risingwave_storage/hummock/compactor/
iterator.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::ops::Range;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use fail::fail_point;
24use risingwave_common::catalog::TableId;
25use risingwave_hummock_sdk::KeyComparator;
26use risingwave_hummock_sdk::compaction_group::StateTableId;
27use risingwave_hummock_sdk::key::FullKey;
28use risingwave_hummock_sdk::key_range::KeyRange;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30
31use crate::hummock::block_stream::BlockDataStream;
32use crate::hummock::compactor::task_progress::TaskProgress;
33use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
34use crate::hummock::sstable_store::SstableStoreRef;
35use crate::hummock::value::HummockValue;
36use crate::hummock::{BlockHolder, BlockIterator, BlockMeta, HummockResult, TableHolder};
37use crate::monitor::StoreLocalStatistic;
38
39const PROGRESS_KEY_INTERVAL: usize = 100;
40
41/// Iterates over the KV-pairs of an SST while downloading it.
42/// `SstableStreamIterator` encapsulates operations on `sstables`, constructing block streams and accessing the corresponding data via `block_metas`.
43///  Note that a `block_meta` does not necessarily correspond to the entire sstable, but rather to a subset, which is documented via the `block_idx`.
44pub struct SstableStreamIterator {
45    sstable_store: SstableStoreRef,
46    sstable: TableHolder,
47    /// The range of block metas to iterate over
48    block_metas_range: Range<usize>,
49    /// The downloading stream.
50    block_stream: Option<BlockDataStream>,
51
52    /// Iterates over the KV-pairs of the current block.
53    block_iter: Option<BlockIterator>,
54
55    /// Index of the current block within the range.
56    block_idx: usize,
57
58    /// Counts the time used for IO.
59    stats_ptr: Arc<AtomicU64>,
60
61    /// For key sanity check of divided SST and debugging
62    sstable_info: SstableInfo,
63
64    /// To Filter out the blocks
65    sstable_table_ids: HashSet<StateTableId>,
66    task_progress: Arc<TaskProgress>,
67    io_retry_times: usize,
68    max_io_retry_times: usize,
69
70    // key range cache
71    key_range_left: FullKey<Vec<u8>>,
72    key_range_right: FullKey<Vec<u8>>,
73    key_range_right_exclusive: bool,
74}
75
76impl SstableStreamIterator {
77    // We have to handle two internal iterators.
78    //   `block_stream`: iterates over the blocks of the table.
79    //     `block_iter`: iterates over the KV-pairs of the current block.
80    // These iterators work in different ways.
81
82    // BlockIterator works as follows: After new(), we call seek(). That brings us
83    // to the first element. Calling next() then brings us to the second element and does not
84    // return anything.
85
86    // BlockStream follows a different approach. After new(), we do not seek, instead next()
87    // returns the first value.
88
89    /// Initialises a new [`SstableStreamIterator`] which iterates over the given [`BlockDataStream`].
90    /// The iterator reads at most `max_block_count` from the stream.
91    pub fn new(
92        sstable: TableHolder,
93        block_metas_range: Range<usize>,
94        sstable_info: SstableInfo,
95        stats: &StoreLocalStatistic,
96        task_progress: Arc<TaskProgress>,
97        sstable_store: SstableStoreRef,
98        max_io_retry_times: usize,
99    ) -> Self {
100        let sstable_table_ids = HashSet::from_iter(sstable_info.table_ids.iter().cloned());
101
102        // Further filter the block_metas_range with sstable_info.key_range
103        // This is necessary when the SST is split into multiple SstableInfo with different key ranges
104        let block_metas_range = {
105            let block_metas = &sstable.meta.block_metas[block_metas_range.clone()];
106            let inner_range = filter_block_metas(
107                block_metas,
108                &sstable_table_ids,
109                sstable_info.key_range.clone(),
110            );
111            // Adjust the range to be relative to the original block_metas
112            (block_metas_range.start + inner_range.start)
113                ..(block_metas_range.start + inner_range.end)
114        };
115
116        let key_range_left = FullKey::decode(&sstable_info.key_range.left).to_vec();
117        let key_range_right = FullKey::decode(&sstable_info.key_range.right).to_vec();
118        let key_range_right_exclusive = sstable_info.key_range.right_exclusive;
119
120        Self {
121            block_stream: None,
122            block_iter: None,
123            sstable,
124            block_metas_range,
125            block_idx: 0,
126            stats_ptr: stats.remote_io_time.clone(),
127            sstable_table_ids,
128            sstable_info,
129            sstable_store,
130            task_progress,
131            io_retry_times: 0,
132            max_io_retry_times,
133            key_range_left,
134            key_range_right,
135            key_range_right_exclusive,
136        }
137    }
138
139    /// Returns the block metas slice for this iterator.
140    #[inline]
141    fn block_metas(&self) -> &[BlockMeta] {
142        &self.sstable.meta.block_metas[self.block_metas_range.clone()]
143    }
144
145    /// Returns the number of blocks in this iterator.
146    #[inline]
147    fn block_count(&self) -> usize {
148        self.block_metas_range.len()
149    }
150
151    async fn create_stream(&mut self) -> HummockResult<()> {
152        let block_stream = self
153            .sstable_store
154            .get_stream_for_blocks(
155                self.sstable_info.object_id,
156                &self.block_metas()[self.block_idx..],
157            )
158            .instrument_await("stream_iter_get_stream".verbose())
159            .await?;
160        self.block_stream = Some(block_stream);
161        Ok(())
162    }
163
164    async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> {
165        while let Some(block_iter) = self.block_iter.as_mut() {
166            if self.sstable_table_ids.contains(&block_iter.table_id()) {
167                return Ok(());
168            } else {
169                self.next_block().await?;
170            }
171        }
172        Ok(())
173    }
174
175    /// Initialises the iterator by moving it to the first KV-pair in the stream's first block where
176    /// key >= `seek_key`. If that block does not contain such a KV-pair, the iterator continues to
177    /// the first KV-pair of the next block. If `seek_key` is not given, the iterator will move to
178    /// the very first KV-pair of the stream's first block.
179    pub async fn seek(&mut self, seek_key: Option<FullKey<&[u8]>>) -> HummockResult<()> {
180        // Load first block.
181        self.next_block().await?;
182
183        // We assume that a block always contains at least one KV pair. Subsequently, if
184        // `next_block()` loads a new block (i.e., `block_iter` is not `None`), then `block_iter` is
185        // also valid and pointing on the block's first KV-pair.
186
187        let seek_key = if let Some(seek_key) = seek_key {
188            if seek_key.cmp(&self.key_range_left.to_ref()).is_lt() {
189                Some(self.key_range_left.to_ref())
190            } else {
191                Some(seek_key)
192            }
193        } else {
194            Some(self.key_range_left.to_ref())
195        };
196
197        if let (Some(block_iter), Some(seek_key)) = (self.block_iter.as_mut(), seek_key) {
198            block_iter.seek(seek_key);
199
200            if !block_iter.is_valid() {
201                // `seek_key` is larger than everything in the first block.
202                self.next_block().await?;
203            }
204        }
205
206        self.prune_from_valid_block_iter().await?;
207        Ok(())
208    }
209
210    /// Loads a new block, creates a new iterator for it, and stores that iterator in
211    /// `self.block_iter`. The created iterator points to the block's first KV-pair. If the end of
212    /// the stream is reached or `self.remaining_blocks` is zero, then the function sets
213    /// `self.block_iter` to `None`.
214    async fn next_block(&mut self) -> HummockResult<()> {
215        // Check if we want and if we can load the next block.
216        let now = Instant::now();
217        let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
218            let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
219            stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
220        });
221        if self.block_idx < self.block_count() {
222            loop {
223                let ret = match &mut self.block_stream {
224                    Some(block_stream) => block_stream.next_block().await,
225                    None => {
226                        self.create_stream().await?;
227                        continue;
228                    }
229                };
230                match ret {
231                    Ok(Some(block)) => {
232                        let mut block_iter =
233                            BlockIterator::new(BlockHolder::from_owned_block(block));
234                        block_iter.seek_to_first();
235                        self.block_idx += 1;
236                        self.block_iter = Some(block_iter);
237                        return Ok(());
238                    }
239                    Ok(None) => break,
240                    Err(e) => {
241                        if !e.is_object_error() || !self.need_recreate_io_stream() {
242                            return Err(e);
243                        }
244                        self.block_stream.take();
245                        self.io_retry_times += 1;
246                        fail_point!("create_stream_err");
247
248                        tracing::warn!(
249                            "retry create stream for sstable {} times, sstinfo={}",
250                            self.io_retry_times,
251                            self.sst_debug_info()
252                        );
253                    }
254                }
255            }
256        }
257        self.block_idx = self.block_count();
258        self.block_iter = None;
259
260        Ok(())
261    }
262
263    /// Moves to the next KV-pair in the table. Assumes that the current position is valid. Even if
264    /// the next position is invalid, the function returns `Ok(())`.
265    ///
266    /// Do not use `next()` to initialise the iterator (i.e. do not use it to find the first
267    /// KV-pair). Instead, use `seek()`. Afterwards, use `next()` to reach the second KV-pair and
268    /// onwards.
269    pub async fn next(&mut self) -> HummockResult<()> {
270        if !self.is_valid() {
271            return Ok(());
272        }
273
274        let block_iter = self.block_iter.as_mut().expect("no block iter");
275        block_iter.next();
276        if !block_iter.is_valid() {
277            self.next_block().await?;
278            self.prune_from_valid_block_iter().await?;
279        }
280
281        if !self.is_valid() {
282            return Ok(());
283        }
284
285        // Check if we need to skip the block.
286        let key = self
287            .block_iter
288            .as_ref()
289            .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
290            .key();
291
292        if self.exceed_key_range_right(key) {
293            self.block_iter = None;
294        }
295
296        Ok(())
297    }
298
299    pub fn key(&self) -> FullKey<&[u8]> {
300        let key = self
301            .block_iter
302            .as_ref()
303            .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
304            .key();
305
306        assert!(
307            !self.exceed_key_range_left(key),
308            "key {:?} key_range_left {:?}",
309            key,
310            self.key_range_left.to_ref()
311        );
312
313        assert!(
314            !self.exceed_key_range_right(key),
315            "key {:?} key_range_right {:?} key_range_right_exclusive {}",
316            key,
317            self.key_range_right.to_ref(),
318            self.key_range_right_exclusive
319        );
320
321        key
322    }
323
324    pub fn value(&self) -> HummockValue<&[u8]> {
325        let raw_value = self
326            .block_iter
327            .as_ref()
328            .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
329            .value();
330        HummockValue::from_slice(raw_value)
331            .unwrap_or_else(|_| panic!("decode error sstinfo={}", self.sst_debug_info()))
332    }
333
334    pub fn is_valid(&self) -> bool {
335        // True iff block_iter exists and is valid.
336        self.block_iter.as_ref().is_some_and(|i| i.is_valid())
337    }
338
339    fn sst_debug_info(&self) -> String {
340        format!(
341            "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
342            self.sstable_info.object_id,
343            self.sstable_info.sst_id,
344            self.sstable_info.meta_offset,
345            self.sstable_info.table_ids
346        )
347    }
348
349    fn need_recreate_io_stream(&self) -> bool {
350        self.io_retry_times < self.max_io_retry_times
351    }
352
353    fn exceed_key_range_left(&self, key: FullKey<&[u8]>) -> bool {
354        key.cmp(&self.key_range_left.to_ref()).is_lt()
355    }
356
357    fn exceed_key_range_right(&self, key: FullKey<&[u8]>) -> bool {
358        if self.key_range_right_exclusive {
359            key.cmp(&self.key_range_right.to_ref()).is_ge()
360        } else {
361            key.cmp(&self.key_range_right.to_ref()).is_gt()
362        }
363    }
364}
365
366impl Drop for SstableStreamIterator {
367    fn drop(&mut self) {
368        self.task_progress.dec_num_pending_read_io()
369    }
370}
371
372/// Iterates over the KV-pairs of a given list of SSTs. The key-ranges of these SSTs are assumed to
373/// be consecutive and non-overlapping.
374pub struct ConcatSstableIterator {
375    /// **CAUTION:** `key_range` is used for optimization. It doesn't guarantee value returned by
376    /// the iterator is in this range.
377    key_range: KeyRange,
378
379    /// The iterator of the current table.
380    sstable_iter: Option<SstableStreamIterator>,
381
382    /// Current table index.
383    cur_idx: usize,
384
385    /// All non-overlapping tables.
386    sstables: Vec<SstableInfo>,
387
388    existing_table_ids: HashSet<StateTableId>,
389
390    sstable_store: SstableStoreRef,
391
392    stats: StoreLocalStatistic,
393    task_progress: Arc<TaskProgress>,
394    max_io_retry_times: usize,
395}
396
397impl ConcatSstableIterator {
398    /// Caller should make sure that `tables` are non-overlapping,
399    /// arranged in ascending order when it serves as a forward iterator,
400    /// and arranged in descending order when it serves as a backward iterator.
401    pub fn new(
402        existing_table_ids: Vec<StateTableId>,
403        sst_infos: Vec<SstableInfo>,
404        key_range: KeyRange,
405        sstable_store: SstableStoreRef,
406        task_progress: Arc<TaskProgress>,
407        max_io_retry_times: usize,
408    ) -> Self {
409        Self {
410            key_range,
411            sstable_iter: None,
412            cur_idx: 0,
413            sstables: sst_infos,
414            existing_table_ids: HashSet::from_iter(existing_table_ids),
415            sstable_store,
416            task_progress,
417            stats: StoreLocalStatistic::default(),
418            max_io_retry_times,
419        }
420    }
421
422    #[cfg(test)]
423    pub fn for_test(
424        existing_table_ids: Vec<impl Into<StateTableId>>,
425        sst_infos: Vec<SstableInfo>,
426        key_range: KeyRange,
427        sstable_store: SstableStoreRef,
428    ) -> Self {
429        Self::new(
430            existing_table_ids.into_iter().map(Into::into).collect(),
431            sst_infos,
432            key_range,
433            sstable_store,
434            Arc::new(TaskProgress::default()),
435            0,
436        )
437    }
438
439    /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
440    async fn seek_idx(
441        &mut self,
442        idx: usize,
443        seek_key: Option<FullKey<&[u8]>>,
444    ) -> HummockResult<()> {
445        self.sstable_iter.take();
446        let mut seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty())
447        {
448            (Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) {
449                Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)),
450                Ordering::Greater => Some(seek_key),
451            },
452            (Some(seek_key), true) => Some(seek_key),
453            (None, true) => None,
454            (None, false) => Some(FullKey::decode(&self.key_range.left)),
455        };
456
457        self.cur_idx = idx;
458        while self.cur_idx < self.sstables.len() {
459            let table_info = &self.sstables[self.cur_idx];
460            let mut found = table_info
461                .table_ids
462                .iter()
463                .any(|table_id| self.existing_table_ids.contains(table_id));
464            if !found {
465                self.cur_idx += 1;
466                seek_key = None;
467                continue;
468            }
469            let sstable = self
470                .sstable_store
471                .sstable(table_info, &mut self.stats)
472                .instrument_await("stream_iter_sstable".verbose())
473                .await?;
474
475            let filter_key_range = match seek_key {
476                Some(seek_key) => {
477                    KeyRange::new(seek_key.encode().into(), self.key_range.right.clone())
478                }
479                None => self.key_range.clone(),
480            };
481
482            let block_metas_range = filter_block_metas(
483                &sstable.meta.block_metas,
484                &self.existing_table_ids,
485                filter_key_range,
486            );
487
488            if block_metas_range.is_empty() {
489                found = false;
490            } else {
491                self.task_progress.inc_num_pending_read_io();
492                let mut sstable_iter = SstableStreamIterator::new(
493                    sstable,
494                    block_metas_range,
495                    table_info.clone(),
496                    &self.stats,
497                    self.task_progress.clone(),
498                    self.sstable_store.clone(),
499                    self.max_io_retry_times,
500                );
501                sstable_iter.seek(seek_key).await?;
502
503                if sstable_iter.is_valid() {
504                    self.sstable_iter = Some(sstable_iter);
505                } else {
506                    found = false;
507                }
508            }
509
510            if found {
511                return Ok(());
512            } else {
513                self.cur_idx += 1;
514                seek_key = None;
515            }
516        }
517        Ok(())
518    }
519}
520
521impl HummockIterator for ConcatSstableIterator {
522    type Direction = Forward;
523
524    async fn next(&mut self) -> HummockResult<()> {
525        let sstable_iter = self.sstable_iter.as_mut().expect("no table iter");
526
527        // Does just calling `next()` suffice?
528        sstable_iter.next().await?;
529        if sstable_iter.is_valid() {
530            Ok(())
531        } else {
532            // No, seek to next table.
533            self.seek_idx(self.cur_idx + 1, None).await?;
534            Ok(())
535        }
536    }
537
538    fn key(&self) -> FullKey<&[u8]> {
539        self.sstable_iter.as_ref().expect("no table iter").key()
540    }
541
542    fn value(&self) -> HummockValue<&[u8]> {
543        self.sstable_iter.as_ref().expect("no table iter").value()
544    }
545
546    fn is_valid(&self) -> bool {
547        self.sstable_iter.as_ref().is_some_and(|i| i.is_valid())
548    }
549
550    async fn rewind(&mut self) -> HummockResult<()> {
551        self.seek_idx(0, None).await
552    }
553
554    /// Resets the iterator and seeks to the first position where the stored key >= `key`.
555    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
556        let seek_key = if self.key_range.left.is_empty() {
557            key
558        } else {
559            match key.cmp(&FullKey::decode(&self.key_range.left)) {
560                Ordering::Less | Ordering::Equal => FullKey::decode(&self.key_range.left),
561                Ordering::Greater => key,
562            }
563        };
564        let table_idx = self.sstables.partition_point(|table| {
565            // We use the maximum key of an SST for the search. That way, we guarantee that the
566            // resulting SST contains either that key or the next-larger KV-pair. Subsequently,
567            // we avoid calling `seek_idx()` twice if the determined SST does not contain `key`.
568
569            // Note that we need to use `<` instead of `<=` to ensure that all keys in an SST
570            // (including its max. key) produce the same search result.
571            let max_sst_key = &table.key_range.right;
572            FullKey::decode(max_sst_key).cmp(&seek_key) == Ordering::Less
573        });
574
575        self.seek_idx(table_idx, Some(key)).await
576    }
577
578    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
579        stats.add(&self.stats)
580    }
581
582    fn value_meta(&self) -> ValueMeta {
583        let iter = self.sstable_iter.as_ref().expect("no table iter");
584        // sstable_iter's block_idx must have advanced at least one.
585        // See SstableStreamIterator::next_block.
586        assert!(iter.block_idx >= 1);
587        // block_idx is relative to block_metas_range.start, so we need to add it back
588        let absolute_block_idx = iter.block_metas_range.start + iter.block_idx - 1;
589        ValueMeta {
590            object_id: Some(iter.sstable_info.object_id),
591            block_id: Some(absolute_block_idx as u64),
592        }
593    }
594}
595
596pub struct MonitoredCompactorIterator<I> {
597    inner: I,
598    task_progress: Arc<TaskProgress>,
599
600    processed_key_num: usize,
601}
602
603impl<I: HummockIterator<Direction = Forward>> MonitoredCompactorIterator<I> {
604    pub fn new(inner: I, task_progress: Arc<TaskProgress>) -> Self {
605        Self {
606            inner,
607            task_progress,
608            processed_key_num: 0,
609        }
610    }
611}
612
613impl<I: HummockIterator<Direction = Forward>> HummockIterator for MonitoredCompactorIterator<I> {
614    type Direction = Forward;
615
616    async fn next(&mut self) -> HummockResult<()> {
617        self.inner.next().await?;
618        self.processed_key_num += 1;
619
620        if self.processed_key_num.is_multiple_of(PROGRESS_KEY_INTERVAL) {
621            self.task_progress
622                .inc_progress_key(PROGRESS_KEY_INTERVAL as _);
623        }
624
625        Ok(())
626    }
627
628    fn key(&self) -> FullKey<&[u8]> {
629        self.inner.key()
630    }
631
632    fn value(&self) -> HummockValue<&[u8]> {
633        self.inner.value()
634    }
635
636    fn is_valid(&self) -> bool {
637        self.inner.is_valid()
638    }
639
640    async fn rewind(&mut self) -> HummockResult<()> {
641        self.processed_key_num = 0;
642        self.inner.rewind().await?;
643        Ok(())
644    }
645
646    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
647        self.processed_key_num = 0;
648        self.inner.seek(key).await?;
649        Ok(())
650    }
651
652    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
653        self.inner.collect_local_statistic(stats)
654    }
655
656    fn value_meta(&self) -> ValueMeta {
657        self.inner.value_meta()
658    }
659}
660
661pub(crate) fn filter_block_metas(
662    block_metas: &[BlockMeta],
663    existing_table_ids: &HashSet<TableId>,
664    key_range: KeyRange,
665) -> Range<usize> {
666    if block_metas.is_empty() {
667        return 0..0;
668    }
669
670    let mut start_index = if key_range.left.is_empty() {
671        0
672    } else {
673        // start_index points to the greatest block whose smallest_key <= seek_key.
674        block_metas
675            .partition_point(|block| {
676                KeyComparator::compare_encoded_full_key(&key_range.left, &block.smallest_key)
677                    != Ordering::Less
678            })
679            .saturating_sub(1)
680    };
681
682    let mut end_index = if key_range.right.is_empty() {
683        block_metas.len()
684    } else {
685        let ret = block_metas.partition_point(|block| {
686            KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right)
687                != Ordering::Greater
688        });
689
690        if ret == 0 {
691            // not found
692            return 0..0;
693        }
694
695        ret
696    }
697    .saturating_sub(1);
698
699    // skip blocks that are not in existing_table_ids
700    while start_index <= end_index {
701        let start_block_table_id = block_metas[start_index].table_id();
702        if existing_table_ids.contains(&start_block_table_id) {
703            break;
704        }
705
706        // skip this table_id
707        let old_start_index = start_index;
708        let block_metas_to_search = &block_metas[start_index..=end_index];
709
710        start_index += block_metas_to_search
711            .partition_point(|block_meta| block_meta.table_id() == start_block_table_id);
712
713        if old_start_index == start_index {
714            // no more blocks with the same table_id
715            break;
716        }
717    }
718
719    while start_index <= end_index {
720        let end_block_table_id = block_metas[end_index].table_id();
721        if existing_table_ids.contains(&end_block_table_id) {
722            break;
723        }
724
725        let old_end_index = end_index;
726        let block_metas_to_search = &block_metas[start_index..=end_index];
727
728        end_index = start_index
729            + block_metas_to_search
730                .partition_point(|block_meta| block_meta.table_id() < end_block_table_id)
731                .saturating_sub(1);
732
733        if end_index == old_end_index {
734            // no more blocks with the same table_id
735            break;
736        }
737    }
738
739    if start_index > end_index {
740        return 0..0;
741    }
742
743    start_index..(end_index + 1)
744}
745
746#[cfg(test)]
747mod tests {
748    use std::cmp::Ordering;
749    use std::collections::HashSet;
750
751    use risingwave_common::catalog::TableId;
752    use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, next_full_key, prev_full_key};
753    use risingwave_hummock_sdk::key_range::KeyRange;
754    use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
755
756    use crate::hummock::BlockMeta;
757    use crate::hummock::compactor::ConcatSstableIterator;
758    use crate::hummock::iterator::test_utils::mock_sstable_store;
759    use crate::hummock::iterator::{HummockIterator, MergeIterator};
760    use crate::hummock::test_utils::{
761        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_info, test_key_of,
762        test_value_of,
763    };
764    use crate::hummock::value::HummockValue;
765
766    #[tokio::test]
767    async fn test_concat_iterator() {
768        let sstable_store = mock_sstable_store().await;
769        let mut table_infos = vec![];
770        for object_id in 0..3 {
771            let start_index = object_id * TEST_KEYS_COUNT;
772            let end_index = (object_id + 1) * TEST_KEYS_COUNT;
773            let table_info = gen_test_sstable_info(
774                default_builder_opt_for_test(),
775                object_id as u64,
776                (start_index..end_index)
777                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
778                sstable_store.clone(),
779            )
780            .await;
781            table_infos.push(table_info);
782        }
783        let start_index = 5000;
784        let end_index = 25000;
785
786        let kr = KeyRange::new(
787            test_key_of(start_index).encode().into(),
788            test_key_of(end_index).encode().into(),
789        );
790        let mut iter = ConcatSstableIterator::for_test(
791            vec![0],
792            table_infos.clone(),
793            kr.clone(),
794            sstable_store.clone(),
795        );
796        iter.seek(FullKey::decode(&kr.left)).await.unwrap();
797
798        for idx in start_index..end_index {
799            let key = iter.key();
800            let val = iter.value();
801            assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
802            assert_eq!(
803                val.into_user_value().unwrap(),
804                test_value_of(idx).as_slice()
805            );
806            iter.next().await.unwrap();
807        }
808
809        // seek non-overlap range
810        let kr = KeyRange::new(
811            test_key_of(30000).encode().into(),
812            test_key_of(40000).encode().into(),
813        );
814        let mut iter = ConcatSstableIterator::for_test(
815            vec![0],
816            table_infos.clone(),
817            kr.clone(),
818            sstable_store.clone(),
819        );
820        iter.seek(FullKey::decode(&kr.left)).await.unwrap();
821        assert!(!iter.is_valid());
822        let kr = KeyRange::new(
823            test_key_of(start_index).encode().into(),
824            test_key_of(40000).encode().into(),
825        );
826        let mut iter = ConcatSstableIterator::for_test(
827            vec![0],
828            table_infos.clone(),
829            kr.clone(),
830            sstable_store.clone(),
831        );
832        iter.seek(FullKey::decode(&kr.left)).await.unwrap();
833        for idx in start_index..30000 {
834            let key = iter.key();
835            let val = iter.value();
836            assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
837            assert_eq!(
838                val.into_user_value().unwrap(),
839                test_value_of(idx).as_slice()
840            );
841            iter.next().await.unwrap();
842        }
843        assert!(!iter.is_valid());
844
845        // Test seek. Result is dominated by given seek key rather than key range.
846        let kr = KeyRange::new(
847            test_key_of(0).encode().into(),
848            test_key_of(40000).encode().into(),
849        );
850        let mut iter = ConcatSstableIterator::for_test(
851            vec![0],
852            table_infos.clone(),
853            kr.clone(),
854            sstable_store.clone(),
855        );
856        iter.seek(test_key_of(10000).to_ref()).await.unwrap();
857        assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10000).to_ref());
858        iter.seek(test_key_of(10001).to_ref()).await.unwrap();
859        assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10001).to_ref());
860        iter.seek(test_key_of(9999).to_ref()).await.unwrap();
861        assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(9999).to_ref());
862        iter.seek(test_key_of(1).to_ref()).await.unwrap();
863        assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(1).to_ref());
864        iter.seek(test_key_of(29999).to_ref()).await.unwrap();
865        assert!(iter.is_valid() && iter.cur_idx == 2 && iter.key() == test_key_of(29999).to_ref());
866        iter.seek(test_key_of(30000).to_ref()).await.unwrap();
867        assert!(!iter.is_valid());
868
869        // Test seek. Result is dominated by key range rather than given seek key.
870        let kr = KeyRange::new(
871            test_key_of(6000).encode().into(),
872            test_key_of(16000).encode().into(),
873        );
874        let mut iter = ConcatSstableIterator::for_test(
875            vec![0],
876            table_infos.clone(),
877            kr.clone(),
878            sstable_store.clone(),
879        );
880        iter.seek(test_key_of(17000).to_ref()).await.unwrap();
881        assert!(!iter.is_valid());
882        iter.seek(test_key_of(1).to_ref()).await.unwrap();
883        assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == FullKey::decode(&kr.left));
884    }
885
886    #[tokio::test]
887    async fn test_concat_iterator_seek_idx() {
888        let sstable_store = mock_sstable_store().await;
889        let mut table_infos = vec![];
890        for object_id in 0..3 {
891            let start_index = object_id * TEST_KEYS_COUNT + TEST_KEYS_COUNT / 2;
892            let end_index = (object_id + 1) * TEST_KEYS_COUNT;
893            let table_info = gen_test_sstable_info(
894                default_builder_opt_for_test(),
895                object_id as u64,
896                (start_index..end_index)
897                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
898                sstable_store.clone(),
899            )
900            .await;
901            table_infos.push(table_info);
902        }
903
904        // Test seek_idx. Result is dominated by given seek key rather than key range.
905        let kr = KeyRange::new(
906            test_key_of(0).encode().into(),
907            test_key_of(40000).encode().into(),
908        );
909        let mut iter = ConcatSstableIterator::for_test(
910            vec![0],
911            table_infos.clone(),
912            kr.clone(),
913            sstable_store.clone(),
914        );
915        let sst = sstable_store
916            .sstable(&iter.sstables[0], &mut iter.stats)
917            .await
918            .unwrap();
919        let block_metas = &sst.meta.block_metas;
920        let block_1_smallest_key = block_metas[1].smallest_key.clone();
921        let block_2_smallest_key = block_metas[2].smallest_key.clone();
922        // Use block_1_smallest_key as seek key and result in the first KV of block 1.
923        let seek_key = block_1_smallest_key.clone();
924        iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
925            .await
926            .unwrap();
927        assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
928        // Use prev_full_key(block_1_smallest_key) as seek key and result in the first KV of block
929        // 1.
930        let seek_key = prev_full_key(block_1_smallest_key.as_slice());
931        iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
932            .await
933            .unwrap();
934        assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
935        iter.next().await.unwrap();
936        let block_1_second_key = iter.key().to_vec();
937        // Use a big enough seek key and result in invalid iterator.
938        let seek_key = test_key_of(30001);
939        iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref()))
940            .await
941            .unwrap();
942        assert!(!iter.is_valid());
943
944        // Test seek_idx. Result is dominated by key range rather than given seek key.
945        let kr = KeyRange::new(
946            next_full_key(&block_1_smallest_key).into(),
947            prev_full_key(&block_2_smallest_key).into(),
948        );
949        let mut iter = ConcatSstableIterator::for_test(
950            vec![0],
951            table_infos.clone(),
952            kr.clone(),
953            sstable_store.clone(),
954        );
955        // Use block_2_smallest_key as seek key and result in invalid iterator.
956        let seek_key = FullKey::decode(&block_2_smallest_key);
957        assert!(seek_key.cmp(&FullKey::decode(&kr.right)) == Ordering::Greater);
958        iter.seek_idx(0, Some(seek_key)).await.unwrap();
959        assert!(!iter.is_valid());
960        // Use a small enough seek key and result in the second KV of block 1.
961        let seek_key = test_key_of(0).encode();
962        iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
963            .await
964            .unwrap();
965        assert!(iter.is_valid());
966        assert_eq!(iter.key(), block_1_second_key.to_ref());
967
968        // Use None seek key and result in the second KV of block 1.
969        iter.seek_idx(0, None).await.unwrap();
970        assert!(iter.is_valid());
971        assert_eq!(iter.key(), block_1_second_key.to_ref());
972    }
973
974    #[tokio::test]
975    async fn test_filter_block_metas() {
976        use crate::hummock::compactor::iterator::filter_block_metas;
977
978        {
979            let block_metas = Vec::default();
980
981            let ret = filter_block_metas(&block_metas, &HashSet::default(), KeyRange::default());
982
983            assert!(ret.is_empty());
984        }
985
986        {
987            let block_metas = vec![
988                BlockMeta {
989                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
990                    ..Default::default()
991                },
992                BlockMeta {
993                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
994                    ..Default::default()
995                },
996                BlockMeta {
997                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
998                    ..Default::default()
999                },
1000            ];
1001
1002            let ret = filter_block_metas(
1003                &block_metas,
1004                &HashSet::from_iter(vec![1_u32.into(), 2.into(), 3.into()].into_iter()),
1005                KeyRange::default(),
1006            );
1007            let ret = &block_metas[ret];
1008
1009            assert_eq!(3, ret.len());
1010            assert_eq!(
1011                1,
1012                FullKey::decode(&ret[0].smallest_key)
1013                    .user_key
1014                    .table_id
1015                    .as_raw_id()
1016            );
1017            assert_eq!(
1018                3,
1019                FullKey::decode(&ret[2].smallest_key)
1020                    .user_key
1021                    .table_id
1022                    .as_raw_id()
1023            );
1024        }
1025
1026        {
1027            let block_metas = vec![
1028                BlockMeta {
1029                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1030                    ..Default::default()
1031                },
1032                BlockMeta {
1033                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1034                    ..Default::default()
1035                },
1036                BlockMeta {
1037                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1038                    ..Default::default()
1039                },
1040            ];
1041
1042            let ret = filter_block_metas(
1043                &block_metas,
1044                &HashSet::from_iter(vec![2_u32.into(), 3.into()].into_iter()),
1045                KeyRange::default(),
1046            );
1047            let ret = &block_metas[ret];
1048
1049            assert_eq!(2, ret.len());
1050            assert_eq!(
1051                2,
1052                FullKey::decode(&ret[0].smallest_key)
1053                    .user_key
1054                    .table_id
1055                    .as_raw_id()
1056            );
1057            assert_eq!(
1058                3,
1059                FullKey::decode(&ret[1].smallest_key)
1060                    .user_key
1061                    .table_id
1062                    .as_raw_id()
1063            );
1064        }
1065
1066        {
1067            let block_metas = vec![
1068                BlockMeta {
1069                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1070                    ..Default::default()
1071                },
1072                BlockMeta {
1073                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1074                    ..Default::default()
1075                },
1076                BlockMeta {
1077                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1078                    ..Default::default()
1079                },
1080            ];
1081
1082            let ret = filter_block_metas(
1083                &block_metas,
1084                &HashSet::from_iter(vec![1_u32.into(), 2_u32.into()].into_iter()),
1085                KeyRange::default(),
1086            );
1087            let ret = &block_metas[ret];
1088
1089            assert_eq!(2, ret.len());
1090            assert_eq!(
1091                1,
1092                FullKey::decode(&ret[0].smallest_key)
1093                    .user_key
1094                    .table_id
1095                    .as_raw_id()
1096            );
1097            assert_eq!(
1098                2,
1099                FullKey::decode(&ret[1].smallest_key)
1100                    .user_key
1101                    .table_id
1102                    .as_raw_id()
1103            );
1104        }
1105
1106        {
1107            let block_metas = vec![
1108                BlockMeta {
1109                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1110                    ..Default::default()
1111                },
1112                BlockMeta {
1113                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1114                    ..Default::default()
1115                },
1116                BlockMeta {
1117                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1118                    ..Default::default()
1119                },
1120            ];
1121            let ret = filter_block_metas(
1122                &block_metas,
1123                &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1124                KeyRange::default(),
1125            );
1126            let ret = &block_metas[ret];
1127
1128            assert_eq!(1, ret.len());
1129            assert_eq!(
1130                2,
1131                FullKey::decode(&ret[0].smallest_key)
1132                    .user_key
1133                    .table_id
1134                    .as_raw_id()
1135            );
1136        }
1137
1138        {
1139            let block_metas = vec![
1140                BlockMeta {
1141                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1142                    ..Default::default()
1143                },
1144                BlockMeta {
1145                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1146                    ..Default::default()
1147                },
1148                BlockMeta {
1149                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1150                    ..Default::default()
1151                },
1152                BlockMeta {
1153                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1154                    ..Default::default()
1155                },
1156                BlockMeta {
1157                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1158                    ..Default::default()
1159                },
1160            ];
1161            let ret = filter_block_metas(
1162                &block_metas,
1163                &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1164                KeyRange::default(),
1165            );
1166            let ret = &block_metas[ret];
1167
1168            assert_eq!(1, ret.len());
1169            assert_eq!(
1170                2,
1171                FullKey::decode(&ret[0].smallest_key)
1172                    .user_key
1173                    .table_id
1174                    .as_raw_id()
1175            );
1176        }
1177
1178        {
1179            let block_metas = vec![
1180                BlockMeta {
1181                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1182                    ..Default::default()
1183                },
1184                BlockMeta {
1185                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1186                    ..Default::default()
1187                },
1188                BlockMeta {
1189                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1190                    ..Default::default()
1191                },
1192                BlockMeta {
1193                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1194                    ..Default::default()
1195                },
1196                BlockMeta {
1197                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1198                    ..Default::default()
1199                },
1200            ];
1201
1202            let ret = filter_block_metas(
1203                &block_metas,
1204                &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1205                KeyRange::default(),
1206            );
1207            let ret = &block_metas[ret];
1208
1209            assert_eq!(1, ret.len());
1210            assert_eq!(
1211                2,
1212                FullKey::decode(&ret[0].smallest_key)
1213                    .user_key
1214                    .table_id
1215                    .as_raw_id()
1216            );
1217        }
1218    }
1219
1220    #[tokio::test]
1221    async fn test_iterator_same_obj() {
1222        let sstable_store = mock_sstable_store().await;
1223
1224        let table_info = gen_test_sstable_info(
1225            default_builder_opt_for_test(),
1226            1_u64,
1227            (1..10000).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
1228            sstable_store.clone(),
1229        )
1230        .await;
1231
1232        let split_key = test_key_of(5000).encode();
1233        let sst_1: SstableInfo = SstableInfoInner {
1234            key_range: KeyRange {
1235                left: table_info.key_range.left.clone(),
1236                right: split_key.clone().into(),
1237                right_exclusive: true,
1238            },
1239            ..table_info.get_inner()
1240        }
1241        .into();
1242
1243        let total_key_count = sst_1.total_key_count;
1244        let sst_2: SstableInfo = SstableInfoInner {
1245            sst_id: sst_1.sst_id + 1,
1246            key_range: KeyRange {
1247                left: split_key.clone().into(),
1248                right: table_info.key_range.right.clone(),
1249                right_exclusive: table_info.key_range.right_exclusive,
1250            },
1251            ..table_info.get_inner()
1252        }
1253        .into();
1254
1255        {
1256            // test concate
1257            let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1258
1259            let mut iter = ConcatSstableIterator::for_test(
1260                vec![0],
1261                vec![sst_1.clone(), sst_2.clone()],
1262                KeyRange::default(),
1263                sstable_store.clone(),
1264            );
1265
1266            iter.rewind().await.unwrap();
1267
1268            let mut key_count = 0;
1269            while iter.is_valid() {
1270                let is_new_user_key = full_key_tracker.observe(iter.key());
1271                assert!(is_new_user_key);
1272                key_count += 1;
1273                iter.next().await.unwrap();
1274            }
1275
1276            assert_eq!(total_key_count, key_count);
1277        }
1278
1279        {
1280            let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1281            let concat_1 = ConcatSstableIterator::for_test(
1282                vec![0],
1283                vec![sst_1.clone()],
1284                KeyRange::default(),
1285                sstable_store.clone(),
1286            );
1287
1288            let concat_2 = ConcatSstableIterator::for_test(
1289                vec![0],
1290                vec![sst_2.clone()],
1291                KeyRange::default(),
1292                sstable_store.clone(),
1293            );
1294
1295            let mut key_count = 0;
1296            let mut iter = MergeIterator::for_compactor(vec![concat_1, concat_2]);
1297            iter.rewind().await.unwrap();
1298            while iter.is_valid() {
1299                full_key_tracker.observe(iter.key());
1300                key_count += 1;
1301                iter.next().await.unwrap();
1302            }
1303            assert_eq!(total_key_count, key_count);
1304        }
1305    }
1306}