risingwave_storage/hummock/sstable/
backward_sstable_iterator.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering::{Equal, Less};
16use std::sync::Arc;
17
18use foyer::Hint;
19use risingwave_hummock_sdk::key::FullKey;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21
22use crate::hummock::iterator::{Backward, HummockIterator, ValueMeta};
23use crate::hummock::sstable::SstableIteratorReadOptions;
24use crate::hummock::value::HummockValue;
25use crate::hummock::{
26    BlockIterator, HummockResult, SstableIteratorType, SstableStoreRef, TableHolder,
27};
28use crate::monitor::StoreLocalStatistic;
29
30/// Iterates backwards on a sstable.
31pub struct BackwardSstableIterator {
32    /// The iterator of the current block.
33    block_iter: Option<BlockIterator>,
34
35    /// Current block index.
36    cur_idx: usize,
37
38    /// Reference to the sstable
39    sst: TableHolder,
40
41    sstable_store: SstableStoreRef,
42
43    stats: StoreLocalStatistic,
44
45    // used for checking if the block is valid, filter out the block that is not in the table-id range
46    read_block_meta_range: (usize, usize),
47}
48
49impl BackwardSstableIterator {
50    pub fn new(
51        sstable: TableHolder,
52        sstable_store: SstableStoreRef,
53        sstable_info_ref: &SstableInfo,
54    ) -> Self {
55        let mut start_idx = 0;
56        let mut end_idx = sstable.meta.block_metas.len() - 1;
57        assert!(
58            !sstable_info_ref.table_ids.is_empty(),
59            "BackwardSstableIterator: SST {} (object {}) has empty table_ids",
60            sstable_info_ref.sst_id,
61            sstable_info_ref.object_id,
62        );
63        let read_table_id_range = (
64            *sstable_info_ref.table_ids.first().unwrap(),
65            *sstable_info_ref.table_ids.last().unwrap(),
66        );
67        assert!(
68            read_table_id_range.0 <= read_table_id_range.1,
69            "invalid table id range {} - {}",
70            read_table_id_range.0,
71            read_table_id_range.1
72        );
73        let block_meta_count = sstable.meta.block_metas.len();
74        assert!(block_meta_count > 0);
75        assert!(
76            sstable.meta.block_metas[0].table_id() <= read_table_id_range.0,
77            "table id {} not found table_ids in block_meta {:?}",
78            read_table_id_range.0,
79            sstable
80                .meta
81                .block_metas
82                .iter()
83                .map(|meta| meta.table_id())
84                .collect::<Vec<_>>()
85        );
86        assert!(
87            sstable.meta.block_metas[block_meta_count - 1].table_id() >= read_table_id_range.1,
88            "table id {} not found table_ids in block_meta {:?}",
89            read_table_id_range.1,
90            sstable
91                .meta
92                .block_metas
93                .iter()
94                .map(|meta| meta.table_id())
95                .collect::<Vec<_>>()
96        );
97
98        while start_idx < block_meta_count
99            && sstable.meta.block_metas[start_idx].table_id() < read_table_id_range.0
100        {
101            start_idx += 1;
102        }
103        // We assume that the table id read must exist in the sstable, otherwise it is a fatal error.
104        assert!(
105            start_idx < block_meta_count,
106            "table id {} not found table_ids in block_meta {:?}",
107            read_table_id_range.0,
108            sstable
109                .meta
110                .block_metas
111                .iter()
112                .map(|meta| meta.table_id())
113                .collect::<Vec<_>>()
114        );
115
116        while end_idx > start_idx
117            && sstable.meta.block_metas[end_idx].table_id() > read_table_id_range.1
118        {
119            end_idx -= 1;
120        }
121        assert!(
122            end_idx >= start_idx,
123            "end_idx {} < start_idx {} block_meta_count {}",
124            end_idx,
125            start_idx,
126            block_meta_count
127        );
128
129        Self {
130            block_iter: None,
131            cur_idx: end_idx,
132            sst: sstable,
133            sstable_store,
134            stats: StoreLocalStatistic::default(),
135            read_block_meta_range: (start_idx, end_idx),
136        }
137    }
138
139    /// Seeks to a block, and then seeks to the key if `seek_key` is given.
140    async fn seek_idx(
141        &mut self,
142        idx: isize,
143        seek_key: Option<FullKey<&[u8]>>,
144    ) -> HummockResult<()> {
145        if idx >= self.sst.block_count() as isize || idx < self.read_block_meta_range.0 as isize {
146            self.block_iter = None;
147        } else {
148            let block = self
149                .sstable_store
150                .get(
151                    &self.sst,
152                    idx as usize,
153                    crate::hummock::CachePolicy::Fill(Hint::Normal),
154                    &mut self.stats,
155                )
156                .await?;
157            let mut block_iter = BlockIterator::new(block);
158            if let Some(key) = seek_key {
159                block_iter.seek_le(key);
160            } else {
161                block_iter.seek_to_last();
162            }
163
164            self.block_iter = Some(block_iter);
165            self.cur_idx = idx as usize;
166        }
167
168        Ok(())
169    }
170}
171
172impl HummockIterator for BackwardSstableIterator {
173    type Direction = Backward;
174
175    async fn next(&mut self) -> HummockResult<()> {
176        self.stats.total_key_count += 1;
177        let block_iter = self.block_iter.as_mut().expect("no block iter");
178        if block_iter.try_prev() {
179            Ok(())
180        } else {
181            // seek to the previous block
182            self.seek_idx(self.cur_idx as isize - 1, None).await
183        }
184    }
185
186    fn key(&self) -> FullKey<&[u8]> {
187        self.block_iter.as_ref().expect("no block iter").key()
188    }
189
190    fn value(&self) -> HummockValue<&[u8]> {
191        let raw_value = self.block_iter.as_ref().expect("no block iter").value();
192
193        HummockValue::from_slice(raw_value).expect("decode error")
194    }
195
196    fn is_valid(&self) -> bool {
197        self.block_iter.as_ref().is_some_and(|i| i.is_valid())
198    }
199
200    /// Instead of setting idx to 0th block, a `BackwardSstableIterator` rewinds to the last block
201    /// in the sstable.
202    async fn rewind(&mut self) -> HummockResult<()> {
203        self.seek_idx(self.read_block_meta_range.1 as isize, None)
204            .await
205    }
206
207    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
208        let block_idx = self
209            .sst
210            .meta
211            .block_metas
212            .partition_point(|block_meta| {
213                // Compare by version comparator
214                // Note: we are comparing against the `smallest_key` of the `block`, thus the
215                // partition point should be `prev(<=)` instead of `<`.
216                let ord = FullKey::decode(&block_meta.smallest_key).cmp(&key);
217                ord == Less || ord == Equal
218            })
219            .saturating_sub(1); // considering the boundary of 0
220        let block_idx = block_idx as isize;
221
222        self.seek_idx(block_idx, Some(key)).await?;
223        if !self.is_valid() {
224            // Seek to prev block
225            self.seek_idx(block_idx - 1, None).await?;
226        }
227
228        Ok(())
229    }
230
231    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
232        stats.add(&self.stats)
233    }
234
235    fn value_meta(&self) -> ValueMeta {
236        ValueMeta {
237            object_id: Some(self.sst.id),
238            block_id: Some(self.cur_idx as _),
239        }
240    }
241}
242
243impl SstableIteratorType for BackwardSstableIterator {
244    fn create(
245        sstable: TableHolder,
246        sstable_store: SstableStoreRef,
247        _: Arc<SstableIteratorReadOptions>,
248        sstable_info_ref: &SstableInfo,
249    ) -> Self {
250        BackwardSstableIterator::new(sstable, sstable_store, sstable_info_ref)
251    }
252}
253
254/// Mirror the tests used for `SstableIterator`
255#[cfg(test)]
256mod tests {
257    use itertools::Itertools;
258    use rand::prelude::*;
259    use rand::rng as thread_rng;
260    use risingwave_common::catalog::TableId;
261    use risingwave_common::hash::VirtualNode;
262    use risingwave_common::util::epoch::test_epoch;
263    use risingwave_hummock_sdk::EpochWithGap;
264    use risingwave_hummock_sdk::key::UserKey;
265    use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
266
267    use super::*;
268    use crate::assert_bytes_eq;
269    use crate::hummock::iterator::test_utils::mock_sstable_store;
270    use crate::hummock::test_utils::{
271        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_default_test_sstable,
272        gen_test_sstable_with_table_ids, test_key_of, test_value_of,
273    };
274
275    #[tokio::test]
276    async fn test_backward_sstable_iterator() {
277        // build remote sstable
278        let sstable_store = mock_sstable_store().await;
279        let (handle, sstable_info) =
280            gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
281                .await;
282        // We should have at least 10 blocks, so that sstable iterator test could cover more code
283        // path.
284        assert!(handle.meta.block_metas.len() > 10);
285        let mut sstable_iter = BackwardSstableIterator::new(handle, sstable_store, &sstable_info);
286        let mut cnt = TEST_KEYS_COUNT;
287        sstable_iter.rewind().await.unwrap();
288
289        while sstable_iter.is_valid() {
290            cnt -= 1;
291            let key = sstable_iter.key();
292            let value = sstable_iter.value();
293            assert_eq!(key, test_key_of(cnt).to_ref());
294            assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
295            sstable_iter.next().await.unwrap();
296        }
297
298        assert_eq!(cnt, 0);
299    }
300
301    #[tokio::test]
302    async fn test_backward_sstable_seek() {
303        let sstable_store = mock_sstable_store().await;
304        let (sstable, sstable_info) =
305            gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
306                .await;
307        // We should have at least 10 blocks, so that sstable iterator test could cover more code
308        // path.
309        assert!(sstable.meta.block_metas.len() > 10);
310        let mut sstable_iter = BackwardSstableIterator::new(sstable, sstable_store, &sstable_info);
311        let mut all_key_to_test = (0..TEST_KEYS_COUNT).collect_vec();
312        let mut rng = thread_rng();
313        all_key_to_test.shuffle(&mut rng);
314
315        // We seek and access all the keys in random order
316        for i in all_key_to_test {
317            sstable_iter.seek(test_key_of(i).to_ref()).await.unwrap();
318            // sstable_iter.next().await.unwrap();
319            let key = sstable_iter.key();
320            assert_eq!(key, test_key_of(i).to_ref());
321        }
322
323        // Seek to key #TEST_KEYS_COUNT-500 and start iterating
324        sstable_iter
325            .seek(test_key_of(TEST_KEYS_COUNT - 500).to_ref())
326            .await
327            .unwrap();
328        for i in (0..TEST_KEYS_COUNT - 500 + 1).rev() {
329            let key = sstable_iter.key();
330            assert_eq!(key, test_key_of(i).to_ref(), "key index:{}", i);
331            sstable_iter.next().await.unwrap();
332        }
333        assert!(!sstable_iter.is_valid());
334
335        let largest_key = FullKey::for_test(
336            TableId::default(),
337            [
338                VirtualNode::ZERO.to_be_bytes().as_slice(),
339                format!("key_zzzz_{:05}", 0).as_bytes(),
340            ]
341            .concat(),
342            test_epoch(1),
343        );
344        sstable_iter.seek(largest_key.to_ref()).await.unwrap();
345        let key = sstable_iter.key();
346        assert_eq!(key, test_key_of(TEST_KEYS_COUNT - 1).to_ref());
347
348        // Seek to > last key
349        let smallest_key = FullKey::for_test(
350            TableId::default(),
351            [
352                VirtualNode::ZERO.to_be_bytes().as_slice(),
353                format!("key_aaaa_{:05}", 0).as_bytes(),
354            ]
355            .concat(),
356            test_epoch(1),
357        );
358        sstable_iter.seek(smallest_key.to_ref()).await.unwrap();
359        assert!(!sstable_iter.is_valid());
360
361        // Seek to non-existing key
362        for idx in (1..TEST_KEYS_COUNT).rev() {
363            // Seek to the previous key of each existing key. e.g.,
364            // Our key space is `key_test_00000`, `key_test_00002`, `key_test_00004`, ...
365            // And we seek to `key_test_00001` (will produce `key_test_00002`), `key_test_00003`
366            // (will produce `key_test_00004`).
367            sstable_iter
368                .seek(
369                    FullKey::for_test(
370                        TableId::default(),
371                        [
372                            VirtualNode::ZERO.to_be_bytes().as_slice(),
373                            format!("key_test_{:05}", idx * 2 - 1).as_bytes(),
374                        ]
375                        .concat(),
376                        0,
377                    )
378                    .to_ref(),
379                )
380                .await
381                .unwrap();
382
383            let key = sstable_iter.key();
384            assert_eq!(key, test_key_of(idx - 1).to_ref());
385            sstable_iter.next().await.unwrap();
386        }
387        assert!(!sstable_iter.is_valid());
388    }
389
390    #[tokio::test]
391    async fn test_read_table_id_range() {
392        {
393            let sstable_store = mock_sstable_store().await;
394            // test key_range right
395            let k1 = {
396                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
397                table_key.extend_from_slice(format!("key_test_{:05}", 1).as_bytes());
398                let uk = UserKey::for_test(TableId::from(1), table_key);
399                FullKey {
400                    user_key: uk,
401                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
402                }
403            };
404
405            let k2 = {
406                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
407                table_key.extend_from_slice(format!("key_test_{:05}", 2).as_bytes());
408                let uk = UserKey::for_test(TableId::from(2), table_key);
409                FullKey {
410                    user_key: uk,
411                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
412                }
413            };
414
415            let k3 = {
416                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
417                table_key.extend_from_slice(format!("key_test_{:05}", 3).as_bytes());
418                let uk = UserKey::for_test(TableId::from(3), table_key);
419                FullKey {
420                    user_key: uk,
421                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
422                }
423            };
424
425            {
426                let kv_pairs = vec![
427                    (k1.clone(), HummockValue::put(test_value_of(1))),
428                    (k2.clone(), HummockValue::put(test_value_of(2))),
429                    (k3.clone(), HummockValue::put(test_value_of(3))),
430                ];
431
432                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
433                    default_builder_opt_for_test(),
434                    10,
435                    kv_pairs.into_iter(),
436                    sstable_store.clone(),
437                    vec![1, 2, 3],
438                )
439                .await;
440                let mut sstable_iter = BackwardSstableIterator::create(
441                    sstable,
442                    sstable_store.clone(),
443                    Arc::new(SstableIteratorReadOptions::default()),
444                    &SstableInfo::from(SstableInfoInner {
445                        table_ids: vec![1.into(), 2.into(), 3.into()],
446                        ..Default::default()
447                    }),
448                );
449                sstable_iter.rewind().await.unwrap();
450                assert!(sstable_iter.is_valid());
451                assert!(sstable_iter.key().eq(&k3.to_ref()));
452
453                let mut cnt = 0;
454                let mut last_key = k1.clone();
455                while sstable_iter.is_valid() {
456                    last_key = sstable_iter.key().to_vec();
457                    cnt += 1;
458                    sstable_iter.next().await.unwrap();
459                }
460
461                assert_eq!(3, cnt);
462                assert_eq!(last_key, k1.clone());
463            }
464
465            {
466                let kv_pairs = vec![
467                    (k1.clone(), HummockValue::put(test_value_of(1))),
468                    (k2.clone(), HummockValue::put(test_value_of(2))),
469                    (k3.clone(), HummockValue::put(test_value_of(3))),
470                ];
471
472                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
473                    default_builder_opt_for_test(),
474                    10,
475                    kv_pairs.into_iter(),
476                    sstable_store.clone(),
477                    vec![1, 2, 3],
478                )
479                .await;
480
481                let mut sstable_iter = BackwardSstableIterator::create(
482                    sstable,
483                    sstable_store.clone(),
484                    Arc::new(SstableIteratorReadOptions::default()),
485                    &SstableInfo::from(SstableInfoInner {
486                        table_ids: vec![1.into(), 2.into()],
487                        ..Default::default()
488                    }),
489                );
490                sstable_iter.rewind().await.unwrap();
491                assert!(sstable_iter.is_valid());
492                assert!(sstable_iter.key().eq(&k2.to_ref()));
493
494                let mut cnt = 0;
495                let mut last_key = k1.clone();
496                while sstable_iter.is_valid() {
497                    last_key = sstable_iter.key().to_vec();
498                    cnt += 1;
499                    sstable_iter.next().await.unwrap();
500                }
501
502                assert_eq!(2, cnt);
503                assert_eq!(last_key, k1.clone());
504            }
505
506            {
507                let kv_pairs = vec![
508                    (k1.clone(), HummockValue::put(test_value_of(1))),
509                    (k2.clone(), HummockValue::put(test_value_of(2))),
510                    (k3.clone(), HummockValue::put(test_value_of(3))),
511                ];
512
513                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
514                    default_builder_opt_for_test(),
515                    10,
516                    kv_pairs.into_iter(),
517                    sstable_store.clone(),
518                    vec![1, 2, 3],
519                )
520                .await;
521
522                let mut sstable_iter = BackwardSstableIterator::create(
523                    sstable,
524                    sstable_store.clone(),
525                    Arc::new(SstableIteratorReadOptions::default()),
526                    &SstableInfo::from(SstableInfoInner {
527                        table_ids: vec![2.into(), 3.into()],
528                        ..Default::default()
529                    }),
530                );
531                sstable_iter.rewind().await.unwrap();
532                assert!(sstable_iter.is_valid());
533                assert!(sstable_iter.key().eq(&k3.to_ref()));
534
535                let mut cnt = 0;
536                let mut last_key = k1.clone();
537                while sstable_iter.is_valid() {
538                    last_key = sstable_iter.key().to_vec();
539                    cnt += 1;
540                    sstable_iter.next().await.unwrap();
541                }
542
543                assert_eq!(2, cnt);
544                assert_eq!(last_key, k2.clone());
545            }
546
547            {
548                let kv_pairs = vec![
549                    (k1.clone(), HummockValue::put(test_value_of(1))),
550                    (k2.clone(), HummockValue::put(test_value_of(2))),
551                    (k3.clone(), HummockValue::put(test_value_of(3))),
552                ];
553
554                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
555                    default_builder_opt_for_test(),
556                    10,
557                    kv_pairs.into_iter(),
558                    sstable_store.clone(),
559                    vec![1, 2, 3],
560                )
561                .await;
562
563                let mut sstable_iter = BackwardSstableIterator::create(
564                    sstable,
565                    sstable_store.clone(),
566                    Arc::new(SstableIteratorReadOptions::default()),
567                    &SstableInfo::from(SstableInfoInner {
568                        table_ids: vec![2.into()],
569                        ..Default::default()
570                    }),
571                );
572                sstable_iter.rewind().await.unwrap();
573                assert!(sstable_iter.is_valid());
574                assert!(sstable_iter.key().eq(&k2.to_ref()));
575
576                let mut cnt = 0;
577                let mut last_key = k1.clone();
578                while sstable_iter.is_valid() {
579                    last_key = sstable_iter.key().to_vec();
580                    cnt += 1;
581                    sstable_iter.next().await.unwrap();
582                }
583
584                assert_eq!(1, cnt);
585                assert_eq!(last_key, k2.clone());
586            }
587        }
588    }
589}