1use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::io::{Read, Write};
18use std::mem::size_of;
19use std::ops::Range;
20
21use bytes::{Buf, BufMut, Bytes, BytesMut};
22use risingwave_common::catalog::TableId;
23use risingwave_hummock_sdk::KeyComparator;
24use risingwave_hummock_sdk::key::{FullKey, TableKey};
25use serde::{Deserialize, Serialize};
26
27use super::utils::{CompressionAlgorithm, bytes_diff_below_max_key_length, xxhash64_verify};
28use crate::hummock::sstable::utils;
29use crate::hummock::sstable::utils::xxhash64_checksum;
30use crate::hummock::{HummockError, HummockResult};
31use crate::monitor::Hitmap;
32
33pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
34pub const DEFAULT_RESTART_INTERVAL: usize = 16;
35pub const DEFAULT_ENTRY_SIZE: usize = 24; #[allow(non_camel_case_types)]
38#[derive(Clone, Copy, PartialEq, Eq, Debug)]
39pub enum LenType {
40 u8 = 1,
41 u16 = 2,
42 u32 = 3,
43}
44
45macro_rules! put_fn {
46 ($name:ident, $($value:ident: $type:ty),*) => {
47 fn $name<T: BufMut>(&self, buf: &mut T, $($value: $type),*) {
48 match *self {
49 LenType::u8 => {
50 $(buf.put_u8($value as u8);)*
51 },
52
53 LenType::u16 => {
54 $(buf.put_u16($value as u16);)*
55 },
56
57 LenType::u32 => {
58 $(buf.put_u32($value as u32);)*
59 },
60 }
61 }
62 };
63}
64
65macro_rules! get_fn {
66 ($name:ident, $($type:ty),*) => {
67 #[allow(unused_parens)]
68 fn $name<T: Buf>(&self, buf: &mut T) -> ($($type), *) {
69 match *self {
70 LenType::u8 => {
71 ($(buf.get_u8() as $type),*)
72 }
73 LenType::u16 => {
74 ($(buf.get_u16() as $type),*)
75 }
76 LenType::u32 => {
77 ($(buf.get_u32() as $type),*)
78 }
79 }
80 }
81 };
82}
83
84impl From<u8> for LenType {
85 fn from(value: u8) -> Self {
86 match value {
87 1 => LenType::u8,
88 2 => LenType::u16,
89 3 => LenType::u32,
90 _ => {
91 panic!("unexpected type {}", value)
92 }
93 }
94 }
95}
96
97impl LenType {
98 put_fn!(put, v1: usize);
99
100 put_fn!(put2, v1: usize, v2: usize);
101
102 get_fn!(get, usize);
103
104 get_fn!(get2, usize, usize);
105
106 fn new(len: usize) -> Self {
107 const U8_MAX: usize = u8::MAX as usize + 1;
108 const U16_MAX: usize = u16::MAX as usize + 1;
109 const U32_MAX: usize = u32::MAX as usize + 1;
110
111 match len {
112 0..U8_MAX => LenType::u8,
113 U8_MAX..U16_MAX => LenType::u16,
114 U16_MAX..U32_MAX => LenType::u32,
115 _ => unreachable!("unexpected LenType {}", len),
116 }
117 }
118
119 fn len(&self) -> usize {
120 match *self {
121 Self::u8 => size_of::<u8>(),
122 Self::u16 => size_of::<u16>(),
123 Self::u32 => size_of::<u32>(),
124 }
125 }
126}
127
128#[derive(Clone, Copy, Debug, PartialEq, Eq)]
129pub struct RestartPoint {
130 pub offset: u32,
131 pub key_len_type: LenType,
132 pub value_len_type: LenType,
133}
134
135impl RestartPoint {
136 fn size_of() -> usize {
137 std::mem::size_of::<u32>() + std::mem::size_of::<LenType>()
140 }
141}
142
143pub struct Block {
144 data: Bytes,
146 data_len: usize,
148
149 table_id: TableId,
151
152 restart_points: Vec<RestartPoint>,
154
155 hitmap: Hitmap<{ Self::HITMAP_ELEMS }>,
156}
157
158impl Clone for Block {
159 fn clone(&self) -> Self {
160 Self {
161 data: self.data.clone(),
162 data_len: self.data_len,
163 table_id: self.table_id,
164 restart_points: self.restart_points.clone(),
165 hitmap: self.hitmap.clone(),
166 }
167 }
168}
169
170impl Debug for Block {
171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 f.debug_struct("Block")
173 .field("data_len", &self.data_len)
174 .field("table_id", &self.table_id)
175 .field("restart_points", &self.restart_points)
176 .finish()
177 }
178}
179
180impl Serialize for Block {
181 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
182 where
183 S: serde::Serializer,
184 {
185 serde_bytes::serialize(&self.data[..], serializer)
186 }
187}
188
189impl<'de> Deserialize<'de> for Block {
190 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
191 where
192 D: serde::Deserializer<'de>,
193 {
194 let data: Vec<u8> = serde_bytes::deserialize(deserializer)?;
195 let data = Bytes::from(data);
196 Ok(Block::decode_from_raw(data))
197 }
198}
199
200impl Block {
201 pub const HITMAP_ELEMS: usize = 4;
202
203 pub fn get_algorithm(buf: &Bytes) -> HummockResult<CompressionAlgorithm> {
204 let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
205 Ok(compression)
206 }
207
208 pub fn decode(buf: Bytes, uncompressed_capacity: usize) -> HummockResult<Self> {
209 Self::decode_with_copy(buf, uncompressed_capacity, false)
210 }
211
212 pub fn decode_with_copy(
213 buf: Bytes,
214 uncompressed_capacity: usize,
215 copy: bool,
216 ) -> HummockResult<Self> {
217 let xxhash64_checksum = (&buf[buf.len() - 8..]).get_u64_le();
220 xxhash64_verify(&buf[..buf.len() - 8], xxhash64_checksum)?;
221
222 let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
224 let compressed_data = &buf[..buf.len() - 9];
225
226 let buf = match compression {
227 CompressionAlgorithm::None => {
228 if copy {
229 Bytes::copy_from_slice(&buf[0..(buf.len() - 9)])
230 } else {
231 buf.slice(0..(buf.len() - 9))
232 }
233 }
234 CompressionAlgorithm::Lz4 => {
235 let mut decoder = lz4::Decoder::new(compressed_data.reader())
236 .map_err(HummockError::decode_error)?;
237 let mut decoded = Vec::with_capacity(uncompressed_capacity);
238 let read_size = decoder
239 .read_to_end(&mut decoded)
240 .map_err(HummockError::decode_error)?;
241 assert_eq!(read_size, uncompressed_capacity);
242 Bytes::from(decoded)
243 }
244 CompressionAlgorithm::Zstd => {
245 let mut decoder = zstd::Decoder::new(compressed_data.reader())
246 .map_err(HummockError::decode_error)?;
247 let mut decoded = Vec::with_capacity(uncompressed_capacity);
248 let read_size = decoder
249 .read_to_end(&mut decoded)
250 .map_err(HummockError::decode_error)?;
251 assert_eq!(read_size, uncompressed_capacity);
252 Bytes::from(decoded)
253 }
254 };
255
256 Ok(Self::decode_from_raw(buf))
257 }
258
259 pub fn decode_from_raw(buf: Bytes) -> Self {
260 let table_id = (&buf[buf.len() - 4..]).get_u32_le();
261 let n_index = ((&buf[buf.len() - 8..buf.len() - 4]).get_u32_le()) as usize;
263 let index_data_len = size_of::<u32>() + n_index * RestartPoint::size_of();
264 let data_len = buf.len() - 4 - index_data_len;
265 let mut restart_points_type_index_buf = &buf[data_len..buf.len() - 8];
266
267 let mut index_key_vec = Vec::with_capacity(n_index);
268 for _ in 0..n_index {
269 let offset = restart_points_type_index_buf.get_u32_le();
270 let value = restart_points_type_index_buf.get_u8();
271 let key_len_type = LenType::from(value >> 4);
272 let value_len_type = LenType::from(value & 0x0F);
273
274 index_key_vec.push(RestartPoint {
275 offset,
276 key_len_type,
277 value_len_type,
278 });
279 }
280
281 let n_restarts = ((&buf[data_len - 4..]).get_u32_le()) as usize;
283 let restart_points_len = size_of::<u32>() + n_restarts * (size_of::<u32>());
284 let restarts_end = data_len - 4;
285 let data_len = data_len - restart_points_len;
286 let mut restart_points = Vec::with_capacity(n_restarts);
287 let mut restart_points_buf = &buf[data_len..restarts_end];
288
289 let mut type_index: usize = 0;
290
291 for _ in 0..n_restarts {
292 let offset = restart_points_buf.get_u32_le();
293 if type_index < index_key_vec.len() - 1
294 && offset >= index_key_vec[type_index + 1].offset
295 {
296 type_index += 1;
297 }
298
299 restart_points.push(RestartPoint {
300 offset,
301 key_len_type: index_key_vec[type_index].key_len_type,
302 value_len_type: index_key_vec[type_index].value_len_type,
303 });
304 }
305
306 Block {
307 data: buf,
308 data_len,
309 restart_points,
310 table_id: TableId::new(table_id),
311 hitmap: Hitmap::default(),
312 }
313 }
314
315 #[expect(clippy::len_without_is_empty)]
317 pub fn len(&self) -> usize {
318 assert!(!self.data.is_empty());
319 self.data_len
320 }
321
322 pub fn capacity(&self) -> usize {
323 self.data.len()
324 + self.restart_points.capacity() * std::mem::size_of::<u32>()
325 + std::mem::size_of::<u32>()
326 }
327
328 pub fn estimated_memory_weight(&self) -> usize {
329 size_of::<Self>()
330 + self.data.len()
331 + self.restart_points.capacity() * size_of::<RestartPoint>()
332 + Hitmap::<{ Self::HITMAP_ELEMS }>::bytes()
333 }
334
335 pub fn table_id(&self) -> TableId {
336 self.table_id
337 }
338
339 pub fn restart_point(&self, index: usize) -> RestartPoint {
341 self.restart_points[index]
342 }
343
344 pub fn restart_point_len(&self) -> usize {
346 self.restart_points.len()
347 }
348
349 pub fn search_restart_partition_point<P>(&self, pred: P) -> usize
351 where
352 P: FnMut(&RestartPoint) -> bool,
353 {
354 self.restart_points.partition_point(pred)
355 }
356
357 pub fn data(&self) -> &[u8] {
358 &self.data[..self.data_len]
359 }
360
361 pub fn raw(&self) -> &[u8] {
362 &self.data[..]
363 }
364
365 pub fn hitmap(&self) -> &Hitmap<{ Self::HITMAP_ELEMS }> {
366 &self.hitmap
367 }
368
369 pub fn efficiency(&self) -> f64 {
370 self.hitmap.ratio()
371 }
372}
373
374#[derive(Debug)]
376pub struct KeyPrefix {
377 overlap: usize,
378 diff: usize,
379 value: usize,
380 offset: usize,
382
383 len: usize,
384}
385
386impl KeyPrefix {
387 pub fn new_without_len(overlap: usize, diff: usize, value: usize, offset: usize) -> Self {
390 KeyPrefix {
391 overlap,
392 diff,
393 value,
394 offset,
395 len: 0, }
397 }
398}
399
400impl KeyPrefix {
401 pub fn encode(&self, buf: &mut impl BufMut, key_len_type: LenType, value_len_type: LenType) {
402 key_len_type.put2(buf, self.overlap, self.diff);
403 value_len_type.put(buf, self.value);
404 }
405
406 pub fn decode(
407 buf: &mut impl Buf,
408 offset: usize,
409 key_len_type: LenType,
410 value_len_type: LenType,
411 ) -> Self {
412 let (overlap, diff) = key_len_type.get2(buf);
413 let value = value_len_type.get(buf);
414
415 let len = key_len_type.len() * 2 + value_len_type.len();
416
417 Self {
418 overlap,
419 diff,
420 value,
421 offset,
422 len,
423 }
424 }
425
426 fn len(&self) -> usize {
428 self.len
429 }
430
431 pub fn overlap_len(&self) -> usize {
433 self.overlap
434 }
435
436 pub fn diff_key_range(&self) -> Range<usize> {
438 self.offset + self.len()..self.offset + self.len() + self.diff
439 }
440
441 pub fn value_range(&self) -> Range<usize> {
443 self.offset + self.len() + self.diff..self.offset + self.len() + self.diff + self.value
444 }
445
446 pub fn entry_len(&self) -> usize {
448 self.len() + self.diff + self.value
449 }
450}
451
452pub struct BlockBuilderOptions {
453 pub capacity: usize,
455 pub compression_algorithm: CompressionAlgorithm,
457 pub restart_interval: usize,
459}
460
461impl Default for BlockBuilderOptions {
462 fn default() -> Self {
463 Self {
464 capacity: DEFAULT_BLOCK_SIZE,
465 compression_algorithm: CompressionAlgorithm::None,
466 restart_interval: DEFAULT_RESTART_INTERVAL,
467 }
468 }
469}
470
471pub struct BlockBuilder {
473 buf: BytesMut,
475 compress_buf: BytesMut,
477 restart_count: usize,
479 restart_points: Vec<u32>,
481 last_key: Vec<u8>,
483 entry_count: usize,
485 compression_algorithm: CompressionAlgorithm,
487
488 table_id: Option<TableId>,
489 restart_points_type_index: Vec<RestartPoint>,
492}
493
494impl BlockBuilder {
495 pub fn new(options: BlockBuilderOptions) -> Self {
496 Self {
497 buf: BytesMut::with_capacity(Self::buf_reserve_size(&options)),
499 compress_buf: BytesMut::default(),
500 restart_count: options.restart_interval,
501 restart_points: Vec::with_capacity(
502 options.capacity / DEFAULT_ENTRY_SIZE / options.restart_interval + 1,
503 ),
504 last_key: vec![],
505 entry_count: 0,
506 compression_algorithm: options.compression_algorithm,
507 table_id: None,
508 restart_points_type_index: Vec::default(),
509 }
510 }
511
512 pub fn add(&mut self, table_id: TableId, encoded_key_without_table_id: &[u8], value: &[u8]) {
530 match self.table_id {
531 Some(current_table_id) => assert_eq!(current_table_id, table_id),
532 None => self.table_id = Some(table_id),
533 }
534 #[cfg(debug_assertions)]
535 self.debug_valid();
536
537 self.add_encoded_impl(encoded_key_without_table_id, value);
539 }
540
541 #[cfg(any(test, feature = "test"))]
542 pub fn add_for_test(&mut self, full_key: FullKey<&[u8]>, value: &[u8]) {
543 use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
544 let input_table_id = full_key.user_key.table_id;
545 match self.table_id {
546 Some(current_table_id) => assert_eq!(current_table_id, input_table_id),
547 None => self.table_id = Some(input_table_id),
548 }
549 #[cfg(debug_assertions)]
550 self.debug_valid();
551
552 let mut key: BytesMut = Default::default();
553 full_key.encode_into(&mut key);
554
555 let encoded_key_without_table_id = &key[TABLE_PREFIX_LEN..];
557
558 self.add_encoded_impl(encoded_key_without_table_id, value);
560 }
561
562 fn add_encoded_impl(&mut self, key: &[u8], value: &[u8]) {
565 if self.entry_count > 0 {
566 debug_assert!(!key.is_empty());
567 debug_assert_eq!(
568 KeyComparator::compare_encoded_full_key(&self.last_key[..], key),
569 Ordering::Less,
570 );
571 }
572 let k_type = LenType::new(key.len());
574 let v_type = LenType::new(value.len());
575
576 let type_mismatch = if let Some(RestartPoint {
577 offset: _,
578 key_len_type: last_key_len_type,
579 value_len_type: last_value_len_type,
580 }) = self.restart_points_type_index.last()
581 {
582 k_type != *last_key_len_type || v_type != *last_value_len_type
583 } else {
584 true
585 };
586
587 let diff_key = if self.entry_count.is_multiple_of(self.restart_count) || type_mismatch {
588 let offset = utils::checked_into_u32(self.buf.len()).unwrap_or_else(|_| {
589 panic!(
590 "WARN overflow can't convert buf_len {} into u32 table {:?}",
591 self.buf.len(),
592 self.table_id,
593 )
594 });
595
596 self.restart_points.push(offset);
597
598 if type_mismatch {
599 self.restart_points_type_index.push(RestartPoint {
600 offset,
601 key_len_type: k_type,
602 value_len_type: v_type,
603 });
604 }
605
606 key
607 } else {
608 bytes_diff_below_max_key_length(&self.last_key, key)
609 };
610
611 let prefix = KeyPrefix::new_without_len(
612 key.len() - diff_key.len(),
613 diff_key.len(),
614 value.len(),
615 self.buf.len(),
616 );
617
618 prefix.encode(&mut self.buf, k_type, v_type);
619 self.buf.put_slice(diff_key);
620 self.buf.put_slice(value);
621
622 self.last_key.clear();
623 self.last_key.extend_from_slice(key);
624 self.entry_count += 1;
625 }
626
627 pub fn get_last_key(&self) -> &[u8] {
628 &self.last_key
629 }
630
631 pub fn is_empty(&self) -> bool {
632 self.buf.is_empty()
633 }
634
635 pub fn clear(&mut self) {
636 self.buf.clear();
637 self.restart_points.clear();
638 self.table_id = None;
639 self.restart_points_type_index.clear();
640 self.last_key.clear();
641 self.entry_count = 0;
642 }
643
644 pub fn uncompressed_block_size(&mut self) -> usize {
646 self.buf.len()
647 + (self.restart_points.len() + 1) * std::mem::size_of::<u32>()
648 + (RestartPoint::size_of()) * self.restart_points_type_index.len()
650 + std::mem::size_of::<u32>() + std::mem::size_of::<u32>() }
653
654 pub fn build(&mut self) -> &[u8] {
667 assert!(
668 self.entry_count > 0,
669 "buf_len {} entry_count {} table {:?}",
670 self.buf.len(),
671 self.entry_count,
672 self.table_id
673 );
674
675 for restart_point in &self.restart_points {
676 self.buf.put_u32_le(*restart_point);
677 }
678
679 self.buf.put_u32_le(
680 utils::checked_into_u32(self.restart_points.len()).unwrap_or_else(|_| {
681 panic!(
682 "WARN overflow can't convert restart_points_len {} into u32 table {:?}",
683 self.restart_points.len(),
684 self.table_id,
685 )
686 }),
687 );
688 for RestartPoint {
689 offset,
690 key_len_type,
691 value_len_type,
692 } in &self.restart_points_type_index
693 {
694 self.buf.put_u32_le(*offset);
695
696 let mut value: u8 = 0;
697 value |= *key_len_type as u8;
698 value <<= 4;
699 value |= *value_len_type as u8;
700
701 self.buf.put_u8(value);
702 }
703
704 self.buf.put_u32_le(
705 utils::checked_into_u32(self.restart_points_type_index.len()).unwrap_or_else(|_| {
706 panic!(
707 "WARN overflow can't convert restart_points_type_index_len {} into u32 table {:?}",
708 self.restart_points_type_index.len(),
709 self.table_id,
710 )
711 }),
712 );
713
714 self.buf.put_u32_le(self.table_id.unwrap().as_raw_id());
715 let result_buf = if self.compression_algorithm != CompressionAlgorithm::None {
716 self.compress_buf.clear();
717 self.compress_buf = Self::compress(
718 &self.buf[..],
719 self.compression_algorithm,
720 std::mem::take(&mut self.compress_buf),
721 );
722
723 &mut self.compress_buf
724 } else {
725 &mut self.buf
726 };
727
728 self.compression_algorithm.encode(result_buf);
729 let checksum = xxhash64_checksum(result_buf);
730 result_buf.put_u64_le(checksum);
731 assert!(
732 result_buf.len() < (u32::MAX) as usize,
733 "buf_len {} entry_count {} table {:?}",
734 result_buf.len(),
735 self.entry_count,
736 self.table_id
737 );
738
739 if self.compression_algorithm != CompressionAlgorithm::None {
740 self.compress_buf.as_ref()
741 } else {
742 self.buf.as_ref()
743 }
744 }
745
746 pub fn compress_block(
747 buf: Bytes,
748 target_compression: CompressionAlgorithm,
749 ) -> HummockResult<Bytes> {
750 let checksum = (&buf[buf.len() - 8..]).get_u64_le();
752 xxhash64_verify(&buf[..buf.len() - 8], checksum)?;
753 let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
755 let compressed_data = &buf[..buf.len() - 9];
756 assert_eq!(compression, CompressionAlgorithm::None);
757 let mut compress_writer = Self::compress(
758 compressed_data,
759 target_compression,
760 BytesMut::with_capacity(buf.len()),
761 );
762
763 target_compression.encode(&mut compress_writer);
764 let checksum = xxhash64_checksum(&compress_writer);
765 compress_writer.put_u64_le(checksum);
766 Ok(compress_writer.freeze())
767 }
768
769 pub fn compress(
770 buf: &[u8],
771 compression_algorithm: CompressionAlgorithm,
772 compress_writer: BytesMut,
773 ) -> BytesMut {
774 match compression_algorithm {
775 CompressionAlgorithm::None => unreachable!(),
776 CompressionAlgorithm::Lz4 => {
777 let mut encoder = lz4::EncoderBuilder::new()
778 .level(4)
779 .build(compress_writer.writer())
780 .map_err(HummockError::encode_error)
781 .unwrap();
782 encoder
783 .write_all(buf)
784 .map_err(HummockError::encode_error)
785 .unwrap();
786 let (writer, result) = encoder.finish();
787 result.map_err(HummockError::encode_error).unwrap();
788 writer.into_inner()
789 }
790 CompressionAlgorithm::Zstd => {
791 let mut encoder = zstd::Encoder::new(compress_writer.writer(), 4)
792 .map_err(HummockError::encode_error)
793 .unwrap();
794 encoder
795 .write_all(buf)
796 .map_err(HummockError::encode_error)
797 .unwrap();
798 let writer = encoder
799 .finish()
800 .map_err(HummockError::encode_error)
801 .unwrap();
802 writer.into_inner()
803 }
804 }
805 }
806
807 pub fn approximate_len(&self) -> usize {
809 self.buf.len()
812 + std::mem::size_of::<u32>() * self.restart_points.len() + std::mem::size_of::<u32>() + RestartPoint::size_of() * self.restart_points_type_index.len() + std::mem::size_of::<u32>() + std::mem::size_of::<CompressionAlgorithm>() + std::mem::size_of::<u64>() + std::mem::size_of::<u32>() }
820
821 pub fn debug_valid(&self) {
822 if self.entry_count == 0 {
823 debug_assert!(self.buf.is_empty());
824 debug_assert!(self.restart_points.is_empty());
825 debug_assert!(self.restart_points_type_index.is_empty());
826 debug_assert!(self.last_key.is_empty());
827 }
828 }
829
830 pub fn table_id(&self) -> Option<TableId> {
831 self.table_id
832 }
833
834 fn buf_reserve_size(option: &BlockBuilderOptions) -> usize {
835 option.capacity + 1024 + 256
836 }
837}
838
839pub fn try_shorten_block_smallest_key(
847 prev_block_last: &FullKey<&[u8]>,
848 block_smallest: &FullKey<&[u8]>,
849) -> Option<FullKey<Vec<u8>>> {
850 fn lcp_len(a: &[u8], b: &[u8]) -> usize {
852 let min_len = a.len().min(b.len());
853 for i in 0..min_len {
854 if a[i] != b[i] {
855 return i;
856 }
857 }
858 min_len
859 }
860
861 let prev_table_key = prev_block_last.user_key.table_key.as_ref();
862 let next_table_key = block_smallest.user_key.table_key.as_ref();
863 assert_eq!(
864 prev_block_last.user_key.table_id, block_smallest.user_key.table_id,
865 "table ids must match for shortening block smallest key"
866 );
867
868 let lcp = lcp_len(prev_table_key, next_table_key);
869
870 let shortened_len = lcp + 1;
873 if shortened_len >= next_table_key.len() {
874 return None;
875 }
876
877 let cand = FullKey::new_with_gap_epoch(
879 block_smallest.user_key.table_id,
880 TableKey(&next_table_key[..shortened_len]),
881 block_smallest.epoch_with_gap,
882 );
883
884 assert!(
888 prev_block_last.cmp(&cand).is_lt(),
889 "Invariant violated: prev_block_last >= cand. prev={:?}, cand={:?}",
890 prev_block_last,
891 cand
892 );
893 assert!(
894 cand.cmp(block_smallest).is_le(),
895 "Invariant violated: cand > block_smallest. cand={:?}, block_smallest={:?}",
896 cand,
897 block_smallest
898 );
899
900 Some(cand.copy_into())
901}
902
903#[cfg(test)]
904mod tests {
905
906 use risingwave_common::util::epoch::test_epoch;
907 use risingwave_hummock_sdk::key::MAX_KEY_LEN;
908
909 use super::*;
910 use crate::hummock::{BlockHolder, BlockIterator};
911
912 #[test]
913 fn test_block_enc_dec() {
914 let options = BlockBuilderOptions::default();
915 let mut builder = BlockBuilder::new(options);
916 builder.add_for_test(construct_full_key_struct_for_test(0, b"k1", 1), b"v01");
917 builder.add_for_test(construct_full_key_struct_for_test(0, b"k2", 2), b"v02");
918 builder.add_for_test(construct_full_key_struct_for_test(0, b"k3", 3), b"v03");
919 builder.add_for_test(construct_full_key_struct_for_test(0, b"k4", 4), b"v04");
920 let capacity = builder.uncompressed_block_size();
921 assert_eq!(capacity, builder.approximate_len() - 9);
922 let buf = builder.build().to_vec();
923 let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
924 let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
925
926 bi.seek_to_first();
927 assert!(bi.is_valid());
928 assert_eq!(construct_full_key_struct_for_test(0, b"k1", 1), bi.key());
929 assert_eq!(b"v01", bi.value());
930
931 bi.next();
932 assert!(bi.is_valid());
933 assert_eq!(construct_full_key_struct_for_test(0, b"k2", 2), bi.key());
934 assert_eq!(b"v02", bi.value());
935
936 bi.next();
937 assert!(bi.is_valid());
938 assert_eq!(construct_full_key_struct_for_test(0, b"k3", 3), bi.key());
939 assert_eq!(b"v03", bi.value());
940
941 bi.next();
942 assert!(bi.is_valid());
943 assert_eq!(construct_full_key_struct_for_test(0, b"k4", 4), bi.key());
944 assert_eq!(b"v04", bi.value());
945
946 bi.next();
947 assert!(!bi.is_valid());
948 }
949
950 #[test]
951 fn test_compressed_block_enc_dec() {
952 inner_test_compressed(CompressionAlgorithm::Lz4);
953 inner_test_compressed(CompressionAlgorithm::Zstd);
954 }
955
956 fn inner_test_compressed(algo: CompressionAlgorithm) {
957 let options = BlockBuilderOptions {
958 compression_algorithm: algo,
959 ..Default::default()
960 };
961 let mut builder = BlockBuilder::new(options);
962 builder.add_for_test(construct_full_key_struct_for_test(0, b"k1", 1), b"v01");
963 builder.add_for_test(construct_full_key_struct_for_test(0, b"k2", 2), b"v02");
964 builder.add_for_test(construct_full_key_struct_for_test(0, b"k3", 3), b"v03");
965 builder.add_for_test(construct_full_key_struct_for_test(0, b"k4", 4), b"v04");
966 let capacity = builder.uncompressed_block_size();
967 assert_eq!(capacity, builder.approximate_len() - 9);
968 let buf = builder.build().to_vec();
969 let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
970 let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
971
972 bi.seek_to_first();
973 assert!(bi.is_valid());
974 assert_eq!(construct_full_key_struct_for_test(0, b"k1", 1), bi.key());
975 assert_eq!(b"v01", bi.value());
976
977 bi.next();
978 assert!(bi.is_valid());
979 assert_eq!(construct_full_key_struct_for_test(0, b"k2", 2), bi.key());
980 assert_eq!(b"v02", bi.value());
981
982 bi.next();
983 assert!(bi.is_valid());
984 assert_eq!(construct_full_key_struct_for_test(0, b"k3", 3), bi.key());
985 assert_eq!(b"v03", bi.value());
986
987 bi.next();
988 assert!(bi.is_valid());
989 assert_eq!(construct_full_key_struct_for_test(0, b"k4", 4), bi.key());
990 assert_eq!(b"v04", bi.value());
991
992 bi.next();
993 assert!(!bi.is_valid());
994 }
995
996 pub fn construct_full_key_struct_for_test(
997 table_id: u32,
998 table_key: &[u8],
999 epoch: u64,
1000 ) -> FullKey<&[u8]> {
1001 FullKey::for_test(TableId::new(table_id), table_key, test_epoch(epoch))
1002 }
1003
1004 #[test]
1005 fn test_block_enc_large_key() {
1006 let options = BlockBuilderOptions::default();
1007 let mut builder = BlockBuilder::new(options);
1008 let medium_key = vec![b'a'; MAX_KEY_LEN - 500];
1009 let large_key = vec![b'b'; MAX_KEY_LEN];
1010 let xlarge_key = vec![b'c'; MAX_KEY_LEN + 500];
1011
1012 builder.add_for_test(construct_full_key_struct_for_test(0, &medium_key, 1), b"v1");
1013 builder.add_for_test(construct_full_key_struct_for_test(0, &large_key, 2), b"v2");
1014 builder.add_for_test(construct_full_key_struct_for_test(0, &xlarge_key, 3), b"v3");
1015 let capacity = builder.uncompressed_block_size();
1016 assert_eq!(capacity, builder.approximate_len() - 9);
1017 let buf = builder.build().to_vec();
1018 let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
1019 let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
1020
1021 bi.seek_to_first();
1022 assert!(bi.is_valid());
1023 assert_eq!(
1024 construct_full_key_struct_for_test(0, &medium_key, 1),
1025 bi.key()
1026 );
1027 assert_eq!(b"v1", bi.value());
1028
1029 bi.next();
1030 assert!(bi.is_valid());
1031 assert_eq!(
1032 construct_full_key_struct_for_test(0, &large_key, 2),
1033 bi.key()
1034 );
1035 assert_eq!(b"v2", bi.value());
1036
1037 bi.next();
1038 assert!(bi.is_valid());
1039 assert_eq!(
1040 construct_full_key_struct_for_test(0, &xlarge_key, 3),
1041 bi.key()
1042 );
1043 assert_eq!(b"v3", bi.value());
1044
1045 bi.next();
1046 assert!(!bi.is_valid());
1047 }
1048
1049 #[test]
1050 fn test_block_restart_point() {
1051 let options = BlockBuilderOptions::default();
1052 let mut builder = BlockBuilder::new(options);
1053
1054 const KEY_COUNT: u8 = 100;
1055 const BUILDER_COUNT: u8 = 5;
1056
1057 for _ in 0..BUILDER_COUNT {
1058 for index in 0..KEY_COUNT {
1059 if index < 50 {
1060 let mut medium_key = vec![b'A'; MAX_KEY_LEN - 500];
1061 medium_key.push(index);
1062 builder
1063 .add_for_test(construct_full_key_struct_for_test(0, &medium_key, 1), b"v1");
1064 } else if index < 80 {
1065 let mut large_key = vec![b'B'; MAX_KEY_LEN];
1066 large_key.push(index);
1067 builder
1068 .add_for_test(construct_full_key_struct_for_test(0, &large_key, 2), b"v2");
1069 } else {
1070 let mut xlarge_key = vec![b'C'; MAX_KEY_LEN + 500];
1071 xlarge_key.push(index);
1072 builder
1073 .add_for_test(construct_full_key_struct_for_test(0, &xlarge_key, 3), b"v3");
1074 }
1075 }
1076
1077 let capacity = builder.uncompressed_block_size();
1078 assert_eq!(capacity, builder.approximate_len() - 9);
1079 let buf = builder.build().to_vec();
1080 let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
1081 let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
1082 bi.seek_to_first();
1083 assert!(bi.is_valid());
1084
1085 for index in 0..KEY_COUNT {
1086 if index < 50 {
1087 let mut medium_key = vec![b'A'; MAX_KEY_LEN - 500];
1088 medium_key.push(index);
1089 assert_eq!(
1090 construct_full_key_struct_for_test(0, &medium_key, 1),
1091 bi.key()
1092 );
1093 } else if index < 80 {
1094 let mut large_key = vec![b'B'; MAX_KEY_LEN];
1095 large_key.push(index);
1096 assert_eq!(
1097 construct_full_key_struct_for_test(0, &large_key, 2),
1098 bi.key()
1099 );
1100 } else {
1101 let mut xlarge_key = vec![b'C'; MAX_KEY_LEN + 500];
1102 xlarge_key.push(index);
1103 assert_eq!(
1104 construct_full_key_struct_for_test(0, &xlarge_key, 3),
1105 bi.key()
1106 );
1107 }
1108 bi.next();
1109 }
1110
1111 assert!(!bi.is_valid());
1112 builder.clear();
1113 }
1114 }
1115
1116 #[test]
1117 fn test_block_serde() {
1118 fn assmut_serde<'de, T: Serialize + Deserialize<'de>>() {}
1119
1120 assmut_serde::<Block>();
1121 assmut_serde::<Box<Block>>();
1122
1123 let options = BlockBuilderOptions::default();
1124 let mut builder = BlockBuilder::new(options);
1125 for i in 0..100 {
1126 builder.add_for_test(
1127 construct_full_key_struct_for_test(0, format!("k{i:8}").as_bytes(), i),
1128 format!("v{i:8}").as_bytes(),
1129 );
1130 }
1131
1132 let capacity = builder.uncompressed_block_size();
1133 assert_eq!(capacity, builder.approximate_len() - 9);
1134 let buf = builder.build().to_vec();
1135
1136 let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
1137
1138 let buffer = bincode::serialize(&block).unwrap();
1139 let blk: Block = bincode::deserialize(&buffer).unwrap();
1140
1141 assert_eq!(block.data, blk.data);
1142 assert_eq!(block.data_len, blk.data_len);
1143 assert_eq!(block.table_id, blk.table_id,);
1144 assert_eq!(block.restart_points, blk.restart_points);
1145 }
1146
1147 #[test]
1148 fn test_try_shorten_block_smallest_key() {
1149 use risingwave_hummock_sdk::EpochWithGap;
1150
1151 fn make_full_key(table_id: u32, table_key: &[u8], epoch: u64) -> FullKey<&[u8]> {
1152 FullKey::new_with_gap_epoch(
1153 TableId::new(table_id),
1154 TableKey(table_key),
1155 EpochWithGap::new_from_epoch(test_epoch(epoch)),
1156 )
1157 }
1158
1159 fn assert_invariant(
1161 prev: &FullKey<&[u8]>,
1162 shortened: &FullKey<Vec<u8>>,
1163 next: &FullKey<&[u8]>,
1164 ) {
1165 assert!(
1166 prev.cmp(&shortened.to_ref()).is_lt(),
1167 "Invariant violated: prev >= shortened. prev={:?}, shortened={:?}",
1168 prev,
1169 shortened
1170 );
1171 assert!(
1172 shortened.to_ref().cmp(next).is_le(),
1173 "Invariant violated: shortened > next. shortened={:?}, next={:?}",
1174 shortened,
1175 next
1176 );
1177 }
1178
1179 {
1181 let prev = make_full_key(1, b"abc", 100);
1182 let next = make_full_key(1, b"abdef", 100);
1183 let result = try_shorten_block_smallest_key(&prev, &next);
1184 assert!(result.is_some());
1185 let shortened = result.unwrap();
1186 assert_eq!(shortened.user_key.table_key.as_ref(), b"abd");
1187 assert_invariant(&prev, &shortened, &next);
1188 }
1189
1190 {
1192 let prev = make_full_key(1, b"ab", 100);
1193 let next = make_full_key(1, b"abcd", 100);
1194 let result = try_shorten_block_smallest_key(&prev, &next);
1195 assert!(result.is_some());
1196 let shortened = result.unwrap();
1197 assert_eq!(shortened.user_key.table_key.as_ref(), b"abc");
1198 assert_invariant(&prev, &shortened, &next);
1199 }
1200
1201 {
1203 let prev = make_full_key(1, b"abc", 100);
1204 let next = make_full_key(1, b"abd", 100);
1205 assert!(try_shorten_block_smallest_key(&prev, &next).is_none());
1206 }
1207
1208 {
1210 let prev = make_full_key(1, b"abc", 100);
1211 let next = make_full_key(1, b"abcd", 100);
1212 assert!(try_shorten_block_smallest_key(&prev, &next).is_none());
1213 }
1214
1215 {
1217 let prev = make_full_key(1, b"abc", 200);
1218 let next = make_full_key(1, b"abc", 100);
1219 assert!(try_shorten_block_smallest_key(&prev, &next).is_none());
1220 }
1221
1222 {
1224 let prev = make_full_key(1, b"a", 100);
1225 let next = make_full_key(1, b"b", 100);
1226 assert!(try_shorten_block_smallest_key(&prev, &next).is_none());
1227 }
1228
1229 {
1231 let prev = make_full_key(1, b"hello_world_abc", 100);
1232 let next = make_full_key(1, b"hello_world_xyz123", 100);
1233 let result = try_shorten_block_smallest_key(&prev, &next);
1234 assert!(result.is_some());
1235 let shortened = result.unwrap();
1236 assert_eq!(shortened.user_key.table_key.as_ref(), b"hello_world_x");
1237 assert_invariant(&prev, &shortened, &next);
1238 }
1239
1240 {
1242 let prev = make_full_key(1, b"aaa", 100);
1243 let next = make_full_key(1, b"bbbccc", 100);
1244 let result = try_shorten_block_smallest_key(&prev, &next);
1245 assert!(result.is_some());
1246 let shortened = result.unwrap();
1247 assert_eq!(shortened.user_key.table_key.as_ref(), b"b");
1248 assert_invariant(&prev, &shortened, &next);
1249 }
1250
1251 {
1253 let prev = make_full_key(1, b"abc", 100);
1254 let next = make_full_key(1, b"abcde", 100);
1255 let result = try_shorten_block_smallest_key(&prev, &next);
1256 assert!(result.is_some());
1257 let shortened = result.unwrap();
1258 assert_eq!(shortened.user_key.table_key.as_ref(), b"abcd");
1259 assert_invariant(&prev, &shortened, &next);
1260 }
1261 }
1262}