risingwave_storage/hummock/compactor/
iterator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::sync::atomic::AtomicU64;
18use std::sync::{Arc, atomic};
19use std::time::Instant;
20
21use await_tree::{InstrumentAwait, SpanExt};
22use fail::fail_point;
23use risingwave_common::catalog::TableId;
24use risingwave_hummock_sdk::KeyComparator;
25use risingwave_hummock_sdk::compaction_group::StateTableId;
26use risingwave_hummock_sdk::key::FullKey;
27use risingwave_hummock_sdk::key_range::KeyRange;
28use risingwave_hummock_sdk::sstable_info::SstableInfo;
29
30use crate::hummock::block_stream::BlockDataStream;
31use crate::hummock::compactor::task_progress::TaskProgress;
32use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
33use crate::hummock::sstable_store::SstableStoreRef;
34use crate::hummock::value::HummockValue;
35use crate::hummock::{BlockHolder, BlockIterator, BlockMeta, HummockResult};
36use crate::monitor::StoreLocalStatistic;
37
38const PROGRESS_KEY_INTERVAL: usize = 100;
39
40/// Iterates over the KV-pairs of an SST while downloading it.
41/// `SstableStreamIterator` encapsulates operations on `sstables`, constructing block streams and accessing the corresponding data via `block_metas`.
42///  Note that a `block_meta` does not necessarily correspond to the entire sstable, but rather to a subset, which is documented via the `block_idx`.
43pub struct SstableStreamIterator {
44    sstable_store: SstableStoreRef,
45    /// The block metas subset of the SST.
46    block_metas: Vec<BlockMeta>,
47    /// The downloading stream.
48    block_stream: Option<BlockDataStream>,
49
50    /// Iterates over the KV-pairs of the current block.
51    block_iter: Option<BlockIterator>,
52
53    /// Index of the current block.
54    block_idx: usize,
55
56    /// Counts the time used for IO.
57    stats_ptr: Arc<AtomicU64>,
58
59    /// For key sanity check of divided SST and debugging
60    sstable_info: SstableInfo,
61
62    /// To Filter out the blocks
63    sstable_table_ids: HashSet<StateTableId>,
64    task_progress: Arc<TaskProgress>,
65    io_retry_times: usize,
66    max_io_retry_times: usize,
67
68    // key range cache
69    key_range_left: FullKey<Vec<u8>>,
70    key_range_right: FullKey<Vec<u8>>,
71    key_range_right_exclusive: bool,
72}
73
74impl SstableStreamIterator {
75    // We have to handle two internal iterators.
76    //   `block_stream`: iterates over the blocks of the table.
77    //     `block_iter`: iterates over the KV-pairs of the current block.
78    // These iterators work in different ways.
79
80    // BlockIterator works as follows: After new(), we call seek(). That brings us
81    // to the first element. Calling next() then brings us to the second element and does not
82    // return anything.
83
84    // BlockStream follows a different approach. After new(), we do not seek, instead next()
85    // returns the first value.
86
87    /// Initialises a new [`SstableStreamIterator`] which iterates over the given [`BlockDataStream`].
88    /// The iterator reads at most `max_block_count` from the stream.
89    pub fn new(
90        block_metas: Vec<BlockMeta>,
91        sstable_info: SstableInfo,
92        stats: &StoreLocalStatistic,
93        task_progress: Arc<TaskProgress>,
94        sstable_store: SstableStoreRef,
95        max_io_retry_times: usize,
96    ) -> Self {
97        let sstable_table_ids = HashSet::from_iter(sstable_info.table_ids.iter().cloned());
98
99        // filter the block meta with key range
100        let block_metas = filter_block_metas(
101            &block_metas,
102            &sstable_table_ids,
103            sstable_info.key_range.clone(),
104        );
105
106        let key_range_left = FullKey::decode(&sstable_info.key_range.left).to_vec();
107        let key_range_right = FullKey::decode(&sstable_info.key_range.right).to_vec();
108        let key_range_right_exclusive = sstable_info.key_range.right_exclusive;
109
110        Self {
111            block_stream: None,
112            block_iter: None,
113            block_metas,
114            block_idx: 0,
115            stats_ptr: stats.remote_io_time.clone(),
116            sstable_table_ids,
117            sstable_info,
118            sstable_store,
119            task_progress,
120            io_retry_times: 0,
121            max_io_retry_times,
122            key_range_left,
123            key_range_right,
124            key_range_right_exclusive,
125        }
126    }
127
128    async fn create_stream(&mut self) -> HummockResult<()> {
129        let block_stream = self
130            .sstable_store
131            .get_stream_for_blocks(
132                self.sstable_info.object_id,
133                &self.block_metas[self.block_idx..],
134            )
135            .instrument_await("stream_iter_get_stream".verbose())
136            .await?;
137        self.block_stream = Some(block_stream);
138        Ok(())
139    }
140
141    async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> {
142        while let Some(block_iter) = self.block_iter.as_mut() {
143            if self.sstable_table_ids.contains(&block_iter.table_id()) {
144                return Ok(());
145            } else {
146                self.next_block().await?;
147            }
148        }
149        Ok(())
150    }
151
152    /// Initialises the iterator by moving it to the first KV-pair in the stream's first block where
153    /// key >= `seek_key`. If that block does not contain such a KV-pair, the iterator continues to
154    /// the first KV-pair of the next block. If `seek_key` is not given, the iterator will move to
155    /// the very first KV-pair of the stream's first block.
156    pub async fn seek(&mut self, seek_key: Option<FullKey<&[u8]>>) -> HummockResult<()> {
157        // Load first block.
158        self.next_block().await?;
159
160        // We assume that a block always contains at least one KV pair. Subsequently, if
161        // `next_block()` loads a new block (i.e., `block_iter` is not `None`), then `block_iter` is
162        // also valid and pointing on the block's first KV-pair.
163
164        let seek_key = if let Some(seek_key) = seek_key {
165            if seek_key.cmp(&self.key_range_left.to_ref()).is_lt() {
166                Some(self.key_range_left.to_ref())
167            } else {
168                Some(seek_key)
169            }
170        } else {
171            Some(self.key_range_left.to_ref())
172        };
173
174        if let (Some(block_iter), Some(seek_key)) = (self.block_iter.as_mut(), seek_key) {
175            block_iter.seek(seek_key);
176
177            if !block_iter.is_valid() {
178                // `seek_key` is larger than everything in the first block.
179                self.next_block().await?;
180            }
181        }
182
183        self.prune_from_valid_block_iter().await?;
184        Ok(())
185    }
186
187    /// Loads a new block, creates a new iterator for it, and stores that iterator in
188    /// `self.block_iter`. The created iterator points to the block's first KV-pair. If the end of
189    /// the stream is reached or `self.remaining_blocks` is zero, then the function sets
190    /// `self.block_iter` to `None`.
191    async fn next_block(&mut self) -> HummockResult<()> {
192        // Check if we want and if we can load the next block.
193        let now = Instant::now();
194        let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
195            let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
196            stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
197        });
198        if self.block_idx < self.block_metas.len() {
199            loop {
200                let ret = match &mut self.block_stream {
201                    Some(block_stream) => block_stream.next_block().await,
202                    None => {
203                        self.create_stream().await?;
204                        continue;
205                    }
206                };
207                match ret {
208                    Ok(Some(block)) => {
209                        let mut block_iter =
210                            BlockIterator::new(BlockHolder::from_owned_block(block));
211                        block_iter.seek_to_first();
212                        self.block_idx += 1;
213                        self.block_iter = Some(block_iter);
214                        return Ok(());
215                    }
216                    Ok(None) => break,
217                    Err(e) => {
218                        if !e.is_object_error() || !self.need_recreate_io_stream() {
219                            return Err(e);
220                        }
221                        self.block_stream.take();
222                        self.io_retry_times += 1;
223                        fail_point!("create_stream_err");
224
225                        tracing::warn!(
226                            "retry create stream for sstable {} times, sstinfo={}",
227                            self.io_retry_times,
228                            self.sst_debug_info()
229                        );
230                    }
231                }
232            }
233        }
234        self.block_idx = self.block_metas.len();
235        self.block_iter = None;
236
237        Ok(())
238    }
239
240    /// Moves to the next KV-pair in the table. Assumes that the current position is valid. Even if
241    /// the next position is invalid, the function returns `Ok(())`.
242    ///
243    /// Do not use `next()` to initialise the iterator (i.e. do not use it to find the first
244    /// KV-pair). Instead, use `seek()`. Afterwards, use `next()` to reach the second KV-pair and
245    /// onwards.
246    pub async fn next(&mut self) -> HummockResult<()> {
247        if !self.is_valid() {
248            return Ok(());
249        }
250
251        let block_iter = self.block_iter.as_mut().expect("no block iter");
252        block_iter.next();
253        if !block_iter.is_valid() {
254            self.next_block().await?;
255            self.prune_from_valid_block_iter().await?;
256        }
257
258        if !self.is_valid() {
259            return Ok(());
260        }
261
262        // Check if we need to skip the block.
263        let key = self
264            .block_iter
265            .as_ref()
266            .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
267            .key();
268
269        if self.exceed_key_range_right(key) {
270            self.block_iter = None;
271        }
272
273        Ok(())
274    }
275
276    pub fn key(&self) -> FullKey<&[u8]> {
277        let key = self
278            .block_iter
279            .as_ref()
280            .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
281            .key();
282
283        assert!(
284            !self.exceed_key_range_left(key),
285            "key {:?} key_range_left {:?}",
286            key,
287            self.key_range_left.to_ref()
288        );
289
290        assert!(
291            !self.exceed_key_range_right(key),
292            "key {:?} key_range_right {:?} key_range_right_exclusive {}",
293            key,
294            self.key_range_right.to_ref(),
295            self.key_range_right_exclusive
296        );
297
298        key
299    }
300
301    pub fn value(&self) -> HummockValue<&[u8]> {
302        let raw_value = self
303            .block_iter
304            .as_ref()
305            .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
306            .value();
307        HummockValue::from_slice(raw_value)
308            .unwrap_or_else(|_| panic!("decode error sstinfo={}", self.sst_debug_info()))
309    }
310
311    pub fn is_valid(&self) -> bool {
312        // True iff block_iter exists and is valid.
313        self.block_iter.as_ref().is_some_and(|i| i.is_valid())
314    }
315
316    fn sst_debug_info(&self) -> String {
317        format!(
318            "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
319            self.sstable_info.object_id,
320            self.sstable_info.sst_id,
321            self.sstable_info.meta_offset,
322            self.sstable_info.table_ids
323        )
324    }
325
326    fn need_recreate_io_stream(&self) -> bool {
327        self.io_retry_times < self.max_io_retry_times
328    }
329
330    fn exceed_key_range_left(&self, key: FullKey<&[u8]>) -> bool {
331        key.cmp(&self.key_range_left.to_ref()).is_lt()
332    }
333
334    fn exceed_key_range_right(&self, key: FullKey<&[u8]>) -> bool {
335        if self.key_range_right_exclusive {
336            key.cmp(&self.key_range_right.to_ref()).is_ge()
337        } else {
338            key.cmp(&self.key_range_right.to_ref()).is_gt()
339        }
340    }
341}
342
343impl Drop for SstableStreamIterator {
344    fn drop(&mut self) {
345        self.task_progress.dec_num_pending_read_io()
346    }
347}
348
349/// Iterates over the KV-pairs of a given list of SSTs. The key-ranges of these SSTs are assumed to
350/// be consecutive and non-overlapping.
351pub struct ConcatSstableIterator {
352    /// **CAUTION:** `key_range` is used for optimization. It doesn't guarantee value returned by
353    /// the iterator is in this range.
354    key_range: KeyRange,
355
356    /// The iterator of the current table.
357    sstable_iter: Option<SstableStreamIterator>,
358
359    /// Current table index.
360    cur_idx: usize,
361
362    /// All non-overlapping tables.
363    sstables: Vec<SstableInfo>,
364
365    existing_table_ids: HashSet<StateTableId>,
366
367    sstable_store: SstableStoreRef,
368
369    stats: StoreLocalStatistic,
370    task_progress: Arc<TaskProgress>,
371    max_io_retry_times: usize,
372}
373
374impl ConcatSstableIterator {
375    /// Caller should make sure that `tables` are non-overlapping,
376    /// arranged in ascending order when it serves as a forward iterator,
377    /// and arranged in descending order when it serves as a backward iterator.
378    pub fn new(
379        existing_table_ids: Vec<StateTableId>,
380        sst_infos: Vec<SstableInfo>,
381        key_range: KeyRange,
382        sstable_store: SstableStoreRef,
383        task_progress: Arc<TaskProgress>,
384        max_io_retry_times: usize,
385    ) -> Self {
386        Self {
387            key_range,
388            sstable_iter: None,
389            cur_idx: 0,
390            sstables: sst_infos,
391            existing_table_ids: HashSet::from_iter(existing_table_ids),
392            sstable_store,
393            task_progress,
394            stats: StoreLocalStatistic::default(),
395            max_io_retry_times,
396        }
397    }
398
399    #[cfg(test)]
400    pub fn for_test(
401        existing_table_ids: Vec<impl Into<StateTableId>>,
402        sst_infos: Vec<SstableInfo>,
403        key_range: KeyRange,
404        sstable_store: SstableStoreRef,
405    ) -> Self {
406        Self::new(
407            existing_table_ids.into_iter().map(Into::into).collect(),
408            sst_infos,
409            key_range,
410            sstable_store,
411            Arc::new(TaskProgress::default()),
412            0,
413        )
414    }
415
416    /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
417    async fn seek_idx(
418        &mut self,
419        idx: usize,
420        seek_key: Option<FullKey<&[u8]>>,
421    ) -> HummockResult<()> {
422        self.sstable_iter.take();
423        let mut seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty())
424        {
425            (Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) {
426                Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)),
427                Ordering::Greater => Some(seek_key),
428            },
429            (Some(seek_key), true) => Some(seek_key),
430            (None, true) => None,
431            (None, false) => Some(FullKey::decode(&self.key_range.left)),
432        };
433
434        self.cur_idx = idx;
435        while self.cur_idx < self.sstables.len() {
436            let table_info = &self.sstables[self.cur_idx];
437            let mut found = table_info
438                .table_ids
439                .iter()
440                .any(|table_id| self.existing_table_ids.contains(table_id));
441            if !found {
442                self.cur_idx += 1;
443                seek_key = None;
444                continue;
445            }
446            let sstable = self
447                .sstable_store
448                .sstable(table_info, &mut self.stats)
449                .instrument_await("stream_iter_sstable".verbose())
450                .await?;
451
452            let filter_key_range = match seek_key {
453                Some(seek_key) => {
454                    KeyRange::new(seek_key.encode().into(), self.key_range.right.clone())
455                }
456                None => self.key_range.clone(),
457            };
458
459            let block_metas = filter_block_metas(
460                &sstable.meta.block_metas,
461                &self.existing_table_ids,
462                filter_key_range,
463            );
464
465            if block_metas.is_empty() {
466                found = false;
467            } else {
468                self.task_progress.inc_num_pending_read_io();
469                let mut sstable_iter = SstableStreamIterator::new(
470                    block_metas,
471                    table_info.clone(),
472                    &self.stats,
473                    self.task_progress.clone(),
474                    self.sstable_store.clone(),
475                    self.max_io_retry_times,
476                );
477                sstable_iter.seek(seek_key).await?;
478
479                if sstable_iter.is_valid() {
480                    self.sstable_iter = Some(sstable_iter);
481                } else {
482                    found = false;
483                }
484            }
485            if found {
486                return Ok(());
487            } else {
488                self.cur_idx += 1;
489                seek_key = None;
490            }
491        }
492        Ok(())
493    }
494}
495
496impl HummockIterator for ConcatSstableIterator {
497    type Direction = Forward;
498
499    async fn next(&mut self) -> HummockResult<()> {
500        let sstable_iter = self.sstable_iter.as_mut().expect("no table iter");
501
502        // Does just calling `next()` suffice?
503        sstable_iter.next().await?;
504        if sstable_iter.is_valid() {
505            Ok(())
506        } else {
507            // No, seek to next table.
508            self.seek_idx(self.cur_idx + 1, None).await?;
509            Ok(())
510        }
511    }
512
513    fn key(&self) -> FullKey<&[u8]> {
514        self.sstable_iter.as_ref().expect("no table iter").key()
515    }
516
517    fn value(&self) -> HummockValue<&[u8]> {
518        self.sstable_iter.as_ref().expect("no table iter").value()
519    }
520
521    fn is_valid(&self) -> bool {
522        self.sstable_iter.as_ref().is_some_and(|i| i.is_valid())
523    }
524
525    async fn rewind(&mut self) -> HummockResult<()> {
526        self.seek_idx(0, None).await
527    }
528
529    /// Resets the iterator and seeks to the first position where the stored key >= `key`.
530    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
531        let seek_key = if self.key_range.left.is_empty() {
532            key
533        } else {
534            match key.cmp(&FullKey::decode(&self.key_range.left)) {
535                Ordering::Less | Ordering::Equal => FullKey::decode(&self.key_range.left),
536                Ordering::Greater => key,
537            }
538        };
539        let table_idx = self.sstables.partition_point(|table| {
540            // We use the maximum key of an SST for the search. That way, we guarantee that the
541            // resulting SST contains either that key or the next-larger KV-pair. Subsequently,
542            // we avoid calling `seek_idx()` twice if the determined SST does not contain `key`.
543
544            // Note that we need to use `<` instead of `<=` to ensure that all keys in an SST
545            // (including its max. key) produce the same search result.
546            let max_sst_key = &table.key_range.right;
547            FullKey::decode(max_sst_key).cmp(&seek_key) == Ordering::Less
548        });
549
550        self.seek_idx(table_idx, Some(key)).await
551    }
552
553    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
554        stats.add(&self.stats)
555    }
556
557    fn value_meta(&self) -> ValueMeta {
558        let iter = self.sstable_iter.as_ref().expect("no table iter");
559        // sstable_iter's block_idx must have advanced at least one.
560        // See SstableStreamIterator::next_block.
561        assert!(iter.block_idx >= 1);
562        ValueMeta {
563            object_id: Some(iter.sstable_info.object_id),
564            block_id: Some(iter.block_idx as u64 - 1),
565        }
566    }
567}
568
569pub struct MonitoredCompactorIterator<I> {
570    inner: I,
571    task_progress: Arc<TaskProgress>,
572
573    processed_key_num: usize,
574}
575
576impl<I: HummockIterator<Direction = Forward>> MonitoredCompactorIterator<I> {
577    pub fn new(inner: I, task_progress: Arc<TaskProgress>) -> Self {
578        Self {
579            inner,
580            task_progress,
581            processed_key_num: 0,
582        }
583    }
584}
585
586impl<I: HummockIterator<Direction = Forward>> HummockIterator for MonitoredCompactorIterator<I> {
587    type Direction = Forward;
588
589    async fn next(&mut self) -> HummockResult<()> {
590        self.inner.next().await?;
591        self.processed_key_num += 1;
592
593        if self.processed_key_num.is_multiple_of(PROGRESS_KEY_INTERVAL) {
594            self.task_progress
595                .inc_progress_key(PROGRESS_KEY_INTERVAL as _);
596        }
597
598        Ok(())
599    }
600
601    fn key(&self) -> FullKey<&[u8]> {
602        self.inner.key()
603    }
604
605    fn value(&self) -> HummockValue<&[u8]> {
606        self.inner.value()
607    }
608
609    fn is_valid(&self) -> bool {
610        self.inner.is_valid()
611    }
612
613    async fn rewind(&mut self) -> HummockResult<()> {
614        self.processed_key_num = 0;
615        self.inner.rewind().await?;
616        Ok(())
617    }
618
619    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
620        self.processed_key_num = 0;
621        self.inner.seek(key).await?;
622        Ok(())
623    }
624
625    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
626        self.inner.collect_local_statistic(stats)
627    }
628
629    fn value_meta(&self) -> ValueMeta {
630        self.inner.value_meta()
631    }
632}
633
634pub(crate) fn filter_block_metas(
635    block_metas: &Vec<BlockMeta>,
636    existing_table_ids: &HashSet<TableId>,
637    key_range: KeyRange,
638) -> Vec<BlockMeta> {
639    if block_metas.is_empty() {
640        return vec![];
641    }
642
643    let mut start_index = if key_range.left.is_empty() {
644        0
645    } else {
646        // start_index points to the greatest block whose smallest_key <= seek_key.
647        block_metas
648            .partition_point(|block| {
649                KeyComparator::compare_encoded_full_key(&key_range.left, &block.smallest_key)
650                    != Ordering::Less
651            })
652            .saturating_sub(1)
653    };
654
655    let mut end_index = if key_range.right.is_empty() {
656        block_metas.len()
657    } else {
658        let ret = block_metas.partition_point(|block| {
659            KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right)
660                != Ordering::Greater
661        });
662
663        if ret == 0 {
664            // not found
665            return vec![];
666        }
667
668        ret
669    }
670    .saturating_sub(1);
671
672    // skip blocks that are not in existing_table_ids
673    while start_index <= end_index {
674        let start_block_table_id = block_metas[start_index].table_id();
675        if existing_table_ids.contains(&start_block_table_id) {
676            break;
677        }
678
679        // skip this table_id
680        let old_start_index = start_index;
681        let block_metas_to_search = &block_metas[start_index..=end_index];
682
683        start_index += block_metas_to_search
684            .partition_point(|block_meta| block_meta.table_id() == start_block_table_id);
685
686        if old_start_index == start_index {
687            // no more blocks with the same table_id
688            break;
689        }
690    }
691
692    while start_index <= end_index {
693        let end_block_table_id = block_metas[end_index].table_id();
694        if existing_table_ids.contains(&end_block_table_id) {
695            break;
696        }
697
698        let old_end_index = end_index;
699        let block_metas_to_search = &block_metas[start_index..=end_index];
700
701        end_index = start_index
702            + block_metas_to_search
703                .partition_point(|block_meta| block_meta.table_id() < end_block_table_id)
704                .saturating_sub(1);
705
706        if end_index == old_end_index {
707            // no more blocks with the same table_id
708            break;
709        }
710    }
711
712    if start_index > end_index {
713        return vec![];
714    }
715
716    block_metas[start_index..=end_index].to_vec()
717}
718
719#[cfg(test)]
720mod tests {
721    use std::cmp::Ordering;
722    use std::collections::HashSet;
723
724    use risingwave_common::catalog::TableId;
725    use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, next_full_key, prev_full_key};
726    use risingwave_hummock_sdk::key_range::KeyRange;
727    use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
728
729    use crate::hummock::BlockMeta;
730    use crate::hummock::compactor::ConcatSstableIterator;
731    use crate::hummock::iterator::test_utils::mock_sstable_store;
732    use crate::hummock::iterator::{HummockIterator, MergeIterator};
733    use crate::hummock::test_utils::{
734        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_info, test_key_of,
735        test_value_of,
736    };
737    use crate::hummock::value::HummockValue;
738
739    #[tokio::test]
740    async fn test_concat_iterator() {
741        let sstable_store = mock_sstable_store().await;
742        let mut table_infos = vec![];
743        for object_id in 0..3 {
744            let start_index = object_id * TEST_KEYS_COUNT;
745            let end_index = (object_id + 1) * TEST_KEYS_COUNT;
746            let table_info = gen_test_sstable_info(
747                default_builder_opt_for_test(),
748                object_id as u64,
749                (start_index..end_index)
750                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
751                sstable_store.clone(),
752            )
753            .await;
754            table_infos.push(table_info);
755        }
756        let start_index = 5000;
757        let end_index = 25000;
758
759        let kr = KeyRange::new(
760            test_key_of(start_index).encode().into(),
761            test_key_of(end_index).encode().into(),
762        );
763        let mut iter = ConcatSstableIterator::for_test(
764            vec![0],
765            table_infos.clone(),
766            kr.clone(),
767            sstable_store.clone(),
768        );
769        iter.seek(FullKey::decode(&kr.left)).await.unwrap();
770
771        for idx in start_index..end_index {
772            let key = iter.key();
773            let val = iter.value();
774            assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
775            assert_eq!(
776                val.into_user_value().unwrap(),
777                test_value_of(idx).as_slice()
778            );
779            iter.next().await.unwrap();
780        }
781
782        // seek non-overlap range
783        let kr = KeyRange::new(
784            test_key_of(30000).encode().into(),
785            test_key_of(40000).encode().into(),
786        );
787        let mut iter = ConcatSstableIterator::for_test(
788            vec![0],
789            table_infos.clone(),
790            kr.clone(),
791            sstable_store.clone(),
792        );
793        iter.seek(FullKey::decode(&kr.left)).await.unwrap();
794        assert!(!iter.is_valid());
795        let kr = KeyRange::new(
796            test_key_of(start_index).encode().into(),
797            test_key_of(40000).encode().into(),
798        );
799        let mut iter = ConcatSstableIterator::for_test(
800            vec![0],
801            table_infos.clone(),
802            kr.clone(),
803            sstable_store.clone(),
804        );
805        iter.seek(FullKey::decode(&kr.left)).await.unwrap();
806        for idx in start_index..30000 {
807            let key = iter.key();
808            let val = iter.value();
809            assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
810            assert_eq!(
811                val.into_user_value().unwrap(),
812                test_value_of(idx).as_slice()
813            );
814            iter.next().await.unwrap();
815        }
816        assert!(!iter.is_valid());
817
818        // Test seek. Result is dominated by given seek key rather than key range.
819        let kr = KeyRange::new(
820            test_key_of(0).encode().into(),
821            test_key_of(40000).encode().into(),
822        );
823        let mut iter = ConcatSstableIterator::for_test(
824            vec![0],
825            table_infos.clone(),
826            kr.clone(),
827            sstable_store.clone(),
828        );
829        iter.seek(test_key_of(10000).to_ref()).await.unwrap();
830        assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10000).to_ref());
831        iter.seek(test_key_of(10001).to_ref()).await.unwrap();
832        assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10001).to_ref());
833        iter.seek(test_key_of(9999).to_ref()).await.unwrap();
834        assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(9999).to_ref());
835        iter.seek(test_key_of(1).to_ref()).await.unwrap();
836        assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(1).to_ref());
837        iter.seek(test_key_of(29999).to_ref()).await.unwrap();
838        assert!(iter.is_valid() && iter.cur_idx == 2 && iter.key() == test_key_of(29999).to_ref());
839        iter.seek(test_key_of(30000).to_ref()).await.unwrap();
840        assert!(!iter.is_valid());
841
842        // Test seek. Result is dominated by key range rather than given seek key.
843        let kr = KeyRange::new(
844            test_key_of(6000).encode().into(),
845            test_key_of(16000).encode().into(),
846        );
847        let mut iter = ConcatSstableIterator::for_test(
848            vec![0],
849            table_infos.clone(),
850            kr.clone(),
851            sstable_store.clone(),
852        );
853        iter.seek(test_key_of(17000).to_ref()).await.unwrap();
854        assert!(!iter.is_valid());
855        iter.seek(test_key_of(1).to_ref()).await.unwrap();
856        assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == FullKey::decode(&kr.left));
857    }
858
859    #[tokio::test]
860    async fn test_concat_iterator_seek_idx() {
861        let sstable_store = mock_sstable_store().await;
862        let mut table_infos = vec![];
863        for object_id in 0..3 {
864            let start_index = object_id * TEST_KEYS_COUNT + TEST_KEYS_COUNT / 2;
865            let end_index = (object_id + 1) * TEST_KEYS_COUNT;
866            let table_info = gen_test_sstable_info(
867                default_builder_opt_for_test(),
868                object_id as u64,
869                (start_index..end_index)
870                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
871                sstable_store.clone(),
872            )
873            .await;
874            table_infos.push(table_info);
875        }
876
877        // Test seek_idx. Result is dominated by given seek key rather than key range.
878        let kr = KeyRange::new(
879            test_key_of(0).encode().into(),
880            test_key_of(40000).encode().into(),
881        );
882        let mut iter = ConcatSstableIterator::for_test(
883            vec![0],
884            table_infos.clone(),
885            kr.clone(),
886            sstable_store.clone(),
887        );
888        let sst = sstable_store
889            .sstable(&iter.sstables[0], &mut iter.stats)
890            .await
891            .unwrap();
892        let block_metas = &sst.meta.block_metas;
893        let block_1_smallest_key = block_metas[1].smallest_key.clone();
894        let block_2_smallest_key = block_metas[2].smallest_key.clone();
895        // Use block_1_smallest_key as seek key and result in the first KV of block 1.
896        let seek_key = block_1_smallest_key.clone();
897        iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
898            .await
899            .unwrap();
900        assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
901        // Use prev_full_key(block_1_smallest_key) as seek key and result in the first KV of block
902        // 1.
903        let seek_key = prev_full_key(block_1_smallest_key.as_slice());
904        iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
905            .await
906            .unwrap();
907        assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
908        iter.next().await.unwrap();
909        let block_1_second_key = iter.key().to_vec();
910        // Use a big enough seek key and result in invalid iterator.
911        let seek_key = test_key_of(30001);
912        iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref()))
913            .await
914            .unwrap();
915        assert!(!iter.is_valid());
916
917        // Test seek_idx. Result is dominated by key range rather than given seek key.
918        let kr = KeyRange::new(
919            next_full_key(&block_1_smallest_key).into(),
920            prev_full_key(&block_2_smallest_key).into(),
921        );
922        let mut iter = ConcatSstableIterator::for_test(
923            vec![0],
924            table_infos.clone(),
925            kr.clone(),
926            sstable_store.clone(),
927        );
928        // Use block_2_smallest_key as seek key and result in invalid iterator.
929        let seek_key = FullKey::decode(&block_2_smallest_key);
930        assert!(seek_key.cmp(&FullKey::decode(&kr.right)) == Ordering::Greater);
931        iter.seek_idx(0, Some(seek_key)).await.unwrap();
932        assert!(!iter.is_valid());
933        // Use a small enough seek key and result in the second KV of block 1.
934        let seek_key = test_key_of(0).encode();
935        iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
936            .await
937            .unwrap();
938        assert!(iter.is_valid());
939        assert_eq!(iter.key(), block_1_second_key.to_ref());
940
941        // Use None seek key and result in the second KV of block 1.
942        iter.seek_idx(0, None).await.unwrap();
943        assert!(iter.is_valid());
944        assert_eq!(iter.key(), block_1_second_key.to_ref());
945    }
946
947    #[tokio::test]
948    async fn test_filter_block_metas() {
949        use crate::hummock::compactor::iterator::filter_block_metas;
950
951        {
952            let block_metas = Vec::default();
953
954            let ret = filter_block_metas(&block_metas, &HashSet::default(), KeyRange::default());
955
956            assert!(ret.is_empty());
957        }
958
959        {
960            let block_metas = vec![
961                BlockMeta {
962                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
963                    ..Default::default()
964                },
965                BlockMeta {
966                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
967                    ..Default::default()
968                },
969                BlockMeta {
970                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
971                    ..Default::default()
972                },
973            ];
974
975            let ret = filter_block_metas(
976                &block_metas,
977                &HashSet::from_iter(vec![1_u32.into(), 2.into(), 3.into()].into_iter()),
978                KeyRange::default(),
979            );
980
981            assert_eq!(3, ret.len());
982            assert_eq!(
983                1,
984                FullKey::decode(&ret[0].smallest_key)
985                    .user_key
986                    .table_id
987                    .as_raw_id()
988            );
989            assert_eq!(
990                3,
991                FullKey::decode(&ret[2].smallest_key)
992                    .user_key
993                    .table_id
994                    .as_raw_id()
995            );
996        }
997
998        {
999            let block_metas = vec![
1000                BlockMeta {
1001                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1002                    ..Default::default()
1003                },
1004                BlockMeta {
1005                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1006                    ..Default::default()
1007                },
1008                BlockMeta {
1009                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1010                    ..Default::default()
1011                },
1012            ];
1013
1014            let ret = filter_block_metas(
1015                &block_metas,
1016                &HashSet::from_iter(vec![2_u32.into(), 3.into()].into_iter()),
1017                KeyRange::default(),
1018            );
1019
1020            assert_eq!(2, ret.len());
1021            assert_eq!(
1022                2,
1023                FullKey::decode(&ret[0].smallest_key)
1024                    .user_key
1025                    .table_id
1026                    .as_raw_id()
1027            );
1028            assert_eq!(
1029                3,
1030                FullKey::decode(&ret[1].smallest_key)
1031                    .user_key
1032                    .table_id
1033                    .as_raw_id()
1034            );
1035        }
1036
1037        {
1038            let block_metas = vec![
1039                BlockMeta {
1040                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1041                    ..Default::default()
1042                },
1043                BlockMeta {
1044                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1045                    ..Default::default()
1046                },
1047                BlockMeta {
1048                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1049                    ..Default::default()
1050                },
1051            ];
1052
1053            let ret = filter_block_metas(
1054                &block_metas,
1055                &HashSet::from_iter(vec![1_u32.into(), 2_u32.into()].into_iter()),
1056                KeyRange::default(),
1057            );
1058
1059            assert_eq!(2, ret.len());
1060            assert_eq!(
1061                1,
1062                FullKey::decode(&ret[0].smallest_key)
1063                    .user_key
1064                    .table_id
1065                    .as_raw_id()
1066            );
1067            assert_eq!(
1068                2,
1069                FullKey::decode(&ret[1].smallest_key)
1070                    .user_key
1071                    .table_id
1072                    .as_raw_id()
1073            );
1074        }
1075
1076        {
1077            let block_metas = vec![
1078                BlockMeta {
1079                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1080                    ..Default::default()
1081                },
1082                BlockMeta {
1083                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1084                    ..Default::default()
1085                },
1086                BlockMeta {
1087                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1088                    ..Default::default()
1089                },
1090            ];
1091            let ret = filter_block_metas(
1092                &block_metas,
1093                &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1094                KeyRange::default(),
1095            );
1096
1097            assert_eq!(1, ret.len());
1098            assert_eq!(
1099                2,
1100                FullKey::decode(&ret[0].smallest_key)
1101                    .user_key
1102                    .table_id
1103                    .as_raw_id()
1104            );
1105        }
1106
1107        {
1108            let block_metas = vec![
1109                BlockMeta {
1110                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1111                    ..Default::default()
1112                },
1113                BlockMeta {
1114                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1115                    ..Default::default()
1116                },
1117                BlockMeta {
1118                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1119                    ..Default::default()
1120                },
1121                BlockMeta {
1122                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1123                    ..Default::default()
1124                },
1125                BlockMeta {
1126                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1127                    ..Default::default()
1128                },
1129            ];
1130            let ret = filter_block_metas(
1131                &block_metas,
1132                &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1133                KeyRange::default(),
1134            );
1135
1136            assert_eq!(1, ret.len());
1137            assert_eq!(
1138                2,
1139                FullKey::decode(&ret[0].smallest_key)
1140                    .user_key
1141                    .table_id
1142                    .as_raw_id()
1143            );
1144        }
1145
1146        {
1147            let block_metas = vec![
1148                BlockMeta {
1149                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1150                    ..Default::default()
1151                },
1152                BlockMeta {
1153                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1154                    ..Default::default()
1155                },
1156                BlockMeta {
1157                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1158                    ..Default::default()
1159                },
1160                BlockMeta {
1161                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1162                    ..Default::default()
1163                },
1164                BlockMeta {
1165                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1166                    ..Default::default()
1167                },
1168            ];
1169
1170            let ret = filter_block_metas(
1171                &block_metas,
1172                &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1173                KeyRange::default(),
1174            );
1175
1176            assert_eq!(1, ret.len());
1177            assert_eq!(
1178                2,
1179                FullKey::decode(&ret[0].smallest_key)
1180                    .user_key
1181                    .table_id
1182                    .as_raw_id()
1183            );
1184        }
1185    }
1186
1187    #[tokio::test]
1188    async fn test_iterator_same_obj() {
1189        let sstable_store = mock_sstable_store().await;
1190
1191        let table_info = gen_test_sstable_info(
1192            default_builder_opt_for_test(),
1193            1_u64,
1194            (1..10000).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
1195            sstable_store.clone(),
1196        )
1197        .await;
1198
1199        let split_key = test_key_of(5000).encode();
1200        let sst_1: SstableInfo = SstableInfoInner {
1201            key_range: KeyRange {
1202                left: table_info.key_range.left.clone(),
1203                right: split_key.clone().into(),
1204                right_exclusive: true,
1205            },
1206            ..table_info.get_inner()
1207        }
1208        .into();
1209
1210        let total_key_count = sst_1.total_key_count;
1211        let sst_2: SstableInfo = SstableInfoInner {
1212            sst_id: sst_1.sst_id + 1,
1213            key_range: KeyRange {
1214                left: split_key.clone().into(),
1215                right: table_info.key_range.right.clone(),
1216                right_exclusive: table_info.key_range.right_exclusive,
1217            },
1218            ..table_info.get_inner()
1219        }
1220        .into();
1221
1222        {
1223            // test concate
1224            let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1225
1226            let mut iter = ConcatSstableIterator::for_test(
1227                vec![0],
1228                vec![sst_1.clone(), sst_2.clone()],
1229                KeyRange::default(),
1230                sstable_store.clone(),
1231            );
1232
1233            iter.rewind().await.unwrap();
1234
1235            let mut key_count = 0;
1236            while iter.is_valid() {
1237                let is_new_user_key = full_key_tracker.observe(iter.key());
1238                assert!(is_new_user_key);
1239                key_count += 1;
1240                iter.next().await.unwrap();
1241            }
1242
1243            assert_eq!(total_key_count, key_count);
1244        }
1245
1246        {
1247            let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1248            let concat_1 = ConcatSstableIterator::for_test(
1249                vec![0],
1250                vec![sst_1.clone()],
1251                KeyRange::default(),
1252                sstable_store.clone(),
1253            );
1254
1255            let concat_2 = ConcatSstableIterator::for_test(
1256                vec![0],
1257                vec![sst_2.clone()],
1258                KeyRange::default(),
1259                sstable_store.clone(),
1260            );
1261
1262            let mut key_count = 0;
1263            let mut iter = MergeIterator::for_compactor(vec![concat_1, concat_2]);
1264            iter.rewind().await.unwrap();
1265            while iter.is_valid() {
1266                full_key_tracker.observe(iter.key());
1267                key_count += 1;
1268                iter.next().await.unwrap();
1269            }
1270            assert_eq!(total_key_count, key_count);
1271        }
1272    }
1273}