use std::cmp::Ordering;
use std::collections::HashSet;
use std::sync::atomic::AtomicU64;
use std::sync::{atomic, Arc};
use std::time::Instant;
use await_tree::InstrumentAwait;
use fail::fail_point;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::KeyComparator;
use crate::hummock::block_stream::BlockDataStream;
use crate::hummock::compactor::task_progress::TaskProgress;
use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
use crate::hummock::sstable_store::SstableStoreRef;
use crate::hummock::value::HummockValue;
use crate::hummock::{BlockHolder, BlockIterator, BlockMeta, HummockResult};
use crate::monitor::StoreLocalStatistic;
const PROGRESS_KEY_INTERVAL: usize = 100;
pub struct SstableStreamIterator {
sstable_store: SstableStoreRef,
block_metas: Vec<BlockMeta>,
block_stream: Option<BlockDataStream>,
block_iter: Option<BlockIterator>,
block_idx: usize,
stats_ptr: Arc<AtomicU64>,
sstable_info: SstableInfo,
sstable_table_ids: HashSet<StateTableId>,
task_progress: Arc<TaskProgress>,
io_retry_times: usize,
max_io_retry_times: usize,
key_range_left: FullKey<Vec<u8>>,
key_range_right: FullKey<Vec<u8>>,
key_range_right_exclusive: bool,
}
impl SstableStreamIterator {
pub fn new(
block_metas: Vec<BlockMeta>,
sstable_info: SstableInfo,
stats: &StoreLocalStatistic,
task_progress: Arc<TaskProgress>,
sstable_store: SstableStoreRef,
max_io_retry_times: usize,
) -> Self {
let sstable_table_ids = HashSet::from_iter(sstable_info.table_ids.iter().cloned());
let block_metas = filter_block_metas(
&block_metas,
&sstable_table_ids,
sstable_info.key_range.clone(),
);
let key_range_left = FullKey::decode(&sstable_info.key_range.left).to_vec();
let key_range_right = FullKey::decode(&sstable_info.key_range.right).to_vec();
let key_range_right_exclusive = sstable_info.key_range.right_exclusive;
Self {
block_stream: None,
block_iter: None,
block_metas,
block_idx: 0,
stats_ptr: stats.remote_io_time.clone(),
sstable_table_ids,
sstable_info,
sstable_store,
task_progress,
io_retry_times: 0,
max_io_retry_times,
key_range_left,
key_range_right,
key_range_right_exclusive,
}
}
async fn create_stream(&mut self) -> HummockResult<()> {
let block_stream = self
.sstable_store
.get_stream_for_blocks(
self.sstable_info.object_id,
&self.block_metas[self.block_idx..],
)
.verbose_instrument_await("stream_iter_get_stream")
.await?;
self.block_stream = Some(block_stream);
Ok(())
}
async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> {
while let Some(block_iter) = self.block_iter.as_mut() {
if self
.sstable_table_ids
.contains(&block_iter.table_id().table_id)
{
return Ok(());
} else {
self.next_block().await?;
}
}
Ok(())
}
pub async fn seek(&mut self, seek_key: Option<FullKey<&[u8]>>) -> HummockResult<()> {
self.next_block().await?;
let seek_key = if let Some(seek_key) = seek_key {
if seek_key.cmp(&self.key_range_left.to_ref()).is_lt() {
Some(self.key_range_left.to_ref())
} else {
Some(seek_key)
}
} else {
Some(self.key_range_left.to_ref())
};
if let (Some(block_iter), Some(seek_key)) = (self.block_iter.as_mut(), seek_key) {
block_iter.seek(seek_key);
if !block_iter.is_valid() {
self.next_block().await?;
}
}
self.prune_from_valid_block_iter().await?;
Ok(())
}
async fn next_block(&mut self) -> HummockResult<()> {
let now = Instant::now();
let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
});
if self.block_idx < self.block_metas.len() {
loop {
let ret = match &mut self.block_stream {
Some(block_stream) => block_stream.next_block().await,
None => {
self.create_stream().await?;
continue;
}
};
match ret {
Ok(Some(block)) => {
let mut block_iter =
BlockIterator::new(BlockHolder::from_owned_block(block));
block_iter.seek_to_first();
self.block_idx += 1;
self.block_iter = Some(block_iter);
return Ok(());
}
Ok(None) => break,
Err(e) => {
if !e.is_object_error() || !self.need_recreate_io_stream() {
return Err(e);
}
self.block_stream.take();
self.io_retry_times += 1;
fail_point!("create_stream_err");
tracing::warn!(
"retry create stream for sstable {} times, sstinfo={}",
self.io_retry_times,
self.sst_debug_info()
);
}
}
}
}
self.block_idx = self.block_metas.len();
self.block_iter = None;
Ok(())
}
pub async fn next(&mut self) -> HummockResult<()> {
if !self.is_valid() {
return Ok(());
}
let block_iter = self.block_iter.as_mut().expect("no block iter");
block_iter.next();
if !block_iter.is_valid() {
self.next_block().await?;
self.prune_from_valid_block_iter().await?;
}
if !self.is_valid() {
return Ok(());
}
let key = self
.block_iter
.as_ref()
.unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
.key();
if self.exceed_key_range_right(key) {
self.block_iter = None;
}
Ok(())
}
pub fn key(&self) -> FullKey<&[u8]> {
let key = self
.block_iter
.as_ref()
.unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
.key();
assert!(
!self.exceed_key_range_left(key),
"key {:?} key_range_left {:?}",
key,
self.key_range_left.to_ref()
);
assert!(
!self.exceed_key_range_right(key),
"key {:?} key_range_right {:?} key_range_right_exclusive {}",
key,
self.key_range_right.to_ref(),
self.key_range_right_exclusive
);
key
}
pub fn value(&self) -> HummockValue<&[u8]> {
let raw_value = self
.block_iter
.as_ref()
.unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
.value();
HummockValue::from_slice(raw_value)
.unwrap_or_else(|_| panic!("decode error sstinfo={}", self.sst_debug_info()))
}
pub fn is_valid(&self) -> bool {
self.block_iter.as_ref().map_or(false, |i| i.is_valid())
}
fn sst_debug_info(&self) -> String {
format!(
"object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
self.sstable_info.object_id,
self.sstable_info.sst_id,
self.sstable_info.meta_offset,
self.sstable_info.table_ids
)
}
fn need_recreate_io_stream(&self) -> bool {
self.io_retry_times < self.max_io_retry_times
}
fn exceed_key_range_left(&self, key: FullKey<&[u8]>) -> bool {
key.cmp(&self.key_range_left.to_ref()).is_lt()
}
fn exceed_key_range_right(&self, key: FullKey<&[u8]>) -> bool {
if self.key_range_right_exclusive {
key.cmp(&self.key_range_right.to_ref()).is_ge()
} else {
key.cmp(&self.key_range_right.to_ref()).is_gt()
}
}
}
impl Drop for SstableStreamIterator {
fn drop(&mut self) {
self.task_progress.dec_num_pending_read_io()
}
}
pub struct ConcatSstableIterator {
key_range: KeyRange,
sstable_iter: Option<SstableStreamIterator>,
cur_idx: usize,
sstables: Vec<SstableInfo>,
existing_table_ids: HashSet<StateTableId>,
sstable_store: SstableStoreRef,
stats: StoreLocalStatistic,
task_progress: Arc<TaskProgress>,
max_io_retry_times: usize,
}
impl ConcatSstableIterator {
pub fn new(
existing_table_ids: Vec<StateTableId>,
sst_infos: Vec<SstableInfo>,
key_range: KeyRange,
sstable_store: SstableStoreRef,
task_progress: Arc<TaskProgress>,
max_io_retry_times: usize,
) -> Self {
Self {
key_range,
sstable_iter: None,
cur_idx: 0,
sstables: sst_infos,
existing_table_ids: HashSet::from_iter(existing_table_ids),
sstable_store,
task_progress,
stats: StoreLocalStatistic::default(),
max_io_retry_times,
}
}
#[cfg(test)]
pub fn for_test(
existing_table_ids: Vec<StateTableId>,
sst_infos: Vec<SstableInfo>,
key_range: KeyRange,
sstable_store: SstableStoreRef,
) -> Self {
Self::new(
existing_table_ids,
sst_infos,
key_range,
sstable_store,
Arc::new(TaskProgress::default()),
0,
)
}
async fn seek_idx(
&mut self,
idx: usize,
seek_key: Option<FullKey<&[u8]>>,
) -> HummockResult<()> {
self.sstable_iter.take();
let mut seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty())
{
(Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) {
Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)),
Ordering::Greater => Some(seek_key),
},
(Some(seek_key), true) => Some(seek_key),
(None, true) => None,
(None, false) => Some(FullKey::decode(&self.key_range.left)),
};
self.cur_idx = idx;
while self.cur_idx < self.sstables.len() {
let table_info = &self.sstables[self.cur_idx];
let mut found = table_info
.table_ids
.iter()
.any(|table_id| self.existing_table_ids.contains(table_id));
if !found {
self.cur_idx += 1;
seek_key = None;
continue;
}
let sstable = self
.sstable_store
.sstable(table_info, &mut self.stats)
.verbose_instrument_await("stream_iter_sstable")
.await?;
let filter_key_range = match seek_key {
Some(seek_key) => {
KeyRange::new(seek_key.encode().into(), self.key_range.right.clone())
}
None => self.key_range.clone(),
};
let block_metas = filter_block_metas(
&sstable.meta.block_metas,
&self.existing_table_ids,
filter_key_range,
);
if block_metas.is_empty() {
found = false;
} else {
self.task_progress.inc_num_pending_read_io();
let mut sstable_iter = SstableStreamIterator::new(
block_metas,
table_info.clone(),
&self.stats,
self.task_progress.clone(),
self.sstable_store.clone(),
self.max_io_retry_times,
);
sstable_iter.seek(seek_key).await?;
if sstable_iter.is_valid() {
self.sstable_iter = Some(sstable_iter);
} else {
found = false;
}
}
if found {
return Ok(());
} else {
self.cur_idx += 1;
seek_key = None;
}
}
Ok(())
}
}
impl HummockIterator for ConcatSstableIterator {
type Direction = Forward;
async fn next(&mut self) -> HummockResult<()> {
let sstable_iter = self.sstable_iter.as_mut().expect("no table iter");
sstable_iter.next().await?;
if sstable_iter.is_valid() {
Ok(())
} else {
self.seek_idx(self.cur_idx + 1, None).await?;
Ok(())
}
}
fn key(&self) -> FullKey<&[u8]> {
self.sstable_iter.as_ref().expect("no table iter").key()
}
fn value(&self) -> HummockValue<&[u8]> {
self.sstable_iter.as_ref().expect("no table iter").value()
}
fn is_valid(&self) -> bool {
self.sstable_iter.as_ref().map_or(false, |i| i.is_valid())
}
async fn rewind(&mut self) -> HummockResult<()> {
self.seek_idx(0, None).await
}
async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
let seek_key = if self.key_range.left.is_empty() {
key
} else {
match key.cmp(&FullKey::decode(&self.key_range.left)) {
Ordering::Less | Ordering::Equal => FullKey::decode(&self.key_range.left),
Ordering::Greater => key,
}
};
let table_idx = self.sstables.partition_point(|table| {
let max_sst_key = &table.key_range.right;
FullKey::decode(max_sst_key).cmp(&seek_key) == Ordering::Less
});
self.seek_idx(table_idx, Some(key)).await
}
fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
stats.add(&self.stats)
}
fn value_meta(&self) -> ValueMeta {
let iter = self.sstable_iter.as_ref().expect("no table iter");
assert!(iter.block_idx >= 1);
ValueMeta {
object_id: Some(iter.sstable_info.object_id),
block_id: Some(iter.block_idx as u64 - 1),
}
}
}
pub struct MonitoredCompactorIterator<I> {
inner: I,
task_progress: Arc<TaskProgress>,
processed_key_num: usize,
}
impl<I: HummockIterator<Direction = Forward>> MonitoredCompactorIterator<I> {
pub fn new(inner: I, task_progress: Arc<TaskProgress>) -> Self {
Self {
inner,
task_progress,
processed_key_num: 0,
}
}
}
impl<I: HummockIterator<Direction = Forward>> HummockIterator for MonitoredCompactorIterator<I> {
type Direction = Forward;
async fn next(&mut self) -> HummockResult<()> {
self.inner.next().await?;
self.processed_key_num += 1;
if self.processed_key_num % PROGRESS_KEY_INTERVAL == 0 {
self.task_progress
.inc_progress_key(PROGRESS_KEY_INTERVAL as _);
}
Ok(())
}
fn key(&self) -> FullKey<&[u8]> {
self.inner.key()
}
fn value(&self) -> HummockValue<&[u8]> {
self.inner.value()
}
fn is_valid(&self) -> bool {
self.inner.is_valid()
}
async fn rewind(&mut self) -> HummockResult<()> {
self.processed_key_num = 0;
self.inner.rewind().await?;
Ok(())
}
async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
self.processed_key_num = 0;
self.inner.seek(key).await?;
Ok(())
}
fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
self.inner.collect_local_statistic(stats)
}
fn value_meta(&self) -> ValueMeta {
self.inner.value_meta()
}
}
pub(crate) fn filter_block_metas(
block_metas: &Vec<BlockMeta>,
existing_table_ids: &HashSet<u32>,
key_range: KeyRange,
) -> Vec<BlockMeta> {
if block_metas.is_empty() {
return vec![];
}
let mut start_index = if key_range.left.is_empty() {
0
} else {
block_metas
.partition_point(|block| {
KeyComparator::compare_encoded_full_key(&key_range.left, &block.smallest_key)
!= Ordering::Less
})
.saturating_sub(1)
};
let mut end_index = if key_range.right.is_empty() {
block_metas.len()
} else {
let ret = block_metas.partition_point(|block| {
KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right)
!= Ordering::Greater
});
if ret == 0 {
return vec![];
}
ret
}
.saturating_sub(1);
while start_index <= end_index {
let start_block_table_id = block_metas[start_index].table_id().table_id();
if existing_table_ids.contains(&start_block_table_id) {
break;
}
let old_start_index = start_index;
let block_metas_to_search = &block_metas[start_index..=end_index];
start_index += block_metas_to_search
.partition_point(|block_meta| block_meta.table_id().table_id() == start_block_table_id);
if old_start_index == start_index {
break;
}
}
while start_index <= end_index {
let end_block_table_id = block_metas[end_index].table_id().table_id();
if existing_table_ids.contains(&end_block_table_id) {
break;
}
let old_end_index = end_index;
let block_metas_to_search = &block_metas[start_index..=end_index];
end_index = start_index
+ block_metas_to_search
.partition_point(|block_meta| block_meta.table_id().table_id() < end_block_table_id)
.saturating_sub(1);
if end_index == old_end_index {
break;
}
}
if start_index > end_index {
return vec![];
}
block_metas[start_index..=end_index].to_vec()
}
#[cfg(test)]
mod tests {
use std::cmp::Ordering;
use std::collections::HashSet;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::{next_full_key, prev_full_key, FullKey, FullKeyTracker};
use risingwave_hummock_sdk::key_range::KeyRange;
use crate::hummock::compactor::ConcatSstableIterator;
use crate::hummock::iterator::test_utils::mock_sstable_store;
use crate::hummock::iterator::{HummockIterator, MergeIterator};
use crate::hummock::test_utils::{
default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
TEST_KEYS_COUNT,
};
use crate::hummock::value::HummockValue;
use crate::hummock::BlockMeta;
#[tokio::test]
async fn test_concat_iterator() {
let sstable_store = mock_sstable_store().await;
let mut table_infos = vec![];
for object_id in 0..3 {
let start_index = object_id * TEST_KEYS_COUNT;
let end_index = (object_id + 1) * TEST_KEYS_COUNT;
let table_info = gen_test_sstable_info(
default_builder_opt_for_test(),
object_id as u64,
(start_index..end_index)
.map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
sstable_store.clone(),
)
.await;
table_infos.push(table_info);
}
let start_index = 5000;
let end_index = 25000;
let kr = KeyRange::new(
test_key_of(start_index).encode().into(),
test_key_of(end_index).encode().into(),
);
let mut iter = ConcatSstableIterator::for_test(
vec![0],
table_infos.clone(),
kr.clone(),
sstable_store.clone(),
);
iter.seek(FullKey::decode(&kr.left)).await.unwrap();
for idx in start_index..end_index {
let key = iter.key();
let val = iter.value();
assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
assert_eq!(
val.into_user_value().unwrap(),
test_value_of(idx).as_slice()
);
iter.next().await.unwrap();
}
let kr = KeyRange::new(
test_key_of(30000).encode().into(),
test_key_of(40000).encode().into(),
);
let mut iter = ConcatSstableIterator::for_test(
vec![0],
table_infos.clone(),
kr.clone(),
sstable_store.clone(),
);
iter.seek(FullKey::decode(&kr.left)).await.unwrap();
assert!(!iter.is_valid());
let kr = KeyRange::new(
test_key_of(start_index).encode().into(),
test_key_of(40000).encode().into(),
);
let mut iter = ConcatSstableIterator::for_test(
vec![0],
table_infos.clone(),
kr.clone(),
sstable_store.clone(),
);
iter.seek(FullKey::decode(&kr.left)).await.unwrap();
for idx in start_index..30000 {
let key = iter.key();
let val = iter.value();
assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
assert_eq!(
val.into_user_value().unwrap(),
test_value_of(idx).as_slice()
);
iter.next().await.unwrap();
}
assert!(!iter.is_valid());
let kr = KeyRange::new(
test_key_of(0).encode().into(),
test_key_of(40000).encode().into(),
);
let mut iter = ConcatSstableIterator::for_test(
vec![0],
table_infos.clone(),
kr.clone(),
sstable_store.clone(),
);
iter.seek(test_key_of(10000).to_ref()).await.unwrap();
assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10000).to_ref());
iter.seek(test_key_of(10001).to_ref()).await.unwrap();
assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10001).to_ref());
iter.seek(test_key_of(9999).to_ref()).await.unwrap();
assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(9999).to_ref());
iter.seek(test_key_of(1).to_ref()).await.unwrap();
assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(1).to_ref());
iter.seek(test_key_of(29999).to_ref()).await.unwrap();
assert!(iter.is_valid() && iter.cur_idx == 2 && iter.key() == test_key_of(29999).to_ref());
iter.seek(test_key_of(30000).to_ref()).await.unwrap();
assert!(!iter.is_valid());
let kr = KeyRange::new(
test_key_of(6000).encode().into(),
test_key_of(16000).encode().into(),
);
let mut iter = ConcatSstableIterator::for_test(
vec![0],
table_infos.clone(),
kr.clone(),
sstable_store.clone(),
);
iter.seek(test_key_of(17000).to_ref()).await.unwrap();
assert!(!iter.is_valid());
iter.seek(test_key_of(1).to_ref()).await.unwrap();
assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == FullKey::decode(&kr.left));
}
#[tokio::test]
async fn test_concat_iterator_seek_idx() {
let sstable_store = mock_sstable_store().await;
let mut table_infos = vec![];
for object_id in 0..3 {
let start_index = object_id * TEST_KEYS_COUNT + TEST_KEYS_COUNT / 2;
let end_index = (object_id + 1) * TEST_KEYS_COUNT;
let table_info = gen_test_sstable_info(
default_builder_opt_for_test(),
object_id as u64,
(start_index..end_index)
.map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
sstable_store.clone(),
)
.await;
table_infos.push(table_info);
}
let kr = KeyRange::new(
test_key_of(0).encode().into(),
test_key_of(40000).encode().into(),
);
let mut iter = ConcatSstableIterator::for_test(
vec![0],
table_infos.clone(),
kr.clone(),
sstable_store.clone(),
);
let sst = sstable_store
.sstable(&iter.sstables[0], &mut iter.stats)
.await
.unwrap();
let block_metas = &sst.meta.block_metas;
let block_1_smallest_key = block_metas[1].smallest_key.clone();
let block_2_smallest_key = block_metas[2].smallest_key.clone();
let seek_key = block_1_smallest_key.clone();
iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
.await
.unwrap();
assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
let seek_key = prev_full_key(block_1_smallest_key.as_slice());
iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
.await
.unwrap();
assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
iter.next().await.unwrap();
let block_1_second_key = iter.key().to_vec();
let seek_key = test_key_of(30001);
iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref()))
.await
.unwrap();
assert!(!iter.is_valid());
let kr = KeyRange::new(
next_full_key(&block_1_smallest_key).into(),
prev_full_key(&block_2_smallest_key).into(),
);
let mut iter = ConcatSstableIterator::for_test(
vec![0],
table_infos.clone(),
kr.clone(),
sstable_store.clone(),
);
let seek_key = FullKey::decode(&block_2_smallest_key);
assert!(seek_key.cmp(&FullKey::decode(&kr.right)) == Ordering::Greater);
iter.seek_idx(0, Some(seek_key)).await.unwrap();
assert!(!iter.is_valid());
let seek_key = test_key_of(0).encode();
iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
.await
.unwrap();
assert!(iter.is_valid());
assert_eq!(iter.key(), block_1_second_key.to_ref());
iter.seek_idx(0, None).await.unwrap();
assert!(iter.is_valid());
assert_eq!(iter.key(), block_1_second_key.to_ref());
}
#[tokio::test]
async fn test_filter_block_metas() {
use crate::hummock::compactor::iterator::filter_block_metas;
{
let block_metas = Vec::default();
let ret = filter_block_metas(&block_metas, &HashSet::default(), KeyRange::default());
assert!(ret.is_empty());
}
{
let block_metas = vec![
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
..Default::default()
},
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
..Default::default()
},
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
..Default::default()
},
];
let ret = filter_block_metas(
&block_metas,
&HashSet::from_iter(vec![1_u32, 2, 3].into_iter()),
KeyRange::default(),
);
assert_eq!(3, ret.len());
assert_eq!(
1,
FullKey::decode(&ret[0].smallest_key)
.user_key
.table_id
.table_id()
);
assert_eq!(
3,
FullKey::decode(&ret[2].smallest_key)
.user_key
.table_id
.table_id()
);
}
{
let block_metas = vec![
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
..Default::default()
},
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
..Default::default()
},
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
..Default::default()
},
];
let ret = filter_block_metas(
&block_metas,
&HashSet::from_iter(vec![2_u32, 3].into_iter()),
KeyRange::default(),
);
assert_eq!(2, ret.len());
assert_eq!(
2,
FullKey::decode(&ret[0].smallest_key)
.user_key
.table_id
.table_id()
);
assert_eq!(
3,
FullKey::decode(&ret[1].smallest_key)
.user_key
.table_id
.table_id()
);
}
{
let block_metas = vec![
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
..Default::default()
},
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
..Default::default()
},
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
..Default::default()
},
];
let ret = filter_block_metas(
&block_metas,
&HashSet::from_iter(vec![1_u32, 2_u32].into_iter()),
KeyRange::default(),
);
assert_eq!(2, ret.len());
assert_eq!(
1,
FullKey::decode(&ret[0].smallest_key)
.user_key
.table_id
.table_id()
);
assert_eq!(
2,
FullKey::decode(&ret[1].smallest_key)
.user_key
.table_id
.table_id()
);
}
{
let block_metas = vec![
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
..Default::default()
},
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
..Default::default()
},
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
..Default::default()
},
];
let ret = filter_block_metas(
&block_metas,
&HashSet::from_iter(vec![2_u32].into_iter()),
KeyRange::default(),
);
assert_eq!(1, ret.len());
assert_eq!(
2,
FullKey::decode(&ret[0].smallest_key)
.user_key
.table_id
.table_id()
);
}
{
let block_metas = vec![
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
..Default::default()
},
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
..Default::default()
},
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
..Default::default()
},
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
..Default::default()
},
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
..Default::default()
},
];
let ret = filter_block_metas(
&block_metas,
&HashSet::from_iter(vec![2_u32].into_iter()),
KeyRange::default(),
);
assert_eq!(1, ret.len());
assert_eq!(
2,
FullKey::decode(&ret[0].smallest_key)
.user_key
.table_id
.table_id()
);
}
{
let block_metas = vec![
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
..Default::default()
},
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
..Default::default()
},
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
..Default::default()
},
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
..Default::default()
},
BlockMeta {
smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
..Default::default()
},
];
let ret = filter_block_metas(
&block_metas,
&HashSet::from_iter(vec![2_u32].into_iter()),
KeyRange::default(),
);
assert_eq!(1, ret.len());
assert_eq!(
2,
FullKey::decode(&ret[0].smallest_key)
.user_key
.table_id
.table_id()
);
}
}
#[tokio::test]
async fn test_iterator_same_obj() {
let sstable_store = mock_sstable_store().await;
let table_info = gen_test_sstable_info(
default_builder_opt_for_test(),
1_u64,
(1..10000).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
sstable_store.clone(),
)
.await;
let split_key = test_key_of(5000).encode();
let mut sst_1 = table_info.clone();
sst_1.key_range.right = split_key.clone().into();
sst_1.key_range.right_exclusive = true;
let total_key_count = sst_1.total_key_count;
let mut sst_2 = table_info.clone();
sst_2.sst_id = sst_1.sst_id + 1;
sst_2.key_range.left = split_key.clone().into();
{
let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
let mut iter = ConcatSstableIterator::for_test(
vec![0],
vec![sst_1.clone(), sst_2.clone()],
KeyRange::default(),
sstable_store.clone(),
);
iter.rewind().await.unwrap();
let mut key_count = 0;
while iter.is_valid() {
let is_new_user_key = full_key_tracker.observe(iter.key());
assert!(is_new_user_key);
key_count += 1;
iter.next().await.unwrap();
}
assert_eq!(total_key_count, key_count);
}
{
let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
let concat_1 = ConcatSstableIterator::for_test(
vec![0],
vec![sst_1.clone()],
KeyRange::default(),
sstable_store.clone(),
);
let concat_2 = ConcatSstableIterator::for_test(
vec![0],
vec![sst_2.clone()],
KeyRange::default(),
sstable_store.clone(),
);
let mut key_count = 0;
let mut iter = MergeIterator::for_compactor(vec![concat_1, concat_2]);
iter.rewind().await.unwrap();
while iter.is_valid() {
full_key_tracker.observe(iter.key());
key_count += 1;
iter.next().await.unwrap();
}
assert_eq!(total_key_count, key_count);
}
}
}