risingwave_storage/hummock/compactor/
iterator.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::ops::Range;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use fail::fail_point;
24use risingwave_common::catalog::TableId;
25use risingwave_hummock_sdk::KeyComparator;
26use risingwave_hummock_sdk::compaction_group::StateTableId;
27use risingwave_hummock_sdk::key::FullKey;
28use risingwave_hummock_sdk::key_range::KeyRange;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30
31use crate::hummock::block_stream::BlockDataStream;
32use crate::hummock::compactor::task_progress::TaskProgress;
33use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
34use crate::hummock::sstable_store::SstableStoreRef;
35use crate::hummock::value::HummockValue;
36use crate::hummock::{BlockHolder, BlockIterator, BlockMeta, HummockResult, TableHolder};
37use crate::monitor::StoreLocalStatistic;
38
39const PROGRESS_KEY_INTERVAL: usize = 100;
40
41/// Iterates over the KV-pairs of an SST while downloading it.
42/// `SstableStreamIterator` encapsulates operations on `sstables`, constructing block streams and accessing the corresponding data via `block_metas`.
43///  Note that a `block_meta` does not necessarily correspond to the entire sstable, but rather to a subset, which is documented via the `block_idx`.
44pub struct SstableStreamIterator {
45    sstable_store: SstableStoreRef,
46    sstable: TableHolder,
47    /// The range of block metas to iterate over
48    block_metas_range: Range<usize>,
49    /// The downloading stream.
50    block_stream: Option<BlockDataStream>,
51
52    /// Iterates over the KV-pairs of the current block.
53    block_iter: Option<BlockIterator>,
54
55    /// Index of the current block within the range.
56    block_idx: usize,
57
58    /// Counts the time used for IO.
59    stats_ptr: Arc<AtomicU64>,
60
61    /// For key sanity check of divided SST and debugging
62    sstable_info: SstableInfo,
63
64    /// To Filter out the blocks
65    compact_table_ids: HashSet<StateTableId>,
66    task_progress: Arc<TaskProgress>,
67    io_retry_times: usize,
68    max_io_retry_times: usize,
69
70    // key range cache
71    key_range_left: FullKey<Vec<u8>>,
72    key_range_right: FullKey<Vec<u8>>,
73    key_range_right_exclusive: bool,
74}
75
76impl SstableStreamIterator {
77    // We have to handle two internal iterators.
78    //   `block_stream`: iterates over the blocks of the table.
79    //     `block_iter`: iterates over the KV-pairs of the current block.
80    // These iterators work in different ways.
81
82    // BlockIterator works as follows: After new(), we call seek(). That brings us
83    // to the first element. Calling next() then brings us to the second element and does not
84    // return anything.
85
86    // BlockStream follows a different approach. After new(), we do not seek, instead next()
87    // returns the first value.
88
89    /// Initialises a new [`SstableStreamIterator`] which iterates over the given [`BlockDataStream`].
90    /// The iterator reads at most `max_block_count` from the stream.
91    pub fn new(
92        sstable: TableHolder,
93        block_metas_range: Range<usize>,
94        sstable_info: SstableInfo,
95        stats: &StoreLocalStatistic,
96        task_progress: Arc<TaskProgress>,
97        sstable_store: SstableStoreRef,
98        max_io_retry_times: usize,
99        compact_table_ids: HashSet<StateTableId>,
100    ) -> Self {
101        // Further filter the block_metas_range with sstable_info.key_range
102        // This is necessary when the SST is split into multiple SstableInfo with different key ranges
103        let block_metas_range = {
104            let block_metas = &sstable.meta.block_metas[block_metas_range.clone()];
105            let inner_range = filter_block_metas(
106                block_metas,
107                &compact_table_ids,
108                sstable_info.key_range.clone(),
109            );
110            // Adjust the range to be relative to the original block_metas
111            (block_metas_range.start + inner_range.start)
112                ..(block_metas_range.start + inner_range.end)
113        };
114
115        let key_range_left = FullKey::decode(&sstable_info.key_range.left).to_vec();
116        let key_range_right = FullKey::decode(&sstable_info.key_range.right).to_vec();
117        let key_range_right_exclusive = sstable_info.key_range.right_exclusive;
118
119        Self {
120            block_stream: None,
121            block_iter: None,
122            sstable,
123            block_metas_range,
124            block_idx: 0,
125            stats_ptr: stats.remote_io_time.clone(),
126            compact_table_ids,
127            sstable_info,
128            sstable_store,
129            task_progress,
130            io_retry_times: 0,
131            max_io_retry_times,
132            key_range_left,
133            key_range_right,
134            key_range_right_exclusive,
135        }
136    }
137
138    /// Returns the block metas slice for this iterator.
139    #[inline]
140    fn block_metas(&self) -> &[BlockMeta] {
141        &self.sstable.meta.block_metas[self.block_metas_range.clone()]
142    }
143
144    /// Returns the number of blocks in this iterator.
145    #[inline]
146    fn block_count(&self) -> usize {
147        self.block_metas_range.len()
148    }
149
150    async fn create_stream(&mut self) -> HummockResult<()> {
151        let block_stream = self
152            .sstable_store
153            .get_stream_for_blocks(
154                self.sstable_info.object_id,
155                &self.block_metas()[self.block_idx..],
156            )
157            .instrument_await("stream_iter_get_stream".verbose())
158            .await?;
159        self.block_stream = Some(block_stream);
160        Ok(())
161    }
162
163    async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> {
164        while let Some(block_iter) = self.block_iter.as_mut() {
165            if self.compact_table_ids.contains(&block_iter.table_id()) {
166                return Ok(());
167            } else {
168                self.next_block().await?;
169            }
170        }
171        Ok(())
172    }
173
174    /// Initialises the iterator by moving it to the first KV-pair in the stream's first block where
175    /// key >= `seek_key`. If that block does not contain such a KV-pair, the iterator continues to
176    /// the first KV-pair of the next block. If `seek_key` is not given, the iterator will move to
177    /// the very first KV-pair of the stream's first block.
178    pub async fn seek(&mut self, seek_key: Option<FullKey<&[u8]>>) -> HummockResult<()> {
179        // Load first block.
180        self.next_block().await?;
181
182        // We assume that a block always contains at least one KV pair. Subsequently, if
183        // `next_block()` loads a new block (i.e., `block_iter` is not `None`), then `block_iter` is
184        // also valid and pointing on the block's first KV-pair.
185
186        let seek_key = if let Some(seek_key) = seek_key {
187            if seek_key.cmp(&self.key_range_left.to_ref()).is_lt() {
188                Some(self.key_range_left.to_ref())
189            } else {
190                Some(seek_key)
191            }
192        } else {
193            Some(self.key_range_left.to_ref())
194        };
195
196        if let (Some(block_iter), Some(seek_key)) = (self.block_iter.as_mut(), seek_key) {
197            block_iter.seek(seek_key);
198
199            if !block_iter.is_valid() {
200                // `seek_key` is larger than everything in the first block.
201                self.next_block().await?;
202            }
203        }
204
205        self.prune_from_valid_block_iter().await?;
206        Ok(())
207    }
208
209    /// Loads a new block, creates a new iterator for it, and stores that iterator in
210    /// `self.block_iter`. The created iterator points to the block's first KV-pair. If the end of
211    /// the stream is reached or `self.remaining_blocks` is zero, then the function sets
212    /// `self.block_iter` to `None`.
213    async fn next_block(&mut self) -> HummockResult<()> {
214        // Check if we want and if we can load the next block.
215        let now = Instant::now();
216        let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
217            let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
218            stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
219        });
220        if self.block_idx < self.block_count() {
221            loop {
222                let ret = match &mut self.block_stream {
223                    Some(block_stream) => block_stream.next_block().await,
224                    None => {
225                        self.create_stream().await?;
226                        continue;
227                    }
228                };
229                match ret {
230                    Ok(Some(block)) => {
231                        let mut block_iter =
232                            BlockIterator::new(BlockHolder::from_owned_block(block));
233                        block_iter.seek_to_first();
234                        self.block_idx += 1;
235                        self.block_iter = Some(block_iter);
236                        return Ok(());
237                    }
238                    Ok(None) => break,
239                    Err(e) => {
240                        if !e.is_object_error() || !self.need_recreate_io_stream() {
241                            return Err(e);
242                        }
243                        self.block_stream.take();
244                        self.io_retry_times += 1;
245                        fail_point!("create_stream_err");
246
247                        tracing::warn!(
248                            "retry create stream for sstable {} times, sstinfo={}",
249                            self.io_retry_times,
250                            self.sst_debug_info()
251                        );
252                    }
253                }
254            }
255        }
256        self.block_idx = self.block_count();
257        self.block_iter = None;
258
259        Ok(())
260    }
261
262    /// Moves to the next KV-pair in the table. Assumes that the current position is valid. Even if
263    /// the next position is invalid, the function returns `Ok(())`.
264    ///
265    /// Do not use `next()` to initialise the iterator (i.e. do not use it to find the first
266    /// KV-pair). Instead, use `seek()`. Afterwards, use `next()` to reach the second KV-pair and
267    /// onwards.
268    pub async fn next(&mut self) -> HummockResult<()> {
269        if !self.is_valid() {
270            return Ok(());
271        }
272
273        let block_iter = self.block_iter.as_mut().expect("no block iter");
274        block_iter.next();
275        if !block_iter.is_valid() {
276            self.next_block().await?;
277            self.prune_from_valid_block_iter().await?;
278        }
279
280        if !self.is_valid() {
281            return Ok(());
282        }
283
284        // Check if we need to skip the block.
285        let key = self
286            .block_iter
287            .as_ref()
288            .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
289            .key();
290
291        if self.exceed_key_range_right(key) {
292            self.block_iter = None;
293        }
294
295        Ok(())
296    }
297
298    pub fn key(&self) -> FullKey<&[u8]> {
299        let key = self
300            .block_iter
301            .as_ref()
302            .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
303            .key();
304
305        assert!(
306            !self.exceed_key_range_left(key),
307            "key {:?} key_range_left {:?}",
308            key,
309            self.key_range_left.to_ref()
310        );
311
312        assert!(
313            !self.exceed_key_range_right(key),
314            "key {:?} key_range_right {:?} key_range_right_exclusive {}",
315            key,
316            self.key_range_right.to_ref(),
317            self.key_range_right_exclusive
318        );
319
320        key
321    }
322
323    pub fn value(&self) -> HummockValue<&[u8]> {
324        let raw_value = self
325            .block_iter
326            .as_ref()
327            .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
328            .value();
329        HummockValue::from_slice(raw_value)
330            .unwrap_or_else(|_| panic!("decode error sstinfo={}", self.sst_debug_info()))
331    }
332
333    pub fn is_valid(&self) -> bool {
334        // True iff block_iter exists and is valid.
335        self.block_iter.as_ref().is_some_and(|i| i.is_valid())
336    }
337
338    fn sst_debug_info(&self) -> String {
339        format!(
340            "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
341            self.sstable_info.object_id,
342            self.sstable_info.sst_id,
343            self.sstable_info.meta_offset,
344            self.sstable_info.table_ids
345        )
346    }
347
348    fn need_recreate_io_stream(&self) -> bool {
349        self.io_retry_times < self.max_io_retry_times
350    }
351
352    fn exceed_key_range_left(&self, key: FullKey<&[u8]>) -> bool {
353        key.cmp(&self.key_range_left.to_ref()).is_lt()
354    }
355
356    fn exceed_key_range_right(&self, key: FullKey<&[u8]>) -> bool {
357        if self.key_range_right_exclusive {
358            key.cmp(&self.key_range_right.to_ref()).is_ge()
359        } else {
360            key.cmp(&self.key_range_right.to_ref()).is_gt()
361        }
362    }
363}
364
365impl Drop for SstableStreamIterator {
366    fn drop(&mut self) {
367        self.task_progress.dec_num_pending_read_io()
368    }
369}
370
371/// Iterates over the KV-pairs of a given list of SSTs. The key-ranges of these SSTs are assumed to
372/// be consecutive and non-overlapping.
373pub struct ConcatSstableIterator {
374    /// **CAUTION:** `key_range` is used for optimization. It doesn't guarantee value returned by
375    /// the iterator is in this range.
376    key_range: KeyRange,
377
378    /// The iterator of the current table.
379    sstable_iter: Option<SstableStreamIterator>,
380
381    /// Current table index.
382    cur_idx: usize,
383
384    /// All non-overlapping tables.
385    sstables: Vec<SstableInfo>,
386
387    compact_table_ids: HashSet<StateTableId>,
388
389    sstable_store: SstableStoreRef,
390
391    stats: StoreLocalStatistic,
392    task_progress: Arc<TaskProgress>,
393    max_io_retry_times: usize,
394}
395
396impl ConcatSstableIterator {
397    /// Caller should make sure that `tables` are non-overlapping,
398    /// arranged in ascending order when it serves as a forward iterator,
399    /// and arranged in descending order when it serves as a backward iterator.
400    pub fn new(
401        compact_table_ids: Vec<StateTableId>,
402        sst_infos: Vec<SstableInfo>,
403        key_range: KeyRange,
404        sstable_store: SstableStoreRef,
405        task_progress: Arc<TaskProgress>,
406        max_io_retry_times: usize,
407    ) -> Self {
408        Self {
409            key_range,
410            sstable_iter: None,
411            cur_idx: 0,
412            sstables: sst_infos,
413            compact_table_ids: HashSet::from_iter(compact_table_ids),
414            sstable_store,
415            task_progress,
416            stats: StoreLocalStatistic::default(),
417            max_io_retry_times,
418        }
419    }
420
421    #[cfg(test)]
422    pub fn for_test(
423        existing_table_ids: Vec<impl Into<StateTableId>>,
424        sst_infos: Vec<SstableInfo>,
425        key_range: KeyRange,
426        sstable_store: SstableStoreRef,
427    ) -> Self {
428        Self::new(
429            existing_table_ids.into_iter().map(Into::into).collect(),
430            sst_infos,
431            key_range,
432            sstable_store,
433            Arc::new(TaskProgress::default()),
434            0,
435        )
436    }
437
438    /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
439    async fn seek_idx(
440        &mut self,
441        idx: usize,
442        seek_key: Option<FullKey<&[u8]>>,
443    ) -> HummockResult<()> {
444        self.sstable_iter.take();
445        let mut seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty())
446        {
447            (Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) {
448                Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)),
449                Ordering::Greater => Some(seek_key),
450            },
451            (Some(seek_key), true) => Some(seek_key),
452            (None, true) => None,
453            (None, false) => Some(FullKey::decode(&self.key_range.left)),
454        };
455
456        self.cur_idx = idx;
457        while self.cur_idx < self.sstables.len() {
458            let table_info = &self.sstables[self.cur_idx];
459            let mut found = table_info
460                .table_ids
461                .iter()
462                .any(|table_id| self.compact_table_ids.contains(table_id));
463            if !found {
464                self.cur_idx += 1;
465                seek_key = None;
466                continue;
467            }
468            let sstable = self
469                .sstable_store
470                .sstable(table_info, &mut self.stats)
471                .instrument_await("stream_iter_sstable".verbose())
472                .await?;
473
474            let filter_key_range = match seek_key {
475                Some(seek_key) => {
476                    KeyRange::new(seek_key.encode().into(), self.key_range.right.clone())
477                }
478                None => self.key_range.clone(),
479            };
480
481            let compact_sstable_table_ids = {
482                // unite table_ids between sstable and compact_table_ids
483                let sstable_table_ids = HashSet::from_iter(table_info.table_ids.iter().cloned());
484                sstable_table_ids
485                    .intersection(&self.compact_table_ids)
486                    .cloned()
487                    .collect()
488            };
489
490            let block_metas_range = filter_block_metas(
491                &sstable.meta.block_metas,
492                &compact_sstable_table_ids,
493                filter_key_range,
494            );
495
496            if block_metas_range.is_empty() {
497                found = false;
498            } else {
499                self.task_progress.inc_num_pending_read_io();
500                let mut sstable_iter = SstableStreamIterator::new(
501                    sstable,
502                    block_metas_range,
503                    table_info.clone(),
504                    &self.stats,
505                    self.task_progress.clone(),
506                    self.sstable_store.clone(),
507                    self.max_io_retry_times,
508                    compact_sstable_table_ids,
509                );
510                sstable_iter.seek(seek_key).await?;
511
512                if sstable_iter.is_valid() {
513                    self.sstable_iter = Some(sstable_iter);
514                } else {
515                    found = false;
516                }
517            }
518
519            if found {
520                return Ok(());
521            } else {
522                self.cur_idx += 1;
523                seek_key = None;
524            }
525        }
526        Ok(())
527    }
528}
529
530impl HummockIterator for ConcatSstableIterator {
531    type Direction = Forward;
532
533    async fn next(&mut self) -> HummockResult<()> {
534        let sstable_iter = self.sstable_iter.as_mut().expect("no table iter");
535
536        // Does just calling `next()` suffice?
537        sstable_iter.next().await?;
538        if sstable_iter.is_valid() {
539            Ok(())
540        } else {
541            // No, seek to next table.
542            self.seek_idx(self.cur_idx + 1, None).await?;
543            Ok(())
544        }
545    }
546
547    fn key(&self) -> FullKey<&[u8]> {
548        self.sstable_iter.as_ref().expect("no table iter").key()
549    }
550
551    fn value(&self) -> HummockValue<&[u8]> {
552        self.sstable_iter.as_ref().expect("no table iter").value()
553    }
554
555    fn is_valid(&self) -> bool {
556        self.sstable_iter.as_ref().is_some_and(|i| i.is_valid())
557    }
558
559    async fn rewind(&mut self) -> HummockResult<()> {
560        self.seek_idx(0, None).await
561    }
562
563    /// Resets the iterator and seeks to the first position where the stored key >= `key`.
564    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
565        let seek_key = if self.key_range.left.is_empty() {
566            key
567        } else {
568            match key.cmp(&FullKey::decode(&self.key_range.left)) {
569                Ordering::Less | Ordering::Equal => FullKey::decode(&self.key_range.left),
570                Ordering::Greater => key,
571            }
572        };
573        let table_idx = self.sstables.partition_point(|table| {
574            // We use the maximum key of an SST for the search. That way, we guarantee that the
575            // resulting SST contains either that key or the next-larger KV-pair. Subsequently,
576            // we avoid calling `seek_idx()` twice if the determined SST does not contain `key`.
577
578            // Note that we need to use `<` instead of `<=` to ensure that all keys in an SST
579            // (including its max. key) produce the same search result.
580            let max_sst_key = &table.key_range.right;
581            FullKey::decode(max_sst_key).cmp(&seek_key) == Ordering::Less
582        });
583
584        self.seek_idx(table_idx, Some(key)).await
585    }
586
587    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
588        stats.add(&self.stats)
589    }
590
591    fn value_meta(&self) -> ValueMeta {
592        let iter = self.sstable_iter.as_ref().expect("no table iter");
593        // sstable_iter's block_idx must have advanced at least one.
594        // See SstableStreamIterator::next_block.
595        assert!(iter.block_idx >= 1);
596        // block_idx is relative to block_metas_range.start, so we need to add it back
597        let absolute_block_idx = iter.block_metas_range.start + iter.block_idx - 1;
598        ValueMeta {
599            object_id: Some(iter.sstable_info.object_id),
600            block_id: Some(absolute_block_idx as u64),
601        }
602    }
603}
604
605pub struct MonitoredCompactorIterator<I> {
606    inner: I,
607    task_progress: Arc<TaskProgress>,
608
609    processed_key_num: usize,
610}
611
612impl<I: HummockIterator<Direction = Forward>> MonitoredCompactorIterator<I> {
613    pub fn new(inner: I, task_progress: Arc<TaskProgress>) -> Self {
614        Self {
615            inner,
616            task_progress,
617            processed_key_num: 0,
618        }
619    }
620}
621
622impl<I: HummockIterator<Direction = Forward>> HummockIterator for MonitoredCompactorIterator<I> {
623    type Direction = Forward;
624
625    async fn next(&mut self) -> HummockResult<()> {
626        self.inner.next().await?;
627        self.processed_key_num += 1;
628
629        if self.processed_key_num.is_multiple_of(PROGRESS_KEY_INTERVAL) {
630            self.task_progress
631                .inc_progress_key(PROGRESS_KEY_INTERVAL as _);
632        }
633
634        Ok(())
635    }
636
637    fn key(&self) -> FullKey<&[u8]> {
638        self.inner.key()
639    }
640
641    fn value(&self) -> HummockValue<&[u8]> {
642        self.inner.value()
643    }
644
645    fn is_valid(&self) -> bool {
646        self.inner.is_valid()
647    }
648
649    async fn rewind(&mut self) -> HummockResult<()> {
650        self.processed_key_num = 0;
651        self.inner.rewind().await?;
652        Ok(())
653    }
654
655    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
656        self.processed_key_num = 0;
657        self.inner.seek(key).await?;
658        Ok(())
659    }
660
661    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
662        self.inner.collect_local_statistic(stats)
663    }
664
665    fn value_meta(&self) -> ValueMeta {
666        self.inner.value_meta()
667    }
668}
669
670pub(crate) fn filter_block_metas(
671    block_metas: &[BlockMeta],
672    compact_table_ids: &HashSet<TableId>,
673    key_range: KeyRange,
674) -> Range<usize> {
675    if block_metas.is_empty() {
676        return 0..0;
677    }
678
679    let mut start_index = if key_range.left.is_empty() {
680        0
681    } else {
682        // start_index points to the greatest block whose smallest_key <= seek_key.
683        block_metas
684            .partition_point(|block| {
685                KeyComparator::compare_encoded_full_key(&key_range.left, &block.smallest_key)
686                    != Ordering::Less
687            })
688            .saturating_sub(1)
689    };
690
691    let mut end_index = if key_range.right.is_empty() {
692        block_metas.len()
693    } else {
694        let ret = block_metas.partition_point(|block| {
695            KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right)
696                != Ordering::Greater
697        });
698
699        if ret == 0 {
700            // not found
701            return 0..0;
702        }
703
704        ret
705    }
706    .saturating_sub(1);
707
708    // skip blocks that are not in existing_table_ids
709    while start_index <= end_index {
710        let start_block_table_id = block_metas[start_index].table_id();
711        if compact_table_ids.contains(&start_block_table_id) {
712            break;
713        }
714
715        // skip this table_id
716        let old_start_index = start_index;
717        let block_metas_to_search = &block_metas[start_index..=end_index];
718
719        start_index += block_metas_to_search
720            .partition_point(|block_meta| block_meta.table_id() == start_block_table_id);
721
722        if old_start_index == start_index {
723            // no more blocks with the same table_id
724            break;
725        }
726    }
727
728    while start_index <= end_index {
729        let end_block_table_id = block_metas[end_index].table_id();
730        if compact_table_ids.contains(&end_block_table_id) {
731            break;
732        }
733
734        let old_end_index = end_index;
735        let block_metas_to_search = &block_metas[start_index..=end_index];
736
737        end_index = start_index
738            + block_metas_to_search
739                .partition_point(|block_meta| block_meta.table_id() < end_block_table_id)
740                .saturating_sub(1);
741
742        if end_index == old_end_index {
743            // no more blocks with the same table_id
744            break;
745        }
746    }
747
748    if start_index > end_index {
749        return 0..0;
750    }
751
752    start_index..(end_index + 1)
753}
754
755#[cfg(test)]
756mod tests {
757    use std::cmp::Ordering;
758    use std::collections::HashSet;
759
760    use risingwave_common::catalog::TableId;
761    use risingwave_common::util::epoch::test_epoch;
762    use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, next_full_key, prev_full_key};
763    use risingwave_hummock_sdk::key_range::KeyRange;
764    use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
765
766    use crate::hummock::BlockMeta;
767    use crate::hummock::compactor::ConcatSstableIterator;
768    use crate::hummock::iterator::test_utils::mock_sstable_store;
769    use crate::hummock::iterator::{HummockIterator, MergeIterator};
770    use crate::hummock::test_utils::{
771        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_info,
772        gen_test_sstable_with_table_ids, test_key_of, test_value_of,
773    };
774    use crate::hummock::value::HummockValue;
775
776    #[tokio::test]
777    async fn test_concat_iterator() {
778        let sstable_store = mock_sstable_store().await;
779        let mut table_infos = vec![];
780        for object_id in 0..3 {
781            let start_index = object_id * TEST_KEYS_COUNT;
782            let end_index = (object_id + 1) * TEST_KEYS_COUNT;
783            let table_info = gen_test_sstable_info(
784                default_builder_opt_for_test(),
785                object_id as u64,
786                (start_index..end_index)
787                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
788                sstable_store.clone(),
789            )
790            .await;
791            table_infos.push(table_info);
792        }
793        let start_index = 5000;
794        let end_index = 25000;
795
796        let kr = KeyRange::new(
797            test_key_of(start_index).encode().into(),
798            test_key_of(end_index).encode().into(),
799        );
800        let mut iter = ConcatSstableIterator::for_test(
801            vec![0],
802            table_infos.clone(),
803            kr.clone(),
804            sstable_store.clone(),
805        );
806        iter.seek(FullKey::decode(&kr.left)).await.unwrap();
807
808        for idx in start_index..end_index {
809            let key = iter.key();
810            let val = iter.value();
811            assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
812            assert_eq!(
813                val.into_user_value().unwrap(),
814                test_value_of(idx).as_slice()
815            );
816            iter.next().await.unwrap();
817        }
818
819        // seek non-overlap range
820        let kr = KeyRange::new(
821            test_key_of(30000).encode().into(),
822            test_key_of(40000).encode().into(),
823        );
824        let mut iter = ConcatSstableIterator::for_test(
825            vec![0],
826            table_infos.clone(),
827            kr.clone(),
828            sstable_store.clone(),
829        );
830        iter.seek(FullKey::decode(&kr.left)).await.unwrap();
831        assert!(!iter.is_valid());
832        let kr = KeyRange::new(
833            test_key_of(start_index).encode().into(),
834            test_key_of(40000).encode().into(),
835        );
836        let mut iter = ConcatSstableIterator::for_test(
837            vec![0],
838            table_infos.clone(),
839            kr.clone(),
840            sstable_store.clone(),
841        );
842        iter.seek(FullKey::decode(&kr.left)).await.unwrap();
843        for idx in start_index..30000 {
844            let key = iter.key();
845            let val = iter.value();
846            assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
847            assert_eq!(
848                val.into_user_value().unwrap(),
849                test_value_of(idx).as_slice()
850            );
851            iter.next().await.unwrap();
852        }
853        assert!(!iter.is_valid());
854
855        // Test seek. Result is dominated by given seek key rather than key range.
856        let kr = KeyRange::new(
857            test_key_of(0).encode().into(),
858            test_key_of(40000).encode().into(),
859        );
860        let mut iter = ConcatSstableIterator::for_test(
861            vec![0],
862            table_infos.clone(),
863            kr.clone(),
864            sstable_store.clone(),
865        );
866        iter.seek(test_key_of(10000).to_ref()).await.unwrap();
867        assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10000).to_ref());
868        iter.seek(test_key_of(10001).to_ref()).await.unwrap();
869        assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10001).to_ref());
870        iter.seek(test_key_of(9999).to_ref()).await.unwrap();
871        assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(9999).to_ref());
872        iter.seek(test_key_of(1).to_ref()).await.unwrap();
873        assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(1).to_ref());
874        iter.seek(test_key_of(29999).to_ref()).await.unwrap();
875        assert!(iter.is_valid() && iter.cur_idx == 2 && iter.key() == test_key_of(29999).to_ref());
876        iter.seek(test_key_of(30000).to_ref()).await.unwrap();
877        assert!(!iter.is_valid());
878
879        // Test seek. Result is dominated by key range rather than given seek key.
880        let kr = KeyRange::new(
881            test_key_of(6000).encode().into(),
882            test_key_of(16000).encode().into(),
883        );
884        let mut iter = ConcatSstableIterator::for_test(
885            vec![0],
886            table_infos.clone(),
887            kr.clone(),
888            sstable_store.clone(),
889        );
890        iter.seek(test_key_of(17000).to_ref()).await.unwrap();
891        assert!(!iter.is_valid());
892        iter.seek(test_key_of(1).to_ref()).await.unwrap();
893        assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == FullKey::decode(&kr.left));
894    }
895
896    #[tokio::test]
897    async fn test_concat_iterator_seek_idx() {
898        let sstable_store = mock_sstable_store().await;
899        let mut table_infos = vec![];
900        for object_id in 0..3 {
901            let start_index = object_id * TEST_KEYS_COUNT + TEST_KEYS_COUNT / 2;
902            let end_index = (object_id + 1) * TEST_KEYS_COUNT;
903            let table_info = gen_test_sstable_info(
904                default_builder_opt_for_test(),
905                object_id as u64,
906                (start_index..end_index)
907                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
908                sstable_store.clone(),
909            )
910            .await;
911            table_infos.push(table_info);
912        }
913
914        // Test seek_idx. Result is dominated by given seek key rather than key range.
915        let kr = KeyRange::new(
916            test_key_of(0).encode().into(),
917            test_key_of(40000).encode().into(),
918        );
919        let mut iter = ConcatSstableIterator::for_test(
920            vec![0],
921            table_infos.clone(),
922            kr.clone(),
923            sstable_store.clone(),
924        );
925        let sst = sstable_store
926            .sstable(&iter.sstables[0], &mut iter.stats)
927            .await
928            .unwrap();
929        let block_metas = &sst.meta.block_metas;
930        let block_1_smallest_key = block_metas[1].smallest_key.clone();
931        let block_2_smallest_key = block_metas[2].smallest_key.clone();
932        // Use block_1_smallest_key as seek key and result in the first KV of block 1.
933        let seek_key = block_1_smallest_key.clone();
934        iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
935            .await
936            .unwrap();
937        assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
938        // Use prev_full_key(block_1_smallest_key) as seek key and result in the first KV of block
939        // 1.
940        let seek_key = prev_full_key(block_1_smallest_key.as_slice());
941        iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
942            .await
943            .unwrap();
944        assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
945        iter.next().await.unwrap();
946        let block_1_second_key = iter.key().to_vec();
947        // Use a big enough seek key and result in invalid iterator.
948        let seek_key = test_key_of(30001);
949        iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref()))
950            .await
951            .unwrap();
952        assert!(!iter.is_valid());
953
954        // Test seek_idx. Result is dominated by key range rather than given seek key.
955        let kr = KeyRange::new(
956            next_full_key(&block_1_smallest_key).into(),
957            prev_full_key(&block_2_smallest_key).into(),
958        );
959        let mut iter = ConcatSstableIterator::for_test(
960            vec![0],
961            table_infos.clone(),
962            kr.clone(),
963            sstable_store.clone(),
964        );
965        // Use block_2_smallest_key as seek key and result in invalid iterator.
966        let seek_key = FullKey::decode(&block_2_smallest_key);
967        assert!(seek_key.cmp(&FullKey::decode(&kr.right)) == Ordering::Greater);
968        iter.seek_idx(0, Some(seek_key)).await.unwrap();
969        assert!(!iter.is_valid());
970        // Use a small enough seek key and result in the second KV of block 1.
971        let seek_key = test_key_of(0).encode();
972        iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
973            .await
974            .unwrap();
975        assert!(iter.is_valid());
976        assert_eq!(iter.key(), block_1_second_key.to_ref());
977
978        // Use None seek key and result in the second KV of block 1.
979        iter.seek_idx(0, None).await.unwrap();
980        assert!(iter.is_valid());
981        assert_eq!(iter.key(), block_1_second_key.to_ref());
982    }
983
984    #[tokio::test]
985    async fn test_filter_block_metas() {
986        use crate::hummock::compactor::iterator::filter_block_metas;
987
988        {
989            let block_metas = Vec::default();
990
991            let ret = filter_block_metas(&block_metas, &HashSet::default(), KeyRange::default());
992
993            assert!(ret.is_empty());
994        }
995
996        {
997            let block_metas = vec![
998                BlockMeta {
999                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1000                    ..Default::default()
1001                },
1002                BlockMeta {
1003                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1004                    ..Default::default()
1005                },
1006                BlockMeta {
1007                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1008                    ..Default::default()
1009                },
1010            ];
1011
1012            let ret = filter_block_metas(
1013                &block_metas,
1014                &HashSet::from_iter(vec![1_u32.into(), 2.into(), 3.into()].into_iter()),
1015                KeyRange::default(),
1016            );
1017            let ret = &block_metas[ret];
1018
1019            assert_eq!(3, ret.len());
1020            assert_eq!(
1021                1,
1022                FullKey::decode(&ret[0].smallest_key)
1023                    .user_key
1024                    .table_id
1025                    .as_raw_id()
1026            );
1027            assert_eq!(
1028                3,
1029                FullKey::decode(&ret[2].smallest_key)
1030                    .user_key
1031                    .table_id
1032                    .as_raw_id()
1033            );
1034        }
1035
1036        {
1037            let block_metas = vec![
1038                BlockMeta {
1039                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1040                    ..Default::default()
1041                },
1042                BlockMeta {
1043                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1044                    ..Default::default()
1045                },
1046                BlockMeta {
1047                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1048                    ..Default::default()
1049                },
1050            ];
1051
1052            let ret = filter_block_metas(
1053                &block_metas,
1054                &HashSet::from_iter(vec![2_u32.into(), 3.into()].into_iter()),
1055                KeyRange::default(),
1056            );
1057            let ret = &block_metas[ret];
1058
1059            assert_eq!(2, ret.len());
1060            assert_eq!(
1061                2,
1062                FullKey::decode(&ret[0].smallest_key)
1063                    .user_key
1064                    .table_id
1065                    .as_raw_id()
1066            );
1067            assert_eq!(
1068                3,
1069                FullKey::decode(&ret[1].smallest_key)
1070                    .user_key
1071                    .table_id
1072                    .as_raw_id()
1073            );
1074        }
1075
1076        {
1077            let block_metas = vec![
1078                BlockMeta {
1079                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1080                    ..Default::default()
1081                },
1082                BlockMeta {
1083                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1084                    ..Default::default()
1085                },
1086                BlockMeta {
1087                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1088                    ..Default::default()
1089                },
1090            ];
1091
1092            let ret = filter_block_metas(
1093                &block_metas,
1094                &HashSet::from_iter(vec![1_u32.into(), 2_u32.into()].into_iter()),
1095                KeyRange::default(),
1096            );
1097            let ret = &block_metas[ret];
1098
1099            assert_eq!(2, ret.len());
1100            assert_eq!(
1101                1,
1102                FullKey::decode(&ret[0].smallest_key)
1103                    .user_key
1104                    .table_id
1105                    .as_raw_id()
1106            );
1107            assert_eq!(
1108                2,
1109                FullKey::decode(&ret[1].smallest_key)
1110                    .user_key
1111                    .table_id
1112                    .as_raw_id()
1113            );
1114        }
1115
1116        {
1117            let block_metas = vec![
1118                BlockMeta {
1119                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1120                    ..Default::default()
1121                },
1122                BlockMeta {
1123                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1124                    ..Default::default()
1125                },
1126                BlockMeta {
1127                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1128                    ..Default::default()
1129                },
1130            ];
1131            let ret = filter_block_metas(
1132                &block_metas,
1133                &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1134                KeyRange::default(),
1135            );
1136            let ret = &block_metas[ret];
1137
1138            assert_eq!(1, ret.len());
1139            assert_eq!(
1140                2,
1141                FullKey::decode(&ret[0].smallest_key)
1142                    .user_key
1143                    .table_id
1144                    .as_raw_id()
1145            );
1146        }
1147
1148        {
1149            let block_metas = vec![
1150                BlockMeta {
1151                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1152                    ..Default::default()
1153                },
1154                BlockMeta {
1155                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1156                    ..Default::default()
1157                },
1158                BlockMeta {
1159                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1160                    ..Default::default()
1161                },
1162                BlockMeta {
1163                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1164                    ..Default::default()
1165                },
1166                BlockMeta {
1167                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1168                    ..Default::default()
1169                },
1170            ];
1171            let ret = filter_block_metas(
1172                &block_metas,
1173                &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1174                KeyRange::default(),
1175            );
1176            let ret = &block_metas[ret];
1177
1178            assert_eq!(1, ret.len());
1179            assert_eq!(
1180                2,
1181                FullKey::decode(&ret[0].smallest_key)
1182                    .user_key
1183                    .table_id
1184                    .as_raw_id()
1185            );
1186        }
1187
1188        {
1189            let block_metas = vec![
1190                BlockMeta {
1191                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1192                    ..Default::default()
1193                },
1194                BlockMeta {
1195                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1196                    ..Default::default()
1197                },
1198                BlockMeta {
1199                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1200                    ..Default::default()
1201                },
1202                BlockMeta {
1203                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1204                    ..Default::default()
1205                },
1206                BlockMeta {
1207                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1208                    ..Default::default()
1209                },
1210            ];
1211
1212            let ret = filter_block_metas(
1213                &block_metas,
1214                &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1215                KeyRange::default(),
1216            );
1217            let ret = &block_metas[ret];
1218
1219            assert_eq!(1, ret.len());
1220            assert_eq!(
1221                2,
1222                FullKey::decode(&ret[0].smallest_key)
1223                    .user_key
1224                    .table_id
1225                    .as_raw_id()
1226            );
1227        }
1228
1229        {
1230            let block_metas = vec![
1231                BlockMeta {
1232                    smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1233                    ..Default::default()
1234                },
1235                BlockMeta {
1236                    smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1237                    ..Default::default()
1238                },
1239                BlockMeta {
1240                    smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1241                    ..Default::default()
1242                },
1243            ];
1244
1245            let ret = filter_block_metas(
1246                &block_metas,
1247                &HashSet::from_iter(vec![1_u32.into(), 3_u32.into()].into_iter()),
1248                KeyRange::default(),
1249            );
1250            let ret = &block_metas[ret];
1251
1252            assert_eq!(3, ret.len());
1253            assert_eq!(
1254                1,
1255                FullKey::decode(&ret[0].smallest_key)
1256                    .user_key
1257                    .table_id
1258                    .as_raw_id()
1259            );
1260            assert_eq!(
1261                2,
1262                FullKey::decode(&ret[1].smallest_key)
1263                    .user_key
1264                    .table_id
1265                    .as_raw_id()
1266            );
1267            assert_eq!(
1268                3,
1269                FullKey::decode(&ret[2].smallest_key)
1270                    .user_key
1271                    .table_id
1272                    .as_raw_id()
1273            );
1274        }
1275    }
1276
1277    #[tokio::test]
1278    async fn test_iterator_same_obj() {
1279        let sstable_store = mock_sstable_store().await;
1280
1281        let table_info = gen_test_sstable_info(
1282            default_builder_opt_for_test(),
1283            1_u64,
1284            (1..10000).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
1285            sstable_store.clone(),
1286        )
1287        .await;
1288
1289        let split_key = test_key_of(5000).encode();
1290        let sst_1: SstableInfo = SstableInfoInner {
1291            key_range: KeyRange {
1292                left: table_info.key_range.left.clone(),
1293                right: split_key.clone().into(),
1294                right_exclusive: true,
1295            },
1296            ..table_info.get_inner()
1297        }
1298        .into();
1299
1300        let total_key_count = sst_1.total_key_count;
1301        let sst_2: SstableInfo = SstableInfoInner {
1302            sst_id: sst_1.sst_id + 1,
1303            key_range: KeyRange {
1304                left: split_key.clone().into(),
1305                right: table_info.key_range.right.clone(),
1306                right_exclusive: table_info.key_range.right_exclusive,
1307            },
1308            ..table_info.get_inner()
1309        }
1310        .into();
1311
1312        {
1313            // test concate
1314            let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1315
1316            let mut iter = ConcatSstableIterator::for_test(
1317                vec![0],
1318                vec![sst_1.clone(), sst_2.clone()],
1319                KeyRange::default(),
1320                sstable_store.clone(),
1321            );
1322
1323            iter.rewind().await.unwrap();
1324
1325            let mut key_count = 0;
1326            while iter.is_valid() {
1327                let is_new_user_key = full_key_tracker.observe(iter.key());
1328                assert!(is_new_user_key);
1329                key_count += 1;
1330                iter.next().await.unwrap();
1331            }
1332
1333            assert_eq!(total_key_count, key_count);
1334        }
1335
1336        {
1337            let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1338            let concat_1 = ConcatSstableIterator::for_test(
1339                vec![0],
1340                vec![sst_1.clone()],
1341                KeyRange::default(),
1342                sstable_store.clone(),
1343            );
1344
1345            let concat_2 = ConcatSstableIterator::for_test(
1346                vec![0],
1347                vec![sst_2.clone()],
1348                KeyRange::default(),
1349                sstable_store.clone(),
1350            );
1351
1352            let mut key_count = 0;
1353            let mut iter = MergeIterator::for_compactor(vec![concat_1, concat_2]);
1354            iter.rewind().await.unwrap();
1355            while iter.is_valid() {
1356                full_key_tracker.observe(iter.key());
1357                key_count += 1;
1358                iter.next().await.unwrap();
1359            }
1360            assert_eq!(total_key_count, key_count);
1361        }
1362    }
1363
1364    #[tokio::test]
1365    async fn test_concat_iterator_skips_hole_table_blocks() {
1366        let sstable_store = mock_sstable_store().await;
1367
1368        let key_1 = FullKey::for_test(TableId::new(1), b"a".to_vec(), test_epoch(1));
1369        let key_2 = FullKey::for_test(TableId::new(2), b"b".to_vec(), test_epoch(1));
1370        let key_3 = FullKey::for_test(TableId::new(3), b"c".to_vec(), test_epoch(1));
1371        let kv_pairs = vec![
1372            (key_1.clone(), HummockValue::put(b"value-1".to_vec())),
1373            (key_2.clone(), HummockValue::put(b"value-2".to_vec())),
1374            (key_3.clone(), HummockValue::put(b"value-3".to_vec())),
1375        ];
1376
1377        let (_sstable, table_info) = gen_test_sstable_with_table_ids(
1378            default_builder_opt_for_test(),
1379            10,
1380            kv_pairs.into_iter(),
1381            sstable_store.clone(),
1382            vec![1, 2, 3],
1383        )
1384        .await;
1385
1386        let table_info: SstableInfo = SstableInfoInner {
1387            table_ids: vec![1.into(), 3.into()],
1388            ..table_info.get_inner()
1389        }
1390        .into();
1391
1392        let mut iter = ConcatSstableIterator::for_test(
1393            vec![1, 3],
1394            vec![table_info.clone()],
1395            KeyRange::default(),
1396            sstable_store.clone(),
1397        );
1398        iter.rewind().await.unwrap();
1399        assert!(iter.is_valid());
1400        assert_eq!(iter.key(), key_1.to_ref());
1401
1402        iter.next().await.unwrap();
1403        assert!(iter.is_valid());
1404        assert_eq!(iter.key(), key_3.to_ref());
1405
1406        iter.next().await.unwrap();
1407        assert!(!iter.is_valid());
1408
1409        let mut iter = ConcatSstableIterator::for_test(
1410            vec![1, 3],
1411            vec![table_info],
1412            KeyRange::default(),
1413            sstable_store,
1414        );
1415        iter.seek(key_2.to_ref()).await.unwrap();
1416        assert!(iter.is_valid());
1417        assert_eq!(iter.key(), key_3.to_ref());
1418    }
1419}