risingwave_storage/hummock/sstable/
block.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::io::{Read, Write};
18use std::mem::size_of;
19use std::ops::Range;
20
21use bytes::{Buf, BufMut, Bytes, BytesMut};
22use risingwave_common::catalog::TableId;
23use risingwave_hummock_sdk::KeyComparator;
24use risingwave_hummock_sdk::key::FullKey;
25use serde::{Deserialize, Serialize};
26
27use super::utils::{CompressionAlgorithm, bytes_diff_below_max_key_length, xxhash64_verify};
28use crate::hummock::sstable::utils;
29use crate::hummock::sstable::utils::xxhash64_checksum;
30use crate::hummock::{HummockError, HummockResult};
31use crate::monitor::Hitmap;
32
33pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
34pub const DEFAULT_RESTART_INTERVAL: usize = 16;
35pub const DEFAULT_ENTRY_SIZE: usize = 24; // table_id(u64) + primary_key(u64) + epoch(u64)
36
37#[allow(non_camel_case_types)]
38#[derive(Clone, Copy, PartialEq, Eq, Debug)]
39pub enum LenType {
40    u8 = 1,
41    u16 = 2,
42    u32 = 3,
43}
44
45macro_rules! put_fn {
46    ($name:ident, $($value:ident: $type:ty),*) => {
47        fn $name<T: BufMut>(&self, buf: &mut T, $($value: $type),*) {
48            match *self {
49                LenType::u8 => {
50                    $(buf.put_u8($value as u8);)*
51                },
52
53                LenType::u16 => {
54                    $(buf.put_u16($value as u16);)*
55                },
56
57                LenType::u32 => {
58                    $(buf.put_u32($value as u32);)*
59                },
60            }
61        }
62    };
63}
64
65macro_rules! get_fn {
66    ($name:ident, $($type:ty),*) => {
67        #[allow(unused_parens)]
68        fn $name<T: Buf>(&self, buf: &mut T) -> ($($type), *) {
69            match *self {
70                LenType::u8 => {
71                    ($(buf.get_u8() as $type),*)
72                }
73                LenType::u16 => {
74                    ($(buf.get_u16() as $type),*)
75                }
76                LenType::u32 => {
77                    ($(buf.get_u32() as $type),*)
78                }
79            }
80        }
81    };
82}
83
84impl From<u8> for LenType {
85    fn from(value: u8) -> Self {
86        match value {
87            1 => LenType::u8,
88            2 => LenType::u16,
89            3 => LenType::u32,
90            _ => {
91                panic!("unexpected type {}", value)
92            }
93        }
94    }
95}
96
97impl LenType {
98    put_fn!(put, v1: usize);
99
100    put_fn!(put2, v1: usize, v2: usize);
101
102    get_fn!(get, usize);
103
104    get_fn!(get2, usize, usize);
105
106    fn new(len: usize) -> Self {
107        const U8_MAX: usize = u8::MAX as usize + 1;
108        const U16_MAX: usize = u16::MAX as usize + 1;
109        const U32_MAX: usize = u32::MAX as usize + 1;
110
111        match len {
112            0..U8_MAX => LenType::u8,
113            U8_MAX..U16_MAX => LenType::u16,
114            U16_MAX..U32_MAX => LenType::u32,
115            _ => unreachable!("unexpected LenType {}", len),
116        }
117    }
118
119    fn len(&self) -> usize {
120        match *self {
121            Self::u8 => size_of::<u8>(),
122            Self::u16 => size_of::<u16>(),
123            Self::u32 => size_of::<u32>(),
124        }
125    }
126}
127
128#[derive(Clone, Copy, Debug, PartialEq, Eq)]
129pub struct RestartPoint {
130    pub offset: u32,
131    pub key_len_type: LenType,
132    pub value_len_type: LenType,
133}
134
135impl RestartPoint {
136    fn size_of() -> usize {
137        // store key_len_type and value_len_type in u8 related to `BlockBuilder::build`
138        // encoding_value = (key_len_type << 4) | value_len_type
139        std::mem::size_of::<u32>() + std::mem::size_of::<LenType>()
140    }
141}
142
143pub struct Block {
144    /// Uncompressed entries data, with restart encoded restart points info.
145    data: Bytes,
146    /// Uncompressed entries data length.
147    data_len: usize,
148
149    /// Table id of this block.
150    table_id: TableId,
151
152    /// Restart points.
153    restart_points: Vec<RestartPoint>,
154
155    hitmap: Hitmap<{ Self::HITMAP_ELEMS }>,
156}
157
158impl Clone for Block {
159    fn clone(&self) -> Self {
160        Self {
161            data: self.data.clone(),
162            data_len: self.data_len,
163            table_id: self.table_id,
164            restart_points: self.restart_points.clone(),
165            hitmap: self.hitmap.clone(),
166        }
167    }
168}
169
170impl Debug for Block {
171    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172        f.debug_struct("Block")
173            .field("data_len", &self.data_len)
174            .field("table_id", &self.table_id)
175            .field("restart_points", &self.restart_points)
176            .finish()
177    }
178}
179
180impl Serialize for Block {
181    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
182    where
183        S: serde::Serializer,
184    {
185        serde_bytes::serialize(&self.data[..], serializer)
186    }
187}
188
189impl<'de> Deserialize<'de> for Block {
190    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
191    where
192        D: serde::Deserializer<'de>,
193    {
194        let data: Vec<u8> = serde_bytes::deserialize(deserializer)?;
195        let data = Bytes::from(data);
196        Ok(Block::decode_from_raw(data))
197    }
198}
199
200impl Block {
201    pub const HITMAP_ELEMS: usize = 4;
202
203    pub fn get_algorithm(buf: &Bytes) -> HummockResult<CompressionAlgorithm> {
204        let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
205        Ok(compression)
206    }
207
208    pub fn decode(buf: Bytes, uncompressed_capacity: usize) -> HummockResult<Self> {
209        Self::decode_with_copy(buf, uncompressed_capacity, false)
210    }
211
212    pub fn decode_with_copy(
213        buf: Bytes,
214        uncompressed_capacity: usize,
215        copy: bool,
216    ) -> HummockResult<Self> {
217        // Verify checksum.
218
219        let xxhash64_checksum = (&buf[buf.len() - 8..]).get_u64_le();
220        xxhash64_verify(&buf[..buf.len() - 8], xxhash64_checksum)?;
221
222        // Decompress.
223        let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
224        let compressed_data = &buf[..buf.len() - 9];
225
226        let buf = match compression {
227            CompressionAlgorithm::None => {
228                if copy {
229                    Bytes::copy_from_slice(&buf[0..(buf.len() - 9)])
230                } else {
231                    buf.slice(0..(buf.len() - 9))
232                }
233            }
234            CompressionAlgorithm::Lz4 => {
235                let mut decoder = lz4::Decoder::new(compressed_data.reader())
236                    .map_err(HummockError::decode_error)?;
237                let mut decoded = Vec::with_capacity(uncompressed_capacity);
238                let read_size = decoder
239                    .read_to_end(&mut decoded)
240                    .map_err(HummockError::decode_error)?;
241                assert_eq!(read_size, uncompressed_capacity);
242                Bytes::from(decoded)
243            }
244            CompressionAlgorithm::Zstd => {
245                let mut decoder = zstd::Decoder::new(compressed_data.reader())
246                    .map_err(HummockError::decode_error)?;
247                let mut decoded = Vec::with_capacity(uncompressed_capacity);
248                let read_size = decoder
249                    .read_to_end(&mut decoded)
250                    .map_err(HummockError::decode_error)?;
251                assert_eq!(read_size, uncompressed_capacity);
252                Bytes::from(decoded)
253            }
254        };
255
256        Ok(Self::decode_from_raw(buf))
257    }
258
259    pub fn decode_from_raw(buf: Bytes) -> Self {
260        let table_id = (&buf[buf.len() - 4..]).get_u32_le();
261        // decode restart_points_type_index
262        let n_index = ((&buf[buf.len() - 8..buf.len() - 4]).get_u32_le()) as usize;
263        let index_data_len = size_of::<u32>() + n_index * RestartPoint::size_of();
264        let data_len = buf.len() - 4 - index_data_len;
265        let mut restart_points_type_index_buf = &buf[data_len..buf.len() - 8];
266
267        let mut index_key_vec = Vec::with_capacity(n_index);
268        for _ in 0..n_index {
269            let offset = restart_points_type_index_buf.get_u32_le();
270            let value = restart_points_type_index_buf.get_u8();
271            let key_len_type = LenType::from(value >> 4);
272            let value_len_type = LenType::from(value & 0x0F);
273
274            index_key_vec.push(RestartPoint {
275                offset,
276                key_len_type,
277                value_len_type,
278            });
279        }
280
281        // Decode restart points.
282        let n_restarts = ((&buf[data_len - 4..]).get_u32_le()) as usize;
283        let restart_points_len = size_of::<u32>() + n_restarts * (size_of::<u32>());
284        let restarts_end = data_len - 4;
285        let data_len = data_len - restart_points_len;
286        let mut restart_points = Vec::with_capacity(n_restarts);
287        let mut restart_points_buf = &buf[data_len..restarts_end];
288
289        let mut type_index: usize = 0;
290
291        for _ in 0..n_restarts {
292            let offset = restart_points_buf.get_u32_le();
293            if type_index < index_key_vec.len() - 1
294                && offset >= index_key_vec[type_index + 1].offset
295            {
296                type_index += 1;
297            }
298
299            restart_points.push(RestartPoint {
300                offset,
301                key_len_type: index_key_vec[type_index].key_len_type,
302                value_len_type: index_key_vec[type_index].value_len_type,
303            });
304        }
305
306        Block {
307            data: buf,
308            data_len,
309            restart_points,
310            table_id: TableId::new(table_id),
311            hitmap: Hitmap::default(),
312        }
313    }
314
315    /// Entries data len.
316    #[expect(clippy::len_without_is_empty)]
317    pub fn len(&self) -> usize {
318        assert!(!self.data.is_empty());
319        self.data_len
320    }
321
322    pub fn capacity(&self) -> usize {
323        self.data.len()
324            + self.restart_points.capacity() * std::mem::size_of::<u32>()
325            + std::mem::size_of::<u32>()
326    }
327
328    pub fn table_id(&self) -> TableId {
329        self.table_id
330    }
331
332    /// Gets restart point by index.
333    pub fn restart_point(&self, index: usize) -> RestartPoint {
334        self.restart_points[index]
335    }
336
337    /// Gets restart point len.
338    pub fn restart_point_len(&self) -> usize {
339        self.restart_points.len()
340    }
341
342    /// Searches the index of the restart point by partition point.
343    pub fn search_restart_partition_point<P>(&self, pred: P) -> usize
344    where
345        P: FnMut(&RestartPoint) -> bool,
346    {
347        self.restart_points.partition_point(pred)
348    }
349
350    pub fn data(&self) -> &[u8] {
351        &self.data[..self.data_len]
352    }
353
354    pub fn raw(&self) -> &[u8] {
355        &self.data[..]
356    }
357
358    pub fn hitmap(&self) -> &Hitmap<{ Self::HITMAP_ELEMS }> {
359        &self.hitmap
360    }
361
362    pub fn efficiency(&self) -> f64 {
363        self.hitmap.ratio()
364    }
365}
366
367/// [`KeyPrefix`] contains info for prefix compression.
368#[derive(Debug)]
369pub struct KeyPrefix {
370    overlap: usize,
371    diff: usize,
372    value: usize,
373    /// Used for calculating range, won't be encoded.
374    offset: usize,
375
376    len: usize,
377}
378
379impl KeyPrefix {
380    // This function is used in BlockBuilder::add to provide a wrapper for encode since the
381    // KeyPrefix len field is only useful in the decode phase
382    pub fn new_without_len(overlap: usize, diff: usize, value: usize, offset: usize) -> Self {
383        KeyPrefix {
384            overlap,
385            diff,
386            value,
387            offset,
388            len: 0, // not used when encode
389        }
390    }
391}
392
393impl KeyPrefix {
394    pub fn encode(&self, buf: &mut impl BufMut, key_len_type: LenType, value_len_type: LenType) {
395        key_len_type.put2(buf, self.overlap, self.diff);
396        value_len_type.put(buf, self.value);
397    }
398
399    pub fn decode(
400        buf: &mut impl Buf,
401        offset: usize,
402        key_len_type: LenType,
403        value_len_type: LenType,
404    ) -> Self {
405        let (overlap, diff) = key_len_type.get2(buf);
406        let value = value_len_type.get(buf);
407
408        let len = key_len_type.len() * 2 + value_len_type.len();
409
410        Self {
411            overlap,
412            diff,
413            value,
414            offset,
415            len,
416        }
417    }
418
419    /// Encoded length.
420    fn len(&self) -> usize {
421        self.len
422    }
423
424    /// Gets overlap len.
425    pub fn overlap_len(&self) -> usize {
426        self.overlap
427    }
428
429    /// Gets diff key range.
430    pub fn diff_key_range(&self) -> Range<usize> {
431        self.offset + self.len()..self.offset + self.len() + self.diff
432    }
433
434    /// Gets value range.
435    pub fn value_range(&self) -> Range<usize> {
436        self.offset + self.len() + self.diff..self.offset + self.len() + self.diff + self.value
437    }
438
439    /// Gets entry len.
440    pub fn entry_len(&self) -> usize {
441        self.len() + self.diff + self.value
442    }
443}
444
445pub struct BlockBuilderOptions {
446    /// Reserved bytes size when creating buffer to avoid frequent allocating.
447    pub capacity: usize,
448    /// Compression algorithm.
449    pub compression_algorithm: CompressionAlgorithm,
450    /// Restart point interval.
451    pub restart_interval: usize,
452}
453
454impl Default for BlockBuilderOptions {
455    fn default() -> Self {
456        Self {
457            capacity: DEFAULT_BLOCK_SIZE,
458            compression_algorithm: CompressionAlgorithm::None,
459            restart_interval: DEFAULT_RESTART_INTERVAL,
460        }
461    }
462}
463
464/// [`BlockBuilder`] encodes and appends block to a buffer.
465pub struct BlockBuilder {
466    /// Write buffer.
467    buf: BytesMut,
468    /// Compress buffer
469    compress_buf: BytesMut,
470    /// Entry interval between restart points.
471    restart_count: usize,
472    /// Restart points.
473    restart_points: Vec<u32>,
474    /// Last key.
475    last_key: Vec<u8>,
476    /// Count of entries in current block.
477    entry_count: usize,
478    /// Compression algorithm.
479    compression_algorithm: CompressionAlgorithm,
480
481    table_id: Option<u32>,
482    // restart_points_type_index stores only the restart_point corresponding to each type change,
483    // as an index, in order to reduce space usage
484    restart_points_type_index: Vec<RestartPoint>,
485}
486
487impl BlockBuilder {
488    pub fn new(options: BlockBuilderOptions) -> Self {
489        Self {
490            // add more space to avoid re-allocate space. (for restart_points and restart_points_type_index)
491            buf: BytesMut::with_capacity(Self::buf_reserve_size(&options)),
492            compress_buf: BytesMut::default(),
493            restart_count: options.restart_interval,
494            restart_points: Vec::with_capacity(
495                options.capacity / DEFAULT_ENTRY_SIZE / options.restart_interval + 1,
496            ),
497            last_key: vec![],
498            entry_count: 0,
499            compression_algorithm: options.compression_algorithm,
500            table_id: None,
501            restart_points_type_index: Vec::default(),
502        }
503    }
504
505    pub fn add_for_test(&mut self, full_key: FullKey<&[u8]>, value: &[u8]) {
506        self.add(full_key, value);
507    }
508
509    /// Appends a kv pair to the block.
510    ///
511    /// NOTE: Key must be added in ASCEND order.
512    ///
513    /// # Format
514    ///
515    /// ```plain
516    /// entry (kv pair): | overlap len (len_type) | diff len (len_type) | value len(len_type) | diff key | value |
517    /// ```
518    ///
519    /// # Panics
520    ///
521    /// Panic if key is not added in ASCEND order.
522    pub fn add(&mut self, full_key: FullKey<&[u8]>, value: &[u8]) {
523        let input_table_id = full_key.user_key.table_id.table_id();
524        match self.table_id {
525            Some(current_table_id) => assert_eq!(current_table_id, input_table_id),
526            None => self.table_id = Some(input_table_id),
527        }
528        #[cfg(debug_assertions)]
529        self.debug_valid();
530
531        let mut key: BytesMut = Default::default();
532        full_key.encode_into_without_table_id(&mut key);
533        if self.entry_count > 0 {
534            debug_assert!(!key.is_empty());
535            debug_assert_eq!(
536                KeyComparator::compare_encoded_full_key(&self.last_key[..], &key[..]),
537                Ordering::Less,
538                "epoch: {}, table key: {}",
539                full_key.epoch_with_gap.pure_epoch(),
540                u64::from_be_bytes(
541                    full_key.user_key.table_key.as_ref()[0..8]
542                        .try_into()
543                        .unwrap()
544                ),
545            );
546        }
547        // Update restart point if needed and calculate diff key.
548        let k_type = LenType::new(key.len());
549        let v_type = LenType::new(value.len());
550
551        let type_mismatch = if let Some(RestartPoint {
552            offset: _,
553            key_len_type: last_key_len_type,
554            value_len_type: last_value_len_type,
555        }) = self.restart_points_type_index.last()
556        {
557            k_type != *last_key_len_type || v_type != *last_value_len_type
558        } else {
559            true
560        };
561
562        let diff_key = if self.entry_count % self.restart_count == 0 || type_mismatch {
563            let offset = utils::checked_into_u32(self.buf.len()).unwrap_or_else(|_| {
564                panic!(
565                    "WARN overflow can't convert buf_len {} into u32 table {:?}",
566                    self.buf.len(),
567                    self.table_id,
568                )
569            });
570
571            self.restart_points.push(offset);
572
573            if type_mismatch {
574                self.restart_points_type_index.push(RestartPoint {
575                    offset,
576                    key_len_type: k_type,
577                    value_len_type: v_type,
578                });
579            }
580
581            key.as_ref()
582        } else {
583            bytes_diff_below_max_key_length(&self.last_key, &key[..])
584        };
585
586        let prefix = KeyPrefix::new_without_len(
587            key.len() - diff_key.len(),
588            diff_key.len(),
589            value.len(),
590            self.buf.len(),
591        );
592
593        prefix.encode(&mut self.buf, k_type, v_type);
594        self.buf.put_slice(diff_key);
595        self.buf.put_slice(value);
596
597        self.last_key.clear();
598        self.last_key.extend_from_slice(&key);
599        self.entry_count += 1;
600    }
601
602    pub fn get_last_key(&self) -> &[u8] {
603        &self.last_key
604    }
605
606    pub fn is_empty(&self) -> bool {
607        self.buf.is_empty()
608    }
609
610    pub fn clear(&mut self) {
611        self.buf.clear();
612        self.restart_points.clear();
613        self.table_id = None;
614        self.restart_points_type_index.clear();
615        self.last_key.clear();
616        self.entry_count = 0;
617    }
618
619    /// Calculate block size without compression.
620    pub fn uncompressed_block_size(&mut self) -> usize {
621        self.buf.len()
622            + (self.restart_points.len() + 1) * std::mem::size_of::<u32>()
623            + (RestartPoint::size_of()) // (offset + len_type(u8)) * len
624                * self.restart_points_type_index.len()
625            + std::mem::size_of::<u32>() // restart_points_type_index len
626            + std::mem::size_of::<u32>() // table_id len
627    }
628
629    /// Finishes building block.
630    ///
631    /// # Format
632    ///
633    /// ```plain
634    /// compressed: | entries | restart point 0 (4B) | ... | restart point N-1 (4B) | N (4B) | restart point index 0 (5B)| ... | restart point index N-1 (5B) | N (4B) | table id (4B)
635    /// uncompressed: | compression method (1B) | xxhash64 checksum (8B) |
636    /// ```
637    ///
638    /// # Panics
639    ///
640    /// Panic if there is compression error.
641    pub fn build(&mut self) -> &[u8] {
642        assert!(
643            self.entry_count > 0,
644            "buf_len {} entry_count {} table {:?}",
645            self.buf.len(),
646            self.entry_count,
647            self.table_id
648        );
649
650        for restart_point in &self.restart_points {
651            self.buf.put_u32_le(*restart_point);
652        }
653
654        self.buf.put_u32_le(
655            utils::checked_into_u32(self.restart_points.len()).unwrap_or_else(|_| {
656                panic!(
657                    "WARN overflow can't convert restart_points_len {} into u32 table {:?}",
658                    self.restart_points.len(),
659                    self.table_id,
660                )
661            }),
662        );
663        for RestartPoint {
664            offset,
665            key_len_type,
666            value_len_type,
667        } in &self.restart_points_type_index
668        {
669            self.buf.put_u32_le(*offset);
670
671            let mut value: u8 = 0;
672            value |= *key_len_type as u8;
673            value <<= 4;
674            value |= *value_len_type as u8;
675
676            self.buf.put_u8(value);
677        }
678
679        self.buf.put_u32_le(
680            utils::checked_into_u32(self.restart_points_type_index.len()).unwrap_or_else(|_| {
681                panic!(
682                    "WARN overflow can't convert restart_points_type_index_len {} into u32 table {:?}",
683                    self.restart_points_type_index.len(),
684                    self.table_id,
685                )
686            }),
687        );
688
689        self.buf.put_u32_le(self.table_id.unwrap());
690        let result_buf = if self.compression_algorithm != CompressionAlgorithm::None {
691            self.compress_buf.clear();
692            self.compress_buf = Self::compress(
693                &self.buf[..],
694                self.compression_algorithm,
695                std::mem::take(&mut self.compress_buf),
696            );
697
698            &mut self.compress_buf
699        } else {
700            &mut self.buf
701        };
702
703        self.compression_algorithm.encode(result_buf);
704        let checksum = xxhash64_checksum(result_buf);
705        result_buf.put_u64_le(checksum);
706        assert!(
707            result_buf.len() < (u32::MAX) as usize,
708            "buf_len {} entry_count {} table {:?}",
709            result_buf.len(),
710            self.entry_count,
711            self.table_id
712        );
713
714        if self.compression_algorithm != CompressionAlgorithm::None {
715            self.compress_buf.as_ref()
716        } else {
717            self.buf.as_ref()
718        }
719    }
720
721    pub fn compress_block(
722        buf: Bytes,
723        target_compression: CompressionAlgorithm,
724    ) -> HummockResult<Bytes> {
725        // Verify checksum.
726        let checksum = (&buf[buf.len() - 8..]).get_u64_le();
727        xxhash64_verify(&buf[..buf.len() - 8], checksum)?;
728        // Decompress.
729        let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
730        let compressed_data = &buf[..buf.len() - 9];
731        assert_eq!(compression, CompressionAlgorithm::None);
732        let mut compress_writer = Self::compress(
733            compressed_data,
734            target_compression,
735            BytesMut::with_capacity(buf.len()),
736        );
737
738        target_compression.encode(&mut compress_writer);
739        let checksum = xxhash64_checksum(&compress_writer);
740        compress_writer.put_u64_le(checksum);
741        Ok(compress_writer.freeze())
742    }
743
744    pub fn compress(
745        buf: &[u8],
746        compression_algorithm: CompressionAlgorithm,
747        compress_writer: BytesMut,
748    ) -> BytesMut {
749        match compression_algorithm {
750            CompressionAlgorithm::None => unreachable!(),
751            CompressionAlgorithm::Lz4 => {
752                let mut encoder = lz4::EncoderBuilder::new()
753                    .level(4)
754                    .build(compress_writer.writer())
755                    .map_err(HummockError::encode_error)
756                    .unwrap();
757                encoder
758                    .write_all(buf)
759                    .map_err(HummockError::encode_error)
760                    .unwrap();
761                let (writer, result) = encoder.finish();
762                result.map_err(HummockError::encode_error).unwrap();
763                writer.into_inner()
764            }
765            CompressionAlgorithm::Zstd => {
766                let mut encoder = zstd::Encoder::new(compress_writer.writer(), 4)
767                    .map_err(HummockError::encode_error)
768                    .unwrap();
769                encoder
770                    .write_all(buf)
771                    .map_err(HummockError::encode_error)
772                    .unwrap();
773                let writer = encoder
774                    .finish()
775                    .map_err(HummockError::encode_error)
776                    .unwrap();
777                writer.into_inner()
778            }
779        }
780    }
781
782    /// Approximate block len (uncompressed).
783    pub fn approximate_len(&self) -> usize {
784        // block + restart_points + restart_points.len + restart_points_type_indices +
785        // restart_points_type_indics.len compression_algorithm + checksum
786        self.buf.len()
787            + std::mem::size_of::<u32>() * self.restart_points.len() // restart_points
788            + std::mem::size_of::<u32>() // restart_points.len
789            + RestartPoint::size_of() * self.restart_points_type_index.len() // restart_points_type_indics
790            + std::mem::size_of::<u32>() // restart_points_type_indics.len
791            + std::mem::size_of::<CompressionAlgorithm>() // compression_algorithm
792            + std::mem::size_of::<u64>() // checksum
793            + std::mem::size_of::<u32>() // table_id
794    }
795
796    pub fn debug_valid(&self) {
797        if self.entry_count == 0 {
798            debug_assert!(self.buf.is_empty());
799            debug_assert!(self.restart_points.is_empty());
800            debug_assert!(self.restart_points_type_index.is_empty());
801            debug_assert!(self.last_key.is_empty());
802        }
803    }
804
805    pub fn table_id(&self) -> Option<u32> {
806        self.table_id
807    }
808
809    fn buf_reserve_size(option: &BlockBuilderOptions) -> usize {
810        option.capacity + 1024 + 256
811    }
812}
813
814#[cfg(test)]
815mod tests {
816
817    use risingwave_common::util::epoch::test_epoch;
818    use risingwave_hummock_sdk::key::MAX_KEY_LEN;
819
820    use super::*;
821    use crate::hummock::{BlockHolder, BlockIterator};
822
823    #[test]
824    fn test_block_enc_dec() {
825        let options = BlockBuilderOptions::default();
826        let mut builder = BlockBuilder::new(options);
827        builder.add_for_test(construct_full_key_struct_for_test(0, b"k1", 1), b"v01");
828        builder.add_for_test(construct_full_key_struct_for_test(0, b"k2", 2), b"v02");
829        builder.add_for_test(construct_full_key_struct_for_test(0, b"k3", 3), b"v03");
830        builder.add_for_test(construct_full_key_struct_for_test(0, b"k4", 4), b"v04");
831        let capacity = builder.uncompressed_block_size();
832        assert_eq!(capacity, builder.approximate_len() - 9);
833        let buf = builder.build().to_vec();
834        let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
835        let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
836
837        bi.seek_to_first();
838        assert!(bi.is_valid());
839        assert_eq!(construct_full_key_struct_for_test(0, b"k1", 1), bi.key());
840        assert_eq!(b"v01", bi.value());
841
842        bi.next();
843        assert!(bi.is_valid());
844        assert_eq!(construct_full_key_struct_for_test(0, b"k2", 2), bi.key());
845        assert_eq!(b"v02", bi.value());
846
847        bi.next();
848        assert!(bi.is_valid());
849        assert_eq!(construct_full_key_struct_for_test(0, b"k3", 3), bi.key());
850        assert_eq!(b"v03", bi.value());
851
852        bi.next();
853        assert!(bi.is_valid());
854        assert_eq!(construct_full_key_struct_for_test(0, b"k4", 4), bi.key());
855        assert_eq!(b"v04", bi.value());
856
857        bi.next();
858        assert!(!bi.is_valid());
859    }
860
861    #[test]
862    fn test_compressed_block_enc_dec() {
863        inner_test_compressed(CompressionAlgorithm::Lz4);
864        inner_test_compressed(CompressionAlgorithm::Zstd);
865    }
866
867    fn inner_test_compressed(algo: CompressionAlgorithm) {
868        let options = BlockBuilderOptions {
869            compression_algorithm: algo,
870            ..Default::default()
871        };
872        let mut builder = BlockBuilder::new(options);
873        builder.add_for_test(construct_full_key_struct_for_test(0, b"k1", 1), b"v01");
874        builder.add_for_test(construct_full_key_struct_for_test(0, b"k2", 2), b"v02");
875        builder.add_for_test(construct_full_key_struct_for_test(0, b"k3", 3), b"v03");
876        builder.add_for_test(construct_full_key_struct_for_test(0, b"k4", 4), b"v04");
877        let capacity = builder.uncompressed_block_size();
878        assert_eq!(capacity, builder.approximate_len() - 9);
879        let buf = builder.build().to_vec();
880        let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
881        let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
882
883        bi.seek_to_first();
884        assert!(bi.is_valid());
885        assert_eq!(construct_full_key_struct_for_test(0, b"k1", 1), bi.key());
886        assert_eq!(b"v01", bi.value());
887
888        bi.next();
889        assert!(bi.is_valid());
890        assert_eq!(construct_full_key_struct_for_test(0, b"k2", 2), bi.key());
891        assert_eq!(b"v02", bi.value());
892
893        bi.next();
894        assert!(bi.is_valid());
895        assert_eq!(construct_full_key_struct_for_test(0, b"k3", 3), bi.key());
896        assert_eq!(b"v03", bi.value());
897
898        bi.next();
899        assert!(bi.is_valid());
900        assert_eq!(construct_full_key_struct_for_test(0, b"k4", 4), bi.key());
901        assert_eq!(b"v04", bi.value());
902
903        bi.next();
904        assert!(!bi.is_valid());
905    }
906
907    pub fn construct_full_key_struct_for_test(
908        table_id: u32,
909        table_key: &[u8],
910        epoch: u64,
911    ) -> FullKey<&[u8]> {
912        FullKey::for_test(TableId::new(table_id), table_key, test_epoch(epoch))
913    }
914
915    #[test]
916    fn test_block_enc_large_key() {
917        let options = BlockBuilderOptions::default();
918        let mut builder = BlockBuilder::new(options);
919        let medium_key = vec![b'a'; MAX_KEY_LEN - 500];
920        let large_key = vec![b'b'; MAX_KEY_LEN];
921        let xlarge_key = vec![b'c'; MAX_KEY_LEN + 500];
922
923        builder.add_for_test(construct_full_key_struct_for_test(0, &medium_key, 1), b"v1");
924        builder.add_for_test(construct_full_key_struct_for_test(0, &large_key, 2), b"v2");
925        builder.add_for_test(construct_full_key_struct_for_test(0, &xlarge_key, 3), b"v3");
926        let capacity = builder.uncompressed_block_size();
927        assert_eq!(capacity, builder.approximate_len() - 9);
928        let buf = builder.build().to_vec();
929        let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
930        let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
931
932        bi.seek_to_first();
933        assert!(bi.is_valid());
934        assert_eq!(
935            construct_full_key_struct_for_test(0, &medium_key, 1),
936            bi.key()
937        );
938        assert_eq!(b"v1", bi.value());
939
940        bi.next();
941        assert!(bi.is_valid());
942        assert_eq!(
943            construct_full_key_struct_for_test(0, &large_key, 2),
944            bi.key()
945        );
946        assert_eq!(b"v2", bi.value());
947
948        bi.next();
949        assert!(bi.is_valid());
950        assert_eq!(
951            construct_full_key_struct_for_test(0, &xlarge_key, 3),
952            bi.key()
953        );
954        assert_eq!(b"v3", bi.value());
955
956        bi.next();
957        assert!(!bi.is_valid());
958    }
959
960    #[test]
961    fn test_block_restart_point() {
962        let options = BlockBuilderOptions::default();
963        let mut builder = BlockBuilder::new(options);
964
965        const KEY_COUNT: u8 = 100;
966        const BUILDER_COUNT: u8 = 5;
967
968        for _ in 0..BUILDER_COUNT {
969            for index in 0..KEY_COUNT {
970                if index < 50 {
971                    let mut medium_key = vec![b'A'; MAX_KEY_LEN - 500];
972                    medium_key.push(index);
973                    builder
974                        .add_for_test(construct_full_key_struct_for_test(0, &medium_key, 1), b"v1");
975                } else if index < 80 {
976                    let mut large_key = vec![b'B'; MAX_KEY_LEN];
977                    large_key.push(index);
978                    builder
979                        .add_for_test(construct_full_key_struct_for_test(0, &large_key, 2), b"v2");
980                } else {
981                    let mut xlarge_key = vec![b'C'; MAX_KEY_LEN + 500];
982                    xlarge_key.push(index);
983                    builder
984                        .add_for_test(construct_full_key_struct_for_test(0, &xlarge_key, 3), b"v3");
985                }
986            }
987
988            let capacity = builder.uncompressed_block_size();
989            assert_eq!(capacity, builder.approximate_len() - 9);
990            let buf = builder.build().to_vec();
991            let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
992            let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
993            bi.seek_to_first();
994            assert!(bi.is_valid());
995
996            for index in 0..KEY_COUNT {
997                if index < 50 {
998                    let mut medium_key = vec![b'A'; MAX_KEY_LEN - 500];
999                    medium_key.push(index);
1000                    assert_eq!(
1001                        construct_full_key_struct_for_test(0, &medium_key, 1),
1002                        bi.key()
1003                    );
1004                } else if index < 80 {
1005                    let mut large_key = vec![b'B'; MAX_KEY_LEN];
1006                    large_key.push(index);
1007                    assert_eq!(
1008                        construct_full_key_struct_for_test(0, &large_key, 2),
1009                        bi.key()
1010                    );
1011                } else {
1012                    let mut xlarge_key = vec![b'C'; MAX_KEY_LEN + 500];
1013                    xlarge_key.push(index);
1014                    assert_eq!(
1015                        construct_full_key_struct_for_test(0, &xlarge_key, 3),
1016                        bi.key()
1017                    );
1018                }
1019                bi.next();
1020            }
1021
1022            assert!(!bi.is_valid());
1023            builder.clear();
1024        }
1025    }
1026
1027    #[test]
1028    fn test_block_serde() {
1029        fn assmut_serde<'de, T: Serialize + Deserialize<'de>>() {}
1030
1031        assmut_serde::<Block>();
1032        assmut_serde::<Box<Block>>();
1033
1034        let options = BlockBuilderOptions::default();
1035        let mut builder = BlockBuilder::new(options);
1036        for i in 0..100 {
1037            builder.add_for_test(
1038                construct_full_key_struct_for_test(0, format!("k{i:8}").as_bytes(), i),
1039                format!("v{i:8}").as_bytes(),
1040            );
1041        }
1042
1043        let capacity = builder.uncompressed_block_size();
1044        assert_eq!(capacity, builder.approximate_len() - 9);
1045        let buf = builder.build().to_vec();
1046
1047        let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
1048
1049        let buffer = bincode::serialize(&block).unwrap();
1050        let blk: Block = bincode::deserialize(&buffer).unwrap();
1051
1052        assert_eq!(block.data, blk.data);
1053        assert_eq!(block.data_len, blk.data_len);
1054        assert_eq!(block.table_id, blk.table_id,);
1055        assert_eq!(block.restart_points, blk.restart_points);
1056    }
1057}