risingwave_storage/hummock/sstable/
backward_sstable_iterator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering::{Equal, Less};
16use std::sync::Arc;
17
18use foyer::Hint;
19use risingwave_hummock_sdk::key::FullKey;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21
22use crate::hummock::iterator::{Backward, HummockIterator, ValueMeta};
23use crate::hummock::sstable::SstableIteratorReadOptions;
24use crate::hummock::value::HummockValue;
25use crate::hummock::{
26    BlockIterator, HummockResult, SstableIteratorType, SstableStoreRef, TableHolder,
27};
28use crate::monitor::StoreLocalStatistic;
29
30/// Iterates backwards on a sstable.
31pub struct BackwardSstableIterator {
32    /// The iterator of the current block.
33    block_iter: Option<BlockIterator>,
34
35    /// Current block index.
36    cur_idx: usize,
37
38    /// Reference to the sstable
39    sst: TableHolder,
40
41    sstable_store: SstableStoreRef,
42
43    stats: StoreLocalStatistic,
44
45    // used for checking if the block is valid, filter out the block that is not in the table-id range
46    read_block_meta_range: (usize, usize),
47}
48
49impl BackwardSstableIterator {
50    pub fn new(
51        sstable: TableHolder,
52        sstable_store: SstableStoreRef,
53        sstable_info_ref: &SstableInfo,
54    ) -> Self {
55        let mut start_idx = 0;
56        let mut end_idx = sstable.meta.block_metas.len() - 1;
57        let read_table_id_range = (
58            *sstable_info_ref.table_ids.first().unwrap(),
59            *sstable_info_ref.table_ids.last().unwrap(),
60        );
61        assert!(
62            read_table_id_range.0 <= read_table_id_range.1,
63            "invalid table id range {} - {}",
64            read_table_id_range.0,
65            read_table_id_range.1
66        );
67        let block_meta_count = sstable.meta.block_metas.len();
68        assert!(block_meta_count > 0);
69        assert!(
70            sstable.meta.block_metas[0].table_id() <= read_table_id_range.0,
71            "table id {} not found table_ids in block_meta {:?}",
72            read_table_id_range.0,
73            sstable
74                .meta
75                .block_metas
76                .iter()
77                .map(|meta| meta.table_id())
78                .collect::<Vec<_>>()
79        );
80        assert!(
81            sstable.meta.block_metas[block_meta_count - 1].table_id() >= read_table_id_range.1,
82            "table id {} not found table_ids in block_meta {:?}",
83            read_table_id_range.1,
84            sstable
85                .meta
86                .block_metas
87                .iter()
88                .map(|meta| meta.table_id())
89                .collect::<Vec<_>>()
90        );
91
92        while start_idx < block_meta_count
93            && sstable.meta.block_metas[start_idx].table_id() < read_table_id_range.0
94        {
95            start_idx += 1;
96        }
97        // We assume that the table id read must exist in the sstable, otherwise it is a fatal error.
98        assert!(
99            start_idx < block_meta_count,
100            "table id {} not found table_ids in block_meta {:?}",
101            read_table_id_range.0,
102            sstable
103                .meta
104                .block_metas
105                .iter()
106                .map(|meta| meta.table_id())
107                .collect::<Vec<_>>()
108        );
109
110        while end_idx > start_idx
111            && sstable.meta.block_metas[end_idx].table_id() > read_table_id_range.1
112        {
113            end_idx -= 1;
114        }
115        assert!(
116            end_idx >= start_idx,
117            "end_idx {} < start_idx {} block_meta_count {}",
118            end_idx,
119            start_idx,
120            block_meta_count
121        );
122
123        Self {
124            block_iter: None,
125            cur_idx: end_idx,
126            sst: sstable,
127            sstable_store,
128            stats: StoreLocalStatistic::default(),
129            read_block_meta_range: (start_idx, end_idx),
130        }
131    }
132
133    /// Seeks to a block, and then seeks to the key if `seek_key` is given.
134    async fn seek_idx(
135        &mut self,
136        idx: isize,
137        seek_key: Option<FullKey<&[u8]>>,
138    ) -> HummockResult<()> {
139        if idx >= self.sst.block_count() as isize || idx < self.read_block_meta_range.0 as isize {
140            self.block_iter = None;
141        } else {
142            let block = self
143                .sstable_store
144                .get(
145                    &self.sst,
146                    idx as usize,
147                    crate::hummock::CachePolicy::Fill(Hint::Normal),
148                    &mut self.stats,
149                )
150                .await?;
151            let mut block_iter = BlockIterator::new(block);
152            if let Some(key) = seek_key {
153                block_iter.seek_le(key);
154            } else {
155                block_iter.seek_to_last();
156            }
157
158            self.block_iter = Some(block_iter);
159            self.cur_idx = idx as usize;
160        }
161
162        Ok(())
163    }
164}
165
166impl HummockIterator for BackwardSstableIterator {
167    type Direction = Backward;
168
169    async fn next(&mut self) -> HummockResult<()> {
170        self.stats.total_key_count += 1;
171        let block_iter = self.block_iter.as_mut().expect("no block iter");
172        if block_iter.try_prev() {
173            Ok(())
174        } else {
175            // seek to the previous block
176            self.seek_idx(self.cur_idx as isize - 1, None).await
177        }
178    }
179
180    fn key(&self) -> FullKey<&[u8]> {
181        self.block_iter.as_ref().expect("no block iter").key()
182    }
183
184    fn value(&self) -> HummockValue<&[u8]> {
185        let raw_value = self.block_iter.as_ref().expect("no block iter").value();
186
187        HummockValue::from_slice(raw_value).expect("decode error")
188    }
189
190    fn is_valid(&self) -> bool {
191        self.block_iter.as_ref().is_some_and(|i| i.is_valid())
192    }
193
194    /// Instead of setting idx to 0th block, a `BackwardSstableIterator` rewinds to the last block
195    /// in the sstable.
196    async fn rewind(&mut self) -> HummockResult<()> {
197        self.seek_idx(self.read_block_meta_range.1 as isize, None)
198            .await
199    }
200
201    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
202        let block_idx = self
203            .sst
204            .meta
205            .block_metas
206            .partition_point(|block_meta| {
207                // Compare by version comparator
208                // Note: we are comparing against the `smallest_key` of the `block`, thus the
209                // partition point should be `prev(<=)` instead of `<`.
210                let ord = FullKey::decode(&block_meta.smallest_key).cmp(&key);
211                ord == Less || ord == Equal
212            })
213            .saturating_sub(1); // considering the boundary of 0
214        let block_idx = block_idx as isize;
215
216        self.seek_idx(block_idx, Some(key)).await?;
217        if !self.is_valid() {
218            // Seek to prev block
219            self.seek_idx(block_idx - 1, None).await?;
220        }
221
222        Ok(())
223    }
224
225    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
226        stats.add(&self.stats)
227    }
228
229    fn value_meta(&self) -> ValueMeta {
230        ValueMeta {
231            object_id: Some(self.sst.id),
232            block_id: Some(self.cur_idx as _),
233        }
234    }
235}
236
237impl SstableIteratorType for BackwardSstableIterator {
238    fn create(
239        sstable: TableHolder,
240        sstable_store: SstableStoreRef,
241        _: Arc<SstableIteratorReadOptions>,
242        sstable_info_ref: &SstableInfo,
243    ) -> Self {
244        BackwardSstableIterator::new(sstable, sstable_store, sstable_info_ref)
245    }
246}
247
248/// Mirror the tests used for `SstableIterator`
249#[cfg(test)]
250mod tests {
251    use itertools::Itertools;
252    use rand::prelude::*;
253    use rand::rng as thread_rng;
254    use risingwave_common::catalog::TableId;
255    use risingwave_common::hash::VirtualNode;
256    use risingwave_common::util::epoch::test_epoch;
257    use risingwave_hummock_sdk::EpochWithGap;
258    use risingwave_hummock_sdk::key::UserKey;
259    use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
260
261    use super::*;
262    use crate::assert_bytes_eq;
263    use crate::hummock::iterator::test_utils::mock_sstable_store;
264    use crate::hummock::test_utils::{
265        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_default_test_sstable,
266        gen_test_sstable_with_table_ids, test_key_of, test_value_of,
267    };
268
269    #[tokio::test]
270    async fn test_backward_sstable_iterator() {
271        // build remote sstable
272        let sstable_store = mock_sstable_store().await;
273        let (handle, sstable_info) =
274            gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
275                .await;
276        // We should have at least 10 blocks, so that sstable iterator test could cover more code
277        // path.
278        assert!(handle.meta.block_metas.len() > 10);
279        let mut sstable_iter = BackwardSstableIterator::new(handle, sstable_store, &sstable_info);
280        let mut cnt = TEST_KEYS_COUNT;
281        sstable_iter.rewind().await.unwrap();
282
283        while sstable_iter.is_valid() {
284            cnt -= 1;
285            let key = sstable_iter.key();
286            let value = sstable_iter.value();
287            assert_eq!(key, test_key_of(cnt).to_ref());
288            assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
289            sstable_iter.next().await.unwrap();
290        }
291
292        assert_eq!(cnt, 0);
293    }
294
295    #[tokio::test]
296    async fn test_backward_sstable_seek() {
297        let sstable_store = mock_sstable_store().await;
298        let (sstable, sstable_info) =
299            gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
300                .await;
301        // We should have at least 10 blocks, so that sstable iterator test could cover more code
302        // path.
303        assert!(sstable.meta.block_metas.len() > 10);
304        let mut sstable_iter = BackwardSstableIterator::new(sstable, sstable_store, &sstable_info);
305        let mut all_key_to_test = (0..TEST_KEYS_COUNT).collect_vec();
306        let mut rng = thread_rng();
307        all_key_to_test.shuffle(&mut rng);
308
309        // We seek and access all the keys in random order
310        for i in all_key_to_test {
311            sstable_iter.seek(test_key_of(i).to_ref()).await.unwrap();
312            // sstable_iter.next().await.unwrap();
313            let key = sstable_iter.key();
314            assert_eq!(key, test_key_of(i).to_ref());
315        }
316
317        // Seek to key #TEST_KEYS_COUNT-500 and start iterating
318        sstable_iter
319            .seek(test_key_of(TEST_KEYS_COUNT - 500).to_ref())
320            .await
321            .unwrap();
322        for i in (0..TEST_KEYS_COUNT - 500 + 1).rev() {
323            let key = sstable_iter.key();
324            assert_eq!(key, test_key_of(i).to_ref(), "key index:{}", i);
325            sstable_iter.next().await.unwrap();
326        }
327        assert!(!sstable_iter.is_valid());
328
329        let largest_key = FullKey::for_test(
330            TableId::default(),
331            [
332                VirtualNode::ZERO.to_be_bytes().as_slice(),
333                format!("key_zzzz_{:05}", 0).as_bytes(),
334            ]
335            .concat(),
336            test_epoch(1),
337        );
338        sstable_iter.seek(largest_key.to_ref()).await.unwrap();
339        let key = sstable_iter.key();
340        assert_eq!(key, test_key_of(TEST_KEYS_COUNT - 1).to_ref());
341
342        // Seek to > last key
343        let smallest_key = FullKey::for_test(
344            TableId::default(),
345            [
346                VirtualNode::ZERO.to_be_bytes().as_slice(),
347                format!("key_aaaa_{:05}", 0).as_bytes(),
348            ]
349            .concat(),
350            test_epoch(1),
351        );
352        sstable_iter.seek(smallest_key.to_ref()).await.unwrap();
353        assert!(!sstable_iter.is_valid());
354
355        // Seek to non-existing key
356        for idx in (1..TEST_KEYS_COUNT).rev() {
357            // Seek to the previous key of each existing key. e.g.,
358            // Our key space is `key_test_00000`, `key_test_00002`, `key_test_00004`, ...
359            // And we seek to `key_test_00001` (will produce `key_test_00002`), `key_test_00003`
360            // (will produce `key_test_00004`).
361            sstable_iter
362                .seek(
363                    FullKey::for_test(
364                        TableId::default(),
365                        [
366                            VirtualNode::ZERO.to_be_bytes().as_slice(),
367                            format!("key_test_{:05}", idx * 2 - 1).as_bytes(),
368                        ]
369                        .concat(),
370                        0,
371                    )
372                    .to_ref(),
373                )
374                .await
375                .unwrap();
376
377            let key = sstable_iter.key();
378            assert_eq!(key, test_key_of(idx - 1).to_ref());
379            sstable_iter.next().await.unwrap();
380        }
381        assert!(!sstable_iter.is_valid());
382    }
383
384    #[tokio::test]
385    async fn test_read_table_id_range() {
386        {
387            let sstable_store = mock_sstable_store().await;
388            // test key_range right
389            let k1 = {
390                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
391                table_key.extend_from_slice(format!("key_test_{:05}", 1).as_bytes());
392                let uk = UserKey::for_test(TableId::from(1), table_key);
393                FullKey {
394                    user_key: uk,
395                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
396                }
397            };
398
399            let k2 = {
400                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
401                table_key.extend_from_slice(format!("key_test_{:05}", 2).as_bytes());
402                let uk = UserKey::for_test(TableId::from(2), table_key);
403                FullKey {
404                    user_key: uk,
405                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
406                }
407            };
408
409            let k3 = {
410                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
411                table_key.extend_from_slice(format!("key_test_{:05}", 3).as_bytes());
412                let uk = UserKey::for_test(TableId::from(3), table_key);
413                FullKey {
414                    user_key: uk,
415                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
416                }
417            };
418
419            {
420                let kv_pairs = vec![
421                    (k1.clone(), HummockValue::put(test_value_of(1))),
422                    (k2.clone(), HummockValue::put(test_value_of(2))),
423                    (k3.clone(), HummockValue::put(test_value_of(3))),
424                ];
425
426                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
427                    default_builder_opt_for_test(),
428                    10,
429                    kv_pairs.into_iter(),
430                    sstable_store.clone(),
431                    vec![1, 2, 3],
432                )
433                .await;
434                let mut sstable_iter = BackwardSstableIterator::create(
435                    sstable,
436                    sstable_store.clone(),
437                    Arc::new(SstableIteratorReadOptions::default()),
438                    &SstableInfo::from(SstableInfoInner {
439                        table_ids: vec![1.into(), 2.into(), 3.into()],
440                        ..Default::default()
441                    }),
442                );
443                sstable_iter.rewind().await.unwrap();
444                assert!(sstable_iter.is_valid());
445                assert!(sstable_iter.key().eq(&k3.to_ref()));
446
447                let mut cnt = 0;
448                let mut last_key = k1.clone();
449                while sstable_iter.is_valid() {
450                    last_key = sstable_iter.key().to_vec();
451                    cnt += 1;
452                    sstable_iter.next().await.unwrap();
453                }
454
455                assert_eq!(3, cnt);
456                assert_eq!(last_key, k1.clone());
457            }
458
459            {
460                let kv_pairs = vec![
461                    (k1.clone(), HummockValue::put(test_value_of(1))),
462                    (k2.clone(), HummockValue::put(test_value_of(2))),
463                    (k3.clone(), HummockValue::put(test_value_of(3))),
464                ];
465
466                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
467                    default_builder_opt_for_test(),
468                    10,
469                    kv_pairs.into_iter(),
470                    sstable_store.clone(),
471                    vec![1, 2, 3],
472                )
473                .await;
474
475                let mut sstable_iter = BackwardSstableIterator::create(
476                    sstable,
477                    sstable_store.clone(),
478                    Arc::new(SstableIteratorReadOptions::default()),
479                    &SstableInfo::from(SstableInfoInner {
480                        table_ids: vec![1.into(), 2.into()],
481                        ..Default::default()
482                    }),
483                );
484                sstable_iter.rewind().await.unwrap();
485                assert!(sstable_iter.is_valid());
486                assert!(sstable_iter.key().eq(&k2.to_ref()));
487
488                let mut cnt = 0;
489                let mut last_key = k1.clone();
490                while sstable_iter.is_valid() {
491                    last_key = sstable_iter.key().to_vec();
492                    cnt += 1;
493                    sstable_iter.next().await.unwrap();
494                }
495
496                assert_eq!(2, cnt);
497                assert_eq!(last_key, k1.clone());
498            }
499
500            {
501                let kv_pairs = vec![
502                    (k1.clone(), HummockValue::put(test_value_of(1))),
503                    (k2.clone(), HummockValue::put(test_value_of(2))),
504                    (k3.clone(), HummockValue::put(test_value_of(3))),
505                ];
506
507                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
508                    default_builder_opt_for_test(),
509                    10,
510                    kv_pairs.into_iter(),
511                    sstable_store.clone(),
512                    vec![1, 2, 3],
513                )
514                .await;
515
516                let mut sstable_iter = BackwardSstableIterator::create(
517                    sstable,
518                    sstable_store.clone(),
519                    Arc::new(SstableIteratorReadOptions::default()),
520                    &SstableInfo::from(SstableInfoInner {
521                        table_ids: vec![2.into(), 3.into()],
522                        ..Default::default()
523                    }),
524                );
525                sstable_iter.rewind().await.unwrap();
526                assert!(sstable_iter.is_valid());
527                assert!(sstable_iter.key().eq(&k3.to_ref()));
528
529                let mut cnt = 0;
530                let mut last_key = k1.clone();
531                while sstable_iter.is_valid() {
532                    last_key = sstable_iter.key().to_vec();
533                    cnt += 1;
534                    sstable_iter.next().await.unwrap();
535                }
536
537                assert_eq!(2, cnt);
538                assert_eq!(last_key, k2.clone());
539            }
540
541            {
542                let kv_pairs = vec![
543                    (k1.clone(), HummockValue::put(test_value_of(1))),
544                    (k2.clone(), HummockValue::put(test_value_of(2))),
545                    (k3.clone(), HummockValue::put(test_value_of(3))),
546                ];
547
548                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
549                    default_builder_opt_for_test(),
550                    10,
551                    kv_pairs.into_iter(),
552                    sstable_store.clone(),
553                    vec![1, 2, 3],
554                )
555                .await;
556
557                let mut sstable_iter = BackwardSstableIterator::create(
558                    sstable,
559                    sstable_store.clone(),
560                    Arc::new(SstableIteratorReadOptions::default()),
561                    &SstableInfo::from(SstableInfoInner {
562                        table_ids: vec![2.into()],
563                        ..Default::default()
564                    }),
565                );
566                sstable_iter.rewind().await.unwrap();
567                assert!(sstable_iter.is_valid());
568                assert!(sstable_iter.key().eq(&k2.to_ref()));
569
570                let mut cnt = 0;
571                let mut last_key = k1.clone();
572                while sstable_iter.is_valid() {
573                    last_key = sstable_iter.key().to_vec();
574                    cnt += 1;
575                    sstable_iter.next().await.unwrap();
576                }
577
578                assert_eq!(1, cnt);
579                assert_eq!(last_key, k2.clone());
580            }
581        }
582    }
583}