1use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::io::{Read, Write};
18use std::mem::size_of;
19use std::ops::Range;
20
21use bytes::{Buf, BufMut, Bytes, BytesMut};
22use risingwave_common::catalog::TableId;
23use risingwave_hummock_sdk::KeyComparator;
24use risingwave_hummock_sdk::key::FullKey;
25use serde::{Deserialize, Serialize};
26
27use super::utils::{CompressionAlgorithm, bytes_diff_below_max_key_length, xxhash64_verify};
28use crate::hummock::sstable::utils;
29use crate::hummock::sstable::utils::xxhash64_checksum;
30use crate::hummock::{HummockError, HummockResult};
31use crate::monitor::Hitmap;
32
33pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
34pub const DEFAULT_RESTART_INTERVAL: usize = 16;
35pub const DEFAULT_ENTRY_SIZE: usize = 24; #[allow(non_camel_case_types)]
38#[derive(Clone, Copy, PartialEq, Eq, Debug)]
39pub enum LenType {
40 u8 = 1,
41 u16 = 2,
42 u32 = 3,
43}
44
45macro_rules! put_fn {
46 ($name:ident, $($value:ident: $type:ty),*) => {
47 fn $name<T: BufMut>(&self, buf: &mut T, $($value: $type),*) {
48 match *self {
49 LenType::u8 => {
50 $(buf.put_u8($value as u8);)*
51 },
52
53 LenType::u16 => {
54 $(buf.put_u16($value as u16);)*
55 },
56
57 LenType::u32 => {
58 $(buf.put_u32($value as u32);)*
59 },
60 }
61 }
62 };
63}
64
65macro_rules! get_fn {
66 ($name:ident, $($type:ty),*) => {
67 #[allow(unused_parens)]
68 fn $name<T: Buf>(&self, buf: &mut T) -> ($($type), *) {
69 match *self {
70 LenType::u8 => {
71 ($(buf.get_u8() as $type),*)
72 }
73 LenType::u16 => {
74 ($(buf.get_u16() as $type),*)
75 }
76 LenType::u32 => {
77 ($(buf.get_u32() as $type),*)
78 }
79 }
80 }
81 };
82}
83
84impl From<u8> for LenType {
85 fn from(value: u8) -> Self {
86 match value {
87 1 => LenType::u8,
88 2 => LenType::u16,
89 3 => LenType::u32,
90 _ => {
91 panic!("unexpected type {}", value)
92 }
93 }
94 }
95}
96
97impl LenType {
98 put_fn!(put, v1: usize);
99
100 put_fn!(put2, v1: usize, v2: usize);
101
102 get_fn!(get, usize);
103
104 get_fn!(get2, usize, usize);
105
106 fn new(len: usize) -> Self {
107 const U8_MAX: usize = u8::MAX as usize + 1;
108 const U16_MAX: usize = u16::MAX as usize + 1;
109 const U32_MAX: usize = u32::MAX as usize + 1;
110
111 match len {
112 0..U8_MAX => LenType::u8,
113 U8_MAX..U16_MAX => LenType::u16,
114 U16_MAX..U32_MAX => LenType::u32,
115 _ => unreachable!("unexpected LenType {}", len),
116 }
117 }
118
119 fn len(&self) -> usize {
120 match *self {
121 Self::u8 => size_of::<u8>(),
122 Self::u16 => size_of::<u16>(),
123 Self::u32 => size_of::<u32>(),
124 }
125 }
126}
127
128#[derive(Clone, Copy, Debug, PartialEq, Eq)]
129pub struct RestartPoint {
130 pub offset: u32,
131 pub key_len_type: LenType,
132 pub value_len_type: LenType,
133}
134
135impl RestartPoint {
136 fn size_of() -> usize {
137 std::mem::size_of::<u32>() + std::mem::size_of::<LenType>()
140 }
141}
142
143pub struct Block {
144 data: Bytes,
146 data_len: usize,
148
149 table_id: TableId,
151
152 restart_points: Vec<RestartPoint>,
154
155 hitmap: Hitmap<{ Self::HITMAP_ELEMS }>,
156}
157
158impl Clone for Block {
159 fn clone(&self) -> Self {
160 Self {
161 data: self.data.clone(),
162 data_len: self.data_len,
163 table_id: self.table_id,
164 restart_points: self.restart_points.clone(),
165 hitmap: self.hitmap.clone(),
166 }
167 }
168}
169
170impl Debug for Block {
171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 f.debug_struct("Block")
173 .field("data_len", &self.data_len)
174 .field("table_id", &self.table_id)
175 .field("restart_points", &self.restart_points)
176 .finish()
177 }
178}
179
180impl Serialize for Block {
181 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
182 where
183 S: serde::Serializer,
184 {
185 serde_bytes::serialize(&self.data[..], serializer)
186 }
187}
188
189impl<'de> Deserialize<'de> for Block {
190 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
191 where
192 D: serde::Deserializer<'de>,
193 {
194 let data: Vec<u8> = serde_bytes::deserialize(deserializer)?;
195 let data = Bytes::from(data);
196 Ok(Block::decode_from_raw(data))
197 }
198}
199
200impl Block {
201 pub const HITMAP_ELEMS: usize = 4;
202
203 pub fn get_algorithm(buf: &Bytes) -> HummockResult<CompressionAlgorithm> {
204 let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
205 Ok(compression)
206 }
207
208 pub fn decode(buf: Bytes, uncompressed_capacity: usize) -> HummockResult<Self> {
209 Self::decode_with_copy(buf, uncompressed_capacity, false)
210 }
211
212 pub fn decode_with_copy(
213 buf: Bytes,
214 uncompressed_capacity: usize,
215 copy: bool,
216 ) -> HummockResult<Self> {
217 let xxhash64_checksum = (&buf[buf.len() - 8..]).get_u64_le();
220 xxhash64_verify(&buf[..buf.len() - 8], xxhash64_checksum)?;
221
222 let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
224 let compressed_data = &buf[..buf.len() - 9];
225
226 let buf = match compression {
227 CompressionAlgorithm::None => {
228 if copy {
229 Bytes::copy_from_slice(&buf[0..(buf.len() - 9)])
230 } else {
231 buf.slice(0..(buf.len() - 9))
232 }
233 }
234 CompressionAlgorithm::Lz4 => {
235 let mut decoder = lz4::Decoder::new(compressed_data.reader())
236 .map_err(HummockError::decode_error)?;
237 let mut decoded = Vec::with_capacity(uncompressed_capacity);
238 let read_size = decoder
239 .read_to_end(&mut decoded)
240 .map_err(HummockError::decode_error)?;
241 assert_eq!(read_size, uncompressed_capacity);
242 Bytes::from(decoded)
243 }
244 CompressionAlgorithm::Zstd => {
245 let mut decoder = zstd::Decoder::new(compressed_data.reader())
246 .map_err(HummockError::decode_error)?;
247 let mut decoded = Vec::with_capacity(uncompressed_capacity);
248 let read_size = decoder
249 .read_to_end(&mut decoded)
250 .map_err(HummockError::decode_error)?;
251 assert_eq!(read_size, uncompressed_capacity);
252 Bytes::from(decoded)
253 }
254 };
255
256 Ok(Self::decode_from_raw(buf))
257 }
258
259 pub fn decode_from_raw(buf: Bytes) -> Self {
260 let table_id = (&buf[buf.len() - 4..]).get_u32_le();
261 let n_index = ((&buf[buf.len() - 8..buf.len() - 4]).get_u32_le()) as usize;
263 let index_data_len = size_of::<u32>() + n_index * RestartPoint::size_of();
264 let data_len = buf.len() - 4 - index_data_len;
265 let mut restart_points_type_index_buf = &buf[data_len..buf.len() - 8];
266
267 let mut index_key_vec = Vec::with_capacity(n_index);
268 for _ in 0..n_index {
269 let offset = restart_points_type_index_buf.get_u32_le();
270 let value = restart_points_type_index_buf.get_u8();
271 let key_len_type = LenType::from(value >> 4);
272 let value_len_type = LenType::from(value & 0x0F);
273
274 index_key_vec.push(RestartPoint {
275 offset,
276 key_len_type,
277 value_len_type,
278 });
279 }
280
281 let n_restarts = ((&buf[data_len - 4..]).get_u32_le()) as usize;
283 let restart_points_len = size_of::<u32>() + n_restarts * (size_of::<u32>());
284 let restarts_end = data_len - 4;
285 let data_len = data_len - restart_points_len;
286 let mut restart_points = Vec::with_capacity(n_restarts);
287 let mut restart_points_buf = &buf[data_len..restarts_end];
288
289 let mut type_index: usize = 0;
290
291 for _ in 0..n_restarts {
292 let offset = restart_points_buf.get_u32_le();
293 if type_index < index_key_vec.len() - 1
294 && offset >= index_key_vec[type_index + 1].offset
295 {
296 type_index += 1;
297 }
298
299 restart_points.push(RestartPoint {
300 offset,
301 key_len_type: index_key_vec[type_index].key_len_type,
302 value_len_type: index_key_vec[type_index].value_len_type,
303 });
304 }
305
306 Block {
307 data: buf,
308 data_len,
309 restart_points,
310 table_id: TableId::new(table_id),
311 hitmap: Hitmap::default(),
312 }
313 }
314
315 #[expect(clippy::len_without_is_empty)]
317 pub fn len(&self) -> usize {
318 assert!(!self.data.is_empty());
319 self.data_len
320 }
321
322 pub fn capacity(&self) -> usize {
323 self.data.len()
324 + self.restart_points.capacity() * std::mem::size_of::<u32>()
325 + std::mem::size_of::<u32>()
326 }
327
328 pub fn table_id(&self) -> TableId {
329 self.table_id
330 }
331
332 pub fn restart_point(&self, index: usize) -> RestartPoint {
334 self.restart_points[index]
335 }
336
337 pub fn restart_point_len(&self) -> usize {
339 self.restart_points.len()
340 }
341
342 pub fn search_restart_partition_point<P>(&self, pred: P) -> usize
344 where
345 P: FnMut(&RestartPoint) -> bool,
346 {
347 self.restart_points.partition_point(pred)
348 }
349
350 pub fn data(&self) -> &[u8] {
351 &self.data[..self.data_len]
352 }
353
354 pub fn raw(&self) -> &[u8] {
355 &self.data[..]
356 }
357
358 pub fn hitmap(&self) -> &Hitmap<{ Self::HITMAP_ELEMS }> {
359 &self.hitmap
360 }
361
362 pub fn efficiency(&self) -> f64 {
363 self.hitmap.ratio()
364 }
365}
366
367#[derive(Debug)]
369pub struct KeyPrefix {
370 overlap: usize,
371 diff: usize,
372 value: usize,
373 offset: usize,
375
376 len: usize,
377}
378
379impl KeyPrefix {
380 pub fn new_without_len(overlap: usize, diff: usize, value: usize, offset: usize) -> Self {
383 KeyPrefix {
384 overlap,
385 diff,
386 value,
387 offset,
388 len: 0, }
390 }
391}
392
393impl KeyPrefix {
394 pub fn encode(&self, buf: &mut impl BufMut, key_len_type: LenType, value_len_type: LenType) {
395 key_len_type.put2(buf, self.overlap, self.diff);
396 value_len_type.put(buf, self.value);
397 }
398
399 pub fn decode(
400 buf: &mut impl Buf,
401 offset: usize,
402 key_len_type: LenType,
403 value_len_type: LenType,
404 ) -> Self {
405 let (overlap, diff) = key_len_type.get2(buf);
406 let value = value_len_type.get(buf);
407
408 let len = key_len_type.len() * 2 + value_len_type.len();
409
410 Self {
411 overlap,
412 diff,
413 value,
414 offset,
415 len,
416 }
417 }
418
419 fn len(&self) -> usize {
421 self.len
422 }
423
424 pub fn overlap_len(&self) -> usize {
426 self.overlap
427 }
428
429 pub fn diff_key_range(&self) -> Range<usize> {
431 self.offset + self.len()..self.offset + self.len() + self.diff
432 }
433
434 pub fn value_range(&self) -> Range<usize> {
436 self.offset + self.len() + self.diff..self.offset + self.len() + self.diff + self.value
437 }
438
439 pub fn entry_len(&self) -> usize {
441 self.len() + self.diff + self.value
442 }
443}
444
445pub struct BlockBuilderOptions {
446 pub capacity: usize,
448 pub compression_algorithm: CompressionAlgorithm,
450 pub restart_interval: usize,
452}
453
454impl Default for BlockBuilderOptions {
455 fn default() -> Self {
456 Self {
457 capacity: DEFAULT_BLOCK_SIZE,
458 compression_algorithm: CompressionAlgorithm::None,
459 restart_interval: DEFAULT_RESTART_INTERVAL,
460 }
461 }
462}
463
464pub struct BlockBuilder {
466 buf: BytesMut,
468 compress_buf: BytesMut,
470 restart_count: usize,
472 restart_points: Vec<u32>,
474 last_key: Vec<u8>,
476 entry_count: usize,
478 compression_algorithm: CompressionAlgorithm,
480
481 table_id: Option<u32>,
482 restart_points_type_index: Vec<RestartPoint>,
485}
486
487impl BlockBuilder {
488 pub fn new(options: BlockBuilderOptions) -> Self {
489 Self {
490 buf: BytesMut::with_capacity(Self::buf_reserve_size(&options)),
492 compress_buf: BytesMut::default(),
493 restart_count: options.restart_interval,
494 restart_points: Vec::with_capacity(
495 options.capacity / DEFAULT_ENTRY_SIZE / options.restart_interval + 1,
496 ),
497 last_key: vec![],
498 entry_count: 0,
499 compression_algorithm: options.compression_algorithm,
500 table_id: None,
501 restart_points_type_index: Vec::default(),
502 }
503 }
504
505 pub fn add_for_test(&mut self, full_key: FullKey<&[u8]>, value: &[u8]) {
506 self.add(full_key, value);
507 }
508
509 pub fn add(&mut self, full_key: FullKey<&[u8]>, value: &[u8]) {
523 let input_table_id = full_key.user_key.table_id.table_id();
524 match self.table_id {
525 Some(current_table_id) => assert_eq!(current_table_id, input_table_id),
526 None => self.table_id = Some(input_table_id),
527 }
528 #[cfg(debug_assertions)]
529 self.debug_valid();
530
531 let mut key: BytesMut = Default::default();
532 full_key.encode_into_without_table_id(&mut key);
533 if self.entry_count > 0 {
534 debug_assert!(!key.is_empty());
535 debug_assert_eq!(
536 KeyComparator::compare_encoded_full_key(&self.last_key[..], &key[..]),
537 Ordering::Less,
538 "epoch: {}, table key: {}",
539 full_key.epoch_with_gap.pure_epoch(),
540 u64::from_be_bytes(
541 full_key.user_key.table_key.as_ref()[0..8]
542 .try_into()
543 .unwrap()
544 ),
545 );
546 }
547 let k_type = LenType::new(key.len());
549 let v_type = LenType::new(value.len());
550
551 let type_mismatch = if let Some(RestartPoint {
552 offset: _,
553 key_len_type: last_key_len_type,
554 value_len_type: last_value_len_type,
555 }) = self.restart_points_type_index.last()
556 {
557 k_type != *last_key_len_type || v_type != *last_value_len_type
558 } else {
559 true
560 };
561
562 let diff_key = if self.entry_count % self.restart_count == 0 || type_mismatch {
563 let offset = utils::checked_into_u32(self.buf.len()).unwrap_or_else(|_| {
564 panic!(
565 "WARN overflow can't convert buf_len {} into u32 table {:?}",
566 self.buf.len(),
567 self.table_id,
568 )
569 });
570
571 self.restart_points.push(offset);
572
573 if type_mismatch {
574 self.restart_points_type_index.push(RestartPoint {
575 offset,
576 key_len_type: k_type,
577 value_len_type: v_type,
578 });
579 }
580
581 key.as_ref()
582 } else {
583 bytes_diff_below_max_key_length(&self.last_key, &key[..])
584 };
585
586 let prefix = KeyPrefix::new_without_len(
587 key.len() - diff_key.len(),
588 diff_key.len(),
589 value.len(),
590 self.buf.len(),
591 );
592
593 prefix.encode(&mut self.buf, k_type, v_type);
594 self.buf.put_slice(diff_key);
595 self.buf.put_slice(value);
596
597 self.last_key.clear();
598 self.last_key.extend_from_slice(&key);
599 self.entry_count += 1;
600 }
601
602 pub fn get_last_key(&self) -> &[u8] {
603 &self.last_key
604 }
605
606 pub fn is_empty(&self) -> bool {
607 self.buf.is_empty()
608 }
609
610 pub fn clear(&mut self) {
611 self.buf.clear();
612 self.restart_points.clear();
613 self.table_id = None;
614 self.restart_points_type_index.clear();
615 self.last_key.clear();
616 self.entry_count = 0;
617 }
618
619 pub fn uncompressed_block_size(&mut self) -> usize {
621 self.buf.len()
622 + (self.restart_points.len() + 1) * std::mem::size_of::<u32>()
623 + (RestartPoint::size_of()) * self.restart_points_type_index.len()
625 + std::mem::size_of::<u32>() + std::mem::size_of::<u32>() }
628
629 pub fn build(&mut self) -> &[u8] {
642 assert!(
643 self.entry_count > 0,
644 "buf_len {} entry_count {} table {:?}",
645 self.buf.len(),
646 self.entry_count,
647 self.table_id
648 );
649
650 for restart_point in &self.restart_points {
651 self.buf.put_u32_le(*restart_point);
652 }
653
654 self.buf.put_u32_le(
655 utils::checked_into_u32(self.restart_points.len()).unwrap_or_else(|_| {
656 panic!(
657 "WARN overflow can't convert restart_points_len {} into u32 table {:?}",
658 self.restart_points.len(),
659 self.table_id,
660 )
661 }),
662 );
663 for RestartPoint {
664 offset,
665 key_len_type,
666 value_len_type,
667 } in &self.restart_points_type_index
668 {
669 self.buf.put_u32_le(*offset);
670
671 let mut value: u8 = 0;
672 value |= *key_len_type as u8;
673 value <<= 4;
674 value |= *value_len_type as u8;
675
676 self.buf.put_u8(value);
677 }
678
679 self.buf.put_u32_le(
680 utils::checked_into_u32(self.restart_points_type_index.len()).unwrap_or_else(|_| {
681 panic!(
682 "WARN overflow can't convert restart_points_type_index_len {} into u32 table {:?}",
683 self.restart_points_type_index.len(),
684 self.table_id,
685 )
686 }),
687 );
688
689 self.buf.put_u32_le(self.table_id.unwrap());
690 let result_buf = if self.compression_algorithm != CompressionAlgorithm::None {
691 self.compress_buf.clear();
692 self.compress_buf = Self::compress(
693 &self.buf[..],
694 self.compression_algorithm,
695 std::mem::take(&mut self.compress_buf),
696 );
697
698 &mut self.compress_buf
699 } else {
700 &mut self.buf
701 };
702
703 self.compression_algorithm.encode(result_buf);
704 let checksum = xxhash64_checksum(result_buf);
705 result_buf.put_u64_le(checksum);
706 assert!(
707 result_buf.len() < (u32::MAX) as usize,
708 "buf_len {} entry_count {} table {:?}",
709 result_buf.len(),
710 self.entry_count,
711 self.table_id
712 );
713
714 if self.compression_algorithm != CompressionAlgorithm::None {
715 self.compress_buf.as_ref()
716 } else {
717 self.buf.as_ref()
718 }
719 }
720
721 pub fn compress_block(
722 buf: Bytes,
723 target_compression: CompressionAlgorithm,
724 ) -> HummockResult<Bytes> {
725 let checksum = (&buf[buf.len() - 8..]).get_u64_le();
727 xxhash64_verify(&buf[..buf.len() - 8], checksum)?;
728 let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
730 let compressed_data = &buf[..buf.len() - 9];
731 assert_eq!(compression, CompressionAlgorithm::None);
732 let mut compress_writer = Self::compress(
733 compressed_data,
734 target_compression,
735 BytesMut::with_capacity(buf.len()),
736 );
737
738 target_compression.encode(&mut compress_writer);
739 let checksum = xxhash64_checksum(&compress_writer);
740 compress_writer.put_u64_le(checksum);
741 Ok(compress_writer.freeze())
742 }
743
744 pub fn compress(
745 buf: &[u8],
746 compression_algorithm: CompressionAlgorithm,
747 compress_writer: BytesMut,
748 ) -> BytesMut {
749 match compression_algorithm {
750 CompressionAlgorithm::None => unreachable!(),
751 CompressionAlgorithm::Lz4 => {
752 let mut encoder = lz4::EncoderBuilder::new()
753 .level(4)
754 .build(compress_writer.writer())
755 .map_err(HummockError::encode_error)
756 .unwrap();
757 encoder
758 .write_all(buf)
759 .map_err(HummockError::encode_error)
760 .unwrap();
761 let (writer, result) = encoder.finish();
762 result.map_err(HummockError::encode_error).unwrap();
763 writer.into_inner()
764 }
765 CompressionAlgorithm::Zstd => {
766 let mut encoder = zstd::Encoder::new(compress_writer.writer(), 4)
767 .map_err(HummockError::encode_error)
768 .unwrap();
769 encoder
770 .write_all(buf)
771 .map_err(HummockError::encode_error)
772 .unwrap();
773 let writer = encoder
774 .finish()
775 .map_err(HummockError::encode_error)
776 .unwrap();
777 writer.into_inner()
778 }
779 }
780 }
781
782 pub fn approximate_len(&self) -> usize {
784 self.buf.len()
787 + std::mem::size_of::<u32>() * self.restart_points.len() + std::mem::size_of::<u32>() + RestartPoint::size_of() * self.restart_points_type_index.len() + std::mem::size_of::<u32>() + std::mem::size_of::<CompressionAlgorithm>() + std::mem::size_of::<u64>() + std::mem::size_of::<u32>() }
795
796 pub fn debug_valid(&self) {
797 if self.entry_count == 0 {
798 debug_assert!(self.buf.is_empty());
799 debug_assert!(self.restart_points.is_empty());
800 debug_assert!(self.restart_points_type_index.is_empty());
801 debug_assert!(self.last_key.is_empty());
802 }
803 }
804
805 pub fn table_id(&self) -> Option<u32> {
806 self.table_id
807 }
808
809 fn buf_reserve_size(option: &BlockBuilderOptions) -> usize {
810 option.capacity + 1024 + 256
811 }
812}
813
814#[cfg(test)]
815mod tests {
816
817 use risingwave_common::util::epoch::test_epoch;
818 use risingwave_hummock_sdk::key::MAX_KEY_LEN;
819
820 use super::*;
821 use crate::hummock::{BlockHolder, BlockIterator};
822
823 #[test]
824 fn test_block_enc_dec() {
825 let options = BlockBuilderOptions::default();
826 let mut builder = BlockBuilder::new(options);
827 builder.add_for_test(construct_full_key_struct_for_test(0, b"k1", 1), b"v01");
828 builder.add_for_test(construct_full_key_struct_for_test(0, b"k2", 2), b"v02");
829 builder.add_for_test(construct_full_key_struct_for_test(0, b"k3", 3), b"v03");
830 builder.add_for_test(construct_full_key_struct_for_test(0, b"k4", 4), b"v04");
831 let capacity = builder.uncompressed_block_size();
832 assert_eq!(capacity, builder.approximate_len() - 9);
833 let buf = builder.build().to_vec();
834 let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
835 let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
836
837 bi.seek_to_first();
838 assert!(bi.is_valid());
839 assert_eq!(construct_full_key_struct_for_test(0, b"k1", 1), bi.key());
840 assert_eq!(b"v01", bi.value());
841
842 bi.next();
843 assert!(bi.is_valid());
844 assert_eq!(construct_full_key_struct_for_test(0, b"k2", 2), bi.key());
845 assert_eq!(b"v02", bi.value());
846
847 bi.next();
848 assert!(bi.is_valid());
849 assert_eq!(construct_full_key_struct_for_test(0, b"k3", 3), bi.key());
850 assert_eq!(b"v03", bi.value());
851
852 bi.next();
853 assert!(bi.is_valid());
854 assert_eq!(construct_full_key_struct_for_test(0, b"k4", 4), bi.key());
855 assert_eq!(b"v04", bi.value());
856
857 bi.next();
858 assert!(!bi.is_valid());
859 }
860
861 #[test]
862 fn test_compressed_block_enc_dec() {
863 inner_test_compressed(CompressionAlgorithm::Lz4);
864 inner_test_compressed(CompressionAlgorithm::Zstd);
865 }
866
867 fn inner_test_compressed(algo: CompressionAlgorithm) {
868 let options = BlockBuilderOptions {
869 compression_algorithm: algo,
870 ..Default::default()
871 };
872 let mut builder = BlockBuilder::new(options);
873 builder.add_for_test(construct_full_key_struct_for_test(0, b"k1", 1), b"v01");
874 builder.add_for_test(construct_full_key_struct_for_test(0, b"k2", 2), b"v02");
875 builder.add_for_test(construct_full_key_struct_for_test(0, b"k3", 3), b"v03");
876 builder.add_for_test(construct_full_key_struct_for_test(0, b"k4", 4), b"v04");
877 let capacity = builder.uncompressed_block_size();
878 assert_eq!(capacity, builder.approximate_len() - 9);
879 let buf = builder.build().to_vec();
880 let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
881 let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
882
883 bi.seek_to_first();
884 assert!(bi.is_valid());
885 assert_eq!(construct_full_key_struct_for_test(0, b"k1", 1), bi.key());
886 assert_eq!(b"v01", bi.value());
887
888 bi.next();
889 assert!(bi.is_valid());
890 assert_eq!(construct_full_key_struct_for_test(0, b"k2", 2), bi.key());
891 assert_eq!(b"v02", bi.value());
892
893 bi.next();
894 assert!(bi.is_valid());
895 assert_eq!(construct_full_key_struct_for_test(0, b"k3", 3), bi.key());
896 assert_eq!(b"v03", bi.value());
897
898 bi.next();
899 assert!(bi.is_valid());
900 assert_eq!(construct_full_key_struct_for_test(0, b"k4", 4), bi.key());
901 assert_eq!(b"v04", bi.value());
902
903 bi.next();
904 assert!(!bi.is_valid());
905 }
906
907 pub fn construct_full_key_struct_for_test(
908 table_id: u32,
909 table_key: &[u8],
910 epoch: u64,
911 ) -> FullKey<&[u8]> {
912 FullKey::for_test(TableId::new(table_id), table_key, test_epoch(epoch))
913 }
914
915 #[test]
916 fn test_block_enc_large_key() {
917 let options = BlockBuilderOptions::default();
918 let mut builder = BlockBuilder::new(options);
919 let medium_key = vec![b'a'; MAX_KEY_LEN - 500];
920 let large_key = vec![b'b'; MAX_KEY_LEN];
921 let xlarge_key = vec![b'c'; MAX_KEY_LEN + 500];
922
923 builder.add_for_test(construct_full_key_struct_for_test(0, &medium_key, 1), b"v1");
924 builder.add_for_test(construct_full_key_struct_for_test(0, &large_key, 2), b"v2");
925 builder.add_for_test(construct_full_key_struct_for_test(0, &xlarge_key, 3), b"v3");
926 let capacity = builder.uncompressed_block_size();
927 assert_eq!(capacity, builder.approximate_len() - 9);
928 let buf = builder.build().to_vec();
929 let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
930 let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
931
932 bi.seek_to_first();
933 assert!(bi.is_valid());
934 assert_eq!(
935 construct_full_key_struct_for_test(0, &medium_key, 1),
936 bi.key()
937 );
938 assert_eq!(b"v1", bi.value());
939
940 bi.next();
941 assert!(bi.is_valid());
942 assert_eq!(
943 construct_full_key_struct_for_test(0, &large_key, 2),
944 bi.key()
945 );
946 assert_eq!(b"v2", bi.value());
947
948 bi.next();
949 assert!(bi.is_valid());
950 assert_eq!(
951 construct_full_key_struct_for_test(0, &xlarge_key, 3),
952 bi.key()
953 );
954 assert_eq!(b"v3", bi.value());
955
956 bi.next();
957 assert!(!bi.is_valid());
958 }
959
960 #[test]
961 fn test_block_restart_point() {
962 let options = BlockBuilderOptions::default();
963 let mut builder = BlockBuilder::new(options);
964
965 const KEY_COUNT: u8 = 100;
966 const BUILDER_COUNT: u8 = 5;
967
968 for _ in 0..BUILDER_COUNT {
969 for index in 0..KEY_COUNT {
970 if index < 50 {
971 let mut medium_key = vec![b'A'; MAX_KEY_LEN - 500];
972 medium_key.push(index);
973 builder
974 .add_for_test(construct_full_key_struct_for_test(0, &medium_key, 1), b"v1");
975 } else if index < 80 {
976 let mut large_key = vec![b'B'; MAX_KEY_LEN];
977 large_key.push(index);
978 builder
979 .add_for_test(construct_full_key_struct_for_test(0, &large_key, 2), b"v2");
980 } else {
981 let mut xlarge_key = vec![b'C'; MAX_KEY_LEN + 500];
982 xlarge_key.push(index);
983 builder
984 .add_for_test(construct_full_key_struct_for_test(0, &xlarge_key, 3), b"v3");
985 }
986 }
987
988 let capacity = builder.uncompressed_block_size();
989 assert_eq!(capacity, builder.approximate_len() - 9);
990 let buf = builder.build().to_vec();
991 let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
992 let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
993 bi.seek_to_first();
994 assert!(bi.is_valid());
995
996 for index in 0..KEY_COUNT {
997 if index < 50 {
998 let mut medium_key = vec![b'A'; MAX_KEY_LEN - 500];
999 medium_key.push(index);
1000 assert_eq!(
1001 construct_full_key_struct_for_test(0, &medium_key, 1),
1002 bi.key()
1003 );
1004 } else if index < 80 {
1005 let mut large_key = vec![b'B'; MAX_KEY_LEN];
1006 large_key.push(index);
1007 assert_eq!(
1008 construct_full_key_struct_for_test(0, &large_key, 2),
1009 bi.key()
1010 );
1011 } else {
1012 let mut xlarge_key = vec![b'C'; MAX_KEY_LEN + 500];
1013 xlarge_key.push(index);
1014 assert_eq!(
1015 construct_full_key_struct_for_test(0, &xlarge_key, 3),
1016 bi.key()
1017 );
1018 }
1019 bi.next();
1020 }
1021
1022 assert!(!bi.is_valid());
1023 builder.clear();
1024 }
1025 }
1026
1027 #[test]
1028 fn test_block_serde() {
1029 fn assmut_serde<'de, T: Serialize + Deserialize<'de>>() {}
1030
1031 assmut_serde::<Block>();
1032 assmut_serde::<Box<Block>>();
1033
1034 let options = BlockBuilderOptions::default();
1035 let mut builder = BlockBuilder::new(options);
1036 for i in 0..100 {
1037 builder.add_for_test(
1038 construct_full_key_struct_for_test(0, format!("k{i:8}").as_bytes(), i),
1039 format!("v{i:8}").as_bytes(),
1040 );
1041 }
1042
1043 let capacity = builder.uncompressed_block_size();
1044 assert_eq!(capacity, builder.approximate_len() - 9);
1045 let buf = builder.build().to_vec();
1046
1047 let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
1048
1049 let buffer = bincode::serialize(&block).unwrap();
1050 let blk: Block = bincode::deserialize(&buffer).unwrap();
1051
1052 assert_eq!(block.data, blk.data);
1053 assert_eq!(block.data_len, blk.data_len);
1054 assert_eq!(block.table_id, blk.table_id,);
1055 assert_eq!(block.restart_points, blk.restart_points);
1056 }
1057}