1use std::cmp::Ordering;
16use std::ops::Range;
17
18use bytes::BytesMut;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::key::FullKey;
21
22use super::{Block, KeyPrefix, LenType, RestartPoint};
23use crate::hummock::BlockHolder;
24use crate::monitor::LocalHitmap;
25
26pub struct BlockIterator {
28 block: BlockHolder,
30 restart_point_index: usize,
32 offset: usize,
34 key: BytesMut,
36 value_range: Range<usize>,
38 entry_len: usize,
40
41 last_key_len_type: LenType,
42 last_value_len_type: LenType,
43
44 hitmap: LocalHitmap<{ Block::HITMAP_ELEMS }>,
49}
50
51impl Drop for BlockIterator {
52 fn drop(&mut self) {
53 self.block.hitmap().report(&mut self.hitmap);
54 }
55}
56
57impl BlockIterator {
58 pub fn new(block: BlockHolder) -> Self {
59 let hitmap = LocalHitmap::default();
60 Self {
61 block,
62 offset: usize::MAX,
63 restart_point_index: usize::MAX,
64 key: BytesMut::default(),
65 value_range: 0..0,
66 entry_len: 0,
67 last_key_len_type: LenType::u8,
68 last_value_len_type: LenType::u8,
69 hitmap,
70 }
71 }
72
73 pub fn next(&mut self) {
74 assert!(self.is_valid());
75 self.next_inner();
76 }
77
78 pub fn try_next(&mut self) -> bool {
79 assert!(self.is_valid());
80 self.try_next_inner()
81 }
82
83 pub fn prev(&mut self) {
84 assert!(self.is_valid());
85 self.prev_inner();
86 }
87
88 pub fn try_prev(&mut self) -> bool {
89 assert!(self.is_valid());
90 self.try_prev_inner()
91 }
92
93 pub fn table_id(&self) -> TableId {
94 self.block.table_id()
95 }
96
97 pub fn key(&self) -> FullKey<&[u8]> {
98 assert!(self.is_valid());
99 FullKey::from_slice_without_table_id(self.table_id(), &self.key[..])
100 }
101
102 pub fn value(&self) -> &[u8] {
103 assert!(self.is_valid());
104 &self.block.data()[self.value_range.clone()]
105 }
106
107 pub fn is_valid(&self) -> bool {
108 self.offset < self.block.len()
109 }
110
111 pub fn seek_to_first(&mut self) {
112 self.seek_restart_point_by_index(0);
113 }
114
115 pub fn seek_to_last(&mut self) {
116 self.seek_restart_point_by_index(self.block.restart_point_len() - 1);
117 self.next_until_prev_offset(self.block.len());
118 }
119
120 pub fn seek(&mut self, key: FullKey<&[u8]>) {
121 self.seek_restart_point_by_key(key);
122 self.next_until_key(key);
123 }
124
125 pub fn seek_le(&mut self, key: FullKey<&[u8]>) {
126 self.seek_restart_point_by_key(key);
127 self.next_until_key(key);
128 if !self.is_valid() {
129 self.seek_to_last();
130 }
131 self.prev_until_key(key);
132 }
133}
134
135impl BlockIterator {
136 fn invalidate(&mut self) {
138 self.offset = self.block.len();
139 self.restart_point_index = self.block.restart_point_len();
140 self.key.clear();
141 self.value_range = 0..0;
142 self.entry_len = 0;
143 }
144
145 fn next_inner(&mut self) {
149 if !self.try_next_inner() {
150 self.invalidate();
151 }
152 }
153
154 fn try_next_inner(&mut self) -> bool {
160 let offset = self.offset + self.entry_len;
161 if offset >= self.block.len() {
162 return false;
163 }
164
165 if self.restart_point_index + 1 < self.block.restart_point_len()
167 && offset
168 >= self
169 .block
170 .restart_point(self.restart_point_index + 1)
171 .offset as usize
172 {
173 let new_restart_point_index = self.restart_point_index + 1;
174 self.update_restart_point(new_restart_point_index);
175 }
176
177 let prefix =
178 self.decode_prefix_at(offset, self.last_key_len_type, self.last_value_len_type);
179 self.key.truncate(prefix.overlap_len());
180 self.key
181 .extend_from_slice(&self.block.data()[prefix.diff_key_range()]);
182
183 self.value_range = prefix.value_range();
184 self.offset = offset;
185 self.entry_len = prefix.entry_len();
186
187 self.hitmap
188 .fill_with_range(self.offset, self.value_range.end, self.block.len());
189
190 true
191 }
192
193 fn next_until_key(&mut self, key: FullKey<&[u8]>) {
195 while self.is_valid() && self.key().cmp(&key) == Ordering::Less {
196 self.next_inner();
197 }
198 }
199
200 fn prev_until_key(&mut self, key: FullKey<&[u8]>) {
202 while self.is_valid() && self.key().cmp(&key) == Ordering::Greater {
203 self.prev_inner();
204 }
205 }
206
207 fn next_until_prev_offset(&mut self, offset: usize) {
210 while self.offset + self.entry_len < std::cmp::min(self.block.len(), offset) {
211 self.next_inner();
212 }
213 }
214
215 fn prev_inner(&mut self) {
219 if !self.try_prev_inner() {
220 self.invalidate();
221 }
222 }
223
224 fn try_prev_inner(&mut self) -> bool {
230 if self.offset == 0 {
231 return false;
232 }
233
234 if self.block.restart_point(self.restart_point_index).offset as usize == self.offset {
235 self.restart_point_index -= 1;
236 }
237 let origin_offset = self.offset;
238 self.seek_restart_point_by_index(self.restart_point_index);
239 self.next_until_prev_offset(origin_offset);
240 true
241 }
242
243 fn decode_prefix_at(
245 &self,
246 offset: usize,
247 key_len_type: LenType,
248 value_len_type: LenType,
249 ) -> KeyPrefix {
250 KeyPrefix::decode(
251 &mut &self.block.data()[offset..],
252 offset,
253 key_len_type,
254 value_len_type,
255 )
256 }
257
258 fn search_restart_point_index_by_key(&mut self, key: FullKey<&[u8]>) -> usize {
260 self.block
263 .search_restart_partition_point(
264 |&RestartPoint {
265 offset: probe,
266 key_len_type,
267 value_len_type,
268 }| {
269 let probe = probe as usize;
270 let prefix = KeyPrefix::decode(
271 &mut &self.block.data()[probe..],
272 probe,
273 key_len_type,
274 value_len_type,
275 );
276 let probe_key = &self.block.data()[prefix.diff_key_range()];
277 let full_probe_key =
278 FullKey::from_slice_without_table_id(self.block.table_id(), probe_key);
279 self.hitmap.fill_with_range(
280 probe,
281 prefix.diff_key_range().end,
282 self.block.len(),
283 );
284 match full_probe_key.cmp(&key) {
285 Ordering::Less | Ordering::Equal => true,
286 Ordering::Greater => false,
287 }
288 },
289 )
290 .saturating_sub(1)
292 }
293
294 fn seek_restart_point_by_key(&mut self, key: FullKey<&[u8]>) {
296 let index = self.search_restart_point_index_by_key(key);
297 self.seek_restart_point_by_index(index)
298 }
299
300 fn seek_restart_point_by_index(&mut self, index: usize) {
302 let restart_point = self.block.restart_point(index);
303 let offset = restart_point.offset as usize;
304 let prefix = self.decode_prefix_at(
305 offset,
306 restart_point.key_len_type,
307 restart_point.value_len_type,
308 );
309
310 self.key = BytesMut::from(&self.block.data()[prefix.diff_key_range()]);
311 self.value_range = prefix.value_range();
312 self.offset = offset;
313 self.entry_len = prefix.entry_len();
314 self.update_restart_point(index);
315
316 self.hitmap
317 .fill_with_range(self.offset, self.value_range.end, self.block.len());
318 }
319
320 fn update_restart_point(&mut self, index: usize) {
321 self.restart_point_index = index;
322 let restart_point = self.block.restart_point(index);
323
324 self.last_key_len_type = restart_point.key_len_type;
325 self.last_value_len_type = restart_point.value_len_type;
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use risingwave_common::util::epoch::test_epoch;
332
333 use super::*;
334 use crate::hummock::{BlockBuilder, BlockBuilderOptions};
335
336 fn build_iterator_for_test() -> BlockIterator {
337 let options = BlockBuilderOptions::default();
338 let mut builder = BlockBuilder::new(options);
339 builder.add_for_test(construct_full_key_struct_for_test(0, b"k01", 1), b"v01");
340 builder.add_for_test(construct_full_key_struct_for_test(0, b"k02", 2), b"v02");
341 builder.add_for_test(construct_full_key_struct_for_test(0, b"k04", 4), b"v04");
342 builder.add_for_test(construct_full_key_struct_for_test(0, b"k05", 5), b"v05");
343 let capacity = builder.uncompressed_block_size();
344 let buf = builder.build().to_vec();
345 BlockIterator::new(BlockHolder::from_owned_block(Box::new(
346 Block::decode(buf.into(), capacity).unwrap(),
347 )))
348 }
349
350 #[test]
351 fn test_seek_first() {
352 let mut it = build_iterator_for_test();
353 it.seek_to_first();
354 assert!(it.is_valid());
355 assert_eq!(construct_full_key_struct_for_test(0, b"k01", 1), it.key());
356 assert_eq!(b"v01", it.value());
357 }
358
359 #[test]
360 fn test_seek_last() {
361 let mut it = build_iterator_for_test();
362 it.seek_to_last();
363 assert!(it.is_valid());
364 assert_eq!(construct_full_key_struct_for_test(0, b"k05", 5), it.key());
365 assert_eq!(b"v05", it.value());
366 }
367
368 #[test]
369 fn test_seek_none_front() {
370 let mut it = build_iterator_for_test();
371 it.seek(construct_full_key_struct_for_test(0, b"k00", 0));
372 assert!(it.is_valid());
373 assert_eq!(construct_full_key_struct_for_test(0, b"k01", 1), it.key());
374 assert_eq!(b"v01", it.value());
375
376 let mut it = build_iterator_for_test();
377
378 it.seek_le(construct_full_key_struct_for_test(0, b"k00", 0));
379 assert!(!it.is_valid());
380 }
381
382 #[test]
383 fn test_seek_none_back() {
384 let mut it = build_iterator_for_test();
385 it.seek(construct_full_key_struct_for_test(0, b"k06", 6));
386 assert!(!it.is_valid());
387
388 let mut it = build_iterator_for_test();
389 it.seek_le(construct_full_key_struct_for_test(0, b"k06", 6));
390 assert!(it.is_valid());
391 assert_eq!(construct_full_key_struct_for_test(0, b"k05", 5), it.key());
392 assert_eq!(b"v05", it.value());
393 }
394
395 #[test]
396 fn bi_direction_seek() {
397 let mut it = build_iterator_for_test();
398 it.seek(construct_full_key_struct_for_test(0, b"k03", 3));
399 assert_eq!(
400 construct_full_key_struct_for_test(0, format!("k{:02}", 4).as_bytes(), 4),
401 it.key()
402 );
403
404 it.seek_le(construct_full_key_struct_for_test(0, b"k03", 3));
405 assert_eq!(
406 construct_full_key_struct_for_test(0, format!("k{:02}", 2).as_bytes(), 2),
407 it.key()
408 );
409 }
410
411 #[test]
412 fn test_forward_iterate() {
413 let mut it = build_iterator_for_test();
414
415 it.seek_to_first();
416 assert!(it.is_valid());
417 assert_eq!(construct_full_key_struct_for_test(0, b"k01", 1), it.key());
418 assert_eq!(b"v01", it.value());
419
420 it.next();
421 assert!(it.is_valid());
422 assert_eq!(construct_full_key_struct_for_test(0, b"k02", 2), it.key());
423 assert_eq!(b"v02", it.value());
424
425 it.next();
426 assert!(it.is_valid());
427 assert_eq!(construct_full_key_struct_for_test(0, b"k04", 4), it.key());
428 assert_eq!(b"v04", it.value());
429
430 it.next();
431 assert!(it.is_valid());
432 assert_eq!(construct_full_key_struct_for_test(0, b"k05", 5), it.key());
433 assert_eq!(b"v05", it.value());
434
435 it.next();
436 assert!(!it.is_valid());
437 }
438
439 #[test]
440 fn test_backward_iterate() {
441 let mut it = build_iterator_for_test();
442
443 it.seek_to_last();
444 assert!(it.is_valid());
445 assert_eq!(construct_full_key_struct_for_test(0, b"k05", 5), it.key());
446 assert_eq!(b"v05", it.value());
447
448 it.prev();
449 assert!(it.is_valid());
450 assert_eq!(construct_full_key_struct_for_test(0, b"k04", 4), it.key());
451 assert_eq!(b"v04", it.value());
452
453 it.prev();
454 assert!(it.is_valid());
455 assert_eq!(construct_full_key_struct_for_test(0, b"k02", 2), it.key());
456 assert_eq!(b"v02", it.value());
457
458 it.prev();
459 assert!(it.is_valid());
460 assert_eq!(construct_full_key_struct_for_test(0, b"k01", 1), it.key());
461 assert_eq!(b"v01", it.value());
462
463 it.prev();
464 assert!(!it.is_valid());
465 }
466
467 #[test]
468 fn test_seek_forward_backward_iterate() {
469 let mut it = build_iterator_for_test();
470
471 it.seek(construct_full_key_struct_for_test(0, b"k03", 3));
472 assert_eq!(
473 construct_full_key_struct_for_test(0, format!("k{:02}", 4).as_bytes(), 4),
474 it.key()
475 );
476
477 it.prev();
478 assert_eq!(
479 construct_full_key_struct_for_test(0, format!("k{:02}", 2).as_bytes(), 2),
480 it.key()
481 );
482
483 it.next();
484 assert_eq!(
485 construct_full_key_struct_for_test(0, format!("k{:02}", 4).as_bytes(), 4),
486 it.key()
487 );
488 }
489
490 pub fn construct_full_key_struct_for_test(
491 table_id: u32,
492 table_key: &[u8],
493 epoch: u64,
494 ) -> FullKey<&[u8]> {
495 FullKey::for_test(TableId::new(table_id), table_key, test_epoch(epoch))
496 }
497}