risingwave_storage/hummock/sstable/
block.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::io::{Read, Write};
18use std::mem::size_of;
19use std::ops::Range;
20
21use bytes::{Buf, BufMut, Bytes, BytesMut};
22use risingwave_common::catalog::TableId;
23use risingwave_hummock_sdk::KeyComparator;
24use risingwave_hummock_sdk::key::{FullKey, TableKey};
25use serde::{Deserialize, Serialize};
26
27use super::utils::{CompressionAlgorithm, bytes_diff_below_max_key_length, xxhash64_verify};
28use crate::hummock::sstable::utils;
29use crate::hummock::sstable::utils::xxhash64_checksum;
30use crate::hummock::{HummockError, HummockResult};
31use crate::monitor::Hitmap;
32
33pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
34pub const DEFAULT_RESTART_INTERVAL: usize = 16;
35pub const DEFAULT_ENTRY_SIZE: usize = 24; // table_id(u64) + primary_key(u64) + epoch(u64)
36
37#[allow(non_camel_case_types)]
38#[derive(Clone, Copy, PartialEq, Eq, Debug)]
39pub enum LenType {
40    u8 = 1,
41    u16 = 2,
42    u32 = 3,
43}
44
45macro_rules! put_fn {
46    ($name:ident, $($value:ident: $type:ty),*) => {
47        fn $name<T: BufMut>(&self, buf: &mut T, $($value: $type),*) {
48            match *self {
49                LenType::u8 => {
50                    $(buf.put_u8($value as u8);)*
51                },
52
53                LenType::u16 => {
54                    $(buf.put_u16($value as u16);)*
55                },
56
57                LenType::u32 => {
58                    $(buf.put_u32($value as u32);)*
59                },
60            }
61        }
62    };
63}
64
65macro_rules! get_fn {
66    ($name:ident, $($type:ty),*) => {
67        #[allow(unused_parens)]
68        fn $name<T: Buf>(&self, buf: &mut T) -> ($($type), *) {
69            match *self {
70                LenType::u8 => {
71                    ($(buf.get_u8() as $type),*)
72                }
73                LenType::u16 => {
74                    ($(buf.get_u16() as $type),*)
75                }
76                LenType::u32 => {
77                    ($(buf.get_u32() as $type),*)
78                }
79            }
80        }
81    };
82}
83
84impl From<u8> for LenType {
85    fn from(value: u8) -> Self {
86        match value {
87            1 => LenType::u8,
88            2 => LenType::u16,
89            3 => LenType::u32,
90            _ => {
91                panic!("unexpected type {}", value)
92            }
93        }
94    }
95}
96
97impl LenType {
98    put_fn!(put, v1: usize);
99
100    put_fn!(put2, v1: usize, v2: usize);
101
102    get_fn!(get, usize);
103
104    get_fn!(get2, usize, usize);
105
106    fn new(len: usize) -> Self {
107        const U8_MAX: usize = u8::MAX as usize + 1;
108        const U16_MAX: usize = u16::MAX as usize + 1;
109        const U32_MAX: usize = u32::MAX as usize + 1;
110
111        match len {
112            0..U8_MAX => LenType::u8,
113            U8_MAX..U16_MAX => LenType::u16,
114            U16_MAX..U32_MAX => LenType::u32,
115            _ => unreachable!("unexpected LenType {}", len),
116        }
117    }
118
119    fn len(&self) -> usize {
120        match *self {
121            Self::u8 => size_of::<u8>(),
122            Self::u16 => size_of::<u16>(),
123            Self::u32 => size_of::<u32>(),
124        }
125    }
126}
127
128#[derive(Clone, Copy, Debug, PartialEq, Eq)]
129pub struct RestartPoint {
130    pub offset: u32,
131    pub key_len_type: LenType,
132    pub value_len_type: LenType,
133}
134
135impl RestartPoint {
136    fn size_of() -> usize {
137        // store key_len_type and value_len_type in u8 related to `BlockBuilder::build`
138        // encoding_value = (key_len_type << 4) | value_len_type
139        std::mem::size_of::<u32>() + std::mem::size_of::<LenType>()
140    }
141}
142
143pub struct Block {
144    /// Uncompressed entries data, with restart encoded restart points info.
145    data: Bytes,
146    /// Uncompressed entries data length.
147    data_len: usize,
148
149    /// Table id of this block.
150    table_id: TableId,
151
152    /// Restart points.
153    restart_points: Vec<RestartPoint>,
154
155    hitmap: Hitmap<{ Self::HITMAP_ELEMS }>,
156}
157
158impl Clone for Block {
159    fn clone(&self) -> Self {
160        Self {
161            data: self.data.clone(),
162            data_len: self.data_len,
163            table_id: self.table_id,
164            restart_points: self.restart_points.clone(),
165            hitmap: self.hitmap.clone(),
166        }
167    }
168}
169
170impl Debug for Block {
171    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172        f.debug_struct("Block")
173            .field("data_len", &self.data_len)
174            .field("table_id", &self.table_id)
175            .field("restart_points", &self.restart_points)
176            .finish()
177    }
178}
179
180impl Serialize for Block {
181    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
182    where
183        S: serde::Serializer,
184    {
185        serde_bytes::serialize(&self.data[..], serializer)
186    }
187}
188
189impl<'de> Deserialize<'de> for Block {
190    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
191    where
192        D: serde::Deserializer<'de>,
193    {
194        let data: Vec<u8> = serde_bytes::deserialize(deserializer)?;
195        let data = Bytes::from(data);
196        Ok(Block::decode_from_raw(data))
197    }
198}
199
200impl Block {
201    pub const HITMAP_ELEMS: usize = 4;
202
203    pub fn get_algorithm(buf: &Bytes) -> HummockResult<CompressionAlgorithm> {
204        let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
205        Ok(compression)
206    }
207
208    pub fn decode(buf: Bytes, uncompressed_capacity: usize) -> HummockResult<Self> {
209        Self::decode_with_copy(buf, uncompressed_capacity, false)
210    }
211
212    pub fn decode_with_copy(
213        buf: Bytes,
214        uncompressed_capacity: usize,
215        copy: bool,
216    ) -> HummockResult<Self> {
217        // Verify checksum.
218
219        let xxhash64_checksum = (&buf[buf.len() - 8..]).get_u64_le();
220        xxhash64_verify(&buf[..buf.len() - 8], xxhash64_checksum)?;
221
222        // Decompress.
223        let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
224        let compressed_data = &buf[..buf.len() - 9];
225
226        let buf = match compression {
227            CompressionAlgorithm::None => {
228                if copy {
229                    Bytes::copy_from_slice(&buf[0..(buf.len() - 9)])
230                } else {
231                    buf.slice(0..(buf.len() - 9))
232                }
233            }
234            CompressionAlgorithm::Lz4 => {
235                let mut decoder = lz4::Decoder::new(compressed_data.reader())
236                    .map_err(HummockError::decode_error)?;
237                let mut decoded = Vec::with_capacity(uncompressed_capacity);
238                let read_size = decoder
239                    .read_to_end(&mut decoded)
240                    .map_err(HummockError::decode_error)?;
241                assert_eq!(read_size, uncompressed_capacity);
242                Bytes::from(decoded)
243            }
244            CompressionAlgorithm::Zstd => {
245                let mut decoder = zstd::Decoder::new(compressed_data.reader())
246                    .map_err(HummockError::decode_error)?;
247                let mut decoded = Vec::with_capacity(uncompressed_capacity);
248                let read_size = decoder
249                    .read_to_end(&mut decoded)
250                    .map_err(HummockError::decode_error)?;
251                assert_eq!(read_size, uncompressed_capacity);
252                Bytes::from(decoded)
253            }
254        };
255
256        Ok(Self::decode_from_raw(buf))
257    }
258
259    pub fn decode_from_raw(buf: Bytes) -> Self {
260        let table_id = (&buf[buf.len() - 4..]).get_u32_le();
261        // decode restart_points_type_index
262        let n_index = ((&buf[buf.len() - 8..buf.len() - 4]).get_u32_le()) as usize;
263        let index_data_len = size_of::<u32>() + n_index * RestartPoint::size_of();
264        let data_len = buf.len() - 4 - index_data_len;
265        let mut restart_points_type_index_buf = &buf[data_len..buf.len() - 8];
266
267        let mut index_key_vec = Vec::with_capacity(n_index);
268        for _ in 0..n_index {
269            let offset = restart_points_type_index_buf.get_u32_le();
270            let value = restart_points_type_index_buf.get_u8();
271            let key_len_type = LenType::from(value >> 4);
272            let value_len_type = LenType::from(value & 0x0F);
273
274            index_key_vec.push(RestartPoint {
275                offset,
276                key_len_type,
277                value_len_type,
278            });
279        }
280
281        // Decode restart points.
282        let n_restarts = ((&buf[data_len - 4..]).get_u32_le()) as usize;
283        let restart_points_len = size_of::<u32>() + n_restarts * (size_of::<u32>());
284        let restarts_end = data_len - 4;
285        let data_len = data_len - restart_points_len;
286        let mut restart_points = Vec::with_capacity(n_restarts);
287        let mut restart_points_buf = &buf[data_len..restarts_end];
288
289        let mut type_index: usize = 0;
290
291        for _ in 0..n_restarts {
292            let offset = restart_points_buf.get_u32_le();
293            if type_index < index_key_vec.len() - 1
294                && offset >= index_key_vec[type_index + 1].offset
295            {
296                type_index += 1;
297            }
298
299            restart_points.push(RestartPoint {
300                offset,
301                key_len_type: index_key_vec[type_index].key_len_type,
302                value_len_type: index_key_vec[type_index].value_len_type,
303            });
304        }
305
306        Block {
307            data: buf,
308            data_len,
309            restart_points,
310            table_id: TableId::new(table_id),
311            hitmap: Hitmap::default(),
312        }
313    }
314
315    /// Entries data len.
316    #[expect(clippy::len_without_is_empty)]
317    pub fn len(&self) -> usize {
318        assert!(!self.data.is_empty());
319        self.data_len
320    }
321
322    pub fn capacity(&self) -> usize {
323        self.data.len()
324            + self.restart_points.capacity() * std::mem::size_of::<u32>()
325            + std::mem::size_of::<u32>()
326    }
327
328    pub fn table_id(&self) -> TableId {
329        self.table_id
330    }
331
332    /// Gets restart point by index.
333    pub fn restart_point(&self, index: usize) -> RestartPoint {
334        self.restart_points[index]
335    }
336
337    /// Gets restart point len.
338    pub fn restart_point_len(&self) -> usize {
339        self.restart_points.len()
340    }
341
342    /// Searches the index of the restart point by partition point.
343    pub fn search_restart_partition_point<P>(&self, pred: P) -> usize
344    where
345        P: FnMut(&RestartPoint) -> bool,
346    {
347        self.restart_points.partition_point(pred)
348    }
349
350    pub fn data(&self) -> &[u8] {
351        &self.data[..self.data_len]
352    }
353
354    pub fn raw(&self) -> &[u8] {
355        &self.data[..]
356    }
357
358    pub fn hitmap(&self) -> &Hitmap<{ Self::HITMAP_ELEMS }> {
359        &self.hitmap
360    }
361
362    pub fn efficiency(&self) -> f64 {
363        self.hitmap.ratio()
364    }
365}
366
367/// [`KeyPrefix`] contains info for prefix compression.
368#[derive(Debug)]
369pub struct KeyPrefix {
370    overlap: usize,
371    diff: usize,
372    value: usize,
373    /// Used for calculating range, won't be encoded.
374    offset: usize,
375
376    len: usize,
377}
378
379impl KeyPrefix {
380    // This function is used in BlockBuilder::add to provide a wrapper for encode since the
381    // KeyPrefix len field is only useful in the decode phase
382    pub fn new_without_len(overlap: usize, diff: usize, value: usize, offset: usize) -> Self {
383        KeyPrefix {
384            overlap,
385            diff,
386            value,
387            offset,
388            len: 0, // not used when encode
389        }
390    }
391}
392
393impl KeyPrefix {
394    pub fn encode(&self, buf: &mut impl BufMut, key_len_type: LenType, value_len_type: LenType) {
395        key_len_type.put2(buf, self.overlap, self.diff);
396        value_len_type.put(buf, self.value);
397    }
398
399    pub fn decode(
400        buf: &mut impl Buf,
401        offset: usize,
402        key_len_type: LenType,
403        value_len_type: LenType,
404    ) -> Self {
405        let (overlap, diff) = key_len_type.get2(buf);
406        let value = value_len_type.get(buf);
407
408        let len = key_len_type.len() * 2 + value_len_type.len();
409
410        Self {
411            overlap,
412            diff,
413            value,
414            offset,
415            len,
416        }
417    }
418
419    /// Encoded length.
420    fn len(&self) -> usize {
421        self.len
422    }
423
424    /// Gets overlap len.
425    pub fn overlap_len(&self) -> usize {
426        self.overlap
427    }
428
429    /// Gets diff key range.
430    pub fn diff_key_range(&self) -> Range<usize> {
431        self.offset + self.len()..self.offset + self.len() + self.diff
432    }
433
434    /// Gets value range.
435    pub fn value_range(&self) -> Range<usize> {
436        self.offset + self.len() + self.diff..self.offset + self.len() + self.diff + self.value
437    }
438
439    /// Gets entry len.
440    pub fn entry_len(&self) -> usize {
441        self.len() + self.diff + self.value
442    }
443}
444
445pub struct BlockBuilderOptions {
446    /// Reserved bytes size when creating buffer to avoid frequent allocating.
447    pub capacity: usize,
448    /// Compression algorithm.
449    pub compression_algorithm: CompressionAlgorithm,
450    /// Restart point interval.
451    pub restart_interval: usize,
452}
453
454impl Default for BlockBuilderOptions {
455    fn default() -> Self {
456        Self {
457            capacity: DEFAULT_BLOCK_SIZE,
458            compression_algorithm: CompressionAlgorithm::None,
459            restart_interval: DEFAULT_RESTART_INTERVAL,
460        }
461    }
462}
463
464/// [`BlockBuilder`] encodes and appends block to a buffer.
465pub struct BlockBuilder {
466    /// Write buffer.
467    buf: BytesMut,
468    /// Compress buffer
469    compress_buf: BytesMut,
470    /// Entry interval between restart points.
471    restart_count: usize,
472    /// Restart points.
473    restart_points: Vec<u32>,
474    /// Last key.
475    last_key: Vec<u8>,
476    /// Count of entries in current block.
477    entry_count: usize,
478    /// Compression algorithm.
479    compression_algorithm: CompressionAlgorithm,
480
481    table_id: Option<TableId>,
482    // restart_points_type_index stores only the restart_point corresponding to each type change,
483    // as an index, in order to reduce space usage
484    restart_points_type_index: Vec<RestartPoint>,
485}
486
487impl BlockBuilder {
488    pub fn new(options: BlockBuilderOptions) -> Self {
489        Self {
490            // add more space to avoid re-allocate space. (for restart_points and restart_points_type_index)
491            buf: BytesMut::with_capacity(Self::buf_reserve_size(&options)),
492            compress_buf: BytesMut::default(),
493            restart_count: options.restart_interval,
494            restart_points: Vec::with_capacity(
495                options.capacity / DEFAULT_ENTRY_SIZE / options.restart_interval + 1,
496            ),
497            last_key: vec![],
498            entry_count: 0,
499            compression_algorithm: options.compression_algorithm,
500            table_id: None,
501            restart_points_type_index: Vec::default(),
502        }
503    }
504
505    /// Appends a kv pair to the block using pre-encoded key.
506    ///
507    /// This method accepts a pre-encoded key (`table_key` + `epoch`, without `table_id`)
508    /// to avoid redundant encoding operations. This is useful when the caller
509    /// has already encoded the full key and wants to reuse it.
510    ///
511    /// NOTE: Key must be added in ASCEND order.
512    ///
513    /// # Format
514    ///
515    /// ```plain
516    /// entry (kv pair): | overlap len (len_type) | diff len (len_type) | value len(len_type) | diff key | value |
517    /// ```
518    ///
519    /// # Panics
520    ///
521    /// Panic if key is not added in ASCEND order or `table_id` mismatch.
522    pub fn add(&mut self, table_id: TableId, encoded_key_without_table_id: &[u8], value: &[u8]) {
523        match self.table_id {
524            Some(current_table_id) => assert_eq!(current_table_id, table_id),
525            None => self.table_id = Some(table_id),
526        }
527        #[cfg(debug_assertions)]
528        self.debug_valid();
529
530        // Call the core implementation with the pre-encoded key
531        self.add_encoded_impl(encoded_key_without_table_id, value);
532    }
533
534    #[cfg(any(test, feature = "test"))]
535    pub fn add_for_test(&mut self, full_key: FullKey<&[u8]>, value: &[u8]) {
536        use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
537        let input_table_id = full_key.user_key.table_id;
538        match self.table_id {
539            Some(current_table_id) => assert_eq!(current_table_id, input_table_id),
540            None => self.table_id = Some(input_table_id),
541        }
542        #[cfg(debug_assertions)]
543        self.debug_valid();
544
545        let mut key: BytesMut = Default::default();
546        full_key.encode_into(&mut key);
547
548        // Slice off the table_id prefix to get the key without table_id
549        let encoded_key_without_table_id = &key[TABLE_PREFIX_LEN..];
550
551        // Call the core implementation with encoded key
552        self.add_encoded_impl(encoded_key_without_table_id, value);
553    }
554
555    /// Core implementation that accepts pre-encoded key.
556    /// This is used by both `add` and `add_for_test` to avoid code duplication.
557    fn add_encoded_impl(&mut self, key: &[u8], value: &[u8]) {
558        if self.entry_count > 0 {
559            debug_assert!(!key.is_empty());
560            debug_assert_eq!(
561                KeyComparator::compare_encoded_full_key(&self.last_key[..], key),
562                Ordering::Less,
563            );
564        }
565        // Update restart point if needed and calculate diff key.
566        let k_type = LenType::new(key.len());
567        let v_type = LenType::new(value.len());
568
569        let type_mismatch = if let Some(RestartPoint {
570            offset: _,
571            key_len_type: last_key_len_type,
572            value_len_type: last_value_len_type,
573        }) = self.restart_points_type_index.last()
574        {
575            k_type != *last_key_len_type || v_type != *last_value_len_type
576        } else {
577            true
578        };
579
580        let diff_key = if self.entry_count.is_multiple_of(self.restart_count) || type_mismatch {
581            let offset = utils::checked_into_u32(self.buf.len()).unwrap_or_else(|_| {
582                panic!(
583                    "WARN overflow can't convert buf_len {} into u32 table {:?}",
584                    self.buf.len(),
585                    self.table_id,
586                )
587            });
588
589            self.restart_points.push(offset);
590
591            if type_mismatch {
592                self.restart_points_type_index.push(RestartPoint {
593                    offset,
594                    key_len_type: k_type,
595                    value_len_type: v_type,
596                });
597            }
598
599            key
600        } else {
601            bytes_diff_below_max_key_length(&self.last_key, key)
602        };
603
604        let prefix = KeyPrefix::new_without_len(
605            key.len() - diff_key.len(),
606            diff_key.len(),
607            value.len(),
608            self.buf.len(),
609        );
610
611        prefix.encode(&mut self.buf, k_type, v_type);
612        self.buf.put_slice(diff_key);
613        self.buf.put_slice(value);
614
615        self.last_key.clear();
616        self.last_key.extend_from_slice(key);
617        self.entry_count += 1;
618    }
619
620    pub fn get_last_key(&self) -> &[u8] {
621        &self.last_key
622    }
623
624    pub fn is_empty(&self) -> bool {
625        self.buf.is_empty()
626    }
627
628    pub fn clear(&mut self) {
629        self.buf.clear();
630        self.restart_points.clear();
631        self.table_id = None;
632        self.restart_points_type_index.clear();
633        self.last_key.clear();
634        self.entry_count = 0;
635    }
636
637    /// Calculate block size without compression.
638    pub fn uncompressed_block_size(&mut self) -> usize {
639        self.buf.len()
640            + (self.restart_points.len() + 1) * std::mem::size_of::<u32>()
641            + (RestartPoint::size_of()) // (offset + len_type(u8)) * len
642                * self.restart_points_type_index.len()
643            + std::mem::size_of::<u32>() // restart_points_type_index len
644            + std::mem::size_of::<u32>() // table_id len
645    }
646
647    /// Finishes building block.
648    ///
649    /// # Format
650    ///
651    /// ```plain
652    /// compressed: | entries | restart point 0 (4B) | ... | restart point N-1 (4B) | N (4B) | restart point index 0 (5B)| ... | restart point index N-1 (5B) | N (4B) | table id (4B)
653    /// uncompressed: | compression method (1B) | xxhash64 checksum (8B) |
654    /// ```
655    ///
656    /// # Panics
657    ///
658    /// Panic if there is compression error.
659    pub fn build(&mut self) -> &[u8] {
660        assert!(
661            self.entry_count > 0,
662            "buf_len {} entry_count {} table {:?}",
663            self.buf.len(),
664            self.entry_count,
665            self.table_id
666        );
667
668        for restart_point in &self.restart_points {
669            self.buf.put_u32_le(*restart_point);
670        }
671
672        self.buf.put_u32_le(
673            utils::checked_into_u32(self.restart_points.len()).unwrap_or_else(|_| {
674                panic!(
675                    "WARN overflow can't convert restart_points_len {} into u32 table {:?}",
676                    self.restart_points.len(),
677                    self.table_id,
678                )
679            }),
680        );
681        for RestartPoint {
682            offset,
683            key_len_type,
684            value_len_type,
685        } in &self.restart_points_type_index
686        {
687            self.buf.put_u32_le(*offset);
688
689            let mut value: u8 = 0;
690            value |= *key_len_type as u8;
691            value <<= 4;
692            value |= *value_len_type as u8;
693
694            self.buf.put_u8(value);
695        }
696
697        self.buf.put_u32_le(
698            utils::checked_into_u32(self.restart_points_type_index.len()).unwrap_or_else(|_| {
699                panic!(
700                    "WARN overflow can't convert restart_points_type_index_len {} into u32 table {:?}",
701                    self.restart_points_type_index.len(),
702                    self.table_id,
703                )
704            }),
705        );
706
707        self.buf.put_u32_le(self.table_id.unwrap().as_raw_id());
708        let result_buf = if self.compression_algorithm != CompressionAlgorithm::None {
709            self.compress_buf.clear();
710            self.compress_buf = Self::compress(
711                &self.buf[..],
712                self.compression_algorithm,
713                std::mem::take(&mut self.compress_buf),
714            );
715
716            &mut self.compress_buf
717        } else {
718            &mut self.buf
719        };
720
721        self.compression_algorithm.encode(result_buf);
722        let checksum = xxhash64_checksum(result_buf);
723        result_buf.put_u64_le(checksum);
724        assert!(
725            result_buf.len() < (u32::MAX) as usize,
726            "buf_len {} entry_count {} table {:?}",
727            result_buf.len(),
728            self.entry_count,
729            self.table_id
730        );
731
732        if self.compression_algorithm != CompressionAlgorithm::None {
733            self.compress_buf.as_ref()
734        } else {
735            self.buf.as_ref()
736        }
737    }
738
739    pub fn compress_block(
740        buf: Bytes,
741        target_compression: CompressionAlgorithm,
742    ) -> HummockResult<Bytes> {
743        // Verify checksum.
744        let checksum = (&buf[buf.len() - 8..]).get_u64_le();
745        xxhash64_verify(&buf[..buf.len() - 8], checksum)?;
746        // Decompress.
747        let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
748        let compressed_data = &buf[..buf.len() - 9];
749        assert_eq!(compression, CompressionAlgorithm::None);
750        let mut compress_writer = Self::compress(
751            compressed_data,
752            target_compression,
753            BytesMut::with_capacity(buf.len()),
754        );
755
756        target_compression.encode(&mut compress_writer);
757        let checksum = xxhash64_checksum(&compress_writer);
758        compress_writer.put_u64_le(checksum);
759        Ok(compress_writer.freeze())
760    }
761
762    pub fn compress(
763        buf: &[u8],
764        compression_algorithm: CompressionAlgorithm,
765        compress_writer: BytesMut,
766    ) -> BytesMut {
767        match compression_algorithm {
768            CompressionAlgorithm::None => unreachable!(),
769            CompressionAlgorithm::Lz4 => {
770                let mut encoder = lz4::EncoderBuilder::new()
771                    .level(4)
772                    .build(compress_writer.writer())
773                    .map_err(HummockError::encode_error)
774                    .unwrap();
775                encoder
776                    .write_all(buf)
777                    .map_err(HummockError::encode_error)
778                    .unwrap();
779                let (writer, result) = encoder.finish();
780                result.map_err(HummockError::encode_error).unwrap();
781                writer.into_inner()
782            }
783            CompressionAlgorithm::Zstd => {
784                let mut encoder = zstd::Encoder::new(compress_writer.writer(), 4)
785                    .map_err(HummockError::encode_error)
786                    .unwrap();
787                encoder
788                    .write_all(buf)
789                    .map_err(HummockError::encode_error)
790                    .unwrap();
791                let writer = encoder
792                    .finish()
793                    .map_err(HummockError::encode_error)
794                    .unwrap();
795                writer.into_inner()
796            }
797        }
798    }
799
800    /// Approximate block len (uncompressed).
801    pub fn approximate_len(&self) -> usize {
802        // block + restart_points + restart_points.len + restart_points_type_indices +
803        // restart_points_type_indics.len compression_algorithm + checksum
804        self.buf.len()
805            + std::mem::size_of::<u32>() * self.restart_points.len() // restart_points
806            + std::mem::size_of::<u32>() // restart_points.len
807            + RestartPoint::size_of() * self.restart_points_type_index.len() // restart_points_type_indics
808            + std::mem::size_of::<u32>() // restart_points_type_indics.len
809            + std::mem::size_of::<CompressionAlgorithm>() // compression_algorithm
810            + std::mem::size_of::<u64>() // checksum
811            + std::mem::size_of::<u32>() // table_id
812    }
813
814    pub fn debug_valid(&self) {
815        if self.entry_count == 0 {
816            debug_assert!(self.buf.is_empty());
817            debug_assert!(self.restart_points.is_empty());
818            debug_assert!(self.restart_points_type_index.is_empty());
819            debug_assert!(self.last_key.is_empty());
820        }
821    }
822
823    pub fn table_id(&self) -> Option<TableId> {
824        self.table_id
825    }
826
827    fn buf_reserve_size(option: &BlockBuilderOptions) -> usize {
828        option.capacity + 1024 + 256
829    }
830}
831
832/// Attempts to shorten `block_smallest` key while preserving block boundary correctness.
833///
834/// Returns a shortened key if `prev_block_last < shortened <= block_smallest` can be satisfied
835/// with fewer bytes. This reduces block metadata size by exploiting common prefixes between
836/// adjacent blocks.
837///
838/// Returns `None` if the key cannot be shortened (e.g., keys differ only in the last byte).
839pub fn try_shorten_block_smallest_key(
840    prev_block_last: &FullKey<&[u8]>,
841    block_smallest: &FullKey<&[u8]>,
842) -> Option<FullKey<Vec<u8>>> {
843    /// Returns the length of the longest common prefix between two byte slices.
844    fn lcp_len(a: &[u8], b: &[u8]) -> usize {
845        let min_len = a.len().min(b.len());
846        for i in 0..min_len {
847            if a[i] != b[i] {
848                return i;
849            }
850        }
851        min_len
852    }
853
854    let prev_table_key = prev_block_last.user_key.table_key.as_ref();
855    let next_table_key = block_smallest.user_key.table_key.as_ref();
856    assert_eq!(
857        prev_block_last.user_key.table_id, block_smallest.user_key.table_id,
858        "table ids must match for shortening block smallest key"
859    );
860
861    let lcp = lcp_len(prev_table_key, next_table_key);
862
863    // Shortened key = LCP prefix + first differing byte.
864    // Only shorten if the result is strictly shorter than next_table_key.
865    let shortened_len = lcp + 1;
866    if shortened_len >= next_table_key.len() {
867        return None;
868    }
869
870    // Build candidate key: take LCP + 1 bytes from block_smallest
871    let cand = FullKey::new_with_gap_epoch(
872        block_smallest.user_key.table_id,
873        TableKey(&next_table_key[..shortened_len]),
874        block_smallest.epoch_with_gap,
875    );
876
877    // Invariant: prev_block_last < cand <= block_smallest
878    // This must hold by construction. If violated, it indicates a bug in the shortening logic.
879    // Use assert! (not debug_assert!) to fail fast and prevent writing incorrect keys to storage.
880    assert!(
881        prev_block_last.cmp(&cand).is_lt(),
882        "Invariant violated: prev_block_last >= cand. prev={:?}, cand={:?}",
883        prev_block_last,
884        cand
885    );
886    assert!(
887        cand.cmp(block_smallest).is_le(),
888        "Invariant violated: cand > block_smallest. cand={:?}, block_smallest={:?}",
889        cand,
890        block_smallest
891    );
892
893    Some(cand.copy_into())
894}
895
896#[cfg(test)]
897mod tests {
898
899    use risingwave_common::util::epoch::test_epoch;
900    use risingwave_hummock_sdk::key::MAX_KEY_LEN;
901
902    use super::*;
903    use crate::hummock::{BlockHolder, BlockIterator};
904
905    #[test]
906    fn test_block_enc_dec() {
907        let options = BlockBuilderOptions::default();
908        let mut builder = BlockBuilder::new(options);
909        builder.add_for_test(construct_full_key_struct_for_test(0, b"k1", 1), b"v01");
910        builder.add_for_test(construct_full_key_struct_for_test(0, b"k2", 2), b"v02");
911        builder.add_for_test(construct_full_key_struct_for_test(0, b"k3", 3), b"v03");
912        builder.add_for_test(construct_full_key_struct_for_test(0, b"k4", 4), b"v04");
913        let capacity = builder.uncompressed_block_size();
914        assert_eq!(capacity, builder.approximate_len() - 9);
915        let buf = builder.build().to_vec();
916        let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
917        let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
918
919        bi.seek_to_first();
920        assert!(bi.is_valid());
921        assert_eq!(construct_full_key_struct_for_test(0, b"k1", 1), bi.key());
922        assert_eq!(b"v01", bi.value());
923
924        bi.next();
925        assert!(bi.is_valid());
926        assert_eq!(construct_full_key_struct_for_test(0, b"k2", 2), bi.key());
927        assert_eq!(b"v02", bi.value());
928
929        bi.next();
930        assert!(bi.is_valid());
931        assert_eq!(construct_full_key_struct_for_test(0, b"k3", 3), bi.key());
932        assert_eq!(b"v03", bi.value());
933
934        bi.next();
935        assert!(bi.is_valid());
936        assert_eq!(construct_full_key_struct_for_test(0, b"k4", 4), bi.key());
937        assert_eq!(b"v04", bi.value());
938
939        bi.next();
940        assert!(!bi.is_valid());
941    }
942
943    #[test]
944    fn test_compressed_block_enc_dec() {
945        inner_test_compressed(CompressionAlgorithm::Lz4);
946        inner_test_compressed(CompressionAlgorithm::Zstd);
947    }
948
949    fn inner_test_compressed(algo: CompressionAlgorithm) {
950        let options = BlockBuilderOptions {
951            compression_algorithm: algo,
952            ..Default::default()
953        };
954        let mut builder = BlockBuilder::new(options);
955        builder.add_for_test(construct_full_key_struct_for_test(0, b"k1", 1), b"v01");
956        builder.add_for_test(construct_full_key_struct_for_test(0, b"k2", 2), b"v02");
957        builder.add_for_test(construct_full_key_struct_for_test(0, b"k3", 3), b"v03");
958        builder.add_for_test(construct_full_key_struct_for_test(0, b"k4", 4), b"v04");
959        let capacity = builder.uncompressed_block_size();
960        assert_eq!(capacity, builder.approximate_len() - 9);
961        let buf = builder.build().to_vec();
962        let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
963        let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
964
965        bi.seek_to_first();
966        assert!(bi.is_valid());
967        assert_eq!(construct_full_key_struct_for_test(0, b"k1", 1), bi.key());
968        assert_eq!(b"v01", bi.value());
969
970        bi.next();
971        assert!(bi.is_valid());
972        assert_eq!(construct_full_key_struct_for_test(0, b"k2", 2), bi.key());
973        assert_eq!(b"v02", bi.value());
974
975        bi.next();
976        assert!(bi.is_valid());
977        assert_eq!(construct_full_key_struct_for_test(0, b"k3", 3), bi.key());
978        assert_eq!(b"v03", bi.value());
979
980        bi.next();
981        assert!(bi.is_valid());
982        assert_eq!(construct_full_key_struct_for_test(0, b"k4", 4), bi.key());
983        assert_eq!(b"v04", bi.value());
984
985        bi.next();
986        assert!(!bi.is_valid());
987    }
988
989    pub fn construct_full_key_struct_for_test(
990        table_id: u32,
991        table_key: &[u8],
992        epoch: u64,
993    ) -> FullKey<&[u8]> {
994        FullKey::for_test(TableId::new(table_id), table_key, test_epoch(epoch))
995    }
996
997    #[test]
998    fn test_block_enc_large_key() {
999        let options = BlockBuilderOptions::default();
1000        let mut builder = BlockBuilder::new(options);
1001        let medium_key = vec![b'a'; MAX_KEY_LEN - 500];
1002        let large_key = vec![b'b'; MAX_KEY_LEN];
1003        let xlarge_key = vec![b'c'; MAX_KEY_LEN + 500];
1004
1005        builder.add_for_test(construct_full_key_struct_for_test(0, &medium_key, 1), b"v1");
1006        builder.add_for_test(construct_full_key_struct_for_test(0, &large_key, 2), b"v2");
1007        builder.add_for_test(construct_full_key_struct_for_test(0, &xlarge_key, 3), b"v3");
1008        let capacity = builder.uncompressed_block_size();
1009        assert_eq!(capacity, builder.approximate_len() - 9);
1010        let buf = builder.build().to_vec();
1011        let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
1012        let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
1013
1014        bi.seek_to_first();
1015        assert!(bi.is_valid());
1016        assert_eq!(
1017            construct_full_key_struct_for_test(0, &medium_key, 1),
1018            bi.key()
1019        );
1020        assert_eq!(b"v1", bi.value());
1021
1022        bi.next();
1023        assert!(bi.is_valid());
1024        assert_eq!(
1025            construct_full_key_struct_for_test(0, &large_key, 2),
1026            bi.key()
1027        );
1028        assert_eq!(b"v2", bi.value());
1029
1030        bi.next();
1031        assert!(bi.is_valid());
1032        assert_eq!(
1033            construct_full_key_struct_for_test(0, &xlarge_key, 3),
1034            bi.key()
1035        );
1036        assert_eq!(b"v3", bi.value());
1037
1038        bi.next();
1039        assert!(!bi.is_valid());
1040    }
1041
1042    #[test]
1043    fn test_block_restart_point() {
1044        let options = BlockBuilderOptions::default();
1045        let mut builder = BlockBuilder::new(options);
1046
1047        const KEY_COUNT: u8 = 100;
1048        const BUILDER_COUNT: u8 = 5;
1049
1050        for _ in 0..BUILDER_COUNT {
1051            for index in 0..KEY_COUNT {
1052                if index < 50 {
1053                    let mut medium_key = vec![b'A'; MAX_KEY_LEN - 500];
1054                    medium_key.push(index);
1055                    builder
1056                        .add_for_test(construct_full_key_struct_for_test(0, &medium_key, 1), b"v1");
1057                } else if index < 80 {
1058                    let mut large_key = vec![b'B'; MAX_KEY_LEN];
1059                    large_key.push(index);
1060                    builder
1061                        .add_for_test(construct_full_key_struct_for_test(0, &large_key, 2), b"v2");
1062                } else {
1063                    let mut xlarge_key = vec![b'C'; MAX_KEY_LEN + 500];
1064                    xlarge_key.push(index);
1065                    builder
1066                        .add_for_test(construct_full_key_struct_for_test(0, &xlarge_key, 3), b"v3");
1067                }
1068            }
1069
1070            let capacity = builder.uncompressed_block_size();
1071            assert_eq!(capacity, builder.approximate_len() - 9);
1072            let buf = builder.build().to_vec();
1073            let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
1074            let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
1075            bi.seek_to_first();
1076            assert!(bi.is_valid());
1077
1078            for index in 0..KEY_COUNT {
1079                if index < 50 {
1080                    let mut medium_key = vec![b'A'; MAX_KEY_LEN - 500];
1081                    medium_key.push(index);
1082                    assert_eq!(
1083                        construct_full_key_struct_for_test(0, &medium_key, 1),
1084                        bi.key()
1085                    );
1086                } else if index < 80 {
1087                    let mut large_key = vec![b'B'; MAX_KEY_LEN];
1088                    large_key.push(index);
1089                    assert_eq!(
1090                        construct_full_key_struct_for_test(0, &large_key, 2),
1091                        bi.key()
1092                    );
1093                } else {
1094                    let mut xlarge_key = vec![b'C'; MAX_KEY_LEN + 500];
1095                    xlarge_key.push(index);
1096                    assert_eq!(
1097                        construct_full_key_struct_for_test(0, &xlarge_key, 3),
1098                        bi.key()
1099                    );
1100                }
1101                bi.next();
1102            }
1103
1104            assert!(!bi.is_valid());
1105            builder.clear();
1106        }
1107    }
1108
1109    #[test]
1110    fn test_block_serde() {
1111        fn assmut_serde<'de, T: Serialize + Deserialize<'de>>() {}
1112
1113        assmut_serde::<Block>();
1114        assmut_serde::<Box<Block>>();
1115
1116        let options = BlockBuilderOptions::default();
1117        let mut builder = BlockBuilder::new(options);
1118        for i in 0..100 {
1119            builder.add_for_test(
1120                construct_full_key_struct_for_test(0, format!("k{i:8}").as_bytes(), i),
1121                format!("v{i:8}").as_bytes(),
1122            );
1123        }
1124
1125        let capacity = builder.uncompressed_block_size();
1126        assert_eq!(capacity, builder.approximate_len() - 9);
1127        let buf = builder.build().to_vec();
1128
1129        let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
1130
1131        let buffer = bincode::serialize(&block).unwrap();
1132        let blk: Block = bincode::deserialize(&buffer).unwrap();
1133
1134        assert_eq!(block.data, blk.data);
1135        assert_eq!(block.data_len, blk.data_len);
1136        assert_eq!(block.table_id, blk.table_id,);
1137        assert_eq!(block.restart_points, blk.restart_points);
1138    }
1139
1140    #[test]
1141    fn test_try_shorten_block_smallest_key() {
1142        use risingwave_hummock_sdk::EpochWithGap;
1143
1144        fn make_full_key(table_id: u32, table_key: &[u8], epoch: u64) -> FullKey<&[u8]> {
1145            FullKey::new_with_gap_epoch(
1146                TableId::new(table_id),
1147                TableKey(table_key),
1148                EpochWithGap::new_from_epoch(test_epoch(epoch)),
1149            )
1150        }
1151
1152        /// Verifies the invariant: prev < shortened <= next using `FullKey::cmp`
1153        fn assert_invariant(
1154            prev: &FullKey<&[u8]>,
1155            shortened: &FullKey<Vec<u8>>,
1156            next: &FullKey<&[u8]>,
1157        ) {
1158            assert!(
1159                prev.cmp(&shortened.to_ref()).is_lt(),
1160                "Invariant violated: prev >= shortened. prev={:?}, shortened={:?}",
1161                prev,
1162                shortened
1163            );
1164            assert!(
1165                shortened.to_ref().cmp(next).is_le(),
1166                "Invariant violated: shortened > next. shortened={:?}, next={:?}",
1167                shortened,
1168                next
1169            );
1170        }
1171
1172        // Case 1: Basic shortening - prev="abc", next="abdef" -> cand="abd"
1173        {
1174            let prev = make_full_key(1, b"abc", 100);
1175            let next = make_full_key(1, b"abdef", 100);
1176            let result = try_shorten_block_smallest_key(&prev, &next);
1177            assert!(result.is_some());
1178            let shortened = result.unwrap();
1179            assert_eq!(shortened.user_key.table_key.as_ref(), b"abd");
1180            assert_invariant(&prev, &shortened, &next);
1181        }
1182
1183        // Case 2: Prefix case - prev="ab", next="abcd" -> cand="abc"
1184        {
1185            let prev = make_full_key(1, b"ab", 100);
1186            let next = make_full_key(1, b"abcd", 100);
1187            let result = try_shorten_block_smallest_key(&prev, &next);
1188            assert!(result.is_some());
1189            let shortened = result.unwrap();
1190            assert_eq!(shortened.user_key.table_key.as_ref(), b"abc");
1191            assert_invariant(&prev, &shortened, &next);
1192        }
1193
1194        // Case 3: Cannot shorten - only 1 char difference at end
1195        {
1196            let prev = make_full_key(1, b"abc", 100);
1197            let next = make_full_key(1, b"abd", 100);
1198            assert!(try_shorten_block_smallest_key(&prev, &next).is_none());
1199        }
1200
1201        // Case 4: Cannot shorten - next is only 1 char longer than LCP
1202        {
1203            let prev = make_full_key(1, b"abc", 100);
1204            let next = make_full_key(1, b"abcd", 100);
1205            assert!(try_shorten_block_smallest_key(&prev, &next).is_none());
1206        }
1207
1208        // Case 5: Same table_key, different epoch - cannot shorten
1209        {
1210            let prev = make_full_key(1, b"abc", 200);
1211            let next = make_full_key(1, b"abc", 100);
1212            assert!(try_shorten_block_smallest_key(&prev, &next).is_none());
1213        }
1214
1215        // Case 7: Single char keys - cannot shorten
1216        {
1217            let prev = make_full_key(1, b"a", 100);
1218            let next = make_full_key(1, b"b", 100);
1219            assert!(try_shorten_block_smallest_key(&prev, &next).is_none());
1220        }
1221
1222        // Case 8: Long common prefix with divergence
1223        {
1224            let prev = make_full_key(1, b"hello_world_abc", 100);
1225            let next = make_full_key(1, b"hello_world_xyz123", 100);
1226            let result = try_shorten_block_smallest_key(&prev, &next);
1227            assert!(result.is_some());
1228            let shortened = result.unwrap();
1229            assert_eq!(shortened.user_key.table_key.as_ref(), b"hello_world_x");
1230            assert_invariant(&prev, &shortened, &next);
1231        }
1232
1233        // Case 9: Empty common prefix (completely different keys)
1234        {
1235            let prev = make_full_key(1, b"aaa", 100);
1236            let next = make_full_key(1, b"bbbccc", 100);
1237            let result = try_shorten_block_smallest_key(&prev, &next);
1238            assert!(result.is_some());
1239            let shortened = result.unwrap();
1240            assert_eq!(shortened.user_key.table_key.as_ref(), b"b");
1241            assert_invariant(&prev, &shortened, &next);
1242        }
1243
1244        // Case 10: Minimal shortening - next is exactly 2 chars longer than LCP
1245        {
1246            let prev = make_full_key(1, b"abc", 100);
1247            let next = make_full_key(1, b"abcde", 100);
1248            let result = try_shorten_block_smallest_key(&prev, &next);
1249            assert!(result.is_some());
1250            let shortened = result.unwrap();
1251            assert_eq!(shortened.user_key.table_key.as_ref(), b"abcd");
1252            assert_invariant(&prev, &shortened, &next);
1253        }
1254    }
1255}