risingwave_storage/hummock/sstable/
backward_sstable_iterator.rsuse std::cmp::Ordering::{Equal, Less};
use std::sync::Arc;
use foyer::CacheContext;
use risingwave_hummock_sdk::key::FullKey;
use crate::hummock::iterator::{Backward, HummockIterator, ValueMeta};
use crate::hummock::sstable::SstableIteratorReadOptions;
use crate::hummock::value::HummockValue;
use crate::hummock::{
BlockIterator, HummockResult, SstableIteratorType, SstableStoreRef, TableHolder,
};
use crate::monitor::StoreLocalStatistic;
pub struct BackwardSstableIterator {
block_iter: Option<BlockIterator>,
cur_idx: usize,
sst: TableHolder,
sstable_store: SstableStoreRef,
stats: StoreLocalStatistic,
}
impl BackwardSstableIterator {
pub fn new(sstable: TableHolder, sstable_store: SstableStoreRef) -> Self {
Self {
block_iter: None,
cur_idx: sstable.meta.block_metas.len() - 1,
sst: sstable,
sstable_store,
stats: StoreLocalStatistic::default(),
}
}
async fn seek_idx(
&mut self,
idx: isize,
seek_key: Option<FullKey<&[u8]>>,
) -> HummockResult<()> {
if idx >= self.sst.block_count() as isize || idx < 0 {
self.block_iter = None;
} else {
let block = self
.sstable_store
.get(
&self.sst,
idx as usize,
crate::hummock::CachePolicy::Fill(CacheContext::Default),
&mut self.stats,
)
.await?;
let mut block_iter = BlockIterator::new(block);
if let Some(key) = seek_key {
block_iter.seek_le(key);
} else {
block_iter.seek_to_last();
}
self.block_iter = Some(block_iter);
self.cur_idx = idx as usize;
}
Ok(())
}
}
impl HummockIterator for BackwardSstableIterator {
type Direction = Backward;
async fn next(&mut self) -> HummockResult<()> {
self.stats.total_key_count += 1;
let block_iter = self.block_iter.as_mut().expect("no block iter");
if block_iter.try_prev() {
Ok(())
} else {
self.seek_idx(self.cur_idx as isize - 1, None).await
}
}
fn key(&self) -> FullKey<&[u8]> {
self.block_iter.as_ref().expect("no block iter").key()
}
fn value(&self) -> HummockValue<&[u8]> {
let raw_value = self.block_iter.as_ref().expect("no block iter").value();
HummockValue::from_slice(raw_value).expect("decode error")
}
fn is_valid(&self) -> bool {
self.block_iter.as_ref().map_or(false, |i| i.is_valid())
}
async fn rewind(&mut self) -> HummockResult<()> {
self.seek_idx(self.sst.block_count() as isize - 1, None)
.await
}
async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
let block_idx = self
.sst
.meta
.block_metas
.partition_point(|block_meta| {
let ord = FullKey::decode(&block_meta.smallest_key).cmp(&key);
ord == Less || ord == Equal
})
.saturating_sub(1); let block_idx = block_idx as isize;
self.seek_idx(block_idx, Some(key)).await?;
if !self.is_valid() {
self.seek_idx(block_idx - 1, None).await?;
}
Ok(())
}
fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
stats.add(&self.stats)
}
fn value_meta(&self) -> ValueMeta {
ValueMeta {
object_id: Some(self.sst.id),
block_id: Some(self.cur_idx as _),
}
}
}
impl SstableIteratorType for BackwardSstableIterator {
fn create(
sstable: TableHolder,
sstable_store: SstableStoreRef,
_: Arc<SstableIteratorReadOptions>,
) -> Self {
BackwardSstableIterator::new(sstable, sstable_store)
}
}
#[cfg(test)]
mod tests {
use itertools::Itertools;
use rand::prelude::*;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::test_epoch;
use super::*;
use crate::assert_bytes_eq;
use crate::hummock::iterator::test_utils::mock_sstable_store;
use crate::hummock::test_utils::{
default_builder_opt_for_test, gen_default_test_sstable, test_key_of, test_value_of,
TEST_KEYS_COUNT,
};
#[tokio::test]
async fn test_backward_sstable_iterator() {
let sstable_store = mock_sstable_store().await;
let handle =
gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
.await;
assert!(handle.meta.block_metas.len() > 10);
let mut sstable_iter = BackwardSstableIterator::new(handle, sstable_store);
let mut cnt = TEST_KEYS_COUNT;
sstable_iter.rewind().await.unwrap();
while sstable_iter.is_valid() {
cnt -= 1;
let key = sstable_iter.key();
let value = sstable_iter.value();
assert_eq!(key, test_key_of(cnt).to_ref());
assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
sstable_iter.next().await.unwrap();
}
assert_eq!(cnt, 0);
}
#[tokio::test]
async fn test_backward_sstable_seek() {
let sstable_store = mock_sstable_store().await;
let sstable =
gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
.await;
assert!(sstable.meta.block_metas.len() > 10);
let mut sstable_iter = BackwardSstableIterator::new(sstable, sstable_store);
let mut all_key_to_test = (0..TEST_KEYS_COUNT).collect_vec();
let mut rng = thread_rng();
all_key_to_test.shuffle(&mut rng);
for i in all_key_to_test {
sstable_iter.seek(test_key_of(i).to_ref()).await.unwrap();
let key = sstable_iter.key();
assert_eq!(key, test_key_of(i).to_ref());
}
sstable_iter
.seek(test_key_of(TEST_KEYS_COUNT - 500).to_ref())
.await
.unwrap();
for i in (0..TEST_KEYS_COUNT - 500 + 1).rev() {
let key = sstable_iter.key();
assert_eq!(key, test_key_of(i).to_ref(), "key index:{}", i);
sstable_iter.next().await.unwrap();
}
assert!(!sstable_iter.is_valid());
let largest_key = FullKey::for_test(
TableId::default(),
[
VirtualNode::ZERO.to_be_bytes().as_slice(),
format!("key_zzzz_{:05}", 0).as_bytes(),
]
.concat(),
test_epoch(1),
);
sstable_iter.seek(largest_key.to_ref()).await.unwrap();
let key = sstable_iter.key();
assert_eq!(key, test_key_of(TEST_KEYS_COUNT - 1).to_ref());
let smallest_key = FullKey::for_test(
TableId::default(),
[
VirtualNode::ZERO.to_be_bytes().as_slice(),
format!("key_aaaa_{:05}", 0).as_bytes(),
]
.concat(),
test_epoch(1),
);
sstable_iter.seek(smallest_key.to_ref()).await.unwrap();
assert!(!sstable_iter.is_valid());
for idx in (1..TEST_KEYS_COUNT).rev() {
sstable_iter
.seek(
FullKey::for_test(
TableId::default(),
[
VirtualNode::ZERO.to_be_bytes().as_slice(),
format!("key_test_{:05}", idx * 2 - 1).as_bytes(),
]
.concat(),
0,
)
.to_ref(),
)
.await
.unwrap();
let key = sstable_iter.key();
assert_eq!(key, test_key_of(idx - 1).to_ref());
sstable_iter.next().await.unwrap();
}
assert!(!sstable_iter.is_valid());
}
}