risingwave_storage/hummock/sstable/
block_iterator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::ops::Range;
17
18use bytes::BytesMut;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::key::FullKey;
21
22use super::{Block, KeyPrefix, LenType, RestartPoint};
23use crate::hummock::BlockHolder;
24use crate::monitor::LocalHitmap;
25
26/// [`BlockIterator`] is used to read kv pairs in a block.
27pub struct BlockIterator {
28    /// Block that iterates on.
29    block: BlockHolder,
30    /// Current restart point index.
31    restart_point_index: usize,
32    /// Current offset.
33    offset: usize,
34    /// Current key.
35    key: BytesMut,
36    /// Current value.
37    value_range: Range<usize>,
38    /// Current entry len.
39    entry_len: usize,
40
41    last_key_len_type: LenType,
42    last_value_len_type: LenType,
43
44    /// NOTE:
45    ///
46    /// - `hitmap` is supposed to be updated each time accessing the block data in a new position.
47    /// - `hitmap` must be reported to the block hitmap before drop.
48    hitmap: LocalHitmap<{ Block::HITMAP_ELEMS }>,
49}
50
51impl Drop for BlockIterator {
52    fn drop(&mut self) {
53        self.block.hitmap().report(&mut self.hitmap);
54    }
55}
56
57impl BlockIterator {
58    pub fn new(block: BlockHolder) -> Self {
59        let hitmap = LocalHitmap::default();
60        Self {
61            block,
62            offset: usize::MAX,
63            restart_point_index: usize::MAX,
64            key: BytesMut::default(),
65            value_range: 0..0,
66            entry_len: 0,
67            last_key_len_type: LenType::u8,
68            last_value_len_type: LenType::u8,
69            hitmap,
70        }
71    }
72
73    pub fn next(&mut self) {
74        assert!(self.is_valid());
75        self.next_inner();
76    }
77
78    pub fn try_next(&mut self) -> bool {
79        assert!(self.is_valid());
80        self.try_next_inner()
81    }
82
83    pub fn prev(&mut self) {
84        assert!(self.is_valid());
85        self.prev_inner();
86    }
87
88    pub fn try_prev(&mut self) -> bool {
89        assert!(self.is_valid());
90        self.try_prev_inner()
91    }
92
93    pub fn table_id(&self) -> TableId {
94        self.block.table_id()
95    }
96
97    pub fn key(&self) -> FullKey<&[u8]> {
98        assert!(self.is_valid());
99        FullKey::from_slice_without_table_id(self.table_id(), &self.key[..])
100    }
101
102    pub fn value(&self) -> &[u8] {
103        assert!(self.is_valid());
104        &self.block.data()[self.value_range.clone()]
105    }
106
107    pub fn is_valid(&self) -> bool {
108        self.offset < self.block.len()
109    }
110
111    pub fn seek_to_first(&mut self) {
112        self.seek_restart_point_by_index(0);
113    }
114
115    pub fn seek_to_last(&mut self) {
116        self.seek_restart_point_by_index(self.block.restart_point_len() - 1);
117        self.next_until_prev_offset(self.block.len());
118    }
119
120    pub fn seek(&mut self, key: FullKey<&[u8]>) {
121        self.seek_restart_point_by_key(key);
122        self.next_until_key(key);
123    }
124
125    pub fn seek_le(&mut self, key: FullKey<&[u8]>) {
126        self.seek_restart_point_by_key(key);
127        self.next_until_key(key);
128        if !self.is_valid() {
129            self.seek_to_last();
130        }
131        self.prev_until_key(key);
132    }
133}
134
135impl BlockIterator {
136    /// Invalidates current state after reaching a invalid state.
137    fn invalidate(&mut self) {
138        self.offset = self.block.len();
139        self.restart_point_index = self.block.restart_point_len();
140        self.key.clear();
141        self.value_range = 0..0;
142        self.entry_len = 0;
143    }
144
145    /// Moving to the next entry
146    ///
147    /// Note: The current state may be invalid if there is no more data to read
148    fn next_inner(&mut self) {
149        if !self.try_next_inner() {
150            self.invalidate();
151        }
152    }
153
154    /// Try moving to the next entry.
155    ///
156    /// The current state will still be valid if there is no more data to read.
157    ///
158    /// Return: true is the iterator is advanced and false otherwise.
159    fn try_next_inner(&mut self) -> bool {
160        let offset = self.offset + self.entry_len;
161        if offset >= self.block.len() {
162            return false;
163        }
164
165        // after seek, offset meet a new restart point we need to update it
166        if self.restart_point_index + 1 < self.block.restart_point_len()
167            && offset
168                >= self
169                    .block
170                    .restart_point(self.restart_point_index + 1)
171                    .offset as usize
172        {
173            let new_restart_point_index = self.restart_point_index + 1;
174            self.update_restart_point(new_restart_point_index);
175        }
176
177        let prefix =
178            self.decode_prefix_at(offset, self.last_key_len_type, self.last_value_len_type);
179        self.key.truncate(prefix.overlap_len());
180        self.key
181            .extend_from_slice(&self.block.data()[prefix.diff_key_range()]);
182
183        self.value_range = prefix.value_range();
184        self.offset = offset;
185        self.entry_len = prefix.entry_len();
186
187        self.hitmap
188            .fill_with_range(self.offset, self.value_range.end, self.block.len());
189
190        true
191    }
192
193    /// Moves forward until reaching the first that equals or larger than the given `key`.
194    fn next_until_key(&mut self, key: FullKey<&[u8]>) {
195        while self.is_valid() && self.key().cmp(&key) == Ordering::Less {
196            self.next_inner();
197        }
198    }
199
200    /// Moves backward until reaching the first key that equals or smaller than the given `key`.
201    fn prev_until_key(&mut self, key: FullKey<&[u8]>) {
202        while self.is_valid() && self.key().cmp(&key) == Ordering::Greater {
203            self.prev_inner();
204        }
205    }
206
207    /// Moves forward until the position reaches the previous position of the given `next_offset` or
208    /// the last valid position if exists.
209    fn next_until_prev_offset(&mut self, offset: usize) {
210        while self.offset + self.entry_len < std::cmp::min(self.block.len(), offset) {
211            self.next_inner();
212        }
213    }
214
215    /// Moving to the previous entry
216    ///
217    /// Note: The current state may be invalid if there is no more data to read
218    fn prev_inner(&mut self) {
219        if !self.try_prev_inner() {
220            self.invalidate();
221        }
222    }
223
224    /// Try moving to the previous entry.
225    ///
226    /// The current state will still be valid if there is no more data to read.
227    ///
228    /// Return: true is the iterator is advanced and false otherwise.
229    fn try_prev_inner(&mut self) -> bool {
230        if self.offset == 0 {
231            return false;
232        }
233
234        if self.block.restart_point(self.restart_point_index).offset as usize == self.offset {
235            self.restart_point_index -= 1;
236        }
237        let origin_offset = self.offset;
238        self.seek_restart_point_by_index(self.restart_point_index);
239        self.next_until_prev_offset(origin_offset);
240        true
241    }
242
243    /// Decodes [`KeyPrefix`] at given offset.
244    fn decode_prefix_at(
245        &self,
246        offset: usize,
247        key_len_type: LenType,
248        value_len_type: LenType,
249    ) -> KeyPrefix {
250        KeyPrefix::decode(
251            &mut &self.block.data()[offset..],
252            offset,
253            key_len_type,
254            value_len_type,
255        )
256    }
257
258    /// Searches the restart point index that the given `key` belongs to.
259    fn search_restart_point_index_by_key(&mut self, key: FullKey<&[u8]>) -> usize {
260        // Find the largest restart point that restart key equals or less than the given key.
261
262        self.block
263            .search_restart_partition_point(
264                |&RestartPoint {
265                     offset: probe,
266                     key_len_type,
267                     value_len_type,
268                 }| {
269                    let probe = probe as usize;
270                    let prefix = KeyPrefix::decode(
271                        &mut &self.block.data()[probe..],
272                        probe,
273                        key_len_type,
274                        value_len_type,
275                    );
276                    let probe_key = &self.block.data()[prefix.diff_key_range()];
277                    let full_probe_key =
278                        FullKey::from_slice_without_table_id(self.block.table_id(), probe_key);
279                    self.hitmap.fill_with_range(
280                        probe,
281                        prefix.diff_key_range().end,
282                        self.block.len(),
283                    );
284                    match full_probe_key.cmp(&key) {
285                        Ordering::Less | Ordering::Equal => true,
286                        Ordering::Greater => false,
287                    }
288                },
289            )
290            // Prevent from underflowing when given is smaller than the first.
291            .saturating_sub(1)
292    }
293
294    /// Seeks to the restart point that the given `key` belongs to.
295    fn seek_restart_point_by_key(&mut self, key: FullKey<&[u8]>) {
296        let index = self.search_restart_point_index_by_key(key);
297        self.seek_restart_point_by_index(index)
298    }
299
300    /// Seeks to the restart point by given restart point index.
301    fn seek_restart_point_by_index(&mut self, index: usize) {
302        let restart_point = self.block.restart_point(index);
303        let offset = restart_point.offset as usize;
304        let prefix = self.decode_prefix_at(
305            offset,
306            restart_point.key_len_type,
307            restart_point.value_len_type,
308        );
309
310        self.key = BytesMut::from(&self.block.data()[prefix.diff_key_range()]);
311        self.value_range = prefix.value_range();
312        self.offset = offset;
313        self.entry_len = prefix.entry_len();
314        self.update_restart_point(index);
315
316        self.hitmap
317            .fill_with_range(self.offset, self.value_range.end, self.block.len());
318    }
319
320    fn update_restart_point(&mut self, index: usize) {
321        self.restart_point_index = index;
322        let restart_point = self.block.restart_point(index);
323
324        self.last_key_len_type = restart_point.key_len_type;
325        self.last_value_len_type = restart_point.value_len_type;
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use risingwave_common::util::epoch::test_epoch;
332
333    use super::*;
334    use crate::hummock::{BlockBuilder, BlockBuilderOptions};
335
336    fn build_iterator_for_test() -> BlockIterator {
337        let options = BlockBuilderOptions::default();
338        let mut builder = BlockBuilder::new(options);
339        builder.add_for_test(construct_full_key_struct_for_test(0, b"k01", 1), b"v01");
340        builder.add_for_test(construct_full_key_struct_for_test(0, b"k02", 2), b"v02");
341        builder.add_for_test(construct_full_key_struct_for_test(0, b"k04", 4), b"v04");
342        builder.add_for_test(construct_full_key_struct_for_test(0, b"k05", 5), b"v05");
343        let capacity = builder.uncompressed_block_size();
344        let buf = builder.build().to_vec();
345        BlockIterator::new(BlockHolder::from_owned_block(Box::new(
346            Block::decode(buf.into(), capacity).unwrap(),
347        )))
348    }
349
350    #[test]
351    fn test_seek_first() {
352        let mut it = build_iterator_for_test();
353        it.seek_to_first();
354        assert!(it.is_valid());
355        assert_eq!(construct_full_key_struct_for_test(0, b"k01", 1), it.key());
356        assert_eq!(b"v01", it.value());
357    }
358
359    #[test]
360    fn test_seek_last() {
361        let mut it = build_iterator_for_test();
362        it.seek_to_last();
363        assert!(it.is_valid());
364        assert_eq!(construct_full_key_struct_for_test(0, b"k05", 5), it.key());
365        assert_eq!(b"v05", it.value());
366    }
367
368    #[test]
369    fn test_seek_none_front() {
370        let mut it = build_iterator_for_test();
371        it.seek(construct_full_key_struct_for_test(0, b"k00", 0));
372        assert!(it.is_valid());
373        assert_eq!(construct_full_key_struct_for_test(0, b"k01", 1), it.key());
374        assert_eq!(b"v01", it.value());
375
376        let mut it = build_iterator_for_test();
377
378        it.seek_le(construct_full_key_struct_for_test(0, b"k00", 0));
379        assert!(!it.is_valid());
380    }
381
382    #[test]
383    fn test_seek_none_back() {
384        let mut it = build_iterator_for_test();
385        it.seek(construct_full_key_struct_for_test(0, b"k06", 6));
386        assert!(!it.is_valid());
387
388        let mut it = build_iterator_for_test();
389        it.seek_le(construct_full_key_struct_for_test(0, b"k06", 6));
390        assert!(it.is_valid());
391        assert_eq!(construct_full_key_struct_for_test(0, b"k05", 5), it.key());
392        assert_eq!(b"v05", it.value());
393    }
394
395    #[test]
396    fn bi_direction_seek() {
397        let mut it = build_iterator_for_test();
398        it.seek(construct_full_key_struct_for_test(0, b"k03", 3));
399        assert_eq!(
400            construct_full_key_struct_for_test(0, format!("k{:02}", 4).as_bytes(), 4),
401            it.key()
402        );
403
404        it.seek_le(construct_full_key_struct_for_test(0, b"k03", 3));
405        assert_eq!(
406            construct_full_key_struct_for_test(0, format!("k{:02}", 2).as_bytes(), 2),
407            it.key()
408        );
409    }
410
411    #[test]
412    fn test_forward_iterate() {
413        let mut it = build_iterator_for_test();
414
415        it.seek_to_first();
416        assert!(it.is_valid());
417        assert_eq!(construct_full_key_struct_for_test(0, b"k01", 1), it.key());
418        assert_eq!(b"v01", it.value());
419
420        it.next();
421        assert!(it.is_valid());
422        assert_eq!(construct_full_key_struct_for_test(0, b"k02", 2), it.key());
423        assert_eq!(b"v02", it.value());
424
425        it.next();
426        assert!(it.is_valid());
427        assert_eq!(construct_full_key_struct_for_test(0, b"k04", 4), it.key());
428        assert_eq!(b"v04", it.value());
429
430        it.next();
431        assert!(it.is_valid());
432        assert_eq!(construct_full_key_struct_for_test(0, b"k05", 5), it.key());
433        assert_eq!(b"v05", it.value());
434
435        it.next();
436        assert!(!it.is_valid());
437    }
438
439    #[test]
440    fn test_backward_iterate() {
441        let mut it = build_iterator_for_test();
442
443        it.seek_to_last();
444        assert!(it.is_valid());
445        assert_eq!(construct_full_key_struct_for_test(0, b"k05", 5), it.key());
446        assert_eq!(b"v05", it.value());
447
448        it.prev();
449        assert!(it.is_valid());
450        assert_eq!(construct_full_key_struct_for_test(0, b"k04", 4), it.key());
451        assert_eq!(b"v04", it.value());
452
453        it.prev();
454        assert!(it.is_valid());
455        assert_eq!(construct_full_key_struct_for_test(0, b"k02", 2), it.key());
456        assert_eq!(b"v02", it.value());
457
458        it.prev();
459        assert!(it.is_valid());
460        assert_eq!(construct_full_key_struct_for_test(0, b"k01", 1), it.key());
461        assert_eq!(b"v01", it.value());
462
463        it.prev();
464        assert!(!it.is_valid());
465    }
466
467    #[test]
468    fn test_seek_forward_backward_iterate() {
469        let mut it = build_iterator_for_test();
470
471        it.seek(construct_full_key_struct_for_test(0, b"k03", 3));
472        assert_eq!(
473            construct_full_key_struct_for_test(0, format!("k{:02}", 4).as_bytes(), 4),
474            it.key()
475        );
476
477        it.prev();
478        assert_eq!(
479            construct_full_key_struct_for_test(0, format!("k{:02}", 2).as_bytes(), 2),
480            it.key()
481        );
482
483        it.next();
484        assert_eq!(
485            construct_full_key_struct_for_test(0, format!("k{:02}", 4).as_bytes(), 4),
486            it.key()
487        );
488    }
489
490    pub fn construct_full_key_struct_for_test(
491        table_id: u32,
492        table_key: &[u8],
493        epoch: u64,
494    ) -> FullKey<&[u8]> {
495        FullKey::for_test(TableId::new(table_id), table_key, test_epoch(epoch))
496    }
497}