risingwave_storage/hummock/compactor/
iterator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::sync::atomic::AtomicU64;
18use std::sync::{Arc, atomic};
19use std::time::Instant;
20
21use await_tree::{InstrumentAwait, SpanExt};
22use fail::fail_point;
23use risingwave_hummock_sdk::KeyComparator;
24use risingwave_hummock_sdk::compaction_group::StateTableId;
25use risingwave_hummock_sdk::key::FullKey;
26use risingwave_hummock_sdk::key_range::KeyRange;
27use risingwave_hummock_sdk::sstable_info::SstableInfo;
28
29use crate::hummock::block_stream::BlockDataStream;
30use crate::hummock::compactor::task_progress::TaskProgress;
31use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
32use crate::hummock::sstable_store::SstableStoreRef;
33use crate::hummock::value::HummockValue;
34use crate::hummock::{BlockHolder, BlockIterator, BlockMeta, HummockResult};
35use crate::monitor::StoreLocalStatistic;
36
37const PROGRESS_KEY_INTERVAL: usize = 100;
38
39/// Iterates over the KV-pairs of an SST while downloading it.
40/// `SstableStreamIterator` encapsulates operations on `sstables`, constructing block streams and accessing the corresponding data via `block_metas`.
41///  Note that a `block_meta` does not necessarily correspond to the entire sstable, but rather to a subset, which is documented via the `block_idx`.
42pub struct SstableStreamIterator {
43    sstable_store: SstableStoreRef,
44    /// The block metas subset of the SST.
45    block_metas: Vec<BlockMeta>,
46    /// The downloading stream.
47    block_stream: Option<BlockDataStream>,
48
49    /// Iterates over the KV-pairs of the current block.
50    block_iter: Option<BlockIterator>,
51
52    /// Index of the current block.
53    block_idx: usize,
54
55    /// Counts the time used for IO.
56    stats_ptr: Arc<AtomicU64>,
57
58    /// For key sanity check of divided SST and debugging
59    sstable_info: SstableInfo,
60
61    /// To Filter out the blocks
62    sstable_table_ids: HashSet<StateTableId>,
63    task_progress: Arc<TaskProgress>,
64    io_retry_times: usize,
65    max_io_retry_times: usize,
66
67    // key range cache
68    key_range_left: FullKey<Vec<u8>>,
69    key_range_right: FullKey<Vec<u8>>,
70    key_range_right_exclusive: bool,
71}
72
73impl SstableStreamIterator {
74    // We have to handle two internal iterators.
75    //   `block_stream`: iterates over the blocks of the table.
76    //     `block_iter`: iterates over the KV-pairs of the current block.
77    // These iterators work in different ways.
78
79    // BlockIterator works as follows: After new(), we call seek(). That brings us
80    // to the first element. Calling next() then brings us to the second element and does not
81    // return anything.
82
83    // BlockStream follows a different approach. After new(), we do not seek, instead next()
84    // returns the first value.
85
86    /// Initialises a new [`SstableStreamIterator`] which iterates over the given [`BlockDataStream`].
87    /// The iterator reads at most `max_block_count` from the stream.
88    pub fn new(
89        block_metas: Vec<BlockMeta>,
90        sstable_info: SstableInfo,
91        stats: &StoreLocalStatistic,
92        task_progress: Arc<TaskProgress>,
93        sstable_store: SstableStoreRef,
94        max_io_retry_times: usize,
95    ) -> Self {
96        let sstable_table_ids = HashSet::from_iter(sstable_info.table_ids.iter().cloned());
97
98        // filter the block meta with key range
99        let block_metas = filter_block_metas(
100            &block_metas,
101            &sstable_table_ids,
102            sstable_info.key_range.clone(),
103        );
104
105        let key_range_left = FullKey::decode(&sstable_info.key_range.left).to_vec();
106        let key_range_right = FullKey::decode(&sstable_info.key_range.right).to_vec();
107        let key_range_right_exclusive = sstable_info.key_range.right_exclusive;
108
109        Self {
110            block_stream: None,
111            block_iter: None,
112            block_metas,
113            block_idx: 0,
114            stats_ptr: stats.remote_io_time.clone(),
115            sstable_table_ids,
116            sstable_info,
117            sstable_store,
118            task_progress,
119            io_retry_times: 0,
120            max_io_retry_times,
121            key_range_left,
122            key_range_right,
123            key_range_right_exclusive,
124        }
125    }
126
127    async fn create_stream(&mut self) -> HummockResult<()> {
128        let block_stream = self
129            .sstable_store
130            .get_stream_for_blocks(
131                self.sstable_info.object_id,
132                &self.block_metas[self.block_idx..],
133            )
134            .instrument_await("stream_iter_get_stream".verbose())
135            .await?;
136        self.block_stream = Some(block_stream);
137        Ok(())
138    }
139
140    async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> {
141        while let Some(block_iter) = self.block_iter.as_mut() {
142            if self
143                .sstable_table_ids
144                .contains(&block_iter.table_id().table_id)
145            {
146                return Ok(());
147            } else {
148                self.next_block().await?;
149            }
150        }
151        Ok(())
152    }
153
154    /// Initialises the iterator by moving it to the first KV-pair in the stream's first block where
155    /// key >= `seek_key`. If that block does not contain such a KV-pair, the iterator continues to
156    /// the first KV-pair of the next block. If `seek_key` is not given, the iterator will move to
157    /// the very first KV-pair of the stream's first block.
158    pub async fn seek(&mut self, seek_key: Option<FullKey<&[u8]>>) -> HummockResult<()> {
159        // Load first block.
160        self.next_block().await?;
161
162        // We assume that a block always contains at least one KV pair. Subsequently, if
163        // `next_block()` loads a new block (i.e., `block_iter` is not `None`), then `block_iter` is
164        // also valid and pointing on the block's first KV-pair.
165
166        let seek_key = if let Some(seek_key) = seek_key {
167            if seek_key.cmp(&self.key_range_left.to_ref()).is_lt() {
168                Some(self.key_range_left.to_ref())
169            } else {
170                Some(seek_key)
171            }
172        } else {
173            Some(self.key_range_left.to_ref())
174        };
175
176        if let (Some(block_iter), Some(seek_key)) = (self.block_iter.as_mut(), seek_key) {
177            block_iter.seek(seek_key);
178
179            if !block_iter.is_valid() {
180                // `seek_key` is larger than everything in the first block.
181                self.next_block().await?;
182            }
183        }
184
185        self.prune_from_valid_block_iter().await?;
186        Ok(())
187    }
188
189    /// Loads a new block, creates a new iterator for it, and stores that iterator in
190    /// `self.block_iter`. The created iterator points to the block's first KV-pair. If the end of
191    /// the stream is reached or `self.remaining_blocks` is zero, then the function sets
192    /// `self.block_iter` to `None`.
193    async fn next_block(&mut self) -> HummockResult<()> {
194        // Check if we want and if we can load the next block.
195        let now = Instant::now();
196        let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
197            let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
198            stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
199        });
200        if self.block_idx < self.block_metas.len() {
201            loop {
202                let ret = match &mut self.block_stream {
203                    Some(block_stream) => block_stream.next_block().await,
204                    None => {
205                        self.create_stream().await?;
206                        continue;
207                    }
208                };
209                match ret {
210                    Ok(Some(block)) => {
211                        let mut block_iter =
212                            BlockIterator::new(BlockHolder::from_owned_block(block));
213                        block_iter.seek_to_first();
214                        self.block_idx += 1;
215                        self.block_iter = Some(block_iter);
216                        return Ok(());
217                    }
218                    Ok(None) => break,
219                    Err(e) => {
220                        if !e.is_object_error() || !self.need_recreate_io_stream() {
221                            return Err(e);
222                        }
223                        self.block_stream.take();
224                        self.io_retry_times += 1;
225                        fail_point!("create_stream_err");
226
227                        tracing::warn!(
228                            "retry create stream for sstable {} times, sstinfo={}",
229                            self.io_retry_times,
230                            self.sst_debug_info()
231                        );
232                    }
233                }
234            }
235        }
236        self.block_idx = self.block_metas.len();
237        self.block_iter = None;
238
239        Ok(())
240    }
241
242    /// Moves to the next KV-pair in the table. Assumes that the current position is valid. Even if
243    /// the next position is invalid, the function returns `Ok(())`.
244    ///
245    /// Do not use `next()` to initialise the iterator (i.e. do not use it to find the first
246    /// KV-pair). Instead, use `seek()`. Afterwards, use `next()` to reach the second KV-pair and
247    /// onwards.
248    pub async fn next(&mut self) -> HummockResult<()> {
249        if !self.is_valid() {
250            return Ok(());
251        }
252
253        let block_iter = self.block_iter.as_mut().expect("no block iter");
254        block_iter.next();
255        if !block_iter.is_valid() {
256            self.next_block().await?;
257            self.prune_from_valid_block_iter().await?;
258        }
259
260        if !self.is_valid() {
261            return Ok(());
262        }
263
264        // Check if we need to skip the block.
265        let key = self
266            .block_iter
267            .as_ref()
268            .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
269            .key();
270
271        if self.exceed_key_range_right(key) {
272            self.block_iter = None;
273        }
274
275        Ok(())
276    }
277
278    pub fn key(&self) -> FullKey<&[u8]> {
279        let key = self
280            .block_iter
281            .as_ref()
282            .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
283            .key();
284
285        assert!(
286            !self.exceed_key_range_left(key),
287            "key {:?} key_range_left {:?}",
288            key,
289            self.key_range_left.to_ref()
290        );
291
292        assert!(
293            !self.exceed_key_range_right(key),
294            "key {:?} key_range_right {:?} key_range_right_exclusive {}",
295            key,
296            self.key_range_right.to_ref(),
297            self.key_range_right_exclusive
298        );
299
300        key
301    }
302
303    pub fn value(&self) -> HummockValue<&[u8]> {
304        let raw_value = self
305            .block_iter
306            .as_ref()
307            .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
308            .value();
309        HummockValue::from_slice(raw_value)
310            .unwrap_or_else(|_| panic!("decode error sstinfo={}", self.sst_debug_info()))
311    }
312
313    pub fn is_valid(&self) -> bool {
314        // True iff block_iter exists and is valid.
315        self.block_iter.as_ref().is_some_and(|i| i.is_valid())
316    }
317
318    fn sst_debug_info(&self) -> String {
319        format!(
320            "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
321            self.sstable_info.object_id,
322            self.sstable_info.sst_id,
323            self.sstable_info.meta_offset,
324            self.sstable_info.table_ids
325        )
326    }
327
328    fn need_recreate_io_stream(&self) -> bool {
329        self.io_retry_times < self.max_io_retry_times
330    }
331
332    fn exceed_key_range_left(&self, key: FullKey<&[u8]>) -> bool {
333        key.cmp(&self.key_range_left.to_ref()).is_lt()
334    }
335
336    fn exceed_key_range_right(&self, key: FullKey<&[u8]>) -> bool {
337        if self.key_range_right_exclusive {
338            key.cmp(&self.key_range_right.to_ref()).is_ge()
339        } else {
340            key.cmp(&self.key_range_right.to_ref()).is_gt()
341        }
342    }
343}
344
345impl Drop for SstableStreamIterator {
346    fn drop(&mut self) {
347        self.task_progress.dec_num_pending_read_io()
348    }
349}
350
351/// Iterates over the KV-pairs of a given list of SSTs. The key-ranges of these SSTs are assumed to
352/// be consecutive and non-overlapping.
353pub struct ConcatSstableIterator {
354    /// **CAUTION:** `key_range` is used for optimization. It doesn't guarantee value returned by
355    /// the iterator is in this range.
356    key_range: KeyRange,
357
358    /// The iterator of the current table.
359    sstable_iter: Option<SstableStreamIterator>,
360
361    /// Current table index.
362    cur_idx: usize,
363
364    /// All non-overlapping tables.
365    sstables: Vec<SstableInfo>,
366
367    existing_table_ids: HashSet<StateTableId>,
368
369    sstable_store: SstableStoreRef,
370
371    stats: StoreLocalStatistic,
372    task_progress: Arc<TaskProgress>,
373    max_io_retry_times: usize,
374}
375
376impl ConcatSstableIterator {
377    /// Caller should make sure that `tables` are non-overlapping,
378    /// arranged in ascending order when it serves as a forward iterator,
379    /// and arranged in descending order when it serves as a backward iterator.
380    pub fn new(
381        existing_table_ids: Vec<StateTableId>,
382        sst_infos: Vec<SstableInfo>,
383        key_range: KeyRange,
384        sstable_store: SstableStoreRef,
385        task_progress: Arc<TaskProgress>,
386        max_io_retry_times: usize,
387    ) -> Self {
388        Self {
389            key_range,
390            sstable_iter: None,
391            cur_idx: 0,
392            sstables: sst_infos,
393            existing_table_ids: HashSet::from_iter(existing_table_ids),
394            sstable_store,
395            task_progress,
396            stats: StoreLocalStatistic::default(),
397            max_io_retry_times,
398        }
399    }
400
401    #[cfg(test)]
402    pub fn for_test(
403        existing_table_ids: Vec<StateTableId>,
404        sst_infos: Vec<SstableInfo>,
405        key_range: KeyRange,
406        sstable_store: SstableStoreRef,
407    ) -> Self {
408        Self::new(
409            existing_table_ids,
410            sst_infos,
411            key_range,
412            sstable_store,
413            Arc::new(TaskProgress::default()),
414            0,
415        )
416    }
417
418    /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
419    async fn seek_idx(
420        &mut self,
421        idx: usize,
422        seek_key: Option<FullKey<&[u8]>>,
423    ) -> HummockResult<()> {
424        self.sstable_iter.take();
425        let mut seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty())
426        {
427            (Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) {
428                Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)),
429                Ordering::Greater => Some(seek_key),
430            },
431            (Some(seek_key), true) => Some(seek_key),
432            (None, true) => None,
433            (None, false) => Some(FullKey::decode(&self.key_range.left)),
434        };
435
436        self.cur_idx = idx;
437        while self.cur_idx < self.sstables.len() {
438            let table_info = &self.sstables[self.cur_idx];
439            let mut found = table_info
440                .table_ids
441                .iter()
442                .any(|table_id| self.existing_table_ids.contains(table_id));
443            if !found {
444                self.cur_idx += 1;
445                seek_key = None;
446                continue;
447            }
448            let sstable = self
449                .sstable_store
450                .sstable(table_info, &mut self.stats)
451                .instrument_await("stream_iter_sstable".verbose())
452                .await?;
453
454            let filter_key_range = match seek_key {
455                Some(seek_key) => {
456                    KeyRange::new(seek_key.encode().into(), self.key_range.right.clone())
457                }
458                None => self.key_range.clone(),
459            };
460
461            let block_metas = filter_block_metas(
462                &sstable.meta.block_metas,
463                &self.existing_table_ids,
464                filter_key_range,
465            );
466
467            if block_metas.is_empty() {
468                found = false;
469            } else {
470                self.task_progress.inc_num_pending_read_io();
471                let mut sstable_iter = SstableStreamIterator::new(
472                    block_metas,
473                    table_info.clone(),
474                    &self.stats,
475                    self.task_progress.clone(),
476                    self.sstable_store.clone(),
477                    self.max_io_retry_times,
478                );
479                sstable_iter.seek(seek_key).await?;
480
481                if sstable_iter.is_valid() {
482                    self.sstable_iter = Some(sstable_iter);
483                } else {
484                    found = false;
485                }
486            }
487            if found {
488                return Ok(());
489            } else {
490                self.cur_idx += 1;
491                seek_key = None;
492            }
493        }
494        Ok(())
495    }
496}
497
498impl HummockIterator for ConcatSstableIterator {
499    type Direction = Forward;
500
501    async fn next(&mut self) -> HummockResult<()> {
502        let sstable_iter = self.sstable_iter.as_mut().expect("no table iter");
503
504        // Does just calling `next()` suffice?
505        sstable_iter.next().await?;
506        if sstable_iter.is_valid() {
507            Ok(())
508        } else {
509            // No, seek to next table.
510            self.seek_idx(self.cur_idx + 1, None).await?;
511            Ok(())
512        }
513    }
514
515    fn key(&self) -> FullKey<&[u8]> {
516        self.sstable_iter.as_ref().expect("no table iter").key()
517    }
518
519    fn value(&self) -> HummockValue<&[u8]> {
520        self.sstable_iter.as_ref().expect("no table iter").value()
521    }
522
523    fn is_valid(&self) -> bool {
524        self.sstable_iter.as_ref().is_some_and(|i| i.is_valid())
525    }
526
527    async fn rewind(&mut self) -> HummockResult<()> {
528        self.seek_idx(0, None).await
529    }
530
531    /// Resets the iterator and seeks to the first position where the stored key >= `key`.
532    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
533        let seek_key = if self.key_range.left.is_empty() {
534            key
535        } else {
536            match key.cmp(&FullKey::decode(&self.key_range.left)) {
537                Ordering::Less | Ordering::Equal => FullKey::decode(&self.key_range.left),
538                Ordering::Greater => key,
539            }
540        };
541        let table_idx = self.sstables.partition_point(|table| {
542            // We use the maximum key of an SST for the search. That way, we guarantee that the
543            // resulting SST contains either that key or the next-larger KV-pair. Subsequently,
544            // we avoid calling `seek_idx()` twice if the determined SST does not contain `key`.
545
546            // Note that we need to use `<` instead of `<=` to ensure that all keys in an SST
547            // (including its max. key) produce the same search result.
548            let max_sst_key = &table.key_range.right;
549            FullKey::decode(max_sst_key).cmp(&seek_key) == Ordering::Less
550        });
551
552        self.seek_idx(table_idx, Some(key)).await
553    }
554
555    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
556        stats.add(&self.stats)
557    }
558
559    fn value_meta(&self) -> ValueMeta {
560        let iter = self.sstable_iter.as_ref().expect("no table iter");
561        // sstable_iter's block_idx must have advanced at least one.
562        // See SstableStreamIterator::next_block.
563        assert!(iter.block_idx >= 1);
564        ValueMeta {
565            object_id: Some(iter.sstable_info.object_id),
566            block_id: Some(iter.block_idx as u64 - 1),
567        }
568    }
569}
570
571pub struct MonitoredCompactorIterator<I> {
572    inner: I,
573    task_progress: Arc<TaskProgress>,
574
575    processed_key_num: usize,
576}
577
578impl<I: HummockIterator<Direction = Forward>> MonitoredCompactorIterator<I> {
579    pub fn new(inner: I, task_progress: Arc<TaskProgress>) -> Self {
580        Self {
581            inner,
582            task_progress,
583            processed_key_num: 0,
584        }
585    }
586}
587
588impl<I: HummockIterator<Direction = Forward>> HummockIterator for MonitoredCompactorIterator<I> {
589    type Direction = Forward;
590
591    async fn next(&mut self) -> HummockResult<()> {
592        self.inner.next().await?;
593        self.processed_key_num += 1;
594
595        if self.processed_key_num % PROGRESS_KEY_INTERVAL == 0 {
596            self.task_progress
597                .inc_progress_key(PROGRESS_KEY_INTERVAL as _);
598        }
599
600        Ok(())
601    }
602
603    fn key(&self) -> FullKey<&[u8]> {
604        self.inner.key()
605    }
606
607    fn value(&self) -> HummockValue<&[u8]> {
608        self.inner.value()
609    }
610
611    fn is_valid(&self) -> bool {
612        self.inner.is_valid()
613    }
614
615    async fn rewind(&mut self) -> HummockResult<()> {
616        self.processed_key_num = 0;
617        self.inner.rewind().await?;
618        Ok(())
619    }
620
621    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
622        self.processed_key_num = 0;
623        self.inner.seek(key).await?;
624        Ok(())
625    }
626
627    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
628        self.inner.collect_local_statistic(stats)
629    }
630
631    fn value_meta(&self) -> ValueMeta {
632        self.inner.value_meta()
633    }
634}
635
636pub(crate) fn filter_block_metas(
637    block_metas: &Vec<BlockMeta>,
638    existing_table_ids: &HashSet<u32>,
639    key_range: KeyRange,
640) -> Vec<BlockMeta> {
641    if block_metas.is_empty() {
642        return vec![];
643    }
644
645    let mut start_index = if key_range.left.is_empty() {
646        0
647    } else {
648        // start_index points to the greatest block whose smallest_key <= seek_key.
649        block_metas
650            .partition_point(|block| {
651                KeyComparator::compare_encoded_full_key(&key_range.left, &block.smallest_key)
652                    != Ordering::Less
653            })
654            .saturating_sub(1)
655    };
656
657    let mut end_index = if key_range.right.is_empty() {
658        block_metas.len()
659    } else {
660        let ret = block_metas.partition_point(|block| {
661            KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right)
662                != Ordering::Greater
663        });
664
665        if ret == 0 {
666            // not found
667            return vec![];
668        }
669
670        ret
671    }
672    .saturating_sub(1);
673
674    // skip blocks that are not in existing_table_ids
675    while start_index <= end_index {
676        let start_block_table_id = block_metas[start_index].table_id().table_id();
677        if existing_table_ids.contains(&start_block_table_id) {
678            break;
679        }
680
681        // skip this table_id
682        let old_start_index = start_index;
683        let block_metas_to_search = &block_metas[start_index..=end_index];
684
685        start_index += block_metas_to_search
686            .partition_point(|block_meta| block_meta.table_id().table_id() == start_block_table_id);
687
688        if old_start_index == start_index {
689            // no more blocks with the same table_id
690            break;
691        }
692    }
693
694    while start_index <= end_index {
695        let end_block_table_id = block_metas[end_index].table_id().table_id();
696        if existing_table_ids.contains(&end_block_table_id) {
697            break;
698        }
699
700        let old_end_index = end_index;
701        let block_metas_to_search = &block_metas[start_index..=end_index];
702
703        end_index = start_index
704            + block_metas_to_search
705                .partition_point(|block_meta| block_meta.table_id().table_id() < end_block_table_id)
706                .saturating_sub(1);
707
708        if end_index == old_end_index {
709            // no more blocks with the same table_id
710            break;
711        }
712    }
713
714    if start_index > end_index {
715        return vec![];
716    }
717
718    block_metas[start_index..=end_index].to_vec()
719}
720
721#[cfg(test)]
722mod tests {
723    use std::cmp::Ordering;
724    use std::collections::HashSet;
725
726    use risingwave_common::catalog::TableId;
727    use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, next_full_key, prev_full_key};
728    use risingwave_hummock_sdk::key_range::KeyRange;
729    use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
730
731    use crate::hummock::BlockMeta;
732    use crate::hummock::compactor::ConcatSstableIterator;
733    use crate::hummock::iterator::test_utils::mock_sstable_store;
734    use crate::hummock::iterator::{HummockIterator, MergeIterator};
735    use crate::hummock::test_utils::{
736        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_info, test_key_of,
737        test_value_of,
738    };
739    use crate::hummock::value::HummockValue;
740
741    #[tokio::test]
742    async fn test_concat_iterator() {
743        let sstable_store = mock_sstable_store().await;
744        let mut table_infos = vec![];
745        for object_id in 0..3 {
746            let start_index = object_id * TEST_KEYS_COUNT;
747            let end_index = (object_id + 1) * TEST_KEYS_COUNT;
748            let table_info = gen_test_sstable_info(
749                default_builder_opt_for_test(),
750                object_id as u64,
751                (start_index..end_index)
752                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
753                sstable_store.clone(),
754            )
755            .await;
756            table_infos.push(table_info);
757        }
758        let start_index = 5000;
759        let end_index = 25000;
760
761        let kr = KeyRange::new(
762            test_key_of(start_index).encode().into(),
763            test_key_of(end_index).encode().into(),
764        );
765        let mut iter = ConcatSstableIterator::for_test(
766            vec![0],
767            table_infos.clone(),
768            kr.clone(),
769            sstable_store.clone(),
770        );
771        iter.seek(FullKey::decode(&kr.left)).await.unwrap();
772
773        for idx in start_index..end_index {
774            let key = iter.key();
775            let val = iter.value();
776            assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
777            assert_eq!(
778                val.into_user_value().unwrap(),
779                test_value_of(idx).as_slice()
780            );
781            iter.next().await.unwrap();
782        }
783
784        // seek non-overlap range
785        let kr = KeyRange::new(
786            test_key_of(30000).encode().into(),
787            test_key_of(40000).encode().into(),
788        );
789        let mut iter = ConcatSstableIterator::for_test(
790            vec![0],
791            table_infos.clone(),
792            kr.clone(),
793            sstable_store.clone(),
794        );
795        iter.seek(FullKey::decode(&kr.left)).await.unwrap();
796        assert!(!iter.is_valid());
797        let kr = KeyRange::new(
798            test_key_of(start_index).encode().into(),
799            test_key_of(40000).encode().into(),
800        );
801        let mut iter = ConcatSstableIterator::for_test(
802            vec![0],
803            table_infos.clone(),
804            kr.clone(),
805            sstable_store.clone(),
806        );
807        iter.seek(FullKey::decode(&kr.left)).await.unwrap();
808        for idx in start_index..30000 {
809            let key = iter.key();
810            let val = iter.value();
811            assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
812            assert_eq!(
813                val.into_user_value().unwrap(),
814                test_value_of(idx).as_slice()
815            );
816            iter.next().await.unwrap();
817        }
818        assert!(!iter.is_valid());
819
820        // Test seek. Result is dominated by given seek key rather than key range.
821        let kr = KeyRange::new(
822            test_key_of(0).encode().into(),
823            test_key_of(40000).encode().into(),
824        );
825        let mut iter = ConcatSstableIterator::for_test(
826            vec![0],
827            table_infos.clone(),
828            kr.clone(),
829            sstable_store.clone(),
830        );
831        iter.seek(test_key_of(10000).to_ref()).await.unwrap();
832        assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10000).to_ref());
833        iter.seek(test_key_of(10001).to_ref()).await.unwrap();
834        assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10001).to_ref());
835        iter.seek(test_key_of(9999).to_ref()).await.unwrap();
836        assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(9999).to_ref());
837        iter.seek(test_key_of(1).to_ref()).await.unwrap();
838        assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(1).to_ref());
839        iter.seek(test_key_of(29999).to_ref()).await.unwrap();
840        assert!(iter.is_valid() && iter.cur_idx == 2 && iter.key() == test_key_of(29999).to_ref());
841        iter.seek(test_key_of(30000).to_ref()).await.unwrap();
842        assert!(!iter.is_valid());
843
844        // Test seek. Result is dominated by key range rather than given seek key.
845        let kr = KeyRange::new(
846            test_key_of(6000).encode().into(),
847            test_key_of(16000).encode().into(),
848        );
849        let mut iter = ConcatSstableIterator::for_test(
850            vec![0],
851            table_infos.clone(),
852            kr.clone(),
853            sstable_store.clone(),
854        );
855        iter.seek(test_key_of(17000).to_ref()).await.unwrap();
856        assert!(!iter.is_valid());
857        iter.seek(test_key_of(1).to_ref()).await.unwrap();
858        assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == FullKey::decode(&kr.left));
859    }
860
861    #[tokio::test]
862    async fn test_concat_iterator_seek_idx() {
863        let sstable_store = mock_sstable_store().await;
864        let mut table_infos = vec![];
865        for object_id in 0..3 {
866            let start_index = object_id * TEST_KEYS_COUNT + TEST_KEYS_COUNT / 2;
867            let end_index = (object_id + 1) * TEST_KEYS_COUNT;
868            let table_info = gen_test_sstable_info(
869                default_builder_opt_for_test(),
870                object_id as u64,
871                (start_index..end_index)
872                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
873                sstable_store.clone(),
874            )
875            .await;
876            table_infos.push(table_info);
877        }
878
879        // Test seek_idx. Result is dominated by given seek key rather than key range.
880        let kr = KeyRange::new(
881            test_key_of(0).encode().into(),
882            test_key_of(40000).encode().into(),
883        );
884        let mut iter = ConcatSstableIterator::for_test(
885            vec![0],
886            table_infos.clone(),
887            kr.clone(),
888            sstable_store.clone(),
889        );
890        let sst = sstable_store
891            .sstable(&iter.sstables[0], &mut iter.stats)
892            .await
893            .unwrap();
894        let block_metas = &sst.meta.block_metas;
895        let block_1_smallest_key = block_metas[1].smallest_key.clone();
896        let block_2_smallest_key = block_metas[2].smallest_key.clone();
897        // Use block_1_smallest_key as seek key and result in the first KV of block 1.
898        let seek_key = block_1_smallest_key.clone();
899        iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
900            .await
901            .unwrap();
902        assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
903        // Use prev_full_key(block_1_smallest_key) as seek key and result in the first KV of block
904        // 1.
905        let seek_key = prev_full_key(block_1_smallest_key.as_slice());
906        iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
907            .await
908            .unwrap();
909        assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
910        iter.next().await.unwrap();
911        let block_1_second_key = iter.key().to_vec();
912        // Use a big enough seek key and result in invalid iterator.
913        let seek_key = test_key_of(30001);
914        iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref()))
915            .await
916            .unwrap();
917        assert!(!iter.is_valid());
918
919        // Test seek_idx. Result is dominated by key range rather than given seek key.
920        let kr = KeyRange::new(
921            next_full_key(&block_1_smallest_key).into(),
922            prev_full_key(&block_2_smallest_key).into(),
923        );
924        let mut iter = ConcatSstableIterator::for_test(
925            vec![0],
926            table_infos.clone(),
927            kr.clone(),
928            sstable_store.clone(),
929        );
930        // Use block_2_smallest_key as seek key and result in invalid iterator.
931        let seek_key = FullKey::decode(&block_2_smallest_key);
932        assert!(seek_key.cmp(&FullKey::decode(&kr.right)) == Ordering::Greater);
933        iter.seek_idx(0, Some(seek_key)).await.unwrap();
934        assert!(!iter.is_valid());
935        // Use a small enough seek key and result in the second KV of block 1.
936        let seek_key = test_key_of(0).encode();
937        iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
938            .await
939            .unwrap();
940        assert!(iter.is_valid());
941        assert_eq!(iter.key(), block_1_second_key.to_ref());
942
943        // Use None seek key and result in the second KV of block 1.
944        iter.seek_idx(0, None).await.unwrap();
945        assert!(iter.is_valid());
946        assert_eq!(iter.key(), block_1_second_key.to_ref());
947    }
948
949    #[tokio::test]
950    async fn test_filter_block_metas() {
951        use crate::hummock::compactor::iterator::filter_block_metas;
952
953        {
954            let block_metas = Vec::default();
955
956            let ret = filter_block_metas(&block_metas, &HashSet::default(), KeyRange::default());
957
958            assert!(ret.is_empty());
959        }
960
961        {
962            let block_metas = vec![
963                BlockMeta {
964                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
965                    ..Default::default()
966                },
967                BlockMeta {
968                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
969                    ..Default::default()
970                },
971                BlockMeta {
972                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
973                    ..Default::default()
974                },
975            ];
976
977            let ret = filter_block_metas(
978                &block_metas,
979                &HashSet::from_iter(vec![1_u32, 2, 3].into_iter()),
980                KeyRange::default(),
981            );
982
983            assert_eq!(3, ret.len());
984            assert_eq!(
985                1,
986                FullKey::decode(&ret[0].smallest_key)
987                    .user_key
988                    .table_id
989                    .table_id()
990            );
991            assert_eq!(
992                3,
993                FullKey::decode(&ret[2].smallest_key)
994                    .user_key
995                    .table_id
996                    .table_id()
997            );
998        }
999
1000        {
1001            let block_metas = vec![
1002                BlockMeta {
1003                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1004                    ..Default::default()
1005                },
1006                BlockMeta {
1007                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1008                    ..Default::default()
1009                },
1010                BlockMeta {
1011                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1012                    ..Default::default()
1013                },
1014            ];
1015
1016            let ret = filter_block_metas(
1017                &block_metas,
1018                &HashSet::from_iter(vec![2_u32, 3].into_iter()),
1019                KeyRange::default(),
1020            );
1021
1022            assert_eq!(2, ret.len());
1023            assert_eq!(
1024                2,
1025                FullKey::decode(&ret[0].smallest_key)
1026                    .user_key
1027                    .table_id
1028                    .table_id()
1029            );
1030            assert_eq!(
1031                3,
1032                FullKey::decode(&ret[1].smallest_key)
1033                    .user_key
1034                    .table_id
1035                    .table_id()
1036            );
1037        }
1038
1039        {
1040            let block_metas = vec![
1041                BlockMeta {
1042                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1043                    ..Default::default()
1044                },
1045                BlockMeta {
1046                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1047                    ..Default::default()
1048                },
1049                BlockMeta {
1050                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1051                    ..Default::default()
1052                },
1053            ];
1054
1055            let ret = filter_block_metas(
1056                &block_metas,
1057                &HashSet::from_iter(vec![1_u32, 2_u32].into_iter()),
1058                KeyRange::default(),
1059            );
1060
1061            assert_eq!(2, ret.len());
1062            assert_eq!(
1063                1,
1064                FullKey::decode(&ret[0].smallest_key)
1065                    .user_key
1066                    .table_id
1067                    .table_id()
1068            );
1069            assert_eq!(
1070                2,
1071                FullKey::decode(&ret[1].smallest_key)
1072                    .user_key
1073                    .table_id
1074                    .table_id()
1075            );
1076        }
1077
1078        {
1079            let block_metas = vec![
1080                BlockMeta {
1081                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1082                    ..Default::default()
1083                },
1084                BlockMeta {
1085                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1086                    ..Default::default()
1087                },
1088                BlockMeta {
1089                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1090                    ..Default::default()
1091                },
1092            ];
1093            let ret = filter_block_metas(
1094                &block_metas,
1095                &HashSet::from_iter(vec![2_u32].into_iter()),
1096                KeyRange::default(),
1097            );
1098
1099            assert_eq!(1, ret.len());
1100            assert_eq!(
1101                2,
1102                FullKey::decode(&ret[0].smallest_key)
1103                    .user_key
1104                    .table_id
1105                    .table_id()
1106            );
1107        }
1108
1109        {
1110            let block_metas = vec![
1111                BlockMeta {
1112                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1113                    ..Default::default()
1114                },
1115                BlockMeta {
1116                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1117                    ..Default::default()
1118                },
1119                BlockMeta {
1120                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1121                    ..Default::default()
1122                },
1123                BlockMeta {
1124                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1125                    ..Default::default()
1126                },
1127                BlockMeta {
1128                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1129                    ..Default::default()
1130                },
1131            ];
1132            let ret = filter_block_metas(
1133                &block_metas,
1134                &HashSet::from_iter(vec![2_u32].into_iter()),
1135                KeyRange::default(),
1136            );
1137
1138            assert_eq!(1, ret.len());
1139            assert_eq!(
1140                2,
1141                FullKey::decode(&ret[0].smallest_key)
1142                    .user_key
1143                    .table_id
1144                    .table_id()
1145            );
1146        }
1147
1148        {
1149            let block_metas = vec![
1150                BlockMeta {
1151                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1152                    ..Default::default()
1153                },
1154                BlockMeta {
1155                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1156                    ..Default::default()
1157                },
1158                BlockMeta {
1159                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1160                    ..Default::default()
1161                },
1162                BlockMeta {
1163                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1164                    ..Default::default()
1165                },
1166                BlockMeta {
1167                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1168                    ..Default::default()
1169                },
1170            ];
1171
1172            let ret = filter_block_metas(
1173                &block_metas,
1174                &HashSet::from_iter(vec![2_u32].into_iter()),
1175                KeyRange::default(),
1176            );
1177
1178            assert_eq!(1, ret.len());
1179            assert_eq!(
1180                2,
1181                FullKey::decode(&ret[0].smallest_key)
1182                    .user_key
1183                    .table_id
1184                    .table_id()
1185            );
1186        }
1187    }
1188
1189    #[tokio::test]
1190    async fn test_iterator_same_obj() {
1191        let sstable_store = mock_sstable_store().await;
1192
1193        let table_info = gen_test_sstable_info(
1194            default_builder_opt_for_test(),
1195            1_u64,
1196            (1..10000).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
1197            sstable_store.clone(),
1198        )
1199        .await;
1200
1201        let split_key = test_key_of(5000).encode();
1202        let sst_1: SstableInfo = SstableInfoInner {
1203            key_range: KeyRange {
1204                left: table_info.key_range.left.clone(),
1205                right: split_key.clone().into(),
1206                right_exclusive: true,
1207            },
1208            ..table_info.get_inner()
1209        }
1210        .into();
1211
1212        let total_key_count = sst_1.total_key_count;
1213        let sst_2: SstableInfo = SstableInfoInner {
1214            sst_id: sst_1.sst_id + 1,
1215            key_range: KeyRange {
1216                left: split_key.clone().into(),
1217                right: table_info.key_range.right.clone(),
1218                right_exclusive: table_info.key_range.right_exclusive,
1219            },
1220            ..table_info.get_inner()
1221        }
1222        .into();
1223
1224        {
1225            // test concate
1226            let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1227
1228            let mut iter = ConcatSstableIterator::for_test(
1229                vec![0],
1230                vec![sst_1.clone(), sst_2.clone()],
1231                KeyRange::default(),
1232                sstable_store.clone(),
1233            );
1234
1235            iter.rewind().await.unwrap();
1236
1237            let mut key_count = 0;
1238            while iter.is_valid() {
1239                let is_new_user_key = full_key_tracker.observe(iter.key());
1240                assert!(is_new_user_key);
1241                key_count += 1;
1242                iter.next().await.unwrap();
1243            }
1244
1245            assert_eq!(total_key_count, key_count);
1246        }
1247
1248        {
1249            let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1250            let concat_1 = ConcatSstableIterator::for_test(
1251                vec![0],
1252                vec![sst_1.clone()],
1253                KeyRange::default(),
1254                sstable_store.clone(),
1255            );
1256
1257            let concat_2 = ConcatSstableIterator::for_test(
1258                vec![0],
1259                vec![sst_2.clone()],
1260                KeyRange::default(),
1261                sstable_store.clone(),
1262            );
1263
1264            let mut key_count = 0;
1265            let mut iter = MergeIterator::for_compactor(vec![concat_1, concat_2]);
1266            iter.rewind().await.unwrap();
1267            while iter.is_valid() {
1268                full_key_tracker.observe(iter.key());
1269                key_count += 1;
1270                iter.next().await.unwrap();
1271            }
1272            assert_eq!(total_key_count, key_count);
1273        }
1274    }
1275}