risingwave_storage/hummock/compactor/
iterator.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::ops::Range;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use fail::fail_point;
24use risingwave_common::catalog::TableId;
25use risingwave_hummock_sdk::KeyComparator;
26use risingwave_hummock_sdk::key::FullKey;
27use risingwave_hummock_sdk::key_range::KeyRange;
28use risingwave_hummock_sdk::sstable_info::SstableInfo;
29
30use crate::hummock::block_stream::BlockDataStream;
31use crate::hummock::compactor::task_progress::TaskProgress;
32use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
33use crate::hummock::sstable_store::SstableStoreRef;
34use crate::hummock::value::HummockValue;
35use crate::hummock::{BlockHolder, BlockIterator, BlockMeta, HummockResult, TableHolder};
36use crate::monitor::StoreLocalStatistic;
37
38const PROGRESS_KEY_INTERVAL: usize = 100;
39
40/// Iterates over the KV-pairs of an SST while downloading it.
41/// `SstableStreamIterator` encapsulates operations on `sstables`, constructing block streams and accessing the corresponding data via `block_metas`.
42///  Note that a `block_meta` does not necessarily correspond to the entire sstable, but rather to a subset, which is documented via the `block_idx`.
43pub struct SstableStreamIterator {
44    sstable_store: SstableStoreRef,
45    sstable: TableHolder,
46    /// The range of block metas to iterate over
47    block_metas_range: Range<usize>,
48    /// The downloading stream.
49    block_stream: Option<BlockDataStream>,
50
51    /// Iterates over the KV-pairs of the current block.
52    block_iter: Option<BlockIterator>,
53
54    /// Index of the current block within the range.
55    block_idx: usize,
56
57    /// Counts the time used for IO.
58    stats_ptr: Arc<AtomicU64>,
59
60    /// For key sanity check of divided SST and debugging
61    sstable_info: SstableInfo,
62
63    /// Table ids used to decide which blocks should be read from this SST.
64    read_table_ids: HashSet<TableId>,
65    task_progress: Arc<TaskProgress>,
66    io_retry_times: usize,
67    max_io_retry_times: usize,
68
69    // key range cache
70    key_range_left: FullKey<Vec<u8>>,
71    key_range_right: FullKey<Vec<u8>>,
72    key_range_right_exclusive: bool,
73}
74
75impl SstableStreamIterator {
76    // We have to handle two internal iterators.
77    //   `block_stream`: iterates over the blocks of the table.
78    //     `block_iter`: iterates over the KV-pairs of the current block.
79    // These iterators work in different ways.
80
81    // BlockIterator works as follows: After new(), we call seek(). That brings us
82    // to the first element. Calling next() then brings us to the second element and does not
83    // return anything.
84
85    // BlockStream follows a different approach. After new(), we do not seek, instead next()
86    // returns the first value.
87
88    /// Initialises a new [`SstableStreamIterator`] which iterates over the given [`BlockDataStream`].
89    /// The iterator reads at most `max_block_count` from the stream.
90    pub fn new(
91        sstable: TableHolder,
92        block_metas_range: Range<usize>,
93        sstable_info: SstableInfo,
94        stats: &StoreLocalStatistic,
95        task_progress: Arc<TaskProgress>,
96        sstable_store: SstableStoreRef,
97        max_io_retry_times: usize,
98    ) -> Self {
99        let read_table_ids = HashSet::from_iter(sstable_info.table_ids.iter().copied());
100        // Further filter the block_metas_range with sstable_info.key_range
101        // This is necessary when the SST is split into multiple SstableInfo with different key ranges
102        let block_metas_range = {
103            let block_metas = &sstable.meta.block_metas[block_metas_range.clone()];
104            let inner_range =
105                filter_block_metas(block_metas, &read_table_ids, sstable_info.key_range.clone());
106            // Adjust the range to be relative to the original block_metas
107            (block_metas_range.start + inner_range.start)
108                ..(block_metas_range.start + inner_range.end)
109        };
110
111        let key_range_left = FullKey::decode(&sstable_info.key_range.left).to_vec();
112        let key_range_right = FullKey::decode(&sstable_info.key_range.right).to_vec();
113        let key_range_right_exclusive = sstable_info.key_range.right_exclusive;
114
115        Self {
116            block_stream: None,
117            block_iter: None,
118            sstable,
119            block_metas_range,
120            block_idx: 0,
121            stats_ptr: stats.remote_io_time.clone(),
122            read_table_ids,
123            sstable_info,
124            sstable_store,
125            task_progress,
126            io_retry_times: 0,
127            max_io_retry_times,
128            key_range_left,
129            key_range_right,
130            key_range_right_exclusive,
131        }
132    }
133
134    /// Returns the block metas slice for this iterator.
135    #[inline]
136    fn block_metas(&self) -> &[BlockMeta] {
137        &self.sstable.meta.block_metas[self.block_metas_range.clone()]
138    }
139
140    /// Returns the number of blocks in this iterator.
141    #[inline]
142    fn block_count(&self) -> usize {
143        self.block_metas_range.len()
144    }
145
146    async fn create_stream(&mut self) -> HummockResult<()> {
147        let block_stream = self
148            .sstable_store
149            .get_stream_for_blocks(
150                self.sstable_info.object_id,
151                &self.block_metas()[self.block_idx..],
152            )
153            .instrument_await("stream_iter_get_stream".verbose())
154            .await?;
155        self.block_stream = Some(block_stream);
156        Ok(())
157    }
158
159    async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> {
160        while let Some(block_iter) = self.block_iter.as_mut() {
161            if self.read_table_ids.contains(&block_iter.table_id()) {
162                return Ok(());
163            } else {
164                self.next_block().await?;
165            }
166        }
167        Ok(())
168    }
169
170    /// Initialises the iterator by moving it to the first KV-pair in the stream's first block where
171    /// key >= `seek_key`. If that block does not contain such a KV-pair, the iterator continues to
172    /// the first KV-pair of the next block. If `seek_key` is not given, the iterator will move to
173    /// the very first KV-pair of the stream's first block.
174    pub async fn seek(&mut self, seek_key: Option<FullKey<&[u8]>>) -> HummockResult<()> {
175        // Load first block.
176        self.next_block().await?;
177
178        // We assume that a block always contains at least one KV pair. Subsequently, if
179        // `next_block()` loads a new block (i.e., `block_iter` is not `None`), then `block_iter` is
180        // also valid and pointing on the block's first KV-pair.
181
182        let seek_key = if let Some(seek_key) = seek_key {
183            if seek_key.cmp(&self.key_range_left.to_ref()).is_lt() {
184                Some(self.key_range_left.to_ref())
185            } else {
186                Some(seek_key)
187            }
188        } else {
189            Some(self.key_range_left.to_ref())
190        };
191
192        if let (Some(block_iter), Some(seek_key)) = (self.block_iter.as_mut(), seek_key) {
193            block_iter.seek(seek_key);
194
195            if !block_iter.is_valid() {
196                // `seek_key` is larger than everything in the first block.
197                self.next_block().await?;
198            }
199        }
200
201        self.prune_from_valid_block_iter().await?;
202        Ok(())
203    }
204
205    /// Loads a new block, creates a new iterator for it, and stores that iterator in
206    /// `self.block_iter`. The created iterator points to the block's first KV-pair. If the end of
207    /// the stream is reached or `self.remaining_blocks` is zero, then the function sets
208    /// `self.block_iter` to `None`.
209    async fn next_block(&mut self) -> HummockResult<()> {
210        // Check if we want and if we can load the next block.
211        let now = Instant::now();
212        let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
213            let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
214            stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
215        });
216        if self.block_idx < self.block_count() {
217            loop {
218                let ret = match &mut self.block_stream {
219                    Some(block_stream) => block_stream.next_block().await,
220                    None => {
221                        self.create_stream().await?;
222                        continue;
223                    }
224                };
225                match ret {
226                    Ok(Some(block)) => {
227                        let mut block_iter =
228                            BlockIterator::new(BlockHolder::from_owned_block(block));
229                        block_iter.seek_to_first();
230                        self.block_idx += 1;
231                        self.block_iter = Some(block_iter);
232                        return Ok(());
233                    }
234                    Ok(None) => break,
235                    Err(e) => {
236                        if !e.is_object_error() || !self.need_recreate_io_stream() {
237                            return Err(e);
238                        }
239                        self.block_stream.take();
240                        self.io_retry_times += 1;
241                        fail_point!("create_stream_err");
242
243                        tracing::warn!(
244                            "retry create stream for sstable {} times, sstinfo={}",
245                            self.io_retry_times,
246                            self.sst_debug_info()
247                        );
248                    }
249                }
250            }
251        }
252        self.block_idx = self.block_count();
253        self.block_iter = None;
254
255        Ok(())
256    }
257
258    /// Moves to the next KV-pair in the table. Assumes that the current position is valid. Even if
259    /// the next position is invalid, the function returns `Ok(())`.
260    ///
261    /// Do not use `next()` to initialise the iterator (i.e. do not use it to find the first
262    /// KV-pair). Instead, use `seek()`. Afterwards, use `next()` to reach the second KV-pair and
263    /// onwards.
264    pub async fn next(&mut self) -> HummockResult<()> {
265        if !self.is_valid() {
266            return Ok(());
267        }
268
269        let block_iter = self.block_iter.as_mut().expect("no block iter");
270        block_iter.next();
271        if !block_iter.is_valid() {
272            self.next_block().await?;
273            self.prune_from_valid_block_iter().await?;
274        }
275
276        if !self.is_valid() {
277            return Ok(());
278        }
279
280        // Check if we need to skip the block.
281        let key = self
282            .block_iter
283            .as_ref()
284            .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
285            .key();
286
287        if self.exceed_key_range_right(key) {
288            self.block_iter = None;
289        }
290
291        Ok(())
292    }
293
294    pub fn key(&self) -> FullKey<&[u8]> {
295        let key = self
296            .block_iter
297            .as_ref()
298            .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
299            .key();
300
301        assert!(
302            !self.exceed_key_range_left(key),
303            "key {:?} key_range_left {:?}",
304            key,
305            self.key_range_left.to_ref()
306        );
307
308        assert!(
309            !self.exceed_key_range_right(key),
310            "key {:?} key_range_right {:?} key_range_right_exclusive {}",
311            key,
312            self.key_range_right.to_ref(),
313            self.key_range_right_exclusive
314        );
315
316        key
317    }
318
319    pub fn value(&self) -> HummockValue<&[u8]> {
320        let raw_value = self
321            .block_iter
322            .as_ref()
323            .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
324            .value();
325        HummockValue::from_slice(raw_value)
326            .unwrap_or_else(|_| panic!("decode error sstinfo={}", self.sst_debug_info()))
327    }
328
329    pub fn is_valid(&self) -> bool {
330        // True iff block_iter exists and is valid.
331        self.block_iter.as_ref().is_some_and(|i| i.is_valid())
332    }
333
334    fn sst_debug_info(&self) -> String {
335        format!(
336            "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
337            self.sstable_info.object_id,
338            self.sstable_info.sst_id,
339            self.sstable_info.meta_offset,
340            self.sstable_info.table_ids
341        )
342    }
343
344    fn need_recreate_io_stream(&self) -> bool {
345        self.io_retry_times < self.max_io_retry_times
346    }
347
348    fn exceed_key_range_left(&self, key: FullKey<&[u8]>) -> bool {
349        key.cmp(&self.key_range_left.to_ref()).is_lt()
350    }
351
352    fn exceed_key_range_right(&self, key: FullKey<&[u8]>) -> bool {
353        if self.key_range_right_exclusive {
354            key.cmp(&self.key_range_right.to_ref()).is_ge()
355        } else {
356            key.cmp(&self.key_range_right.to_ref()).is_gt()
357        }
358    }
359}
360
361impl Drop for SstableStreamIterator {
362    fn drop(&mut self) {
363        self.task_progress.dec_num_pending_read_io()
364    }
365}
366
367/// Iterates over the KV-pairs of a given list of SSTs. The key-ranges of these SSTs are assumed to
368/// be consecutive and non-overlapping.
369pub struct ConcatSstableIterator {
370    /// **CAUTION:** `key_range` is used for optimization. It doesn't guarantee value returned by
371    /// the iterator is in this range.
372    key_range: KeyRange,
373
374    /// The iterator of the current table.
375    sstable_iter: Option<SstableStreamIterator>,
376
377    /// Current table index.
378    cur_idx: usize,
379
380    /// All non-overlapping tables.
381    sstables: Vec<SstableInfo>,
382
383    sstable_store: SstableStoreRef,
384
385    stats: StoreLocalStatistic,
386    task_progress: Arc<TaskProgress>,
387    max_io_retry_times: usize,
388}
389
390impl ConcatSstableIterator {
391    /// Caller should make sure that `tables` are non-overlapping,
392    /// arranged in ascending order when it serves as a forward iterator,
393    /// and arranged in descending order when it serves as a backward iterator.
394    pub fn new(
395        sst_infos: Vec<SstableInfo>,
396        key_range: KeyRange,
397        sstable_store: SstableStoreRef,
398        task_progress: Arc<TaskProgress>,
399        max_io_retry_times: usize,
400    ) -> Self {
401        Self {
402            key_range,
403            sstable_iter: None,
404            cur_idx: 0,
405            sstables: sst_infos,
406            sstable_store,
407            task_progress,
408            stats: StoreLocalStatistic::default(),
409            max_io_retry_times,
410        }
411    }
412
413    #[cfg(test)]
414    pub fn for_test(
415        sst_infos: Vec<SstableInfo>,
416        key_range: KeyRange,
417        sstable_store: SstableStoreRef,
418    ) -> Self {
419        Self::new(
420            sst_infos,
421            key_range,
422            sstable_store,
423            Arc::new(TaskProgress::default()),
424            0,
425        )
426    }
427
428    /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
429    async fn seek_idx(
430        &mut self,
431        idx: usize,
432        seek_key: Option<FullKey<&[u8]>>,
433    ) -> HummockResult<()> {
434        self.sstable_iter.take();
435        let mut seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty())
436        {
437            (Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) {
438                Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)),
439                Ordering::Greater => Some(seek_key),
440            },
441            (Some(seek_key), true) => Some(seek_key),
442            (None, true) => None,
443            (None, false) => Some(FullKey::decode(&self.key_range.left)),
444        };
445
446        self.cur_idx = idx;
447        while self.cur_idx < self.sstables.len() {
448            let table_info = &self.sstables[self.cur_idx];
449            let read_table_ids = HashSet::from_iter(table_info.table_ids.iter().copied());
450            if read_table_ids.is_empty() {
451                self.cur_idx += 1;
452                seek_key = None;
453                continue;
454            }
455            let sstable = self
456                .sstable_store
457                .sstable(table_info, &mut self.stats)
458                .instrument_await("stream_iter_sstable".verbose())
459                .await?;
460
461            let filter_key_range = match seek_key {
462                Some(seek_key) => {
463                    KeyRange::new(seek_key.encode().into(), self.key_range.right.clone())
464                }
465                None => self.key_range.clone(),
466            };
467
468            let block_metas_range =
469                filter_block_metas(&sstable.meta.block_metas, &read_table_ids, filter_key_range);
470
471            let mut found = true;
472            if block_metas_range.is_empty() {
473                found = false;
474            } else {
475                self.task_progress.inc_num_pending_read_io();
476                let mut sstable_iter = SstableStreamIterator::new(
477                    sstable,
478                    block_metas_range,
479                    table_info.clone(),
480                    &self.stats,
481                    self.task_progress.clone(),
482                    self.sstable_store.clone(),
483                    self.max_io_retry_times,
484                );
485                sstable_iter.seek(seek_key).await?;
486
487                if sstable_iter.is_valid() {
488                    self.sstable_iter = Some(sstable_iter);
489                } else {
490                    found = false;
491                }
492            }
493
494            if found {
495                return Ok(());
496            } else {
497                self.cur_idx += 1;
498                seek_key = None;
499            }
500        }
501        Ok(())
502    }
503}
504
505impl HummockIterator for ConcatSstableIterator {
506    type Direction = Forward;
507
508    async fn next(&mut self) -> HummockResult<()> {
509        let sstable_iter = self.sstable_iter.as_mut().expect("no table iter");
510
511        // Does just calling `next()` suffice?
512        sstable_iter.next().await?;
513        if sstable_iter.is_valid() {
514            Ok(())
515        } else {
516            // No, seek to next table.
517            self.seek_idx(self.cur_idx + 1, None).await?;
518            Ok(())
519        }
520    }
521
522    fn key(&self) -> FullKey<&[u8]> {
523        self.sstable_iter.as_ref().expect("no table iter").key()
524    }
525
526    fn value(&self) -> HummockValue<&[u8]> {
527        self.sstable_iter.as_ref().expect("no table iter").value()
528    }
529
530    fn is_valid(&self) -> bool {
531        self.sstable_iter.as_ref().is_some_and(|i| i.is_valid())
532    }
533
534    async fn rewind(&mut self) -> HummockResult<()> {
535        self.seek_idx(0, None).await
536    }
537
538    /// Resets the iterator and seeks to the first position where the stored key >= `key`.
539    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
540        let seek_key = if self.key_range.left.is_empty() {
541            key
542        } else {
543            match key.cmp(&FullKey::decode(&self.key_range.left)) {
544                Ordering::Less | Ordering::Equal => FullKey::decode(&self.key_range.left),
545                Ordering::Greater => key,
546            }
547        };
548        let table_idx = self.sstables.partition_point(|table| {
549            // We use the maximum key of an SST for the search. That way, we guarantee that the
550            // resulting SST contains either that key or the next-larger KV-pair. Subsequently,
551            // we avoid calling `seek_idx()` twice if the determined SST does not contain `key`.
552
553            // Note that we need to use `<` instead of `<=` to ensure that all keys in an SST
554            // (including its max. key) produce the same search result.
555            let max_sst_key = &table.key_range.right;
556            FullKey::decode(max_sst_key).cmp(&seek_key) == Ordering::Less
557        });
558
559        self.seek_idx(table_idx, Some(key)).await
560    }
561
562    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
563        stats.add(&self.stats)
564    }
565
566    fn value_meta(&self) -> ValueMeta {
567        let iter = self.sstable_iter.as_ref().expect("no table iter");
568        // sstable_iter's block_idx must have advanced at least one.
569        // See SstableStreamIterator::next_block.
570        assert!(iter.block_idx >= 1);
571        // block_idx is relative to block_metas_range.start, so we need to add it back
572        let absolute_block_idx = iter.block_metas_range.start + iter.block_idx - 1;
573        ValueMeta {
574            object_id: Some(iter.sstable_info.object_id),
575            block_id: Some(absolute_block_idx as u64),
576        }
577    }
578}
579
580pub struct MonitoredCompactorIterator<I> {
581    inner: I,
582    task_progress: Arc<TaskProgress>,
583
584    processed_key_num: usize,
585}
586
587impl<I: HummockIterator<Direction = Forward>> MonitoredCompactorIterator<I> {
588    pub fn new(inner: I, task_progress: Arc<TaskProgress>) -> Self {
589        Self {
590            inner,
591            task_progress,
592            processed_key_num: 0,
593        }
594    }
595}
596
597impl<I: HummockIterator<Direction = Forward>> HummockIterator for MonitoredCompactorIterator<I> {
598    type Direction = Forward;
599
600    async fn next(&mut self) -> HummockResult<()> {
601        self.inner.next().await?;
602        self.processed_key_num += 1;
603
604        if self.processed_key_num.is_multiple_of(PROGRESS_KEY_INTERVAL) {
605            self.task_progress
606                .inc_progress_key(PROGRESS_KEY_INTERVAL as _);
607        }
608
609        Ok(())
610    }
611
612    fn key(&self) -> FullKey<&[u8]> {
613        self.inner.key()
614    }
615
616    fn value(&self) -> HummockValue<&[u8]> {
617        self.inner.value()
618    }
619
620    fn is_valid(&self) -> bool {
621        self.inner.is_valid()
622    }
623
624    async fn rewind(&mut self) -> HummockResult<()> {
625        self.processed_key_num = 0;
626        self.inner.rewind().await?;
627        Ok(())
628    }
629
630    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
631        self.processed_key_num = 0;
632        self.inner.seek(key).await?;
633        Ok(())
634    }
635
636    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
637        self.inner.collect_local_statistic(stats)
638    }
639
640    fn value_meta(&self) -> ValueMeta {
641        self.inner.value_meta()
642    }
643}
644
645pub(crate) fn filter_block_metas(
646    block_metas: &[BlockMeta],
647    read_table_ids: &HashSet<TableId>,
648    key_range: KeyRange,
649) -> Range<usize> {
650    if block_metas.is_empty() {
651        return 0..0;
652    }
653
654    let mut start_index = if key_range.left.is_empty() {
655        0
656    } else {
657        // start_index points to the greatest block whose smallest_key <= seek_key.
658        block_metas
659            .partition_point(|block| {
660                KeyComparator::compare_encoded_full_key(&key_range.left, &block.smallest_key)
661                    != Ordering::Less
662            })
663            .saturating_sub(1)
664    };
665
666    let mut end_index = if key_range.right.is_empty() {
667        block_metas.len()
668    } else {
669        let ret = block_metas.partition_point(|block| {
670            KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right)
671                != Ordering::Greater
672        });
673
674        if ret == 0 {
675            // not found
676            return 0..0;
677        }
678
679        ret
680    }
681    .saturating_sub(1);
682
683    // Skip blocks that are not in the SST read table ids.
684    while start_index <= end_index {
685        let start_block_table_id = block_metas[start_index].table_id();
686        if read_table_ids.contains(&start_block_table_id) {
687            break;
688        }
689
690        // skip this table_id
691        let old_start_index = start_index;
692        let block_metas_to_search = &block_metas[start_index..=end_index];
693
694        start_index += block_metas_to_search
695            .partition_point(|block_meta| block_meta.table_id() == start_block_table_id);
696
697        if old_start_index == start_index {
698            // no more blocks with the same table_id
699            break;
700        }
701    }
702
703    while start_index <= end_index {
704        let end_block_table_id = block_metas[end_index].table_id();
705        if read_table_ids.contains(&end_block_table_id) {
706            break;
707        }
708
709        let old_end_index = end_index;
710        let block_metas_to_search = &block_metas[start_index..=end_index];
711
712        end_index = start_index
713            + block_metas_to_search
714                .partition_point(|block_meta| block_meta.table_id() < end_block_table_id)
715                .saturating_sub(1);
716
717        if end_index == old_end_index {
718            // no more blocks with the same table_id
719            break;
720        }
721    }
722
723    if start_index > end_index {
724        return 0..0;
725    }
726
727    start_index..(end_index + 1)
728}
729
730#[cfg(test)]
731mod tests {
732    use std::cmp::Ordering;
733    use std::collections::HashSet;
734
735    use risingwave_common::catalog::TableId;
736    use risingwave_common::util::epoch::test_epoch;
737    use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, next_full_key, prev_full_key};
738    use risingwave_hummock_sdk::key_range::KeyRange;
739    use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
740
741    use crate::hummock::BlockMeta;
742    use crate::hummock::compactor::ConcatSstableIterator;
743    use crate::hummock::iterator::test_utils::mock_sstable_store;
744    use crate::hummock::iterator::{HummockIterator, MergeIterator};
745    use crate::hummock::test_utils::{
746        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_info,
747        gen_test_sstable_with_table_ids, test_key_of, test_value_of,
748    };
749    use crate::hummock::value::HummockValue;
750
751    #[tokio::test]
752    async fn test_concat_iterator() {
753        let sstable_store = mock_sstable_store().await;
754        let mut table_infos = vec![];
755        for object_id in 0..3 {
756            let start_index = object_id * TEST_KEYS_COUNT;
757            let end_index = (object_id + 1) * TEST_KEYS_COUNT;
758            let table_info = gen_test_sstable_info(
759                default_builder_opt_for_test(),
760                object_id as u64,
761                (start_index..end_index)
762                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
763                sstable_store.clone(),
764            )
765            .await;
766            table_infos.push(table_info);
767        }
768        let start_index = 5000;
769        let end_index = 25000;
770
771        let kr = KeyRange::new(
772            test_key_of(start_index).encode().into(),
773            test_key_of(end_index).encode().into(),
774        );
775        let mut iter =
776            ConcatSstableIterator::for_test(table_infos.clone(), kr.clone(), sstable_store.clone());
777        iter.seek(FullKey::decode(&kr.left)).await.unwrap();
778
779        for idx in start_index..end_index {
780            let key = iter.key();
781            let val = iter.value();
782            assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
783            assert_eq!(
784                val.into_user_value().unwrap(),
785                test_value_of(idx).as_slice()
786            );
787            iter.next().await.unwrap();
788        }
789
790        // seek non-overlap range
791        let kr = KeyRange::new(
792            test_key_of(30000).encode().into(),
793            test_key_of(40000).encode().into(),
794        );
795        let mut iter =
796            ConcatSstableIterator::for_test(table_infos.clone(), kr.clone(), sstable_store.clone());
797        iter.seek(FullKey::decode(&kr.left)).await.unwrap();
798        assert!(!iter.is_valid());
799        let kr = KeyRange::new(
800            test_key_of(start_index).encode().into(),
801            test_key_of(40000).encode().into(),
802        );
803        let mut iter =
804            ConcatSstableIterator::for_test(table_infos.clone(), kr.clone(), sstable_store.clone());
805        iter.seek(FullKey::decode(&kr.left)).await.unwrap();
806        for idx in start_index..30000 {
807            let key = iter.key();
808            let val = iter.value();
809            assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
810            assert_eq!(
811                val.into_user_value().unwrap(),
812                test_value_of(idx).as_slice()
813            );
814            iter.next().await.unwrap();
815        }
816        assert!(!iter.is_valid());
817
818        // Test seek. Result is dominated by given seek key rather than key range.
819        let kr = KeyRange::new(
820            test_key_of(0).encode().into(),
821            test_key_of(40000).encode().into(),
822        );
823        let mut iter =
824            ConcatSstableIterator::for_test(table_infos.clone(), kr.clone(), sstable_store.clone());
825        iter.seek(test_key_of(10000).to_ref()).await.unwrap();
826        assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10000).to_ref());
827        iter.seek(test_key_of(10001).to_ref()).await.unwrap();
828        assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10001).to_ref());
829        iter.seek(test_key_of(9999).to_ref()).await.unwrap();
830        assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(9999).to_ref());
831        iter.seek(test_key_of(1).to_ref()).await.unwrap();
832        assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(1).to_ref());
833        iter.seek(test_key_of(29999).to_ref()).await.unwrap();
834        assert!(iter.is_valid() && iter.cur_idx == 2 && iter.key() == test_key_of(29999).to_ref());
835        iter.seek(test_key_of(30000).to_ref()).await.unwrap();
836        assert!(!iter.is_valid());
837
838        // Test seek. Result is dominated by key range rather than given seek key.
839        let kr = KeyRange::new(
840            test_key_of(6000).encode().into(),
841            test_key_of(16000).encode().into(),
842        );
843        let mut iter =
844            ConcatSstableIterator::for_test(table_infos.clone(), kr.clone(), sstable_store.clone());
845        iter.seek(test_key_of(17000).to_ref()).await.unwrap();
846        assert!(!iter.is_valid());
847        iter.seek(test_key_of(1).to_ref()).await.unwrap();
848        assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == FullKey::decode(&kr.left));
849    }
850
851    #[tokio::test]
852    async fn test_concat_iterator_seek_idx() {
853        let sstable_store = mock_sstable_store().await;
854        let mut table_infos = vec![];
855        for object_id in 0..3 {
856            let start_index = object_id * TEST_KEYS_COUNT + TEST_KEYS_COUNT / 2;
857            let end_index = (object_id + 1) * TEST_KEYS_COUNT;
858            let table_info = gen_test_sstable_info(
859                default_builder_opt_for_test(),
860                object_id as u64,
861                (start_index..end_index)
862                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
863                sstable_store.clone(),
864            )
865            .await;
866            table_infos.push(table_info);
867        }
868
869        // Test seek_idx. Result is dominated by given seek key rather than key range.
870        let kr = KeyRange::new(
871            test_key_of(0).encode().into(),
872            test_key_of(40000).encode().into(),
873        );
874        let mut iter =
875            ConcatSstableIterator::for_test(table_infos.clone(), kr.clone(), sstable_store.clone());
876        let sst = sstable_store
877            .sstable(&iter.sstables[0], &mut iter.stats)
878            .await
879            .unwrap();
880        let block_metas = &sst.meta.block_metas;
881        let block_1_smallest_key = block_metas[1].smallest_key.clone();
882        let block_2_smallest_key = block_metas[2].smallest_key.clone();
883        // Use block_1_smallest_key as seek key and result in the first KV of block 1.
884        let seek_key = block_1_smallest_key.clone();
885        iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
886            .await
887            .unwrap();
888        assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
889        // Use prev_full_key(block_1_smallest_key) as seek key and result in the first KV of block
890        // 1.
891        let seek_key = prev_full_key(block_1_smallest_key.as_slice());
892        iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
893            .await
894            .unwrap();
895        assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
896        iter.next().await.unwrap();
897        let block_1_second_key = iter.key().to_vec();
898        // Use a big enough seek key and result in invalid iterator.
899        let seek_key = test_key_of(30001);
900        iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref()))
901            .await
902            .unwrap();
903        assert!(!iter.is_valid());
904
905        // Test seek_idx. Result is dominated by key range rather than given seek key.
906        let kr = KeyRange::new(
907            next_full_key(&block_1_smallest_key).into(),
908            prev_full_key(&block_2_smallest_key).into(),
909        );
910        let mut iter =
911            ConcatSstableIterator::for_test(table_infos.clone(), kr.clone(), sstable_store.clone());
912        // Use block_2_smallest_key as seek key and result in invalid iterator.
913        let seek_key = FullKey::decode(&block_2_smallest_key);
914        assert!(seek_key.cmp(&FullKey::decode(&kr.right)) == Ordering::Greater);
915        iter.seek_idx(0, Some(seek_key)).await.unwrap();
916        assert!(!iter.is_valid());
917        // Use a small enough seek key and result in the second KV of block 1.
918        let seek_key = test_key_of(0).encode();
919        iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
920            .await
921            .unwrap();
922        assert!(iter.is_valid());
923        assert_eq!(iter.key(), block_1_second_key.to_ref());
924
925        // Use None seek key and result in the second KV of block 1.
926        iter.seek_idx(0, None).await.unwrap();
927        assert!(iter.is_valid());
928        assert_eq!(iter.key(), block_1_second_key.to_ref());
929    }
930
931    #[tokio::test]
932    async fn test_filter_block_metas() {
933        use crate::hummock::compactor::iterator::filter_block_metas;
934
935        {
936            let block_metas = Vec::default();
937
938            let ret = filter_block_metas(&block_metas, &HashSet::default(), KeyRange::default());
939
940            assert!(ret.is_empty());
941        }
942
943        {
944            let block_metas = vec![
945                BlockMeta {
946                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
947                    ..Default::default()
948                },
949                BlockMeta {
950                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
951                    ..Default::default()
952                },
953                BlockMeta {
954                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
955                    ..Default::default()
956                },
957            ];
958
959            let ret = filter_block_metas(
960                &block_metas,
961                &HashSet::from_iter(vec![1_u32.into(), 2.into(), 3.into()].into_iter()),
962                KeyRange::default(),
963            );
964            let ret = &block_metas[ret];
965
966            assert_eq!(3, ret.len());
967            assert_eq!(
968                1,
969                FullKey::decode(&ret[0].smallest_key)
970                    .user_key
971                    .table_id
972                    .as_raw_id()
973            );
974            assert_eq!(
975                3,
976                FullKey::decode(&ret[2].smallest_key)
977                    .user_key
978                    .table_id
979                    .as_raw_id()
980            );
981        }
982
983        {
984            let block_metas = vec![
985                BlockMeta {
986                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
987                    ..Default::default()
988                },
989                BlockMeta {
990                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
991                    ..Default::default()
992                },
993                BlockMeta {
994                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
995                    ..Default::default()
996                },
997            ];
998
999            let ret = filter_block_metas(
1000                &block_metas,
1001                &HashSet::from_iter(vec![2_u32.into(), 3.into()].into_iter()),
1002                KeyRange::default(),
1003            );
1004            let ret = &block_metas[ret];
1005
1006            assert_eq!(2, ret.len());
1007            assert_eq!(
1008                2,
1009                FullKey::decode(&ret[0].smallest_key)
1010                    .user_key
1011                    .table_id
1012                    .as_raw_id()
1013            );
1014            assert_eq!(
1015                3,
1016                FullKey::decode(&ret[1].smallest_key)
1017                    .user_key
1018                    .table_id
1019                    .as_raw_id()
1020            );
1021        }
1022
1023        {
1024            let block_metas = vec![
1025                BlockMeta {
1026                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1027                    ..Default::default()
1028                },
1029                BlockMeta {
1030                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1031                    ..Default::default()
1032                },
1033                BlockMeta {
1034                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1035                    ..Default::default()
1036                },
1037            ];
1038
1039            let ret = filter_block_metas(
1040                &block_metas,
1041                &HashSet::from_iter(vec![1_u32.into(), 2_u32.into()].into_iter()),
1042                KeyRange::default(),
1043            );
1044            let ret = &block_metas[ret];
1045
1046            assert_eq!(2, ret.len());
1047            assert_eq!(
1048                1,
1049                FullKey::decode(&ret[0].smallest_key)
1050                    .user_key
1051                    .table_id
1052                    .as_raw_id()
1053            );
1054            assert_eq!(
1055                2,
1056                FullKey::decode(&ret[1].smallest_key)
1057                    .user_key
1058                    .table_id
1059                    .as_raw_id()
1060            );
1061        }
1062
1063        {
1064            let block_metas = vec![
1065                BlockMeta {
1066                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1067                    ..Default::default()
1068                },
1069                BlockMeta {
1070                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1071                    ..Default::default()
1072                },
1073                BlockMeta {
1074                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1075                    ..Default::default()
1076                },
1077            ];
1078            let ret = filter_block_metas(
1079                &block_metas,
1080                &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1081                KeyRange::default(),
1082            );
1083            let ret = &block_metas[ret];
1084
1085            assert_eq!(1, ret.len());
1086            assert_eq!(
1087                2,
1088                FullKey::decode(&ret[0].smallest_key)
1089                    .user_key
1090                    .table_id
1091                    .as_raw_id()
1092            );
1093        }
1094
1095        {
1096            let block_metas = vec![
1097                BlockMeta {
1098                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1099                    ..Default::default()
1100                },
1101                BlockMeta {
1102                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1103                    ..Default::default()
1104                },
1105                BlockMeta {
1106                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1107                    ..Default::default()
1108                },
1109                BlockMeta {
1110                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1111                    ..Default::default()
1112                },
1113                BlockMeta {
1114                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1115                    ..Default::default()
1116                },
1117            ];
1118            let ret = filter_block_metas(
1119                &block_metas,
1120                &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1121                KeyRange::default(),
1122            );
1123            let ret = &block_metas[ret];
1124
1125            assert_eq!(1, ret.len());
1126            assert_eq!(
1127                2,
1128                FullKey::decode(&ret[0].smallest_key)
1129                    .user_key
1130                    .table_id
1131                    .as_raw_id()
1132            );
1133        }
1134
1135        {
1136            let block_metas = vec![
1137                BlockMeta {
1138                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1139                    ..Default::default()
1140                },
1141                BlockMeta {
1142                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1143                    ..Default::default()
1144                },
1145                BlockMeta {
1146                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1147                    ..Default::default()
1148                },
1149                BlockMeta {
1150                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1151                    ..Default::default()
1152                },
1153                BlockMeta {
1154                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1155                    ..Default::default()
1156                },
1157            ];
1158
1159            let ret = filter_block_metas(
1160                &block_metas,
1161                &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1162                KeyRange::default(),
1163            );
1164            let ret = &block_metas[ret];
1165
1166            assert_eq!(1, ret.len());
1167            assert_eq!(
1168                2,
1169                FullKey::decode(&ret[0].smallest_key)
1170                    .user_key
1171                    .table_id
1172                    .as_raw_id()
1173            );
1174        }
1175
1176        {
1177            let block_metas = vec![
1178                BlockMeta {
1179                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1180                    ..Default::default()
1181                },
1182                BlockMeta {
1183                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1184                    ..Default::default()
1185                },
1186                BlockMeta {
1187                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1188                    ..Default::default()
1189                },
1190            ];
1191
1192            let ret = filter_block_metas(
1193                &block_metas,
1194                &HashSet::from_iter(vec![1_u32.into(), 3_u32.into()].into_iter()),
1195                KeyRange::default(),
1196            );
1197            let ret = &block_metas[ret];
1198
1199            assert_eq!(3, ret.len());
1200            assert_eq!(
1201                1,
1202                FullKey::decode(&ret[0].smallest_key)
1203                    .user_key
1204                    .table_id
1205                    .as_raw_id()
1206            );
1207            assert_eq!(
1208                2,
1209                FullKey::decode(&ret[1].smallest_key)
1210                    .user_key
1211                    .table_id
1212                    .as_raw_id()
1213            );
1214            assert_eq!(
1215                3,
1216                FullKey::decode(&ret[2].smallest_key)
1217                    .user_key
1218                    .table_id
1219                    .as_raw_id()
1220            );
1221        }
1222    }
1223
1224    #[tokio::test]
1225    async fn test_iterator_same_obj() {
1226        let sstable_store = mock_sstable_store().await;
1227
1228        let table_info = gen_test_sstable_info(
1229            default_builder_opt_for_test(),
1230            1_u64,
1231            (1..10000).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
1232            sstable_store.clone(),
1233        )
1234        .await;
1235
1236        let split_key = test_key_of(5000).encode();
1237        let sst_1: SstableInfo = SstableInfoInner {
1238            key_range: KeyRange {
1239                left: table_info.key_range.left.clone(),
1240                right: split_key.clone().into(),
1241                right_exclusive: true,
1242            },
1243            ..table_info.get_inner()
1244        }
1245        .into();
1246
1247        let total_key_count = sst_1.total_key_count;
1248        let sst_2: SstableInfo = SstableInfoInner {
1249            sst_id: sst_1.sst_id + 1,
1250            key_range: KeyRange {
1251                left: split_key.clone().into(),
1252                right: table_info.key_range.right.clone(),
1253                right_exclusive: table_info.key_range.right_exclusive,
1254            },
1255            ..table_info.get_inner()
1256        }
1257        .into();
1258
1259        {
1260            // test concate
1261            let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1262
1263            let mut iter = ConcatSstableIterator::for_test(
1264                vec![sst_1.clone(), sst_2.clone()],
1265                KeyRange::default(),
1266                sstable_store.clone(),
1267            );
1268
1269            iter.rewind().await.unwrap();
1270
1271            let mut key_count = 0;
1272            while iter.is_valid() {
1273                let is_new_user_key = full_key_tracker.observe(iter.key());
1274                assert!(is_new_user_key);
1275                key_count += 1;
1276                iter.next().await.unwrap();
1277            }
1278
1279            assert_eq!(total_key_count, key_count);
1280        }
1281
1282        {
1283            let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1284            let concat_1 = ConcatSstableIterator::for_test(
1285                vec![sst_1.clone()],
1286                KeyRange::default(),
1287                sstable_store.clone(),
1288            );
1289
1290            let concat_2 = ConcatSstableIterator::for_test(
1291                vec![sst_2.clone()],
1292                KeyRange::default(),
1293                sstable_store.clone(),
1294            );
1295
1296            let mut key_count = 0;
1297            let mut iter = MergeIterator::for_compactor(vec![concat_1, concat_2]);
1298            iter.rewind().await.unwrap();
1299            while iter.is_valid() {
1300                full_key_tracker.observe(iter.key());
1301                key_count += 1;
1302                iter.next().await.unwrap();
1303            }
1304            assert_eq!(total_key_count, key_count);
1305        }
1306    }
1307
1308    #[tokio::test]
1309    async fn test_concat_iterator_skips_hole_table_blocks() {
1310        let sstable_store = mock_sstable_store().await;
1311
1312        let key_1 = FullKey::for_test(TableId::new(1), b"a".to_vec(), test_epoch(1));
1313        let key_2 = FullKey::for_test(TableId::new(2), b"b".to_vec(), test_epoch(1));
1314        let key_3 = FullKey::for_test(TableId::new(3), b"c".to_vec(), test_epoch(1));
1315        let kv_pairs = vec![
1316            (key_1.clone(), HummockValue::put(b"value-1".to_vec())),
1317            (key_2.clone(), HummockValue::put(b"value-2".to_vec())),
1318            (key_3.clone(), HummockValue::put(b"value-3".to_vec())),
1319        ];
1320
1321        let (_sstable, table_info) = gen_test_sstable_with_table_ids(
1322            default_builder_opt_for_test(),
1323            10,
1324            kv_pairs.into_iter(),
1325            sstable_store.clone(),
1326            vec![1, 2, 3],
1327        )
1328        .await;
1329
1330        let table_info: SstableInfo = SstableInfoInner {
1331            table_ids: vec![1.into(), 3.into()],
1332            ..table_info.get_inner()
1333        }
1334        .into();
1335
1336        let mut iter = ConcatSstableIterator::for_test(
1337            vec![table_info.clone()],
1338            KeyRange::default(),
1339            sstable_store.clone(),
1340        );
1341        iter.rewind().await.unwrap();
1342        assert!(iter.is_valid());
1343        assert_eq!(iter.key(), key_1.to_ref());
1344
1345        iter.next().await.unwrap();
1346        assert!(iter.is_valid());
1347        assert_eq!(iter.key(), key_3.to_ref());
1348
1349        iter.next().await.unwrap();
1350        assert!(!iter.is_valid());
1351
1352        let mut iter =
1353            ConcatSstableIterator::for_test(vec![table_info], KeyRange::default(), sstable_store);
1354        iter.seek(key_2.to_ref()).await.unwrap();
1355        assert!(iter.is_valid());
1356        assert_eq!(iter.key(), key_3.to_ref());
1357    }
1358}