use std::ops::Bound::*;
use bytes::Bytes;
use more_asserts::debug_assert_le;
use risingwave_hummock_sdk::key::{FullKey, UserKey, UserKeyRange};
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
use crate::hummock::iterator::{Backward, HummockIterator};
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::value::HummockValue;
use crate::hummock::HummockResult;
use crate::monitor::StoreLocalStatistic;
pub struct BackwardUserIterator<I: HummockIterator<Direction = Backward>> {
iterator: I,
just_met_new_key: bool,
last_key: FullKey<Bytes>,
last_val: Bytes,
last_delete: bool,
out_of_range: bool,
key_range: UserKeyRange,
read_epoch: HummockEpoch,
min_epoch: HummockEpoch,
_version: Option<PinnedVersion>,
stats: StoreLocalStatistic,
}
impl<I: HummockIterator<Direction = Backward>> BackwardUserIterator<I> {
pub fn new(
iterator: I,
key_range: UserKeyRange,
read_epoch: u64,
min_epoch: u64,
version: Option<PinnedVersion>,
) -> Self {
Self {
iterator,
out_of_range: false,
key_range,
just_met_new_key: false,
last_key: FullKey::default(),
last_val: Bytes::new(),
last_delete: true,
read_epoch,
min_epoch,
stats: StoreLocalStatistic::default(),
_version: version,
}
}
fn out_of_range(&self, key: UserKey<&[u8]>) -> bool {
match &self.key_range.0 {
Included(begin_key) => key < begin_key.as_ref(),
Excluded(begin_key) => key <= begin_key.as_ref(),
Unbounded => false,
}
}
fn reset(&mut self) {
self.last_key = FullKey::default();
self.just_met_new_key = false;
self.last_delete = true;
self.out_of_range = false;
}
pub async fn next(&mut self) -> HummockResult<()> {
if !self.iterator.is_valid() {
self.last_delete = true;
return Ok(());
}
while self.iterator.is_valid() {
let full_key = self.iterator.key();
let epoch = full_key.epoch_with_gap.pure_epoch();
let key = &full_key.user_key;
if epoch > self.min_epoch && epoch <= self.read_epoch {
if self.just_met_new_key {
self.last_key = full_key.copy_into();
self.just_met_new_key = false;
if self.out_of_range(self.last_key.user_key.as_ref()) {
self.out_of_range = true;
break;
}
} else if self.last_key.user_key.as_ref() != *key {
if !self.last_delete {
self.just_met_new_key = true;
self.stats.processed_key_count += 1;
return Ok(());
} else {
self.last_key = full_key.copy_into();
if self.out_of_range(self.last_key.user_key.as_ref()) {
self.out_of_range = true;
break;
}
}
} else {
self.stats.skip_multi_version_key_count += 1;
}
match self.iterator.value() {
HummockValue::Put(val) => {
self.last_key = full_key.copy_into();
self.last_val = Bytes::copy_from_slice(val);
self.last_delete = false;
}
HummockValue::Delete => {
self.last_delete = true;
}
}
}
self.iterator.next().await?;
}
Ok(()) }
pub fn key(&self) -> FullKey<&[u8]> {
assert!(self.is_valid());
self.last_key.to_ref()
}
pub fn value(&self) -> &[u8] {
assert!(self.is_valid());
&self.last_val
}
pub async fn rewind(&mut self) -> HummockResult<()> {
match &self.key_range.1 {
Included(end_key) | Excluded(end_key) => {
let full_key = FullKey {
user_key: end_key.as_ref(),
epoch_with_gap: EpochWithGap::new_min_epoch(),
};
self.iterator.seek(full_key).await?;
}
Unbounded => self.iterator.rewind().await?,
};
self.reset();
self.next().await?;
if let Excluded(end_key) = &self.key_range.1
&& self.is_valid()
&& self.key().user_key == end_key.as_ref()
{
self.next().await?;
}
Ok(())
}
pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> {
let seek_key = match &self.key_range.1 {
Included(end_key) | Excluded(end_key) => {
let end_key = end_key.as_ref();
if end_key < user_key {
end_key
} else {
user_key
}
}
Unbounded => user_key,
};
let full_key = FullKey {
user_key: seek_key,
epoch_with_gap: EpochWithGap::new_min_epoch(),
};
self.iterator.seek(full_key).await?;
self.reset();
self.next().await?;
if let Excluded(end_key) = &self.key_range.1
&& self.is_valid()
&& self.key().user_key == end_key.as_ref()
{
debug_assert_le!(end_key.as_ref(), user_key);
self.next().await?;
}
Ok(())
}
pub fn is_valid(&self) -> bool {
let has_enough_input = self.iterator.is_valid() || !self.last_delete;
has_enough_input && (!self.out_of_range)
}
pub fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
stats.add(&self.stats);
self.iterator.collect_local_statistic(stats);
}
}
#[cfg(test)]
impl<I: HummockIterator<Direction = Backward>> BackwardUserIterator<I> {
pub(crate) fn for_test(iterator: I, key_range: UserKeyRange) -> Self {
Self::new(iterator, key_range, HummockEpoch::MAX, 0, None)
}
pub(crate) fn with_min_epoch(
iterator: I,
key_range: UserKeyRange,
min_epoch: HummockEpoch,
) -> Self {
Self::new(iterator, key_range, HummockEpoch::MAX, min_epoch, None)
}
}
#[cfg(test)]
mod tests {
use std::cmp::Reverse;
use std::collections::BTreeMap;
use std::ops::Bound::{self, *};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::{test_epoch, EpochExt};
use risingwave_hummock_sdk::key::prev_key;
use super::*;
use crate::hummock::iterator::test_utils::{
default_builder_opt_for_test, gen_iterator_test_sstable_base,
gen_iterator_test_sstable_from_kv_pair, gen_iterator_test_sstable_with_incr_epoch,
iterator_test_bytes_key_of, iterator_test_bytes_key_of_epoch,
iterator_test_bytes_user_key_of, iterator_test_user_key_of, iterator_test_value_of,
mock_sstable_store, TEST_KEYS_COUNT,
};
use crate::hummock::iterator::MergeIterator;
use crate::hummock::test_utils::gen_test_sstable;
use crate::hummock::{BackwardSstableIterator, SstableStoreRef, TableHolder};
#[tokio::test]
async fn test_backward_user_basic() {
let sstable_store = mock_sstable_store().await;
let table0 = gen_iterator_test_sstable_base(
0,
default_builder_opt_for_test(),
|x| x * 3 + 1,
sstable_store.clone(),
TEST_KEYS_COUNT,
)
.await;
let table1 = gen_iterator_test_sstable_base(
1,
default_builder_opt_for_test(),
|x| x * 3 + 2,
sstable_store.clone(),
TEST_KEYS_COUNT,
)
.await;
let table2 = gen_iterator_test_sstable_base(
2,
default_builder_opt_for_test(),
|x| x * 3 + 3,
sstable_store.clone(),
TEST_KEYS_COUNT,
)
.await;
let backward_iters = vec![
BackwardSstableIterator::new(table1, sstable_store.clone()),
BackwardSstableIterator::new(table2, sstable_store.clone()),
BackwardSstableIterator::new(table0, sstable_store),
];
let mi = MergeIterator::new(backward_iters);
let mut ui = BackwardUserIterator::for_test(mi, (Unbounded, Unbounded));
let mut i = 3 * TEST_KEYS_COUNT;
ui.rewind().await.unwrap();
while ui.is_valid() {
let key = ui.key();
let val = ui.value();
assert_eq!(key, iterator_test_bytes_key_of(i).to_ref());
assert_eq!(val, iterator_test_value_of(i).as_slice());
i -= 1;
ui.next().await.unwrap();
if i == 0 {
assert!(!ui.is_valid());
break;
}
}
}
#[tokio::test]
async fn test_backward_user_seek() {
let sstable_store = mock_sstable_store().await;
let table0 = gen_iterator_test_sstable_base(
0,
default_builder_opt_for_test(),
|x| x * 3 + 1,
sstable_store.clone(),
TEST_KEYS_COUNT,
)
.await;
let table1 = gen_iterator_test_sstable_base(
1,
default_builder_opt_for_test(),
|x| x * 3 + 2,
sstable_store.clone(),
TEST_KEYS_COUNT,
)
.await;
let table2 = gen_iterator_test_sstable_base(
2,
default_builder_opt_for_test(),
|x| x * 3 + 3,
sstable_store.clone(),
TEST_KEYS_COUNT,
)
.await;
let backward_iters = vec![
BackwardSstableIterator::new(table0, sstable_store.clone()),
BackwardSstableIterator::new(table1, sstable_store.clone()),
BackwardSstableIterator::new(table2, sstable_store),
];
let bmi = MergeIterator::new(backward_iters);
let mut bui = BackwardUserIterator::for_test(bmi, (Unbounded, Unbounded));
bui.seek(iterator_test_user_key_of(0).as_ref())
.await
.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(TEST_KEYS_COUNT + 4).as_ref())
.await
.unwrap();
let k = bui.key();
let v = bui.value();
assert_eq!(v, iterator_test_value_of(TEST_KEYS_COUNT + 4).as_slice());
assert_eq!(k, iterator_test_bytes_key_of(TEST_KEYS_COUNT + 4).to_ref());
bui.seek(iterator_test_user_key_of(2 * TEST_KEYS_COUNT + 5).as_ref())
.await
.unwrap();
let k = bui.key();
let v = bui.value();
assert_eq!(
v,
iterator_test_value_of(2 * TEST_KEYS_COUNT + 5).as_slice()
);
assert_eq!(
k,
iterator_test_bytes_key_of(2 * TEST_KEYS_COUNT + 5).to_ref()
);
bui.seek(iterator_test_user_key_of(3 * TEST_KEYS_COUNT).as_ref())
.await
.unwrap();
let k = bui.key();
let v = bui.value();
assert_eq!(v, iterator_test_value_of(3 * TEST_KEYS_COUNT).as_slice());
assert_eq!(k, iterator_test_bytes_key_of(3 * TEST_KEYS_COUNT).to_ref());
}
#[tokio::test]
async fn test_backward_user_delete() {
let sstable_store = mock_sstable_store().await;
let kv_pairs = vec![
(1, 300, HummockValue::delete()),
(2, 100, HummockValue::put(iterator_test_value_of(2))),
];
let table0 =
gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
let kv_pairs = vec![
(1, 400, HummockValue::put(iterator_test_value_of(1))),
(2, 200, HummockValue::delete()),
];
let table1 =
gen_iterator_test_sstable_from_kv_pair(1, kv_pairs, sstable_store.clone()).await;
let backward_iters = vec![
BackwardSstableIterator::new(table0, sstable_store.clone()),
BackwardSstableIterator::new(table1, sstable_store),
];
let bmi = MergeIterator::new(backward_iters);
let mut bui = BackwardUserIterator::for_test(bmi, (Unbounded, Unbounded));
bui.rewind().await.unwrap();
let k = bui.key();
let v = bui.value();
assert_eq!(k, iterator_test_bytes_key_of_epoch(1, 400).to_ref());
assert_eq!(v, iterator_test_value_of(1).as_slice());
bui.next().await.unwrap();
assert!(!bui.is_valid());
}
#[tokio::test]
async fn test_backward_user_range_inclusive() {
let sstable_store = mock_sstable_store().await;
let kv_pairs = vec![
(0, 200, HummockValue::delete()),
(0, 100, HummockValue::put(iterator_test_value_of(0))),
(1, 200, HummockValue::put(iterator_test_value_of(1))),
(1, 100, HummockValue::delete()),
(2, 400, HummockValue::delete()),
(2, 300, HummockValue::put(iterator_test_value_of(2))),
(2, 200, HummockValue::delete()),
(2, 100, HummockValue::put(iterator_test_value_of(2))),
(3, 100, HummockValue::put(iterator_test_value_of(3))),
(5, 200, HummockValue::delete()),
(5, 100, HummockValue::put(iterator_test_value_of(5))),
(6, 100, HummockValue::put(iterator_test_value_of(6))),
(7, 300, HummockValue::put(iterator_test_value_of(7))),
(7, 200, HummockValue::delete()),
(7, 100, HummockValue::put(iterator_test_value_of(7))),
(8, 100, HummockValue::put(iterator_test_value_of(8))),
];
let sstable =
gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
let backward_iters = vec![BackwardSstableIterator::new(sstable, sstable_store)];
let bmi = MergeIterator::new(backward_iters);
let begin_key = Included(iterator_test_bytes_user_key_of(2));
let end_key = Included(iterator_test_bytes_user_key_of(7));
let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, end_key));
bui.rewind().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 300).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
bui.next().await.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(8).as_ref())
.await
.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 300).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
bui.next().await.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(7).as_ref())
.await
.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 300).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
bui.next().await.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(2).as_ref())
.await
.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(1).as_ref())
.await
.unwrap();
assert!(!bui.is_valid());
}
#[tokio::test]
async fn test_backward_user_range() {
let sstable_store = mock_sstable_store().await;
let kv_pairs = vec![
(0, 200, HummockValue::delete()),
(0, 100, HummockValue::put(iterator_test_value_of(0))),
(1, 200, HummockValue::put(iterator_test_value_of(1))),
(1, 100, HummockValue::delete()),
(2, 300, HummockValue::put(iterator_test_value_of(2))),
(2, 200, HummockValue::delete()),
(2, 100, HummockValue::delete()),
(3, 100, HummockValue::put(iterator_test_value_of(3))),
(5, 200, HummockValue::delete()),
(5, 100, HummockValue::put(iterator_test_value_of(5))),
(6, 100, HummockValue::put(iterator_test_value_of(6))),
(7, 100, HummockValue::put(iterator_test_value_of(7))),
(8, 100, HummockValue::put(iterator_test_value_of(8))),
];
let sstable =
gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
let backward_iters = vec![BackwardSstableIterator::new(
sstable.clone(),
sstable_store.clone(),
)];
let bmi = MergeIterator::new(backward_iters);
let begin_key = Excluded(iterator_test_bytes_user_key_of(2));
let end_key = Included(iterator_test_bytes_user_key_of(7));
let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, end_key));
bui.rewind().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
bui.next().await.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(8).as_ref())
.await
.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
bui.next().await.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(7).as_ref())
.await
.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
bui.next().await.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(2).as_ref())
.await
.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(1).as_ref())
.await
.unwrap();
assert!(!bui.is_valid());
let backward_iters = vec![BackwardSstableIterator::new(sstable, sstable_store)];
let bmi = MergeIterator::new(backward_iters);
let begin_key = Excluded(iterator_test_bytes_user_key_of(2));
let end_key = Excluded(iterator_test_bytes_user_key_of(7));
let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, end_key));
bui.rewind().await.unwrap();
assert!(bui.is_valid());
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
bui.seek(iterator_test_user_key_of(7).as_ref())
.await
.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
bui.seek(iterator_test_user_key_of(5).as_ref())
.await
.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
}
#[tokio::test]
async fn test_backward_user_range_to_inclusive() {
let sstable_store = mock_sstable_store().await;
let kv_pairs = vec![
(0, 200, HummockValue::delete()),
(0, 100, HummockValue::put(iterator_test_value_of(0))),
(1, 200, HummockValue::put(iterator_test_value_of(1))),
(1, 100, HummockValue::delete()),
(2, 300, HummockValue::put(iterator_test_value_of(2))),
(2, 200, HummockValue::delete()),
(2, 100, HummockValue::delete()),
(3, 100, HummockValue::put(iterator_test_value_of(3))),
(5, 200, HummockValue::delete()),
(5, 100, HummockValue::put(iterator_test_value_of(5))),
(6, 100, HummockValue::put(iterator_test_value_of(6))),
(7, 200, HummockValue::delete()),
(7, 100, HummockValue::put(iterator_test_value_of(7))),
(8, 100, HummockValue::put(iterator_test_value_of(8))),
];
let sstable =
gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
let backward_iters = vec![BackwardSstableIterator::new(
sstable.clone(),
sstable_store.clone(),
)];
let bmi = MergeIterator::new(backward_iters);
let end_key = Included(iterator_test_bytes_user_key_of(7));
let mut bui = BackwardUserIterator::for_test(
bmi,
(Included(iterator_test_bytes_user_key_of(2)), end_key),
);
bui.rewind().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
bui.next().await.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(7).as_ref())
.await
.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
bui.next().await.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(6).as_ref())
.await
.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
bui.next().await.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(0).as_ref())
.await
.unwrap();
assert!(!bui.is_valid());
let end_key = Excluded(iterator_test_bytes_user_key_of(6));
let backward_iters = vec![BackwardSstableIterator::new(sstable, sstable_store)];
let bmi = MergeIterator::new(backward_iters);
let mut bui = BackwardUserIterator::for_test(
bmi,
(Excluded(iterator_test_bytes_user_key_of(2)), end_key),
);
bui.seek(iterator_test_user_key_of(6).as_ref())
.await
.unwrap();
assert!(bui.is_valid());
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
bui.next().await.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(7).as_ref())
.await
.unwrap();
assert!(bui.is_valid());
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
}
#[tokio::test]
async fn test_backward_user_range_from() {
let sstable_store = mock_sstable_store().await;
let kv_pairs = vec![
(0, 200, HummockValue::delete()),
(0, 100, HummockValue::put(iterator_test_value_of(0))),
(1, 200, HummockValue::put(iterator_test_value_of(1))),
(1, 100, HummockValue::delete()),
(2, 300, HummockValue::put(iterator_test_value_of(2))),
(2, 200, HummockValue::delete()),
(2, 100, HummockValue::delete()),
(3, 100, HummockValue::put(iterator_test_value_of(3))),
(5, 200, HummockValue::delete()),
(5, 100, HummockValue::put(iterator_test_value_of(5))),
(6, 100, HummockValue::put(iterator_test_value_of(6))),
(7, 200, HummockValue::delete()),
(7, 100, HummockValue::put(iterator_test_value_of(7))),
(8, 100, HummockValue::put(iterator_test_value_of(8))),
];
let handle =
gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
let backward_iters = vec![BackwardSstableIterator::new(handle, sstable_store)];
let bmi = MergeIterator::new(backward_iters);
let begin_key = Included(iterator_test_bytes_user_key_of(2));
let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, Unbounded));
bui.rewind().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
bui.next().await.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(2).as_ref())
.await
.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
bui.next().await.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(5).as_ref())
.await
.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
bui.next().await.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(8).as_ref())
.await
.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
bui.next().await.unwrap();
assert!(!bui.is_valid());
bui.seek(iterator_test_user_key_of(9).as_ref())
.await
.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
bui.next().await.unwrap();
assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
bui.next().await.unwrap();
assert!(!bui.is_valid());
}
fn key_from_num(num: usize) -> UserKey<Vec<u8>> {
let width = 20;
UserKey::for_test(
TableId::default(),
format!("{:0width$}", num, width = width)
.as_bytes()
.to_vec(),
)
}
#[allow(clippy::mutable_key_type)]
async fn chaos_test_case(
handle: TableHolder,
start_bound: Bound<UserKey<Bytes>>,
end_bound: Bound<UserKey<Bytes>>,
truth: &ChaosTestTruth,
sstable_store: SstableStoreRef,
) {
let start_key = match &start_bound {
Bound::Included(b) => {
UserKey::for_test(b.table_id, Bytes::from(prev_key(&b.table_key.clone())))
}
Bound::Excluded(b) => b.clone(),
Unbounded => key_from_num(0).into_bytes(),
};
let end_key = match &end_bound {
Bound::Included(b) => b.clone(),
Unbounded => key_from_num(999999999999).into_bytes(),
_ => unimplemented!(),
};
let backward_iters = vec![BackwardSstableIterator::new(handle, sstable_store)];
let bmi = MergeIterator::new(backward_iters);
let mut bui = BackwardUserIterator::for_test(bmi, (start_bound, end_bound));
let num_puts: usize = truth
.iter()
.map(|(key, inserts)| {
if *key > end_key || *key <= start_key {
return 0;
}
match inserts.first_key_value().unwrap().1 {
HummockValue::Put(_) => 1,
HummockValue::Delete => 0,
}
})
.reduce(|accum, item| accum + item)
.unwrap();
let mut num_kvs = 0;
bui.rewind().await.unwrap();
for (key, value) in truth.iter().rev() {
if *key > end_key || *key <= start_key {
continue;
}
let (_, value) = value.first_key_value().unwrap();
if let HummockValue::Delete = value {
continue;
}
assert!(bui.is_valid(), "num_kvs:{}", num_kvs);
assert_eq!(bui.key().user_key, key.as_ref(), "num_kvs:{}", num_kvs);
if let HummockValue::Put(bytes) = &value {
assert_eq!(bui.value(), bytes, "num_kvs:{}", num_kvs);
}
bui.next().await.unwrap();
num_kvs += 1;
}
assert!(!bui.is_valid());
assert_eq!(num_kvs, num_puts);
}
type ChaosTestTruth =
BTreeMap<UserKey<Bytes>, BTreeMap<Reverse<HummockEpoch>, HummockValue<Bytes>>>;
async fn generate_chaos_test_data() -> (usize, TableHolder, ChaosTestTruth, SstableStoreRef) {
let mut rng = thread_rng();
#[allow(clippy::mutable_key_type)]
let mut truth: ChaosTestTruth = BTreeMap::new();
let mut prev_key_number: usize = 1;
let number_of_keys = 5000;
for _ in 0..number_of_keys {
let key: usize = rng.gen_range(prev_key_number..=(prev_key_number + 10));
prev_key_number = key + 1;
let key_bytes = key_from_num(key).into_bytes();
let mut prev_time = 500;
let num_updates = rng.gen_range(1..10usize);
for _ in 0..num_updates {
let time: HummockEpoch = test_epoch(rng.gen_range(prev_time..=(prev_time + 1000)));
let is_delete = rng.gen_range(0..=1usize) < 1usize;
match is_delete {
true => {
truth
.entry(key_bytes.clone())
.or_default()
.insert(Reverse(time), HummockValue::delete());
}
false => {
let value_size = rng.gen_range(100..=200);
let value: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(value_size)
.map(char::from)
.collect();
truth
.entry(key_bytes.clone())
.or_default()
.insert(Reverse(time), HummockValue::put(Bytes::from(value)));
}
}
prev_time = time.next_epoch();
}
}
let sstable_store = mock_sstable_store().await;
let sst = gen_test_sstable(
default_builder_opt_for_test(),
0,
truth.iter().flat_map(|(key, inserts)| {
inserts.iter().map(|(time, value)| {
let full_key = FullKey {
user_key: key.clone(),
epoch_with_gap: EpochWithGap::new_from_epoch(time.0),
};
(full_key, value.clone())
})
}),
sstable_store.clone(),
)
.await;
(prev_key_number, sst, truth, sstable_store)
}
#[tokio::test]
async fn test_backward_user_chaos_unbounded_unbounded() {
let (_prev_key_number, sst, truth, sstable_store) = generate_chaos_test_data().await;
let repeat = 20;
for _ in 0..repeat {
chaos_test_case(
sst.clone(),
Unbounded,
Unbounded,
&truth,
sstable_store.clone(),
)
.await;
}
}
#[tokio::test]
async fn test_backward_user_chaos_unbounded_included() {
let (prev_key_number, sst, truth, sstable_store) = generate_chaos_test_data().await;
let repeat = 20;
for _ in 0..repeat {
let mut rng = thread_rng();
let end_key: usize = rng.gen_range(2..=prev_key_number);
let end_key_bytes = key_from_num(end_key).into_bytes();
chaos_test_case(
sst.clone(),
Unbounded,
Included(end_key_bytes.clone()),
&truth,
sstable_store.clone(),
)
.await;
}
}
#[tokio::test]
async fn test_backward_user_chaos_included_unbounded() {
let (prev_key_number, sst, truth, sstable_store) = generate_chaos_test_data().await;
let repeat = 20;
for _ in 0..repeat {
let mut rng = thread_rng();
let end_key: usize = rng.gen_range(2..=prev_key_number);
let begin_key: usize = rng.gen_range(1..=end_key);
let begin_key_bytes = key_from_num(begin_key).into_bytes();
chaos_test_case(
sst.clone(),
Included(begin_key_bytes.clone()),
Unbounded,
&truth,
sstable_store.clone(),
)
.await;
}
}
#[tokio::test]
async fn test_backward_user_chaos_excluded_unbounded() {
let (prev_key_number, sst, truth, sstable_store) = generate_chaos_test_data().await;
let repeat = 20;
for _ in 0..repeat {
let mut rng = thread_rng();
let end_key: usize = rng.gen_range(2..=prev_key_number);
let begin_key: usize = rng.gen_range(1..=end_key);
let begin_key_bytes = key_from_num(begin_key).into_bytes();
chaos_test_case(
sst.clone(),
Excluded(begin_key_bytes.clone()),
Unbounded,
&truth,
sstable_store.clone(),
)
.await;
}
}
#[tokio::test]
async fn test_backward_user_chaos_included_included() {
let (prev_key_number, sst, truth, sstable_store) = generate_chaos_test_data().await;
let repeat = 20;
for _ in 0..repeat {
let mut rng = thread_rng();
let end_key: usize = rng.gen_range(2..=prev_key_number);
let end_key_bytes = key_from_num(end_key).into_bytes();
let begin_key: usize = rng.gen_range(1..=end_key);
let begin_key_bytes = key_from_num(begin_key).into_bytes();
chaos_test_case(
sst.clone(),
Included(begin_key_bytes.clone()),
Included(end_key_bytes.clone()),
&truth,
sstable_store.clone(),
)
.await;
}
}
#[tokio::test]
async fn test_backward_user_chaos_excluded_included() {
let (prev_key_number, sst, truth, sstable_store) = generate_chaos_test_data().await;
let repeat = 20;
for _ in 0..repeat {
let mut rng = thread_rng();
let end_key: usize = rng.gen_range(2..=prev_key_number);
let end_key_bytes = key_from_num(end_key).into_bytes();
let begin_key: usize = rng.gen_range(1..=end_key);
let begin_key_bytes = key_from_num(begin_key).into_bytes();
chaos_test_case(
sst.clone(),
Excluded(begin_key_bytes),
Included(end_key_bytes),
&truth,
sstable_store.clone(),
)
.await;
}
}
#[tokio::test]
async fn test_min_epoch() {
let sstable_store = mock_sstable_store().await;
let table0 = gen_iterator_test_sstable_with_incr_epoch(
0,
default_builder_opt_for_test(),
|x| x * 3,
sstable_store.clone(),
TEST_KEYS_COUNT,
1,
)
.await;
let backward_iters = vec![BackwardSstableIterator::new(table0, sstable_store)];
let min_count = (TEST_KEYS_COUNT / 5) as u64;
let min_epoch = test_epoch(min_count);
let mi = MergeIterator::new(backward_iters);
let mut ui = BackwardUserIterator::with_min_epoch(mi, (Unbounded, Unbounded), min_epoch);
ui.rewind().await.unwrap();
let mut i = 0;
while ui.is_valid() {
let key = ui.key();
let key_epoch = key.epoch_with_gap.pure_epoch();
assert!(key_epoch > min_epoch);
i += 1;
ui.next().await.unwrap();
}
let expect_count = TEST_KEYS_COUNT - min_count as usize;
assert_eq!(i, expect_count);
}
}