risingwave_storage/hummock/sstable/
backward_sstable_iterator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering::{Equal, Less};
16use std::sync::Arc;
17
18use foyer::CacheHint;
19use risingwave_hummock_sdk::key::FullKey;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21
22use crate::hummock::iterator::{Backward, HummockIterator, ValueMeta};
23use crate::hummock::sstable::SstableIteratorReadOptions;
24use crate::hummock::value::HummockValue;
25use crate::hummock::{
26    BlockIterator, HummockResult, SstableIteratorType, SstableStoreRef, TableHolder,
27};
28use crate::monitor::StoreLocalStatistic;
29
30/// Iterates backwards on a sstable.
31pub struct BackwardSstableIterator {
32    /// The iterator of the current block.
33    block_iter: Option<BlockIterator>,
34
35    /// Current block index.
36    cur_idx: usize,
37
38    /// Reference to the sstable
39    sst: TableHolder,
40
41    sstable_store: SstableStoreRef,
42
43    stats: StoreLocalStatistic,
44
45    // used for checking if the block is valid, filter out the block that is not in the table-id range
46    read_block_meta_range: (usize, usize),
47}
48
49impl BackwardSstableIterator {
50    pub fn new(
51        sstable: TableHolder,
52        sstable_store: SstableStoreRef,
53        sstable_info_ref: &SstableInfo,
54    ) -> Self {
55        let mut start_idx = 0;
56        let mut end_idx = sstable.meta.block_metas.len() - 1;
57        let read_table_id_range = (
58            *sstable_info_ref.table_ids.first().unwrap(),
59            *sstable_info_ref.table_ids.last().unwrap(),
60        );
61        assert!(
62            read_table_id_range.0 <= read_table_id_range.1,
63            "invalid table id range {} - {}",
64            read_table_id_range.0,
65            read_table_id_range.1
66        );
67        let block_meta_count = sstable.meta.block_metas.len();
68        assert!(block_meta_count > 0);
69        assert!(
70            sstable.meta.block_metas[0].table_id().table_id() <= read_table_id_range.0,
71            "table id {} not found table_ids in block_meta {:?}",
72            read_table_id_range.0,
73            sstable
74                .meta
75                .block_metas
76                .iter()
77                .map(|meta| meta.table_id())
78                .collect::<Vec<_>>()
79        );
80        assert!(
81            sstable.meta.block_metas[block_meta_count - 1]
82                .table_id()
83                .table_id()
84                >= read_table_id_range.1,
85            "table id {} not found table_ids in block_meta {:?}",
86            read_table_id_range.1,
87            sstable
88                .meta
89                .block_metas
90                .iter()
91                .map(|meta| meta.table_id())
92                .collect::<Vec<_>>()
93        );
94
95        while start_idx < block_meta_count
96            && sstable.meta.block_metas[start_idx].table_id().table_id() < read_table_id_range.0
97        {
98            start_idx += 1;
99        }
100        // We assume that the table id read must exist in the sstable, otherwise it is a fatal error.
101        assert!(
102            start_idx < block_meta_count,
103            "table id {} not found table_ids in block_meta {:?}",
104            read_table_id_range.0,
105            sstable
106                .meta
107                .block_metas
108                .iter()
109                .map(|meta| meta.table_id())
110                .collect::<Vec<_>>()
111        );
112
113        while end_idx > start_idx
114            && sstable.meta.block_metas[end_idx].table_id().table_id() > read_table_id_range.1
115        {
116            end_idx -= 1;
117        }
118        assert!(
119            end_idx >= start_idx,
120            "end_idx {} < start_idx {} block_meta_count {}",
121            end_idx,
122            start_idx,
123            block_meta_count
124        );
125
126        Self {
127            block_iter: None,
128            cur_idx: end_idx,
129            sst: sstable,
130            sstable_store,
131            stats: StoreLocalStatistic::default(),
132            read_block_meta_range: (start_idx, end_idx),
133        }
134    }
135
136    /// Seeks to a block, and then seeks to the key if `seek_key` is given.
137    async fn seek_idx(
138        &mut self,
139        idx: isize,
140        seek_key: Option<FullKey<&[u8]>>,
141    ) -> HummockResult<()> {
142        if idx >= self.sst.block_count() as isize || idx < self.read_block_meta_range.0 as isize {
143            self.block_iter = None;
144        } else {
145            let block = self
146                .sstable_store
147                .get(
148                    &self.sst,
149                    idx as usize,
150                    crate::hummock::CachePolicy::Fill(CacheHint::Normal),
151                    &mut self.stats,
152                )
153                .await?;
154            let mut block_iter = BlockIterator::new(block);
155            if let Some(key) = seek_key {
156                block_iter.seek_le(key);
157            } else {
158                block_iter.seek_to_last();
159            }
160
161            self.block_iter = Some(block_iter);
162            self.cur_idx = idx as usize;
163        }
164
165        Ok(())
166    }
167}
168
169impl HummockIterator for BackwardSstableIterator {
170    type Direction = Backward;
171
172    async fn next(&mut self) -> HummockResult<()> {
173        self.stats.total_key_count += 1;
174        let block_iter = self.block_iter.as_mut().expect("no block iter");
175        if block_iter.try_prev() {
176            Ok(())
177        } else {
178            // seek to the previous block
179            self.seek_idx(self.cur_idx as isize - 1, None).await
180        }
181    }
182
183    fn key(&self) -> FullKey<&[u8]> {
184        self.block_iter.as_ref().expect("no block iter").key()
185    }
186
187    fn value(&self) -> HummockValue<&[u8]> {
188        let raw_value = self.block_iter.as_ref().expect("no block iter").value();
189
190        HummockValue::from_slice(raw_value).expect("decode error")
191    }
192
193    fn is_valid(&self) -> bool {
194        self.block_iter.as_ref().is_some_and(|i| i.is_valid())
195    }
196
197    /// Instead of setting idx to 0th block, a `BackwardSstableIterator` rewinds to the last block
198    /// in the sstable.
199    async fn rewind(&mut self) -> HummockResult<()> {
200        self.seek_idx(self.read_block_meta_range.1 as isize, None)
201            .await
202    }
203
204    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
205        let block_idx = self
206            .sst
207            .meta
208            .block_metas
209            .partition_point(|block_meta| {
210                // Compare by version comparator
211                // Note: we are comparing against the `smallest_key` of the `block`, thus the
212                // partition point should be `prev(<=)` instead of `<`.
213                let ord = FullKey::decode(&block_meta.smallest_key).cmp(&key);
214                ord == Less || ord == Equal
215            })
216            .saturating_sub(1); // considering the boundary of 0
217        let block_idx = block_idx as isize;
218
219        self.seek_idx(block_idx, Some(key)).await?;
220        if !self.is_valid() {
221            // Seek to prev block
222            self.seek_idx(block_idx - 1, None).await?;
223        }
224
225        Ok(())
226    }
227
228    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
229        stats.add(&self.stats)
230    }
231
232    fn value_meta(&self) -> ValueMeta {
233        ValueMeta {
234            object_id: Some(self.sst.id),
235            block_id: Some(self.cur_idx as _),
236        }
237    }
238}
239
240impl SstableIteratorType for BackwardSstableIterator {
241    fn create(
242        sstable: TableHolder,
243        sstable_store: SstableStoreRef,
244        _: Arc<SstableIteratorReadOptions>,
245        sstable_info_ref: &SstableInfo,
246    ) -> Self {
247        BackwardSstableIterator::new(sstable, sstable_store, sstable_info_ref)
248    }
249}
250
251/// Mirror the tests used for `SstableIterator`
252#[cfg(test)]
253mod tests {
254    use itertools::Itertools;
255    use rand::prelude::*;
256    use rand::rng as thread_rng;
257    use risingwave_common::catalog::TableId;
258    use risingwave_common::hash::VirtualNode;
259    use risingwave_common::util::epoch::test_epoch;
260    use risingwave_hummock_sdk::EpochWithGap;
261    use risingwave_hummock_sdk::key::UserKey;
262    use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
263
264    use super::*;
265    use crate::assert_bytes_eq;
266    use crate::hummock::iterator::test_utils::mock_sstable_store;
267    use crate::hummock::test_utils::{
268        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_default_test_sstable,
269        gen_test_sstable_with_table_ids, test_key_of, test_value_of,
270    };
271
272    #[tokio::test]
273    async fn test_backward_sstable_iterator() {
274        // build remote sstable
275        let sstable_store = mock_sstable_store().await;
276        let (handle, sstable_info) =
277            gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
278                .await;
279        // We should have at least 10 blocks, so that sstable iterator test could cover more code
280        // path.
281        assert!(handle.meta.block_metas.len() > 10);
282        let mut sstable_iter = BackwardSstableIterator::new(handle, sstable_store, &sstable_info);
283        let mut cnt = TEST_KEYS_COUNT;
284        sstable_iter.rewind().await.unwrap();
285
286        while sstable_iter.is_valid() {
287            cnt -= 1;
288            let key = sstable_iter.key();
289            let value = sstable_iter.value();
290            assert_eq!(key, test_key_of(cnt).to_ref());
291            assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
292            sstable_iter.next().await.unwrap();
293        }
294
295        assert_eq!(cnt, 0);
296    }
297
298    #[tokio::test]
299    async fn test_backward_sstable_seek() {
300        let sstable_store = mock_sstable_store().await;
301        let (sstable, sstable_info) =
302            gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
303                .await;
304        // We should have at least 10 blocks, so that sstable iterator test could cover more code
305        // path.
306        assert!(sstable.meta.block_metas.len() > 10);
307        let mut sstable_iter = BackwardSstableIterator::new(sstable, sstable_store, &sstable_info);
308        let mut all_key_to_test = (0..TEST_KEYS_COUNT).collect_vec();
309        let mut rng = thread_rng();
310        all_key_to_test.shuffle(&mut rng);
311
312        // We seek and access all the keys in random order
313        for i in all_key_to_test {
314            sstable_iter.seek(test_key_of(i).to_ref()).await.unwrap();
315            // sstable_iter.next().await.unwrap();
316            let key = sstable_iter.key();
317            assert_eq!(key, test_key_of(i).to_ref());
318        }
319
320        // Seek to key #TEST_KEYS_COUNT-500 and start iterating
321        sstable_iter
322            .seek(test_key_of(TEST_KEYS_COUNT - 500).to_ref())
323            .await
324            .unwrap();
325        for i in (0..TEST_KEYS_COUNT - 500 + 1).rev() {
326            let key = sstable_iter.key();
327            assert_eq!(key, test_key_of(i).to_ref(), "key index:{}", i);
328            sstable_iter.next().await.unwrap();
329        }
330        assert!(!sstable_iter.is_valid());
331
332        let largest_key = FullKey::for_test(
333            TableId::default(),
334            [
335                VirtualNode::ZERO.to_be_bytes().as_slice(),
336                format!("key_zzzz_{:05}", 0).as_bytes(),
337            ]
338            .concat(),
339            test_epoch(1),
340        );
341        sstable_iter.seek(largest_key.to_ref()).await.unwrap();
342        let key = sstable_iter.key();
343        assert_eq!(key, test_key_of(TEST_KEYS_COUNT - 1).to_ref());
344
345        // Seek to > last key
346        let smallest_key = FullKey::for_test(
347            TableId::default(),
348            [
349                VirtualNode::ZERO.to_be_bytes().as_slice(),
350                format!("key_aaaa_{:05}", 0).as_bytes(),
351            ]
352            .concat(),
353            test_epoch(1),
354        );
355        sstable_iter.seek(smallest_key.to_ref()).await.unwrap();
356        assert!(!sstable_iter.is_valid());
357
358        // Seek to non-existing key
359        for idx in (1..TEST_KEYS_COUNT).rev() {
360            // Seek to the previous key of each existing key. e.g.,
361            // Our key space is `key_test_00000`, `key_test_00002`, `key_test_00004`, ...
362            // And we seek to `key_test_00001` (will produce `key_test_00002`), `key_test_00003`
363            // (will produce `key_test_00004`).
364            sstable_iter
365                .seek(
366                    FullKey::for_test(
367                        TableId::default(),
368                        [
369                            VirtualNode::ZERO.to_be_bytes().as_slice(),
370                            format!("key_test_{:05}", idx * 2 - 1).as_bytes(),
371                        ]
372                        .concat(),
373                        0,
374                    )
375                    .to_ref(),
376                )
377                .await
378                .unwrap();
379
380            let key = sstable_iter.key();
381            assert_eq!(key, test_key_of(idx - 1).to_ref());
382            sstable_iter.next().await.unwrap();
383        }
384        assert!(!sstable_iter.is_valid());
385    }
386
387    #[tokio::test]
388    async fn test_read_table_id_range() {
389        {
390            let sstable_store = mock_sstable_store().await;
391            // test key_range right
392            let k1 = {
393                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
394                table_key.extend_from_slice(format!("key_test_{:05}", 1).as_bytes());
395                let uk = UserKey::for_test(TableId::from(1), table_key);
396                FullKey {
397                    user_key: uk,
398                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
399                }
400            };
401
402            let k2 = {
403                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
404                table_key.extend_from_slice(format!("key_test_{:05}", 2).as_bytes());
405                let uk = UserKey::for_test(TableId::from(2), table_key);
406                FullKey {
407                    user_key: uk,
408                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
409                }
410            };
411
412            let k3 = {
413                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
414                table_key.extend_from_slice(format!("key_test_{:05}", 3).as_bytes());
415                let uk = UserKey::for_test(TableId::from(3), table_key);
416                FullKey {
417                    user_key: uk,
418                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
419                }
420            };
421
422            {
423                let kv_pairs = vec![
424                    (k1.clone(), HummockValue::put(test_value_of(1))),
425                    (k2.clone(), HummockValue::put(test_value_of(2))),
426                    (k3.clone(), HummockValue::put(test_value_of(3))),
427                ];
428
429                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
430                    default_builder_opt_for_test(),
431                    10,
432                    kv_pairs.into_iter(),
433                    sstable_store.clone(),
434                    vec![1, 2, 3],
435                )
436                .await;
437                let mut sstable_iter = BackwardSstableIterator::create(
438                    sstable,
439                    sstable_store.clone(),
440                    Arc::new(SstableIteratorReadOptions::default()),
441                    &SstableInfo::from(SstableInfoInner {
442                        table_ids: vec![1, 2, 3],
443                        ..Default::default()
444                    }),
445                );
446                sstable_iter.rewind().await.unwrap();
447                assert!(sstable_iter.is_valid());
448                assert!(sstable_iter.key().eq(&k3.to_ref()));
449
450                let mut cnt = 0;
451                let mut last_key = k1.clone();
452                while sstable_iter.is_valid() {
453                    last_key = sstable_iter.key().to_vec();
454                    cnt += 1;
455                    sstable_iter.next().await.unwrap();
456                }
457
458                assert_eq!(3, cnt);
459                assert_eq!(last_key, k1.clone());
460            }
461
462            {
463                let kv_pairs = vec![
464                    (k1.clone(), HummockValue::put(test_value_of(1))),
465                    (k2.clone(), HummockValue::put(test_value_of(2))),
466                    (k3.clone(), HummockValue::put(test_value_of(3))),
467                ];
468
469                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
470                    default_builder_opt_for_test(),
471                    10,
472                    kv_pairs.into_iter(),
473                    sstable_store.clone(),
474                    vec![1, 2, 3],
475                )
476                .await;
477
478                let mut sstable_iter = BackwardSstableIterator::create(
479                    sstable,
480                    sstable_store.clone(),
481                    Arc::new(SstableIteratorReadOptions::default()),
482                    &SstableInfo::from(SstableInfoInner {
483                        table_ids: vec![1, 2],
484                        ..Default::default()
485                    }),
486                );
487                sstable_iter.rewind().await.unwrap();
488                assert!(sstable_iter.is_valid());
489                assert!(sstable_iter.key().eq(&k2.to_ref()));
490
491                let mut cnt = 0;
492                let mut last_key = k1.clone();
493                while sstable_iter.is_valid() {
494                    last_key = sstable_iter.key().to_vec();
495                    cnt += 1;
496                    sstable_iter.next().await.unwrap();
497                }
498
499                assert_eq!(2, cnt);
500                assert_eq!(last_key, k1.clone());
501            }
502
503            {
504                let kv_pairs = vec![
505                    (k1.clone(), HummockValue::put(test_value_of(1))),
506                    (k2.clone(), HummockValue::put(test_value_of(2))),
507                    (k3.clone(), HummockValue::put(test_value_of(3))),
508                ];
509
510                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
511                    default_builder_opt_for_test(),
512                    10,
513                    kv_pairs.into_iter(),
514                    sstable_store.clone(),
515                    vec![1, 2, 3],
516                )
517                .await;
518
519                let mut sstable_iter = BackwardSstableIterator::create(
520                    sstable,
521                    sstable_store.clone(),
522                    Arc::new(SstableIteratorReadOptions::default()),
523                    &SstableInfo::from(SstableInfoInner {
524                        table_ids: vec![2, 3],
525                        ..Default::default()
526                    }),
527                );
528                sstable_iter.rewind().await.unwrap();
529                assert!(sstable_iter.is_valid());
530                assert!(sstable_iter.key().eq(&k3.to_ref()));
531
532                let mut cnt = 0;
533                let mut last_key = k1.clone();
534                while sstable_iter.is_valid() {
535                    last_key = sstable_iter.key().to_vec();
536                    cnt += 1;
537                    sstable_iter.next().await.unwrap();
538                }
539
540                assert_eq!(2, cnt);
541                assert_eq!(last_key, k2.clone());
542            }
543
544            {
545                let kv_pairs = vec![
546                    (k1.clone(), HummockValue::put(test_value_of(1))),
547                    (k2.clone(), HummockValue::put(test_value_of(2))),
548                    (k3.clone(), HummockValue::put(test_value_of(3))),
549                ];
550
551                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
552                    default_builder_opt_for_test(),
553                    10,
554                    kv_pairs.into_iter(),
555                    sstable_store.clone(),
556                    vec![1, 2, 3],
557                )
558                .await;
559
560                let mut sstable_iter = BackwardSstableIterator::create(
561                    sstable,
562                    sstable_store.clone(),
563                    Arc::new(SstableIteratorReadOptions::default()),
564                    &SstableInfo::from(SstableInfoInner {
565                        table_ids: vec![2],
566                        ..Default::default()
567                    }),
568                );
569                sstable_iter.rewind().await.unwrap();
570                assert!(sstable_iter.is_valid());
571                assert!(sstable_iter.key().eq(&k2.to_ref()));
572
573                let mut cnt = 0;
574                let mut last_key = k1.clone();
575                while sstable_iter.is_valid() {
576                    last_key = sstable_iter.key().to_vec();
577                    cnt += 1;
578                    sstable_iter.next().await.unwrap();
579                }
580
581                assert_eq!(1, cnt);
582                assert_eq!(last_key, k2.clone());
583            }
584        }
585    }
586}