1use std::cmp::Ordering;
16use std::ops::Range;
17
18use bytes::BytesMut;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::key::FullKey;
21
22use super::{Block, KeyPrefix, LenType, RestartPoint};
23use crate::hummock::BlockHolder;
24use crate::monitor::LocalHitmap;
25
26pub struct BlockIterator {
28 block: BlockHolder,
30 restart_point_index: usize,
32 offset: usize,
34 key: BytesMut,
36 value_range: Range<usize>,
38 entry_len: usize,
40
41 last_key_len_type: LenType,
42 last_value_len_type: LenType,
43
44 hitmap: LocalHitmap<{ Block::HITMAP_ELEMS }>,
49}
50
51impl Drop for BlockIterator {
52 fn drop(&mut self) {
53 self.block.hitmap().report(&mut self.hitmap);
54 }
55}
56
57impl BlockIterator {
58 pub fn new(block: BlockHolder) -> Self {
59 let hitmap = LocalHitmap::default();
60 Self {
61 block,
62 offset: usize::MAX,
63 restart_point_index: usize::MAX,
64 key: BytesMut::default(),
65 value_range: 0..0,
66 entry_len: 0,
67 last_key_len_type: LenType::u8,
68 last_value_len_type: LenType::u8,
69 hitmap,
70 }
71 }
72
73 pub fn next(&mut self) {
74 assert!(self.is_valid());
75 self.next_inner();
76 }
77
78 pub fn try_next(&mut self) -> bool {
79 assert!(self.is_valid());
80 self.try_next_inner()
81 }
82
83 pub fn prev(&mut self) {
84 assert!(self.is_valid());
85 self.prev_inner();
86 }
87
88 pub fn try_prev(&mut self) -> bool {
89 assert!(self.is_valid());
90 self.try_prev_inner()
91 }
92
93 pub fn table_id(&self) -> TableId {
94 self.block.table_id()
95 }
96
97 pub fn key(&self) -> FullKey<&[u8]> {
98 assert!(self.is_valid());
99 FullKey::from_slice_without_table_id(self.table_id(), &self.key[..])
100 }
101
102 pub fn value(&self) -> &[u8] {
103 assert!(self.is_valid());
104 &self.block.data()[self.value_range.clone()]
105 }
106
107 pub fn is_valid(&self) -> bool {
108 self.offset < self.block.len()
109 }
110
111 pub fn seek_to_first(&mut self) {
112 self.seek_restart_point_by_index(0);
113 }
114
115 pub fn seek_to_last(&mut self) {
116 self.seek_restart_point_by_index(self.block.restart_point_len() - 1);
117 self.next_until_prev_offset(self.block.len());
118 }
119
120 pub fn seek(&mut self, key: FullKey<&[u8]>) {
121 self.seek_restart_point_by_key(key);
122 self.next_until_key(key);
123 }
124
125 pub fn seek_le(&mut self, key: FullKey<&[u8]>) {
126 self.seek_restart_point_by_key(key);
127 self.next_until_key(key);
128 if !self.is_valid() {
129 self.seek_to_last();
130 }
131 self.prev_until_key(key);
132 }
133
134 pub(crate) fn finish_block(&mut self) {
135 self.invalidate();
136 }
137
138 fn invalidate(&mut self) {
140 self.offset = self.block.len();
141 self.restart_point_index = self.block.restart_point_len();
142 self.key.clear();
143 self.value_range = 0..0;
144 self.entry_len = 0;
145 }
146
147 fn next_inner(&mut self) {
151 if !self.try_next_inner() {
152 self.invalidate();
153 }
154 }
155
156 fn try_next_inner(&mut self) -> bool {
162 let offset = self.offset + self.entry_len;
163 if offset >= self.block.len() {
164 return false;
165 }
166
167 if self.restart_point_index + 1 < self.block.restart_point_len()
169 && offset
170 >= self
171 .block
172 .restart_point(self.restart_point_index + 1)
173 .offset as usize
174 {
175 let new_restart_point_index = self.restart_point_index + 1;
176 self.update_restart_point(new_restart_point_index);
177 }
178
179 let prefix =
180 self.decode_prefix_at(offset, self.last_key_len_type, self.last_value_len_type);
181 self.key.truncate(prefix.overlap_len());
182 self.key
183 .extend_from_slice(&self.block.data()[prefix.diff_key_range()]);
184
185 self.value_range = prefix.value_range();
186 self.offset = offset;
187 self.entry_len = prefix.entry_len();
188
189 self.hitmap
190 .fill_with_range(self.offset, self.value_range.end, self.block.len());
191
192 true
193 }
194
195 fn next_until_key(&mut self, key: FullKey<&[u8]>) {
197 while self.is_valid() && self.key().cmp(&key) == Ordering::Less {
198 self.next_inner();
199 }
200 }
201
202 fn prev_until_key(&mut self, key: FullKey<&[u8]>) {
204 while self.is_valid() && self.key().cmp(&key) == Ordering::Greater {
205 self.prev_inner();
206 }
207 }
208
209 fn next_until_prev_offset(&mut self, offset: usize) {
212 while self.offset + self.entry_len < std::cmp::min(self.block.len(), offset) {
213 self.next_inner();
214 }
215 }
216
217 fn prev_inner(&mut self) {
221 if !self.try_prev_inner() {
222 self.invalidate();
223 }
224 }
225
226 fn try_prev_inner(&mut self) -> bool {
232 if self.offset == 0 {
233 return false;
234 }
235
236 if self.block.restart_point(self.restart_point_index).offset as usize == self.offset {
237 self.restart_point_index -= 1;
238 }
239 let origin_offset = self.offset;
240 self.seek_restart_point_by_index(self.restart_point_index);
241 self.next_until_prev_offset(origin_offset);
242 true
243 }
244
245 fn decode_prefix_at(
247 &self,
248 offset: usize,
249 key_len_type: LenType,
250 value_len_type: LenType,
251 ) -> KeyPrefix {
252 KeyPrefix::decode(
253 &mut &self.block.data()[offset..],
254 offset,
255 key_len_type,
256 value_len_type,
257 )
258 }
259
260 fn search_restart_point_index_by_key(&mut self, key: FullKey<&[u8]>) -> usize {
262 self.block
265 .search_restart_partition_point(
266 |&RestartPoint {
267 offset: probe,
268 key_len_type,
269 value_len_type,
270 }| {
271 let probe = probe as usize;
272 let prefix = KeyPrefix::decode(
273 &mut &self.block.data()[probe..],
274 probe,
275 key_len_type,
276 value_len_type,
277 );
278 let probe_key = &self.block.data()[prefix.diff_key_range()];
279 let full_probe_key =
280 FullKey::from_slice_without_table_id(self.block.table_id(), probe_key);
281 self.hitmap.fill_with_range(
282 probe,
283 prefix.diff_key_range().end,
284 self.block.len(),
285 );
286 match full_probe_key.cmp(&key) {
287 Ordering::Less | Ordering::Equal => true,
288 Ordering::Greater => false,
289 }
290 },
291 )
292 .saturating_sub(1)
294 }
295
296 fn seek_restart_point_by_key(&mut self, key: FullKey<&[u8]>) {
298 let index = self.search_restart_point_index_by_key(key);
299 self.seek_restart_point_by_index(index)
300 }
301
302 fn seek_restart_point_by_index(&mut self, index: usize) {
304 let restart_point = self.block.restart_point(index);
305 let offset = restart_point.offset as usize;
306 let prefix = self.decode_prefix_at(
307 offset,
308 restart_point.key_len_type,
309 restart_point.value_len_type,
310 );
311
312 self.key = BytesMut::from(&self.block.data()[prefix.diff_key_range()]);
313 self.value_range = prefix.value_range();
314 self.offset = offset;
315 self.entry_len = prefix.entry_len();
316 self.update_restart_point(index);
317
318 self.hitmap
319 .fill_with_range(self.offset, self.value_range.end, self.block.len());
320 }
321
322 fn update_restart_point(&mut self, index: usize) {
323 self.restart_point_index = index;
324 let restart_point = self.block.restart_point(index);
325
326 self.last_key_len_type = restart_point.key_len_type;
327 self.last_value_len_type = restart_point.value_len_type;
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use risingwave_common::util::epoch::test_epoch;
334
335 use super::*;
336 use crate::hummock::{BlockBuilder, BlockBuilderOptions};
337
338 fn build_iterator_for_test() -> BlockIterator {
339 let options = BlockBuilderOptions::default();
340 let mut builder = BlockBuilder::new(options);
341 builder.add_for_test(construct_full_key_struct_for_test(0, b"k01", 1), b"v01");
342 builder.add_for_test(construct_full_key_struct_for_test(0, b"k02", 2), b"v02");
343 builder.add_for_test(construct_full_key_struct_for_test(0, b"k04", 4), b"v04");
344 builder.add_for_test(construct_full_key_struct_for_test(0, b"k05", 5), b"v05");
345 let capacity = builder.uncompressed_block_size();
346 let buf = builder.build().to_vec();
347 BlockIterator::new(BlockHolder::from_owned_block(Box::new(
348 Block::decode(buf.into(), capacity).unwrap(),
349 )))
350 }
351
352 #[test]
353 fn test_seek_first() {
354 let mut it = build_iterator_for_test();
355 it.seek_to_first();
356 assert!(it.is_valid());
357 assert_eq!(construct_full_key_struct_for_test(0, b"k01", 1), it.key());
358 assert_eq!(b"v01", it.value());
359 }
360
361 #[test]
362 fn test_seek_last() {
363 let mut it = build_iterator_for_test();
364 it.seek_to_last();
365 assert!(it.is_valid());
366 assert_eq!(construct_full_key_struct_for_test(0, b"k05", 5), it.key());
367 assert_eq!(b"v05", it.value());
368 }
369
370 #[test]
371 fn test_seek_none_front() {
372 let mut it = build_iterator_for_test();
373 it.seek(construct_full_key_struct_for_test(0, b"k00", 0));
374 assert!(it.is_valid());
375 assert_eq!(construct_full_key_struct_for_test(0, b"k01", 1), it.key());
376 assert_eq!(b"v01", it.value());
377
378 let mut it = build_iterator_for_test();
379
380 it.seek_le(construct_full_key_struct_for_test(0, b"k00", 0));
381 assert!(!it.is_valid());
382 }
383
384 #[test]
385 fn test_seek_none_back() {
386 let mut it = build_iterator_for_test();
387 it.seek(construct_full_key_struct_for_test(0, b"k06", 6));
388 assert!(!it.is_valid());
389
390 let mut it = build_iterator_for_test();
391 it.seek_le(construct_full_key_struct_for_test(0, b"k06", 6));
392 assert!(it.is_valid());
393 assert_eq!(construct_full_key_struct_for_test(0, b"k05", 5), it.key());
394 assert_eq!(b"v05", it.value());
395 }
396
397 #[test]
398 fn bi_direction_seek() {
399 let mut it = build_iterator_for_test();
400 it.seek(construct_full_key_struct_for_test(0, b"k03", 3));
401 assert_eq!(
402 construct_full_key_struct_for_test(0, format!("k{:02}", 4).as_bytes(), 4),
403 it.key()
404 );
405
406 it.seek_le(construct_full_key_struct_for_test(0, b"k03", 3));
407 assert_eq!(
408 construct_full_key_struct_for_test(0, format!("k{:02}", 2).as_bytes(), 2),
409 it.key()
410 );
411 }
412
413 #[test]
414 fn test_forward_iterate() {
415 let mut it = build_iterator_for_test();
416
417 it.seek_to_first();
418 assert!(it.is_valid());
419 assert_eq!(construct_full_key_struct_for_test(0, b"k01", 1), it.key());
420 assert_eq!(b"v01", it.value());
421
422 it.next();
423 assert!(it.is_valid());
424 assert_eq!(construct_full_key_struct_for_test(0, b"k02", 2), it.key());
425 assert_eq!(b"v02", it.value());
426
427 it.next();
428 assert!(it.is_valid());
429 assert_eq!(construct_full_key_struct_for_test(0, b"k04", 4), it.key());
430 assert_eq!(b"v04", it.value());
431
432 it.next();
433 assert!(it.is_valid());
434 assert_eq!(construct_full_key_struct_for_test(0, b"k05", 5), it.key());
435 assert_eq!(b"v05", it.value());
436
437 it.next();
438 assert!(!it.is_valid());
439 }
440
441 #[test]
442 fn test_backward_iterate() {
443 let mut it = build_iterator_for_test();
444
445 it.seek_to_last();
446 assert!(it.is_valid());
447 assert_eq!(construct_full_key_struct_for_test(0, b"k05", 5), it.key());
448 assert_eq!(b"v05", it.value());
449
450 it.prev();
451 assert!(it.is_valid());
452 assert_eq!(construct_full_key_struct_for_test(0, b"k04", 4), it.key());
453 assert_eq!(b"v04", it.value());
454
455 it.prev();
456 assert!(it.is_valid());
457 assert_eq!(construct_full_key_struct_for_test(0, b"k02", 2), it.key());
458 assert_eq!(b"v02", it.value());
459
460 it.prev();
461 assert!(it.is_valid());
462 assert_eq!(construct_full_key_struct_for_test(0, b"k01", 1), it.key());
463 assert_eq!(b"v01", it.value());
464
465 it.prev();
466 assert!(!it.is_valid());
467 }
468
469 #[test]
470 fn test_seek_forward_backward_iterate() {
471 let mut it = build_iterator_for_test();
472
473 it.seek(construct_full_key_struct_for_test(0, b"k03", 3));
474 assert_eq!(
475 construct_full_key_struct_for_test(0, format!("k{:02}", 4).as_bytes(), 4),
476 it.key()
477 );
478
479 it.prev();
480 assert_eq!(
481 construct_full_key_struct_for_test(0, format!("k{:02}", 2).as_bytes(), 2),
482 it.key()
483 );
484
485 it.next();
486 assert_eq!(
487 construct_full_key_struct_for_test(0, format!("k{:02}", 4).as_bytes(), 4),
488 it.key()
489 );
490 }
491
492 pub fn construct_full_key_struct_for_test(
493 table_id: u32,
494 table_key: &[u8],
495 epoch: u64,
496 ) -> FullKey<&[u8]> {
497 FullKey::for_test(TableId::new(table_id), table_key, test_epoch(epoch))
498 }
499}