risingwave_storage/hummock/iterator/
concat_inner.rsuse std::cmp::Ordering::{Equal, Greater, Less};
use std::sync::Arc;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use crate::hummock::iterator::{
DirectionEnum, HummockIterator, HummockIteratorDirection, ValueMeta,
};
use crate::hummock::sstable::SstableIteratorReadOptions;
use crate::hummock::value::HummockValue;
use crate::hummock::{HummockResult, SstableIteratorType, SstableStoreRef};
use crate::monitor::StoreLocalStatistic;
fn smallest_key(sstable_info: &SstableInfo) -> &[u8] {
&sstable_info.key_range.left
}
fn largest_key(sstable_info: &SstableInfo) -> &[u8] {
&sstable_info.key_range.right
}
pub struct ConcatIteratorInner<TI: SstableIteratorType> {
sstable_iter: Option<TI>,
cur_idx: usize,
tables: Vec<SstableInfo>,
sstable_store: SstableStoreRef,
stats: StoreLocalStatistic,
read_options: Arc<SstableIteratorReadOptions>,
}
impl<TI: SstableIteratorType> ConcatIteratorInner<TI> {
pub fn new(
tables: Vec<SstableInfo>,
sstable_store: SstableStoreRef,
read_options: Arc<SstableIteratorReadOptions>,
) -> Self {
Self {
sstable_iter: None,
cur_idx: 0,
tables,
sstable_store,
stats: StoreLocalStatistic::default(),
read_options,
}
}
async fn seek_idx(
&mut self,
idx: usize,
seek_key: Option<FullKey<&[u8]>>,
) -> HummockResult<()> {
if idx >= self.tables.len() {
if let Some(old_iter) = self.sstable_iter.take() {
old_iter.collect_local_statistic(&mut self.stats);
}
self.cur_idx = self.tables.len();
} else {
let table = self
.sstable_store
.sstable(&self.tables[idx], &mut self.stats)
.await?;
let mut sstable_iter =
TI::create(table, self.sstable_store.clone(), self.read_options.clone());
if let Some(key) = seek_key {
sstable_iter.seek(key).await?;
} else {
sstable_iter.rewind().await?;
}
if let Some(old_iter) = self.sstable_iter.take() {
old_iter.collect_local_statistic(&mut self.stats);
}
self.sstable_iter = Some(sstable_iter);
self.cur_idx = idx;
}
Ok(())
}
}
impl<TI: SstableIteratorType> HummockIterator for ConcatIteratorInner<TI> {
type Direction = TI::Direction;
async fn next(&mut self) -> HummockResult<()> {
let sstable_iter = self.sstable_iter.as_mut().expect("no table iter");
sstable_iter.next().await?;
if sstable_iter.is_valid() {
Ok(())
} else {
let mut table_idx = self.cur_idx + 1;
while !self.is_valid() && table_idx < self.tables.len() {
self.seek_idx(table_idx, None).await?;
table_idx += 1;
}
Ok(())
}
}
fn key(&self) -> FullKey<&[u8]> {
self.sstable_iter.as_ref().expect("no table iter").key()
}
fn value(&self) -> HummockValue<&[u8]> {
self.sstable_iter.as_ref().expect("no table iter").value()
}
fn is_valid(&self) -> bool {
self.sstable_iter.as_ref().map_or(false, |i| i.is_valid())
}
async fn rewind(&mut self) -> HummockResult<()> {
self.seek_idx(0, None).await?;
let mut table_idx = 1;
while !self.is_valid() && table_idx < self.tables.len() {
self.seek_idx(table_idx, None).await?;
table_idx += 1;
}
Ok(())
}
async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
let mut table_idx = self
.tables
.partition_point(|table| match Self::Direction::direction() {
DirectionEnum::Forward => {
let ord = FullKey::decode(smallest_key(table)).cmp(&key);
ord == Less || ord == Equal
}
DirectionEnum::Backward => {
let ord = FullKey::decode(largest_key(table)).cmp(&key);
ord == Greater || (ord == Equal && !table.key_range.right_exclusive)
}
})
.saturating_sub(1); self.seek_idx(table_idx, Some(key)).await?;
table_idx += 1;
while !self.is_valid() && table_idx < self.tables.len() {
self.seek_idx(table_idx, None).await?;
table_idx += 1;
}
Ok(())
}
fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
stats.add(&self.stats);
if let Some(iter) = &self.sstable_iter {
iter.collect_local_statistic(stats);
}
}
fn value_meta(&self) -> ValueMeta {
self.sstable_iter
.as_ref()
.expect("no table iter")
.value_meta()
}
}