1use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::io::{Read, Write};
18use std::mem::size_of;
19use std::ops::Range;
20
21use bytes::{Buf, BufMut, Bytes, BytesMut};
22use risingwave_common::catalog::TableId;
23use risingwave_hummock_sdk::KeyComparator;
24use risingwave_hummock_sdk::key::{FullKey, TableKey};
25use serde::{Deserialize, Serialize};
26
27use super::utils::{CompressionAlgorithm, bytes_diff_below_max_key_length, xxhash64_verify};
28use crate::hummock::sstable::utils;
29use crate::hummock::sstable::utils::xxhash64_checksum;
30use crate::hummock::{HummockError, HummockResult};
31use crate::monitor::Hitmap;
32
33pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
34pub const DEFAULT_RESTART_INTERVAL: usize = 16;
35pub const DEFAULT_ENTRY_SIZE: usize = 24; #[allow(non_camel_case_types)]
38#[derive(Clone, Copy, PartialEq, Eq, Debug)]
39pub enum LenType {
40 u8 = 1,
41 u16 = 2,
42 u32 = 3,
43}
44
45macro_rules! put_fn {
46 ($name:ident, $($value:ident: $type:ty),*) => {
47 fn $name<T: BufMut>(&self, buf: &mut T, $($value: $type),*) {
48 match *self {
49 LenType::u8 => {
50 $(buf.put_u8($value as u8);)*
51 },
52
53 LenType::u16 => {
54 $(buf.put_u16($value as u16);)*
55 },
56
57 LenType::u32 => {
58 $(buf.put_u32($value as u32);)*
59 },
60 }
61 }
62 };
63}
64
65macro_rules! get_fn {
66 ($name:ident, $($type:ty),*) => {
67 #[allow(unused_parens)]
68 fn $name<T: Buf>(&self, buf: &mut T) -> ($($type), *) {
69 match *self {
70 LenType::u8 => {
71 ($(buf.get_u8() as $type),*)
72 }
73 LenType::u16 => {
74 ($(buf.get_u16() as $type),*)
75 }
76 LenType::u32 => {
77 ($(buf.get_u32() as $type),*)
78 }
79 }
80 }
81 };
82}
83
84impl From<u8> for LenType {
85 fn from(value: u8) -> Self {
86 match value {
87 1 => LenType::u8,
88 2 => LenType::u16,
89 3 => LenType::u32,
90 _ => {
91 panic!("unexpected type {}", value)
92 }
93 }
94 }
95}
96
97impl LenType {
98 put_fn!(put, v1: usize);
99
100 put_fn!(put2, v1: usize, v2: usize);
101
102 get_fn!(get, usize);
103
104 get_fn!(get2, usize, usize);
105
106 fn new(len: usize) -> Self {
107 const U8_MAX: usize = u8::MAX as usize + 1;
108 const U16_MAX: usize = u16::MAX as usize + 1;
109 const U32_MAX: usize = u32::MAX as usize + 1;
110
111 match len {
112 0..U8_MAX => LenType::u8,
113 U8_MAX..U16_MAX => LenType::u16,
114 U16_MAX..U32_MAX => LenType::u32,
115 _ => unreachable!("unexpected LenType {}", len),
116 }
117 }
118
119 fn len(&self) -> usize {
120 match *self {
121 Self::u8 => size_of::<u8>(),
122 Self::u16 => size_of::<u16>(),
123 Self::u32 => size_of::<u32>(),
124 }
125 }
126}
127
128#[derive(Clone, Copy, Debug, PartialEq, Eq)]
129pub struct RestartPoint {
130 pub offset: u32,
131 pub key_len_type: LenType,
132 pub value_len_type: LenType,
133}
134
135impl RestartPoint {
136 fn size_of() -> usize {
137 std::mem::size_of::<u32>() + std::mem::size_of::<LenType>()
140 }
141}
142
143pub struct Block {
144 data: Bytes,
146 data_len: usize,
148
149 table_id: TableId,
151
152 restart_points: Vec<RestartPoint>,
154
155 hitmap: Hitmap<{ Self::HITMAP_ELEMS }>,
156}
157
158impl Clone for Block {
159 fn clone(&self) -> Self {
160 Self {
161 data: self.data.clone(),
162 data_len: self.data_len,
163 table_id: self.table_id,
164 restart_points: self.restart_points.clone(),
165 hitmap: self.hitmap.clone(),
166 }
167 }
168}
169
170impl Debug for Block {
171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 f.debug_struct("Block")
173 .field("data_len", &self.data_len)
174 .field("table_id", &self.table_id)
175 .field("restart_points", &self.restart_points)
176 .finish()
177 }
178}
179
180impl Serialize for Block {
181 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
182 where
183 S: serde::Serializer,
184 {
185 serde_bytes::serialize(&self.data[..], serializer)
186 }
187}
188
189impl<'de> Deserialize<'de> for Block {
190 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
191 where
192 D: serde::Deserializer<'de>,
193 {
194 let data: Vec<u8> = serde_bytes::deserialize(deserializer)?;
195 let data = Bytes::from(data);
196 Ok(Block::decode_from_raw(data))
197 }
198}
199
200impl Block {
201 pub const HITMAP_ELEMS: usize = 4;
202
203 pub fn get_algorithm(buf: &Bytes) -> HummockResult<CompressionAlgorithm> {
204 let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
205 Ok(compression)
206 }
207
208 pub fn decode(buf: Bytes, uncompressed_capacity: usize) -> HummockResult<Self> {
209 Self::decode_with_copy(buf, uncompressed_capacity, false)
210 }
211
212 pub fn decode_with_copy(
213 buf: Bytes,
214 uncompressed_capacity: usize,
215 copy: bool,
216 ) -> HummockResult<Self> {
217 let xxhash64_checksum = (&buf[buf.len() - 8..]).get_u64_le();
220 xxhash64_verify(&buf[..buf.len() - 8], xxhash64_checksum)?;
221
222 let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
224 let compressed_data = &buf[..buf.len() - 9];
225
226 let buf = match compression {
227 CompressionAlgorithm::None => {
228 if copy {
229 Bytes::copy_from_slice(&buf[0..(buf.len() - 9)])
230 } else {
231 buf.slice(0..(buf.len() - 9))
232 }
233 }
234 CompressionAlgorithm::Lz4 => {
235 let mut decoder = lz4::Decoder::new(compressed_data.reader())
236 .map_err(HummockError::decode_error)?;
237 let mut decoded = Vec::with_capacity(uncompressed_capacity);
238 let read_size = decoder
239 .read_to_end(&mut decoded)
240 .map_err(HummockError::decode_error)?;
241 assert_eq!(read_size, uncompressed_capacity);
242 Bytes::from(decoded)
243 }
244 CompressionAlgorithm::Zstd => {
245 let mut decoder = zstd::Decoder::new(compressed_data.reader())
246 .map_err(HummockError::decode_error)?;
247 let mut decoded = Vec::with_capacity(uncompressed_capacity);
248 let read_size = decoder
249 .read_to_end(&mut decoded)
250 .map_err(HummockError::decode_error)?;
251 assert_eq!(read_size, uncompressed_capacity);
252 Bytes::from(decoded)
253 }
254 };
255
256 Ok(Self::decode_from_raw(buf))
257 }
258
259 pub fn decode_from_raw(buf: Bytes) -> Self {
260 let table_id = (&buf[buf.len() - 4..]).get_u32_le();
261 let n_index = ((&buf[buf.len() - 8..buf.len() - 4]).get_u32_le()) as usize;
263 let index_data_len = size_of::<u32>() + n_index * RestartPoint::size_of();
264 let data_len = buf.len() - 4 - index_data_len;
265 let mut restart_points_type_index_buf = &buf[data_len..buf.len() - 8];
266
267 let mut index_key_vec = Vec::with_capacity(n_index);
268 for _ in 0..n_index {
269 let offset = restart_points_type_index_buf.get_u32_le();
270 let value = restart_points_type_index_buf.get_u8();
271 let key_len_type = LenType::from(value >> 4);
272 let value_len_type = LenType::from(value & 0x0F);
273
274 index_key_vec.push(RestartPoint {
275 offset,
276 key_len_type,
277 value_len_type,
278 });
279 }
280
281 let n_restarts = ((&buf[data_len - 4..]).get_u32_le()) as usize;
283 let restart_points_len = size_of::<u32>() + n_restarts * (size_of::<u32>());
284 let restarts_end = data_len - 4;
285 let data_len = data_len - restart_points_len;
286 let mut restart_points = Vec::with_capacity(n_restarts);
287 let mut restart_points_buf = &buf[data_len..restarts_end];
288
289 let mut type_index: usize = 0;
290
291 for _ in 0..n_restarts {
292 let offset = restart_points_buf.get_u32_le();
293 if type_index < index_key_vec.len() - 1
294 && offset >= index_key_vec[type_index + 1].offset
295 {
296 type_index += 1;
297 }
298
299 restart_points.push(RestartPoint {
300 offset,
301 key_len_type: index_key_vec[type_index].key_len_type,
302 value_len_type: index_key_vec[type_index].value_len_type,
303 });
304 }
305
306 Block {
307 data: buf,
308 data_len,
309 restart_points,
310 table_id: TableId::new(table_id),
311 hitmap: Hitmap::default(),
312 }
313 }
314
315 #[expect(clippy::len_without_is_empty)]
317 pub fn len(&self) -> usize {
318 assert!(!self.data.is_empty());
319 self.data_len
320 }
321
322 pub fn capacity(&self) -> usize {
323 self.data.len()
324 + self.restart_points.capacity() * std::mem::size_of::<u32>()
325 + std::mem::size_of::<u32>()
326 }
327
328 pub fn table_id(&self) -> TableId {
329 self.table_id
330 }
331
332 pub fn restart_point(&self, index: usize) -> RestartPoint {
334 self.restart_points[index]
335 }
336
337 pub fn restart_point_len(&self) -> usize {
339 self.restart_points.len()
340 }
341
342 pub fn search_restart_partition_point<P>(&self, pred: P) -> usize
344 where
345 P: FnMut(&RestartPoint) -> bool,
346 {
347 self.restart_points.partition_point(pred)
348 }
349
350 pub fn data(&self) -> &[u8] {
351 &self.data[..self.data_len]
352 }
353
354 pub fn raw(&self) -> &[u8] {
355 &self.data[..]
356 }
357
358 pub fn hitmap(&self) -> &Hitmap<{ Self::HITMAP_ELEMS }> {
359 &self.hitmap
360 }
361
362 pub fn efficiency(&self) -> f64 {
363 self.hitmap.ratio()
364 }
365}
366
367#[derive(Debug)]
369pub struct KeyPrefix {
370 overlap: usize,
371 diff: usize,
372 value: usize,
373 offset: usize,
375
376 len: usize,
377}
378
379impl KeyPrefix {
380 pub fn new_without_len(overlap: usize, diff: usize, value: usize, offset: usize) -> Self {
383 KeyPrefix {
384 overlap,
385 diff,
386 value,
387 offset,
388 len: 0, }
390 }
391}
392
393impl KeyPrefix {
394 pub fn encode(&self, buf: &mut impl BufMut, key_len_type: LenType, value_len_type: LenType) {
395 key_len_type.put2(buf, self.overlap, self.diff);
396 value_len_type.put(buf, self.value);
397 }
398
399 pub fn decode(
400 buf: &mut impl Buf,
401 offset: usize,
402 key_len_type: LenType,
403 value_len_type: LenType,
404 ) -> Self {
405 let (overlap, diff) = key_len_type.get2(buf);
406 let value = value_len_type.get(buf);
407
408 let len = key_len_type.len() * 2 + value_len_type.len();
409
410 Self {
411 overlap,
412 diff,
413 value,
414 offset,
415 len,
416 }
417 }
418
419 fn len(&self) -> usize {
421 self.len
422 }
423
424 pub fn overlap_len(&self) -> usize {
426 self.overlap
427 }
428
429 pub fn diff_key_range(&self) -> Range<usize> {
431 self.offset + self.len()..self.offset + self.len() + self.diff
432 }
433
434 pub fn value_range(&self) -> Range<usize> {
436 self.offset + self.len() + self.diff..self.offset + self.len() + self.diff + self.value
437 }
438
439 pub fn entry_len(&self) -> usize {
441 self.len() + self.diff + self.value
442 }
443}
444
445pub struct BlockBuilderOptions {
446 pub capacity: usize,
448 pub compression_algorithm: CompressionAlgorithm,
450 pub restart_interval: usize,
452}
453
454impl Default for BlockBuilderOptions {
455 fn default() -> Self {
456 Self {
457 capacity: DEFAULT_BLOCK_SIZE,
458 compression_algorithm: CompressionAlgorithm::None,
459 restart_interval: DEFAULT_RESTART_INTERVAL,
460 }
461 }
462}
463
464pub struct BlockBuilder {
466 buf: BytesMut,
468 compress_buf: BytesMut,
470 restart_count: usize,
472 restart_points: Vec<u32>,
474 last_key: Vec<u8>,
476 entry_count: usize,
478 compression_algorithm: CompressionAlgorithm,
480
481 table_id: Option<TableId>,
482 restart_points_type_index: Vec<RestartPoint>,
485}
486
487impl BlockBuilder {
488 pub fn new(options: BlockBuilderOptions) -> Self {
489 Self {
490 buf: BytesMut::with_capacity(Self::buf_reserve_size(&options)),
492 compress_buf: BytesMut::default(),
493 restart_count: options.restart_interval,
494 restart_points: Vec::with_capacity(
495 options.capacity / DEFAULT_ENTRY_SIZE / options.restart_interval + 1,
496 ),
497 last_key: vec![],
498 entry_count: 0,
499 compression_algorithm: options.compression_algorithm,
500 table_id: None,
501 restart_points_type_index: Vec::default(),
502 }
503 }
504
505 pub fn add(&mut self, table_id: TableId, encoded_key_without_table_id: &[u8], value: &[u8]) {
523 match self.table_id {
524 Some(current_table_id) => assert_eq!(current_table_id, table_id),
525 None => self.table_id = Some(table_id),
526 }
527 #[cfg(debug_assertions)]
528 self.debug_valid();
529
530 self.add_encoded_impl(encoded_key_without_table_id, value);
532 }
533
534 #[cfg(any(test, feature = "test"))]
535 pub fn add_for_test(&mut self, full_key: FullKey<&[u8]>, value: &[u8]) {
536 use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
537 let input_table_id = full_key.user_key.table_id;
538 match self.table_id {
539 Some(current_table_id) => assert_eq!(current_table_id, input_table_id),
540 None => self.table_id = Some(input_table_id),
541 }
542 #[cfg(debug_assertions)]
543 self.debug_valid();
544
545 let mut key: BytesMut = Default::default();
546 full_key.encode_into(&mut key);
547
548 let encoded_key_without_table_id = &key[TABLE_PREFIX_LEN..];
550
551 self.add_encoded_impl(encoded_key_without_table_id, value);
553 }
554
555 fn add_encoded_impl(&mut self, key: &[u8], value: &[u8]) {
558 if self.entry_count > 0 {
559 debug_assert!(!key.is_empty());
560 debug_assert_eq!(
561 KeyComparator::compare_encoded_full_key(&self.last_key[..], key),
562 Ordering::Less,
563 );
564 }
565 let k_type = LenType::new(key.len());
567 let v_type = LenType::new(value.len());
568
569 let type_mismatch = if let Some(RestartPoint {
570 offset: _,
571 key_len_type: last_key_len_type,
572 value_len_type: last_value_len_type,
573 }) = self.restart_points_type_index.last()
574 {
575 k_type != *last_key_len_type || v_type != *last_value_len_type
576 } else {
577 true
578 };
579
580 let diff_key = if self.entry_count.is_multiple_of(self.restart_count) || type_mismatch {
581 let offset = utils::checked_into_u32(self.buf.len()).unwrap_or_else(|_| {
582 panic!(
583 "WARN overflow can't convert buf_len {} into u32 table {:?}",
584 self.buf.len(),
585 self.table_id,
586 )
587 });
588
589 self.restart_points.push(offset);
590
591 if type_mismatch {
592 self.restart_points_type_index.push(RestartPoint {
593 offset,
594 key_len_type: k_type,
595 value_len_type: v_type,
596 });
597 }
598
599 key
600 } else {
601 bytes_diff_below_max_key_length(&self.last_key, key)
602 };
603
604 let prefix = KeyPrefix::new_without_len(
605 key.len() - diff_key.len(),
606 diff_key.len(),
607 value.len(),
608 self.buf.len(),
609 );
610
611 prefix.encode(&mut self.buf, k_type, v_type);
612 self.buf.put_slice(diff_key);
613 self.buf.put_slice(value);
614
615 self.last_key.clear();
616 self.last_key.extend_from_slice(key);
617 self.entry_count += 1;
618 }
619
620 pub fn get_last_key(&self) -> &[u8] {
621 &self.last_key
622 }
623
624 pub fn is_empty(&self) -> bool {
625 self.buf.is_empty()
626 }
627
628 pub fn clear(&mut self) {
629 self.buf.clear();
630 self.restart_points.clear();
631 self.table_id = None;
632 self.restart_points_type_index.clear();
633 self.last_key.clear();
634 self.entry_count = 0;
635 }
636
637 pub fn uncompressed_block_size(&mut self) -> usize {
639 self.buf.len()
640 + (self.restart_points.len() + 1) * std::mem::size_of::<u32>()
641 + (RestartPoint::size_of()) * self.restart_points_type_index.len()
643 + std::mem::size_of::<u32>() + std::mem::size_of::<u32>() }
646
647 pub fn build(&mut self) -> &[u8] {
660 assert!(
661 self.entry_count > 0,
662 "buf_len {} entry_count {} table {:?}",
663 self.buf.len(),
664 self.entry_count,
665 self.table_id
666 );
667
668 for restart_point in &self.restart_points {
669 self.buf.put_u32_le(*restart_point);
670 }
671
672 self.buf.put_u32_le(
673 utils::checked_into_u32(self.restart_points.len()).unwrap_or_else(|_| {
674 panic!(
675 "WARN overflow can't convert restart_points_len {} into u32 table {:?}",
676 self.restart_points.len(),
677 self.table_id,
678 )
679 }),
680 );
681 for RestartPoint {
682 offset,
683 key_len_type,
684 value_len_type,
685 } in &self.restart_points_type_index
686 {
687 self.buf.put_u32_le(*offset);
688
689 let mut value: u8 = 0;
690 value |= *key_len_type as u8;
691 value <<= 4;
692 value |= *value_len_type as u8;
693
694 self.buf.put_u8(value);
695 }
696
697 self.buf.put_u32_le(
698 utils::checked_into_u32(self.restart_points_type_index.len()).unwrap_or_else(|_| {
699 panic!(
700 "WARN overflow can't convert restart_points_type_index_len {} into u32 table {:?}",
701 self.restart_points_type_index.len(),
702 self.table_id,
703 )
704 }),
705 );
706
707 self.buf.put_u32_le(self.table_id.unwrap().as_raw_id());
708 let result_buf = if self.compression_algorithm != CompressionAlgorithm::None {
709 self.compress_buf.clear();
710 self.compress_buf = Self::compress(
711 &self.buf[..],
712 self.compression_algorithm,
713 std::mem::take(&mut self.compress_buf),
714 );
715
716 &mut self.compress_buf
717 } else {
718 &mut self.buf
719 };
720
721 self.compression_algorithm.encode(result_buf);
722 let checksum = xxhash64_checksum(result_buf);
723 result_buf.put_u64_le(checksum);
724 assert!(
725 result_buf.len() < (u32::MAX) as usize,
726 "buf_len {} entry_count {} table {:?}",
727 result_buf.len(),
728 self.entry_count,
729 self.table_id
730 );
731
732 if self.compression_algorithm != CompressionAlgorithm::None {
733 self.compress_buf.as_ref()
734 } else {
735 self.buf.as_ref()
736 }
737 }
738
739 pub fn compress_block(
740 buf: Bytes,
741 target_compression: CompressionAlgorithm,
742 ) -> HummockResult<Bytes> {
743 let checksum = (&buf[buf.len() - 8..]).get_u64_le();
745 xxhash64_verify(&buf[..buf.len() - 8], checksum)?;
746 let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
748 let compressed_data = &buf[..buf.len() - 9];
749 assert_eq!(compression, CompressionAlgorithm::None);
750 let mut compress_writer = Self::compress(
751 compressed_data,
752 target_compression,
753 BytesMut::with_capacity(buf.len()),
754 );
755
756 target_compression.encode(&mut compress_writer);
757 let checksum = xxhash64_checksum(&compress_writer);
758 compress_writer.put_u64_le(checksum);
759 Ok(compress_writer.freeze())
760 }
761
762 pub fn compress(
763 buf: &[u8],
764 compression_algorithm: CompressionAlgorithm,
765 compress_writer: BytesMut,
766 ) -> BytesMut {
767 match compression_algorithm {
768 CompressionAlgorithm::None => unreachable!(),
769 CompressionAlgorithm::Lz4 => {
770 let mut encoder = lz4::EncoderBuilder::new()
771 .level(4)
772 .build(compress_writer.writer())
773 .map_err(HummockError::encode_error)
774 .unwrap();
775 encoder
776 .write_all(buf)
777 .map_err(HummockError::encode_error)
778 .unwrap();
779 let (writer, result) = encoder.finish();
780 result.map_err(HummockError::encode_error).unwrap();
781 writer.into_inner()
782 }
783 CompressionAlgorithm::Zstd => {
784 let mut encoder = zstd::Encoder::new(compress_writer.writer(), 4)
785 .map_err(HummockError::encode_error)
786 .unwrap();
787 encoder
788 .write_all(buf)
789 .map_err(HummockError::encode_error)
790 .unwrap();
791 let writer = encoder
792 .finish()
793 .map_err(HummockError::encode_error)
794 .unwrap();
795 writer.into_inner()
796 }
797 }
798 }
799
800 pub fn approximate_len(&self) -> usize {
802 self.buf.len()
805 + std::mem::size_of::<u32>() * self.restart_points.len() + std::mem::size_of::<u32>() + RestartPoint::size_of() * self.restart_points_type_index.len() + std::mem::size_of::<u32>() + std::mem::size_of::<CompressionAlgorithm>() + std::mem::size_of::<u64>() + std::mem::size_of::<u32>() }
813
814 pub fn debug_valid(&self) {
815 if self.entry_count == 0 {
816 debug_assert!(self.buf.is_empty());
817 debug_assert!(self.restart_points.is_empty());
818 debug_assert!(self.restart_points_type_index.is_empty());
819 debug_assert!(self.last_key.is_empty());
820 }
821 }
822
823 pub fn table_id(&self) -> Option<TableId> {
824 self.table_id
825 }
826
827 fn buf_reserve_size(option: &BlockBuilderOptions) -> usize {
828 option.capacity + 1024 + 256
829 }
830}
831
832pub fn try_shorten_block_smallest_key(
840 prev_block_last: &FullKey<&[u8]>,
841 block_smallest: &FullKey<&[u8]>,
842) -> Option<FullKey<Vec<u8>>> {
843 fn lcp_len(a: &[u8], b: &[u8]) -> usize {
845 let min_len = a.len().min(b.len());
846 for i in 0..min_len {
847 if a[i] != b[i] {
848 return i;
849 }
850 }
851 min_len
852 }
853
854 let prev_table_key = prev_block_last.user_key.table_key.as_ref();
855 let next_table_key = block_smallest.user_key.table_key.as_ref();
856 assert_eq!(
857 prev_block_last.user_key.table_id, block_smallest.user_key.table_id,
858 "table ids must match for shortening block smallest key"
859 );
860
861 let lcp = lcp_len(prev_table_key, next_table_key);
862
863 let shortened_len = lcp + 1;
866 if shortened_len >= next_table_key.len() {
867 return None;
868 }
869
870 let cand = FullKey::new_with_gap_epoch(
872 block_smallest.user_key.table_id,
873 TableKey(&next_table_key[..shortened_len]),
874 block_smallest.epoch_with_gap,
875 );
876
877 assert!(
881 prev_block_last.cmp(&cand).is_lt(),
882 "Invariant violated: prev_block_last >= cand. prev={:?}, cand={:?}",
883 prev_block_last,
884 cand
885 );
886 assert!(
887 cand.cmp(block_smallest).is_le(),
888 "Invariant violated: cand > block_smallest. cand={:?}, block_smallest={:?}",
889 cand,
890 block_smallest
891 );
892
893 Some(cand.copy_into())
894}
895
896#[cfg(test)]
897mod tests {
898
899 use risingwave_common::util::epoch::test_epoch;
900 use risingwave_hummock_sdk::key::MAX_KEY_LEN;
901
902 use super::*;
903 use crate::hummock::{BlockHolder, BlockIterator};
904
905 #[test]
906 fn test_block_enc_dec() {
907 let options = BlockBuilderOptions::default();
908 let mut builder = BlockBuilder::new(options);
909 builder.add_for_test(construct_full_key_struct_for_test(0, b"k1", 1), b"v01");
910 builder.add_for_test(construct_full_key_struct_for_test(0, b"k2", 2), b"v02");
911 builder.add_for_test(construct_full_key_struct_for_test(0, b"k3", 3), b"v03");
912 builder.add_for_test(construct_full_key_struct_for_test(0, b"k4", 4), b"v04");
913 let capacity = builder.uncompressed_block_size();
914 assert_eq!(capacity, builder.approximate_len() - 9);
915 let buf = builder.build().to_vec();
916 let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
917 let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
918
919 bi.seek_to_first();
920 assert!(bi.is_valid());
921 assert_eq!(construct_full_key_struct_for_test(0, b"k1", 1), bi.key());
922 assert_eq!(b"v01", bi.value());
923
924 bi.next();
925 assert!(bi.is_valid());
926 assert_eq!(construct_full_key_struct_for_test(0, b"k2", 2), bi.key());
927 assert_eq!(b"v02", bi.value());
928
929 bi.next();
930 assert!(bi.is_valid());
931 assert_eq!(construct_full_key_struct_for_test(0, b"k3", 3), bi.key());
932 assert_eq!(b"v03", bi.value());
933
934 bi.next();
935 assert!(bi.is_valid());
936 assert_eq!(construct_full_key_struct_for_test(0, b"k4", 4), bi.key());
937 assert_eq!(b"v04", bi.value());
938
939 bi.next();
940 assert!(!bi.is_valid());
941 }
942
943 #[test]
944 fn test_compressed_block_enc_dec() {
945 inner_test_compressed(CompressionAlgorithm::Lz4);
946 inner_test_compressed(CompressionAlgorithm::Zstd);
947 }
948
949 fn inner_test_compressed(algo: CompressionAlgorithm) {
950 let options = BlockBuilderOptions {
951 compression_algorithm: algo,
952 ..Default::default()
953 };
954 let mut builder = BlockBuilder::new(options);
955 builder.add_for_test(construct_full_key_struct_for_test(0, b"k1", 1), b"v01");
956 builder.add_for_test(construct_full_key_struct_for_test(0, b"k2", 2), b"v02");
957 builder.add_for_test(construct_full_key_struct_for_test(0, b"k3", 3), b"v03");
958 builder.add_for_test(construct_full_key_struct_for_test(0, b"k4", 4), b"v04");
959 let capacity = builder.uncompressed_block_size();
960 assert_eq!(capacity, builder.approximate_len() - 9);
961 let buf = builder.build().to_vec();
962 let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
963 let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
964
965 bi.seek_to_first();
966 assert!(bi.is_valid());
967 assert_eq!(construct_full_key_struct_for_test(0, b"k1", 1), bi.key());
968 assert_eq!(b"v01", bi.value());
969
970 bi.next();
971 assert!(bi.is_valid());
972 assert_eq!(construct_full_key_struct_for_test(0, b"k2", 2), bi.key());
973 assert_eq!(b"v02", bi.value());
974
975 bi.next();
976 assert!(bi.is_valid());
977 assert_eq!(construct_full_key_struct_for_test(0, b"k3", 3), bi.key());
978 assert_eq!(b"v03", bi.value());
979
980 bi.next();
981 assert!(bi.is_valid());
982 assert_eq!(construct_full_key_struct_for_test(0, b"k4", 4), bi.key());
983 assert_eq!(b"v04", bi.value());
984
985 bi.next();
986 assert!(!bi.is_valid());
987 }
988
989 pub fn construct_full_key_struct_for_test(
990 table_id: u32,
991 table_key: &[u8],
992 epoch: u64,
993 ) -> FullKey<&[u8]> {
994 FullKey::for_test(TableId::new(table_id), table_key, test_epoch(epoch))
995 }
996
997 #[test]
998 fn test_block_enc_large_key() {
999 let options = BlockBuilderOptions::default();
1000 let mut builder = BlockBuilder::new(options);
1001 let medium_key = vec![b'a'; MAX_KEY_LEN - 500];
1002 let large_key = vec![b'b'; MAX_KEY_LEN];
1003 let xlarge_key = vec![b'c'; MAX_KEY_LEN + 500];
1004
1005 builder.add_for_test(construct_full_key_struct_for_test(0, &medium_key, 1), b"v1");
1006 builder.add_for_test(construct_full_key_struct_for_test(0, &large_key, 2), b"v2");
1007 builder.add_for_test(construct_full_key_struct_for_test(0, &xlarge_key, 3), b"v3");
1008 let capacity = builder.uncompressed_block_size();
1009 assert_eq!(capacity, builder.approximate_len() - 9);
1010 let buf = builder.build().to_vec();
1011 let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
1012 let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
1013
1014 bi.seek_to_first();
1015 assert!(bi.is_valid());
1016 assert_eq!(
1017 construct_full_key_struct_for_test(0, &medium_key, 1),
1018 bi.key()
1019 );
1020 assert_eq!(b"v1", bi.value());
1021
1022 bi.next();
1023 assert!(bi.is_valid());
1024 assert_eq!(
1025 construct_full_key_struct_for_test(0, &large_key, 2),
1026 bi.key()
1027 );
1028 assert_eq!(b"v2", bi.value());
1029
1030 bi.next();
1031 assert!(bi.is_valid());
1032 assert_eq!(
1033 construct_full_key_struct_for_test(0, &xlarge_key, 3),
1034 bi.key()
1035 );
1036 assert_eq!(b"v3", bi.value());
1037
1038 bi.next();
1039 assert!(!bi.is_valid());
1040 }
1041
1042 #[test]
1043 fn test_block_restart_point() {
1044 let options = BlockBuilderOptions::default();
1045 let mut builder = BlockBuilder::new(options);
1046
1047 const KEY_COUNT: u8 = 100;
1048 const BUILDER_COUNT: u8 = 5;
1049
1050 for _ in 0..BUILDER_COUNT {
1051 for index in 0..KEY_COUNT {
1052 if index < 50 {
1053 let mut medium_key = vec![b'A'; MAX_KEY_LEN - 500];
1054 medium_key.push(index);
1055 builder
1056 .add_for_test(construct_full_key_struct_for_test(0, &medium_key, 1), b"v1");
1057 } else if index < 80 {
1058 let mut large_key = vec![b'B'; MAX_KEY_LEN];
1059 large_key.push(index);
1060 builder
1061 .add_for_test(construct_full_key_struct_for_test(0, &large_key, 2), b"v2");
1062 } else {
1063 let mut xlarge_key = vec![b'C'; MAX_KEY_LEN + 500];
1064 xlarge_key.push(index);
1065 builder
1066 .add_for_test(construct_full_key_struct_for_test(0, &xlarge_key, 3), b"v3");
1067 }
1068 }
1069
1070 let capacity = builder.uncompressed_block_size();
1071 assert_eq!(capacity, builder.approximate_len() - 9);
1072 let buf = builder.build().to_vec();
1073 let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
1074 let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
1075 bi.seek_to_first();
1076 assert!(bi.is_valid());
1077
1078 for index in 0..KEY_COUNT {
1079 if index < 50 {
1080 let mut medium_key = vec![b'A'; MAX_KEY_LEN - 500];
1081 medium_key.push(index);
1082 assert_eq!(
1083 construct_full_key_struct_for_test(0, &medium_key, 1),
1084 bi.key()
1085 );
1086 } else if index < 80 {
1087 let mut large_key = vec![b'B'; MAX_KEY_LEN];
1088 large_key.push(index);
1089 assert_eq!(
1090 construct_full_key_struct_for_test(0, &large_key, 2),
1091 bi.key()
1092 );
1093 } else {
1094 let mut xlarge_key = vec![b'C'; MAX_KEY_LEN + 500];
1095 xlarge_key.push(index);
1096 assert_eq!(
1097 construct_full_key_struct_for_test(0, &xlarge_key, 3),
1098 bi.key()
1099 );
1100 }
1101 bi.next();
1102 }
1103
1104 assert!(!bi.is_valid());
1105 builder.clear();
1106 }
1107 }
1108
1109 #[test]
1110 fn test_block_serde() {
1111 fn assmut_serde<'de, T: Serialize + Deserialize<'de>>() {}
1112
1113 assmut_serde::<Block>();
1114 assmut_serde::<Box<Block>>();
1115
1116 let options = BlockBuilderOptions::default();
1117 let mut builder = BlockBuilder::new(options);
1118 for i in 0..100 {
1119 builder.add_for_test(
1120 construct_full_key_struct_for_test(0, format!("k{i:8}").as_bytes(), i),
1121 format!("v{i:8}").as_bytes(),
1122 );
1123 }
1124
1125 let capacity = builder.uncompressed_block_size();
1126 assert_eq!(capacity, builder.approximate_len() - 9);
1127 let buf = builder.build().to_vec();
1128
1129 let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
1130
1131 let buffer = bincode::serialize(&block).unwrap();
1132 let blk: Block = bincode::deserialize(&buffer).unwrap();
1133
1134 assert_eq!(block.data, blk.data);
1135 assert_eq!(block.data_len, blk.data_len);
1136 assert_eq!(block.table_id, blk.table_id,);
1137 assert_eq!(block.restart_points, blk.restart_points);
1138 }
1139
1140 #[test]
1141 fn test_try_shorten_block_smallest_key() {
1142 use risingwave_hummock_sdk::EpochWithGap;
1143
1144 fn make_full_key(table_id: u32, table_key: &[u8], epoch: u64) -> FullKey<&[u8]> {
1145 FullKey::new_with_gap_epoch(
1146 TableId::new(table_id),
1147 TableKey(table_key),
1148 EpochWithGap::new_from_epoch(test_epoch(epoch)),
1149 )
1150 }
1151
1152 fn assert_invariant(
1154 prev: &FullKey<&[u8]>,
1155 shortened: &FullKey<Vec<u8>>,
1156 next: &FullKey<&[u8]>,
1157 ) {
1158 assert!(
1159 prev.cmp(&shortened.to_ref()).is_lt(),
1160 "Invariant violated: prev >= shortened. prev={:?}, shortened={:?}",
1161 prev,
1162 shortened
1163 );
1164 assert!(
1165 shortened.to_ref().cmp(next).is_le(),
1166 "Invariant violated: shortened > next. shortened={:?}, next={:?}",
1167 shortened,
1168 next
1169 );
1170 }
1171
1172 {
1174 let prev = make_full_key(1, b"abc", 100);
1175 let next = make_full_key(1, b"abdef", 100);
1176 let result = try_shorten_block_smallest_key(&prev, &next);
1177 assert!(result.is_some());
1178 let shortened = result.unwrap();
1179 assert_eq!(shortened.user_key.table_key.as_ref(), b"abd");
1180 assert_invariant(&prev, &shortened, &next);
1181 }
1182
1183 {
1185 let prev = make_full_key(1, b"ab", 100);
1186 let next = make_full_key(1, b"abcd", 100);
1187 let result = try_shorten_block_smallest_key(&prev, &next);
1188 assert!(result.is_some());
1189 let shortened = result.unwrap();
1190 assert_eq!(shortened.user_key.table_key.as_ref(), b"abc");
1191 assert_invariant(&prev, &shortened, &next);
1192 }
1193
1194 {
1196 let prev = make_full_key(1, b"abc", 100);
1197 let next = make_full_key(1, b"abd", 100);
1198 assert!(try_shorten_block_smallest_key(&prev, &next).is_none());
1199 }
1200
1201 {
1203 let prev = make_full_key(1, b"abc", 100);
1204 let next = make_full_key(1, b"abcd", 100);
1205 assert!(try_shorten_block_smallest_key(&prev, &next).is_none());
1206 }
1207
1208 {
1210 let prev = make_full_key(1, b"abc", 200);
1211 let next = make_full_key(1, b"abc", 100);
1212 assert!(try_shorten_block_smallest_key(&prev, &next).is_none());
1213 }
1214
1215 {
1217 let prev = make_full_key(1, b"a", 100);
1218 let next = make_full_key(1, b"b", 100);
1219 assert!(try_shorten_block_smallest_key(&prev, &next).is_none());
1220 }
1221
1222 {
1224 let prev = make_full_key(1, b"hello_world_abc", 100);
1225 let next = make_full_key(1, b"hello_world_xyz123", 100);
1226 let result = try_shorten_block_smallest_key(&prev, &next);
1227 assert!(result.is_some());
1228 let shortened = result.unwrap();
1229 assert_eq!(shortened.user_key.table_key.as_ref(), b"hello_world_x");
1230 assert_invariant(&prev, &shortened, &next);
1231 }
1232
1233 {
1235 let prev = make_full_key(1, b"aaa", 100);
1236 let next = make_full_key(1, b"bbbccc", 100);
1237 let result = try_shorten_block_smallest_key(&prev, &next);
1238 assert!(result.is_some());
1239 let shortened = result.unwrap();
1240 assert_eq!(shortened.user_key.table_key.as_ref(), b"b");
1241 assert_invariant(&prev, &shortened, &next);
1242 }
1243
1244 {
1246 let prev = make_full_key(1, b"abc", 100);
1247 let next = make_full_key(1, b"abcde", 100);
1248 let result = try_shorten_block_smallest_key(&prev, &next);
1249 assert!(result.is_some());
1250 let shortened = result.unwrap();
1251 assert_eq!(shortened.user_key.table_key.as_ref(), b"abcd");
1252 assert_invariant(&prev, &shortened, &next);
1253 }
1254 }
1255}