risingwave_storage/hummock/sstable/
block_iterator.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::ops::Range;
17
18use bytes::BytesMut;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::key::FullKey;
21
22use super::{Block, KeyPrefix, LenType, RestartPoint};
23use crate::hummock::BlockHolder;
24use crate::monitor::LocalHitmap;
25
26/// [`BlockIterator`] is used to read kv pairs in a block.
27pub struct BlockIterator {
28    /// Block that iterates on.
29    block: BlockHolder,
30    /// Current restart point index.
31    restart_point_index: usize,
32    /// Current offset.
33    offset: usize,
34    /// Current key.
35    key: BytesMut,
36    /// Current value.
37    value_range: Range<usize>,
38    /// Current entry len.
39    entry_len: usize,
40
41    last_key_len_type: LenType,
42    last_value_len_type: LenType,
43
44    /// NOTE:
45    ///
46    /// - `hitmap` is supposed to be updated each time accessing the block data in a new position.
47    /// - `hitmap` must be reported to the block hitmap before drop.
48    hitmap: LocalHitmap<{ Block::HITMAP_ELEMS }>,
49}
50
51impl Drop for BlockIterator {
52    fn drop(&mut self) {
53        self.block.hitmap().report(&mut self.hitmap);
54    }
55}
56
57impl BlockIterator {
58    pub fn new(block: BlockHolder) -> Self {
59        let hitmap = LocalHitmap::default();
60        Self {
61            block,
62            offset: usize::MAX,
63            restart_point_index: usize::MAX,
64            key: BytesMut::default(),
65            value_range: 0..0,
66            entry_len: 0,
67            last_key_len_type: LenType::u8,
68            last_value_len_type: LenType::u8,
69            hitmap,
70        }
71    }
72
73    pub fn next(&mut self) {
74        assert!(self.is_valid());
75        self.next_inner();
76    }
77
78    pub fn try_next(&mut self) -> bool {
79        assert!(self.is_valid());
80        self.try_next_inner()
81    }
82
83    pub fn prev(&mut self) {
84        assert!(self.is_valid());
85        self.prev_inner();
86    }
87
88    pub fn try_prev(&mut self) -> bool {
89        assert!(self.is_valid());
90        self.try_prev_inner()
91    }
92
93    pub fn table_id(&self) -> TableId {
94        self.block.table_id()
95    }
96
97    pub fn key(&self) -> FullKey<&[u8]> {
98        assert!(self.is_valid());
99        FullKey::from_slice_without_table_id(self.table_id(), &self.key[..])
100    }
101
102    pub fn value(&self) -> &[u8] {
103        assert!(self.is_valid());
104        &self.block.data()[self.value_range.clone()]
105    }
106
107    pub fn is_valid(&self) -> bool {
108        self.offset < self.block.len()
109    }
110
111    pub fn seek_to_first(&mut self) {
112        self.seek_restart_point_by_index(0);
113    }
114
115    pub fn seek_to_last(&mut self) {
116        self.seek_restart_point_by_index(self.block.restart_point_len() - 1);
117        self.next_until_prev_offset(self.block.len());
118    }
119
120    pub fn seek(&mut self, key: FullKey<&[u8]>) {
121        self.seek_restart_point_by_key(key);
122        self.next_until_key(key);
123    }
124
125    pub fn seek_le(&mut self, key: FullKey<&[u8]>) {
126        self.seek_restart_point_by_key(key);
127        self.next_until_key(key);
128        if !self.is_valid() {
129            self.seek_to_last();
130        }
131        self.prev_until_key(key);
132    }
133
134    pub(crate) fn finish_block(&mut self) {
135        self.invalidate();
136    }
137
138    /// Invalidates current state after reaching a invalid state.
139    fn invalidate(&mut self) {
140        self.offset = self.block.len();
141        self.restart_point_index = self.block.restart_point_len();
142        self.key.clear();
143        self.value_range = 0..0;
144        self.entry_len = 0;
145    }
146
147    /// Moving to the next entry
148    ///
149    /// Note: The current state may be invalid if there is no more data to read
150    fn next_inner(&mut self) {
151        if !self.try_next_inner() {
152            self.invalidate();
153        }
154    }
155
156    /// Try moving to the next entry.
157    ///
158    /// The current state will still be valid if there is no more data to read.
159    ///
160    /// Return: true is the iterator is advanced and false otherwise.
161    fn try_next_inner(&mut self) -> bool {
162        let offset = self.offset + self.entry_len;
163        if offset >= self.block.len() {
164            return false;
165        }
166
167        // after seek, offset meet a new restart point we need to update it
168        if self.restart_point_index + 1 < self.block.restart_point_len()
169            && offset
170                >= self
171                    .block
172                    .restart_point(self.restart_point_index + 1)
173                    .offset as usize
174        {
175            let new_restart_point_index = self.restart_point_index + 1;
176            self.update_restart_point(new_restart_point_index);
177        }
178
179        let prefix =
180            self.decode_prefix_at(offset, self.last_key_len_type, self.last_value_len_type);
181        self.key.truncate(prefix.overlap_len());
182        self.key
183            .extend_from_slice(&self.block.data()[prefix.diff_key_range()]);
184
185        self.value_range = prefix.value_range();
186        self.offset = offset;
187        self.entry_len = prefix.entry_len();
188
189        self.hitmap
190            .fill_with_range(self.offset, self.value_range.end, self.block.len());
191
192        true
193    }
194
195    /// Moves forward until reaching the first that equals or larger than the given `key`.
196    fn next_until_key(&mut self, key: FullKey<&[u8]>) {
197        while self.is_valid() && self.key().cmp(&key) == Ordering::Less {
198            self.next_inner();
199        }
200    }
201
202    /// Moves backward until reaching the first key that equals or smaller than the given `key`.
203    fn prev_until_key(&mut self, key: FullKey<&[u8]>) {
204        while self.is_valid() && self.key().cmp(&key) == Ordering::Greater {
205            self.prev_inner();
206        }
207    }
208
209    /// Moves forward until the position reaches the previous position of the given `next_offset` or
210    /// the last valid position if exists.
211    fn next_until_prev_offset(&mut self, offset: usize) {
212        while self.offset + self.entry_len < std::cmp::min(self.block.len(), offset) {
213            self.next_inner();
214        }
215    }
216
217    /// Moving to the previous entry
218    ///
219    /// Note: The current state may be invalid if there is no more data to read
220    fn prev_inner(&mut self) {
221        if !self.try_prev_inner() {
222            self.invalidate();
223        }
224    }
225
226    /// Try moving to the previous entry.
227    ///
228    /// The current state will still be valid if there is no more data to read.
229    ///
230    /// Return: true is the iterator is advanced and false otherwise.
231    fn try_prev_inner(&mut self) -> bool {
232        if self.offset == 0 {
233            return false;
234        }
235
236        if self.block.restart_point(self.restart_point_index).offset as usize == self.offset {
237            self.restart_point_index -= 1;
238        }
239        let origin_offset = self.offset;
240        self.seek_restart_point_by_index(self.restart_point_index);
241        self.next_until_prev_offset(origin_offset);
242        true
243    }
244
245    /// Decodes [`KeyPrefix`] at given offset.
246    fn decode_prefix_at(
247        &self,
248        offset: usize,
249        key_len_type: LenType,
250        value_len_type: LenType,
251    ) -> KeyPrefix {
252        KeyPrefix::decode(
253            &mut &self.block.data()[offset..],
254            offset,
255            key_len_type,
256            value_len_type,
257        )
258    }
259
260    /// Searches the restart point index that the given `key` belongs to.
261    fn search_restart_point_index_by_key(&mut self, key: FullKey<&[u8]>) -> usize {
262        // Find the largest restart point that restart key equals or less than the given key.
263
264        self.block
265            .search_restart_partition_point(
266                |&RestartPoint {
267                     offset: probe,
268                     key_len_type,
269                     value_len_type,
270                 }| {
271                    let probe = probe as usize;
272                    let prefix = KeyPrefix::decode(
273                        &mut &self.block.data()[probe..],
274                        probe,
275                        key_len_type,
276                        value_len_type,
277                    );
278                    let probe_key = &self.block.data()[prefix.diff_key_range()];
279                    let full_probe_key =
280                        FullKey::from_slice_without_table_id(self.block.table_id(), probe_key);
281                    self.hitmap.fill_with_range(
282                        probe,
283                        prefix.diff_key_range().end,
284                        self.block.len(),
285                    );
286                    match full_probe_key.cmp(&key) {
287                        Ordering::Less | Ordering::Equal => true,
288                        Ordering::Greater => false,
289                    }
290                },
291            )
292            // Prevent from underflowing when given is smaller than the first.
293            .saturating_sub(1)
294    }
295
296    /// Seeks to the restart point that the given `key` belongs to.
297    fn seek_restart_point_by_key(&mut self, key: FullKey<&[u8]>) {
298        let index = self.search_restart_point_index_by_key(key);
299        self.seek_restart_point_by_index(index)
300    }
301
302    /// Seeks to the restart point by given restart point index.
303    fn seek_restart_point_by_index(&mut self, index: usize) {
304        let restart_point = self.block.restart_point(index);
305        let offset = restart_point.offset as usize;
306        let prefix = self.decode_prefix_at(
307            offset,
308            restart_point.key_len_type,
309            restart_point.value_len_type,
310        );
311
312        self.key = BytesMut::from(&self.block.data()[prefix.diff_key_range()]);
313        self.value_range = prefix.value_range();
314        self.offset = offset;
315        self.entry_len = prefix.entry_len();
316        self.update_restart_point(index);
317
318        self.hitmap
319            .fill_with_range(self.offset, self.value_range.end, self.block.len());
320    }
321
322    fn update_restart_point(&mut self, index: usize) {
323        self.restart_point_index = index;
324        let restart_point = self.block.restart_point(index);
325
326        self.last_key_len_type = restart_point.key_len_type;
327        self.last_value_len_type = restart_point.value_len_type;
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use risingwave_common::util::epoch::test_epoch;
334
335    use super::*;
336    use crate::hummock::{BlockBuilder, BlockBuilderOptions};
337
338    fn build_iterator_for_test() -> BlockIterator {
339        let options = BlockBuilderOptions::default();
340        let mut builder = BlockBuilder::new(options);
341        builder.add_for_test(construct_full_key_struct_for_test(0, b"k01", 1), b"v01");
342        builder.add_for_test(construct_full_key_struct_for_test(0, b"k02", 2), b"v02");
343        builder.add_for_test(construct_full_key_struct_for_test(0, b"k04", 4), b"v04");
344        builder.add_for_test(construct_full_key_struct_for_test(0, b"k05", 5), b"v05");
345        let capacity = builder.uncompressed_block_size();
346        let buf = builder.build().to_vec();
347        BlockIterator::new(BlockHolder::from_owned_block(Box::new(
348            Block::decode(buf.into(), capacity).unwrap(),
349        )))
350    }
351
352    #[test]
353    fn test_seek_first() {
354        let mut it = build_iterator_for_test();
355        it.seek_to_first();
356        assert!(it.is_valid());
357        assert_eq!(construct_full_key_struct_for_test(0, b"k01", 1), it.key());
358        assert_eq!(b"v01", it.value());
359    }
360
361    #[test]
362    fn test_seek_last() {
363        let mut it = build_iterator_for_test();
364        it.seek_to_last();
365        assert!(it.is_valid());
366        assert_eq!(construct_full_key_struct_for_test(0, b"k05", 5), it.key());
367        assert_eq!(b"v05", it.value());
368    }
369
370    #[test]
371    fn test_seek_none_front() {
372        let mut it = build_iterator_for_test();
373        it.seek(construct_full_key_struct_for_test(0, b"k00", 0));
374        assert!(it.is_valid());
375        assert_eq!(construct_full_key_struct_for_test(0, b"k01", 1), it.key());
376        assert_eq!(b"v01", it.value());
377
378        let mut it = build_iterator_for_test();
379
380        it.seek_le(construct_full_key_struct_for_test(0, b"k00", 0));
381        assert!(!it.is_valid());
382    }
383
384    #[test]
385    fn test_seek_none_back() {
386        let mut it = build_iterator_for_test();
387        it.seek(construct_full_key_struct_for_test(0, b"k06", 6));
388        assert!(!it.is_valid());
389
390        let mut it = build_iterator_for_test();
391        it.seek_le(construct_full_key_struct_for_test(0, b"k06", 6));
392        assert!(it.is_valid());
393        assert_eq!(construct_full_key_struct_for_test(0, b"k05", 5), it.key());
394        assert_eq!(b"v05", it.value());
395    }
396
397    #[test]
398    fn bi_direction_seek() {
399        let mut it = build_iterator_for_test();
400        it.seek(construct_full_key_struct_for_test(0, b"k03", 3));
401        assert_eq!(
402            construct_full_key_struct_for_test(0, format!("k{:02}", 4).as_bytes(), 4),
403            it.key()
404        );
405
406        it.seek_le(construct_full_key_struct_for_test(0, b"k03", 3));
407        assert_eq!(
408            construct_full_key_struct_for_test(0, format!("k{:02}", 2).as_bytes(), 2),
409            it.key()
410        );
411    }
412
413    #[test]
414    fn test_forward_iterate() {
415        let mut it = build_iterator_for_test();
416
417        it.seek_to_first();
418        assert!(it.is_valid());
419        assert_eq!(construct_full_key_struct_for_test(0, b"k01", 1), it.key());
420        assert_eq!(b"v01", it.value());
421
422        it.next();
423        assert!(it.is_valid());
424        assert_eq!(construct_full_key_struct_for_test(0, b"k02", 2), it.key());
425        assert_eq!(b"v02", it.value());
426
427        it.next();
428        assert!(it.is_valid());
429        assert_eq!(construct_full_key_struct_for_test(0, b"k04", 4), it.key());
430        assert_eq!(b"v04", it.value());
431
432        it.next();
433        assert!(it.is_valid());
434        assert_eq!(construct_full_key_struct_for_test(0, b"k05", 5), it.key());
435        assert_eq!(b"v05", it.value());
436
437        it.next();
438        assert!(!it.is_valid());
439    }
440
441    #[test]
442    fn test_backward_iterate() {
443        let mut it = build_iterator_for_test();
444
445        it.seek_to_last();
446        assert!(it.is_valid());
447        assert_eq!(construct_full_key_struct_for_test(0, b"k05", 5), it.key());
448        assert_eq!(b"v05", it.value());
449
450        it.prev();
451        assert!(it.is_valid());
452        assert_eq!(construct_full_key_struct_for_test(0, b"k04", 4), it.key());
453        assert_eq!(b"v04", it.value());
454
455        it.prev();
456        assert!(it.is_valid());
457        assert_eq!(construct_full_key_struct_for_test(0, b"k02", 2), it.key());
458        assert_eq!(b"v02", it.value());
459
460        it.prev();
461        assert!(it.is_valid());
462        assert_eq!(construct_full_key_struct_for_test(0, b"k01", 1), it.key());
463        assert_eq!(b"v01", it.value());
464
465        it.prev();
466        assert!(!it.is_valid());
467    }
468
469    #[test]
470    fn test_seek_forward_backward_iterate() {
471        let mut it = build_iterator_for_test();
472
473        it.seek(construct_full_key_struct_for_test(0, b"k03", 3));
474        assert_eq!(
475            construct_full_key_struct_for_test(0, format!("k{:02}", 4).as_bytes(), 4),
476            it.key()
477        );
478
479        it.prev();
480        assert_eq!(
481            construct_full_key_struct_for_test(0, format!("k{:02}", 2).as_bytes(), 2),
482            it.key()
483        );
484
485        it.next();
486        assert_eq!(
487            construct_full_key_struct_for_test(0, format!("k{:02}", 4).as_bytes(), 4),
488            it.key()
489        );
490    }
491
492    pub fn construct_full_key_struct_for_test(
493        table_id: u32,
494        table_key: &[u8],
495        epoch: u64,
496    ) -> FullKey<&[u8]> {
497        FullKey::for_test(TableId::new(table_id), table_key, test_epoch(epoch))
498    }
499}