risingwave_storage/hummock/sstable/
block.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::io::{Read, Write};
18use std::mem::size_of;
19use std::ops::Range;
20
21use bytes::{Buf, BufMut, Bytes, BytesMut};
22use risingwave_common::catalog::TableId;
23use risingwave_hummock_sdk::KeyComparator;
24use risingwave_hummock_sdk::key::{FullKey, TableKey};
25use serde::{Deserialize, Serialize};
26
27use super::utils::{CompressionAlgorithm, bytes_diff_below_max_key_length, xxhash64_verify};
28use crate::hummock::sstable::utils;
29use crate::hummock::sstable::utils::xxhash64_checksum;
30use crate::hummock::{HummockError, HummockResult};
31use crate::monitor::Hitmap;
32
33pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
34pub const DEFAULT_RESTART_INTERVAL: usize = 16;
35pub const DEFAULT_ENTRY_SIZE: usize = 24; // table_id(u64) + primary_key(u64) + epoch(u64)
36
37#[allow(non_camel_case_types)]
38#[derive(Clone, Copy, PartialEq, Eq, Debug)]
39pub enum LenType {
40    u8 = 1,
41    u16 = 2,
42    u32 = 3,
43}
44
45macro_rules! put_fn {
46    ($name:ident, $($value:ident: $type:ty),*) => {
47        fn $name<T: BufMut>(&self, buf: &mut T, $($value: $type),*) {
48            match *self {
49                LenType::u8 => {
50                    $(buf.put_u8($value as u8);)*
51                },
52
53                LenType::u16 => {
54                    $(buf.put_u16($value as u16);)*
55                },
56
57                LenType::u32 => {
58                    $(buf.put_u32($value as u32);)*
59                },
60            }
61        }
62    };
63}
64
65macro_rules! get_fn {
66    ($name:ident, $($type:ty),*) => {
67        #[allow(unused_parens)]
68        fn $name<T: Buf>(&self, buf: &mut T) -> ($($type), *) {
69            match *self {
70                LenType::u8 => {
71                    ($(buf.get_u8() as $type),*)
72                }
73                LenType::u16 => {
74                    ($(buf.get_u16() as $type),*)
75                }
76                LenType::u32 => {
77                    ($(buf.get_u32() as $type),*)
78                }
79            }
80        }
81    };
82}
83
84impl From<u8> for LenType {
85    fn from(value: u8) -> Self {
86        match value {
87            1 => LenType::u8,
88            2 => LenType::u16,
89            3 => LenType::u32,
90            _ => {
91                panic!("unexpected type {}", value)
92            }
93        }
94    }
95}
96
97impl LenType {
98    put_fn!(put, v1: usize);
99
100    put_fn!(put2, v1: usize, v2: usize);
101
102    get_fn!(get, usize);
103
104    get_fn!(get2, usize, usize);
105
106    fn new(len: usize) -> Self {
107        const U8_MAX: usize = u8::MAX as usize + 1;
108        const U16_MAX: usize = u16::MAX as usize + 1;
109        const U32_MAX: usize = u32::MAX as usize + 1;
110
111        match len {
112            0..U8_MAX => LenType::u8,
113            U8_MAX..U16_MAX => LenType::u16,
114            U16_MAX..U32_MAX => LenType::u32,
115            _ => unreachable!("unexpected LenType {}", len),
116        }
117    }
118
119    fn len(&self) -> usize {
120        match *self {
121            Self::u8 => size_of::<u8>(),
122            Self::u16 => size_of::<u16>(),
123            Self::u32 => size_of::<u32>(),
124        }
125    }
126}
127
128#[derive(Clone, Copy, Debug, PartialEq, Eq)]
129pub struct RestartPoint {
130    pub offset: u32,
131    pub key_len_type: LenType,
132    pub value_len_type: LenType,
133}
134
135impl RestartPoint {
136    fn size_of() -> usize {
137        // store key_len_type and value_len_type in u8 related to `BlockBuilder::build`
138        // encoding_value = (key_len_type << 4) | value_len_type
139        std::mem::size_of::<u32>() + std::mem::size_of::<LenType>()
140    }
141}
142
143pub struct Block {
144    /// Uncompressed entries data, with restart encoded restart points info.
145    data: Bytes,
146    /// Uncompressed entries data length.
147    data_len: usize,
148
149    /// Table id of this block.
150    table_id: TableId,
151
152    /// Restart points.
153    restart_points: Vec<RestartPoint>,
154
155    hitmap: Hitmap<{ Self::HITMAP_ELEMS }>,
156}
157
158impl Clone for Block {
159    fn clone(&self) -> Self {
160        Self {
161            data: self.data.clone(),
162            data_len: self.data_len,
163            table_id: self.table_id,
164            restart_points: self.restart_points.clone(),
165            hitmap: self.hitmap.clone(),
166        }
167    }
168}
169
170impl Debug for Block {
171    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172        f.debug_struct("Block")
173            .field("data_len", &self.data_len)
174            .field("table_id", &self.table_id)
175            .field("restart_points", &self.restart_points)
176            .finish()
177    }
178}
179
180impl Serialize for Block {
181    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
182    where
183        S: serde::Serializer,
184    {
185        serde_bytes::serialize(&self.data[..], serializer)
186    }
187}
188
189impl<'de> Deserialize<'de> for Block {
190    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
191    where
192        D: serde::Deserializer<'de>,
193    {
194        let data: Vec<u8> = serde_bytes::deserialize(deserializer)?;
195        let data = Bytes::from(data);
196        Ok(Block::decode_from_raw(data))
197    }
198}
199
200impl Block {
201    pub const HITMAP_ELEMS: usize = 4;
202
203    pub fn get_algorithm(buf: &Bytes) -> HummockResult<CompressionAlgorithm> {
204        let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
205        Ok(compression)
206    }
207
208    pub fn decode(buf: Bytes, uncompressed_capacity: usize) -> HummockResult<Self> {
209        Self::decode_with_copy(buf, uncompressed_capacity, false)
210    }
211
212    pub fn decode_with_copy(
213        buf: Bytes,
214        uncompressed_capacity: usize,
215        copy: bool,
216    ) -> HummockResult<Self> {
217        // Verify checksum.
218
219        let xxhash64_checksum = (&buf[buf.len() - 8..]).get_u64_le();
220        xxhash64_verify(&buf[..buf.len() - 8], xxhash64_checksum)?;
221
222        // Decompress.
223        let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
224        let compressed_data = &buf[..buf.len() - 9];
225
226        let buf = match compression {
227            CompressionAlgorithm::None => {
228                if copy {
229                    Bytes::copy_from_slice(&buf[0..(buf.len() - 9)])
230                } else {
231                    buf.slice(0..(buf.len() - 9))
232                }
233            }
234            CompressionAlgorithm::Lz4 => {
235                let mut decoder = lz4::Decoder::new(compressed_data.reader())
236                    .map_err(HummockError::decode_error)?;
237                let mut decoded = Vec::with_capacity(uncompressed_capacity);
238                let read_size = decoder
239                    .read_to_end(&mut decoded)
240                    .map_err(HummockError::decode_error)?;
241                assert_eq!(read_size, uncompressed_capacity);
242                Bytes::from(decoded)
243            }
244            CompressionAlgorithm::Zstd => {
245                let mut decoder = zstd::Decoder::new(compressed_data.reader())
246                    .map_err(HummockError::decode_error)?;
247                let mut decoded = Vec::with_capacity(uncompressed_capacity);
248                let read_size = decoder
249                    .read_to_end(&mut decoded)
250                    .map_err(HummockError::decode_error)?;
251                assert_eq!(read_size, uncompressed_capacity);
252                Bytes::from(decoded)
253            }
254        };
255
256        Ok(Self::decode_from_raw(buf))
257    }
258
259    pub fn decode_from_raw(buf: Bytes) -> Self {
260        let table_id = (&buf[buf.len() - 4..]).get_u32_le();
261        // decode restart_points_type_index
262        let n_index = ((&buf[buf.len() - 8..buf.len() - 4]).get_u32_le()) as usize;
263        let index_data_len = size_of::<u32>() + n_index * RestartPoint::size_of();
264        let data_len = buf.len() - 4 - index_data_len;
265        let mut restart_points_type_index_buf = &buf[data_len..buf.len() - 8];
266
267        let mut index_key_vec = Vec::with_capacity(n_index);
268        for _ in 0..n_index {
269            let offset = restart_points_type_index_buf.get_u32_le();
270            let value = restart_points_type_index_buf.get_u8();
271            let key_len_type = LenType::from(value >> 4);
272            let value_len_type = LenType::from(value & 0x0F);
273
274            index_key_vec.push(RestartPoint {
275                offset,
276                key_len_type,
277                value_len_type,
278            });
279        }
280
281        // Decode restart points.
282        let n_restarts = ((&buf[data_len - 4..]).get_u32_le()) as usize;
283        let restart_points_len = size_of::<u32>() + n_restarts * (size_of::<u32>());
284        let restarts_end = data_len - 4;
285        let data_len = data_len - restart_points_len;
286        let mut restart_points = Vec::with_capacity(n_restarts);
287        let mut restart_points_buf = &buf[data_len..restarts_end];
288
289        let mut type_index: usize = 0;
290
291        for _ in 0..n_restarts {
292            let offset = restart_points_buf.get_u32_le();
293            if type_index < index_key_vec.len() - 1
294                && offset >= index_key_vec[type_index + 1].offset
295            {
296                type_index += 1;
297            }
298
299            restart_points.push(RestartPoint {
300                offset,
301                key_len_type: index_key_vec[type_index].key_len_type,
302                value_len_type: index_key_vec[type_index].value_len_type,
303            });
304        }
305
306        Block {
307            data: buf,
308            data_len,
309            restart_points,
310            table_id: TableId::new(table_id),
311            hitmap: Hitmap::default(),
312        }
313    }
314
315    /// Entries data len.
316    #[expect(clippy::len_without_is_empty)]
317    pub fn len(&self) -> usize {
318        assert!(!self.data.is_empty());
319        self.data_len
320    }
321
322    pub fn capacity(&self) -> usize {
323        self.data.len()
324            + self.restart_points.capacity() * std::mem::size_of::<u32>()
325            + std::mem::size_of::<u32>()
326    }
327
328    pub fn estimated_memory_weight(&self) -> usize {
329        size_of::<Self>()
330            + self.data.len()
331            + self.restart_points.capacity() * size_of::<RestartPoint>()
332            + Hitmap::<{ Self::HITMAP_ELEMS }>::bytes()
333    }
334
335    pub fn table_id(&self) -> TableId {
336        self.table_id
337    }
338
339    /// Gets restart point by index.
340    pub fn restart_point(&self, index: usize) -> RestartPoint {
341        self.restart_points[index]
342    }
343
344    /// Gets restart point len.
345    pub fn restart_point_len(&self) -> usize {
346        self.restart_points.len()
347    }
348
349    /// Searches the index of the restart point by partition point.
350    pub fn search_restart_partition_point<P>(&self, pred: P) -> usize
351    where
352        P: FnMut(&RestartPoint) -> bool,
353    {
354        self.restart_points.partition_point(pred)
355    }
356
357    pub fn data(&self) -> &[u8] {
358        &self.data[..self.data_len]
359    }
360
361    pub fn raw(&self) -> &[u8] {
362        &self.data[..]
363    }
364
365    pub fn hitmap(&self) -> &Hitmap<{ Self::HITMAP_ELEMS }> {
366        &self.hitmap
367    }
368
369    pub fn efficiency(&self) -> f64 {
370        self.hitmap.ratio()
371    }
372}
373
374/// [`KeyPrefix`] contains info for prefix compression.
375#[derive(Debug)]
376pub struct KeyPrefix {
377    overlap: usize,
378    diff: usize,
379    value: usize,
380    /// Used for calculating range, won't be encoded.
381    offset: usize,
382
383    len: usize,
384}
385
386impl KeyPrefix {
387    // This function is used in BlockBuilder::add to provide a wrapper for encode since the
388    // KeyPrefix len field is only useful in the decode phase
389    pub fn new_without_len(overlap: usize, diff: usize, value: usize, offset: usize) -> Self {
390        KeyPrefix {
391            overlap,
392            diff,
393            value,
394            offset,
395            len: 0, // not used when encode
396        }
397    }
398}
399
400impl KeyPrefix {
401    pub fn encode(&self, buf: &mut impl BufMut, key_len_type: LenType, value_len_type: LenType) {
402        key_len_type.put2(buf, self.overlap, self.diff);
403        value_len_type.put(buf, self.value);
404    }
405
406    pub fn decode(
407        buf: &mut impl Buf,
408        offset: usize,
409        key_len_type: LenType,
410        value_len_type: LenType,
411    ) -> Self {
412        let (overlap, diff) = key_len_type.get2(buf);
413        let value = value_len_type.get(buf);
414
415        let len = key_len_type.len() * 2 + value_len_type.len();
416
417        Self {
418            overlap,
419            diff,
420            value,
421            offset,
422            len,
423        }
424    }
425
426    /// Encoded length.
427    fn len(&self) -> usize {
428        self.len
429    }
430
431    /// Gets overlap len.
432    pub fn overlap_len(&self) -> usize {
433        self.overlap
434    }
435
436    /// Gets diff key range.
437    pub fn diff_key_range(&self) -> Range<usize> {
438        self.offset + self.len()..self.offset + self.len() + self.diff
439    }
440
441    /// Gets value range.
442    pub fn value_range(&self) -> Range<usize> {
443        self.offset + self.len() + self.diff..self.offset + self.len() + self.diff + self.value
444    }
445
446    /// Gets entry len.
447    pub fn entry_len(&self) -> usize {
448        self.len() + self.diff + self.value
449    }
450}
451
452pub struct BlockBuilderOptions {
453    /// Reserved bytes size when creating buffer to avoid frequent allocating.
454    pub capacity: usize,
455    /// Compression algorithm.
456    pub compression_algorithm: CompressionAlgorithm,
457    /// Restart point interval.
458    pub restart_interval: usize,
459}
460
461impl Default for BlockBuilderOptions {
462    fn default() -> Self {
463        Self {
464            capacity: DEFAULT_BLOCK_SIZE,
465            compression_algorithm: CompressionAlgorithm::None,
466            restart_interval: DEFAULT_RESTART_INTERVAL,
467        }
468    }
469}
470
471/// [`BlockBuilder`] encodes and appends block to a buffer.
472pub struct BlockBuilder {
473    /// Write buffer.
474    buf: BytesMut,
475    /// Compress buffer
476    compress_buf: BytesMut,
477    /// Entry interval between restart points.
478    restart_count: usize,
479    /// Restart points.
480    restart_points: Vec<u32>,
481    /// Last key.
482    last_key: Vec<u8>,
483    /// Count of entries in current block.
484    entry_count: usize,
485    /// Compression algorithm.
486    compression_algorithm: CompressionAlgorithm,
487
488    table_id: Option<TableId>,
489    // restart_points_type_index stores only the restart_point corresponding to each type change,
490    // as an index, in order to reduce space usage
491    restart_points_type_index: Vec<RestartPoint>,
492}
493
494impl BlockBuilder {
495    pub fn new(options: BlockBuilderOptions) -> Self {
496        Self {
497            // add more space to avoid re-allocate space. (for restart_points and restart_points_type_index)
498            buf: BytesMut::with_capacity(Self::buf_reserve_size(&options)),
499            compress_buf: BytesMut::default(),
500            restart_count: options.restart_interval,
501            restart_points: Vec::with_capacity(
502                options.capacity / DEFAULT_ENTRY_SIZE / options.restart_interval + 1,
503            ),
504            last_key: vec![],
505            entry_count: 0,
506            compression_algorithm: options.compression_algorithm,
507            table_id: None,
508            restart_points_type_index: Vec::default(),
509        }
510    }
511
512    /// Appends a kv pair to the block using pre-encoded key.
513    ///
514    /// This method accepts a pre-encoded key (`table_key` + `epoch`, without `table_id`)
515    /// to avoid redundant encoding operations. This is useful when the caller
516    /// has already encoded the full key and wants to reuse it.
517    ///
518    /// NOTE: Key must be added in ASCEND order.
519    ///
520    /// # Format
521    ///
522    /// ```plain
523    /// entry (kv pair): | overlap len (len_type) | diff len (len_type) | value len(len_type) | diff key | value |
524    /// ```
525    ///
526    /// # Panics
527    ///
528    /// Panic if key is not added in ASCEND order or `table_id` mismatch.
529    pub fn add(&mut self, table_id: TableId, encoded_key_without_table_id: &[u8], value: &[u8]) {
530        match self.table_id {
531            Some(current_table_id) => assert_eq!(current_table_id, table_id),
532            None => self.table_id = Some(table_id),
533        }
534        #[cfg(debug_assertions)]
535        self.debug_valid();
536
537        // Call the core implementation with the pre-encoded key
538        self.add_encoded_impl(encoded_key_without_table_id, value);
539    }
540
541    #[cfg(any(test, feature = "test"))]
542    pub fn add_for_test(&mut self, full_key: FullKey<&[u8]>, value: &[u8]) {
543        use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
544        let input_table_id = full_key.user_key.table_id;
545        match self.table_id {
546            Some(current_table_id) => assert_eq!(current_table_id, input_table_id),
547            None => self.table_id = Some(input_table_id),
548        }
549        #[cfg(debug_assertions)]
550        self.debug_valid();
551
552        let mut key: BytesMut = Default::default();
553        full_key.encode_into(&mut key);
554
555        // Slice off the table_id prefix to get the key without table_id
556        let encoded_key_without_table_id = &key[TABLE_PREFIX_LEN..];
557
558        // Call the core implementation with encoded key
559        self.add_encoded_impl(encoded_key_without_table_id, value);
560    }
561
562    /// Core implementation that accepts pre-encoded key.
563    /// This is used by both `add` and `add_for_test` to avoid code duplication.
564    fn add_encoded_impl(&mut self, key: &[u8], value: &[u8]) {
565        if self.entry_count > 0 {
566            debug_assert!(!key.is_empty());
567            debug_assert_eq!(
568                KeyComparator::compare_encoded_full_key(&self.last_key[..], key),
569                Ordering::Less,
570            );
571        }
572        // Update restart point if needed and calculate diff key.
573        let k_type = LenType::new(key.len());
574        let v_type = LenType::new(value.len());
575
576        let type_mismatch = if let Some(RestartPoint {
577            offset: _,
578            key_len_type: last_key_len_type,
579            value_len_type: last_value_len_type,
580        }) = self.restart_points_type_index.last()
581        {
582            k_type != *last_key_len_type || v_type != *last_value_len_type
583        } else {
584            true
585        };
586
587        let diff_key = if self.entry_count.is_multiple_of(self.restart_count) || type_mismatch {
588            let offset = utils::checked_into_u32(self.buf.len()).unwrap_or_else(|_| {
589                panic!(
590                    "WARN overflow can't convert buf_len {} into u32 table {:?}",
591                    self.buf.len(),
592                    self.table_id,
593                )
594            });
595
596            self.restart_points.push(offset);
597
598            if type_mismatch {
599                self.restart_points_type_index.push(RestartPoint {
600                    offset,
601                    key_len_type: k_type,
602                    value_len_type: v_type,
603                });
604            }
605
606            key
607        } else {
608            bytes_diff_below_max_key_length(&self.last_key, key)
609        };
610
611        let prefix = KeyPrefix::new_without_len(
612            key.len() - diff_key.len(),
613            diff_key.len(),
614            value.len(),
615            self.buf.len(),
616        );
617
618        prefix.encode(&mut self.buf, k_type, v_type);
619        self.buf.put_slice(diff_key);
620        self.buf.put_slice(value);
621
622        self.last_key.clear();
623        self.last_key.extend_from_slice(key);
624        self.entry_count += 1;
625    }
626
627    pub fn get_last_key(&self) -> &[u8] {
628        &self.last_key
629    }
630
631    pub fn is_empty(&self) -> bool {
632        self.buf.is_empty()
633    }
634
635    pub fn clear(&mut self) {
636        self.buf.clear();
637        self.restart_points.clear();
638        self.table_id = None;
639        self.restart_points_type_index.clear();
640        self.last_key.clear();
641        self.entry_count = 0;
642    }
643
644    /// Calculate block size without compression.
645    pub fn uncompressed_block_size(&mut self) -> usize {
646        self.buf.len()
647            + (self.restart_points.len() + 1) * std::mem::size_of::<u32>()
648            + (RestartPoint::size_of()) // (offset + len_type(u8)) * len
649                * self.restart_points_type_index.len()
650            + std::mem::size_of::<u32>() // restart_points_type_index len
651            + std::mem::size_of::<u32>() // table_id len
652    }
653
654    /// Finishes building block.
655    ///
656    /// # Format
657    ///
658    /// ```plain
659    /// compressed: | entries | restart point 0 (4B) | ... | restart point N-1 (4B) | N (4B) | restart point index 0 (5B)| ... | restart point index N-1 (5B) | N (4B) | table id (4B)
660    /// uncompressed: | compression method (1B) | xxhash64 checksum (8B) |
661    /// ```
662    ///
663    /// # Panics
664    ///
665    /// Panic if there is compression error.
666    pub fn build(&mut self) -> &[u8] {
667        assert!(
668            self.entry_count > 0,
669            "buf_len {} entry_count {} table {:?}",
670            self.buf.len(),
671            self.entry_count,
672            self.table_id
673        );
674
675        for restart_point in &self.restart_points {
676            self.buf.put_u32_le(*restart_point);
677        }
678
679        self.buf.put_u32_le(
680            utils::checked_into_u32(self.restart_points.len()).unwrap_or_else(|_| {
681                panic!(
682                    "WARN overflow can't convert restart_points_len {} into u32 table {:?}",
683                    self.restart_points.len(),
684                    self.table_id,
685                )
686            }),
687        );
688        for RestartPoint {
689            offset,
690            key_len_type,
691            value_len_type,
692        } in &self.restart_points_type_index
693        {
694            self.buf.put_u32_le(*offset);
695
696            let mut value: u8 = 0;
697            value |= *key_len_type as u8;
698            value <<= 4;
699            value |= *value_len_type as u8;
700
701            self.buf.put_u8(value);
702        }
703
704        self.buf.put_u32_le(
705            utils::checked_into_u32(self.restart_points_type_index.len()).unwrap_or_else(|_| {
706                panic!(
707                    "WARN overflow can't convert restart_points_type_index_len {} into u32 table {:?}",
708                    self.restart_points_type_index.len(),
709                    self.table_id,
710                )
711            }),
712        );
713
714        self.buf.put_u32_le(self.table_id.unwrap().as_raw_id());
715        let result_buf = if self.compression_algorithm != CompressionAlgorithm::None {
716            self.compress_buf.clear();
717            self.compress_buf = Self::compress(
718                &self.buf[..],
719                self.compression_algorithm,
720                std::mem::take(&mut self.compress_buf),
721            );
722
723            &mut self.compress_buf
724        } else {
725            &mut self.buf
726        };
727
728        self.compression_algorithm.encode(result_buf);
729        let checksum = xxhash64_checksum(result_buf);
730        result_buf.put_u64_le(checksum);
731        assert!(
732            result_buf.len() < (u32::MAX) as usize,
733            "buf_len {} entry_count {} table {:?}",
734            result_buf.len(),
735            self.entry_count,
736            self.table_id
737        );
738
739        if self.compression_algorithm != CompressionAlgorithm::None {
740            self.compress_buf.as_ref()
741        } else {
742            self.buf.as_ref()
743        }
744    }
745
746    pub fn compress_block(
747        buf: Bytes,
748        target_compression: CompressionAlgorithm,
749    ) -> HummockResult<Bytes> {
750        // Verify checksum.
751        let checksum = (&buf[buf.len() - 8..]).get_u64_le();
752        xxhash64_verify(&buf[..buf.len() - 8], checksum)?;
753        // Decompress.
754        let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
755        let compressed_data = &buf[..buf.len() - 9];
756        assert_eq!(compression, CompressionAlgorithm::None);
757        let mut compress_writer = Self::compress(
758            compressed_data,
759            target_compression,
760            BytesMut::with_capacity(buf.len()),
761        );
762
763        target_compression.encode(&mut compress_writer);
764        let checksum = xxhash64_checksum(&compress_writer);
765        compress_writer.put_u64_le(checksum);
766        Ok(compress_writer.freeze())
767    }
768
769    pub fn compress(
770        buf: &[u8],
771        compression_algorithm: CompressionAlgorithm,
772        compress_writer: BytesMut,
773    ) -> BytesMut {
774        match compression_algorithm {
775            CompressionAlgorithm::None => unreachable!(),
776            CompressionAlgorithm::Lz4 => {
777                let mut encoder = lz4::EncoderBuilder::new()
778                    .level(4)
779                    .build(compress_writer.writer())
780                    .map_err(HummockError::encode_error)
781                    .unwrap();
782                encoder
783                    .write_all(buf)
784                    .map_err(HummockError::encode_error)
785                    .unwrap();
786                let (writer, result) = encoder.finish();
787                result.map_err(HummockError::encode_error).unwrap();
788                writer.into_inner()
789            }
790            CompressionAlgorithm::Zstd => {
791                let mut encoder = zstd::Encoder::new(compress_writer.writer(), 4)
792                    .map_err(HummockError::encode_error)
793                    .unwrap();
794                encoder
795                    .write_all(buf)
796                    .map_err(HummockError::encode_error)
797                    .unwrap();
798                let writer = encoder
799                    .finish()
800                    .map_err(HummockError::encode_error)
801                    .unwrap();
802                writer.into_inner()
803            }
804        }
805    }
806
807    /// Approximate block len (uncompressed).
808    pub fn approximate_len(&self) -> usize {
809        // block + restart_points + restart_points.len + restart_points_type_indices +
810        // restart_points_type_indics.len compression_algorithm + checksum
811        self.buf.len()
812            + std::mem::size_of::<u32>() * self.restart_points.len() // restart_points
813            + std::mem::size_of::<u32>() // restart_points.len
814            + RestartPoint::size_of() * self.restart_points_type_index.len() // restart_points_type_indics
815            + std::mem::size_of::<u32>() // restart_points_type_indics.len
816            + std::mem::size_of::<CompressionAlgorithm>() // compression_algorithm
817            + std::mem::size_of::<u64>() // checksum
818            + std::mem::size_of::<u32>() // table_id
819    }
820
821    pub fn debug_valid(&self) {
822        if self.entry_count == 0 {
823            debug_assert!(self.buf.is_empty());
824            debug_assert!(self.restart_points.is_empty());
825            debug_assert!(self.restart_points_type_index.is_empty());
826            debug_assert!(self.last_key.is_empty());
827        }
828    }
829
830    pub fn table_id(&self) -> Option<TableId> {
831        self.table_id
832    }
833
834    fn buf_reserve_size(option: &BlockBuilderOptions) -> usize {
835        option.capacity + 1024 + 256
836    }
837}
838
839/// Attempts to shorten `block_smallest` key while preserving block boundary correctness.
840///
841/// Returns a shortened key if `prev_block_last < shortened <= block_smallest` can be satisfied
842/// with fewer bytes. This reduces block metadata size by exploiting common prefixes between
843/// adjacent blocks.
844///
845/// Returns `None` if the key cannot be shortened (e.g., keys differ only in the last byte).
846pub fn try_shorten_block_smallest_key(
847    prev_block_last: &FullKey<&[u8]>,
848    block_smallest: &FullKey<&[u8]>,
849) -> Option<FullKey<Vec<u8>>> {
850    /// Returns the length of the longest common prefix between two byte slices.
851    fn lcp_len(a: &[u8], b: &[u8]) -> usize {
852        let min_len = a.len().min(b.len());
853        for i in 0..min_len {
854            if a[i] != b[i] {
855                return i;
856            }
857        }
858        min_len
859    }
860
861    let prev_table_key = prev_block_last.user_key.table_key.as_ref();
862    let next_table_key = block_smallest.user_key.table_key.as_ref();
863    assert_eq!(
864        prev_block_last.user_key.table_id, block_smallest.user_key.table_id,
865        "table ids must match for shortening block smallest key"
866    );
867
868    let lcp = lcp_len(prev_table_key, next_table_key);
869
870    // Shortened key = LCP prefix + first differing byte.
871    // Only shorten if the result is strictly shorter than next_table_key.
872    let shortened_len = lcp + 1;
873    if shortened_len >= next_table_key.len() {
874        return None;
875    }
876
877    // Build candidate key: take LCP + 1 bytes from block_smallest
878    let cand = FullKey::new_with_gap_epoch(
879        block_smallest.user_key.table_id,
880        TableKey(&next_table_key[..shortened_len]),
881        block_smallest.epoch_with_gap,
882    );
883
884    // Invariant: prev_block_last < cand <= block_smallest
885    // This must hold by construction. If violated, it indicates a bug in the shortening logic.
886    // Use assert! (not debug_assert!) to fail fast and prevent writing incorrect keys to storage.
887    assert!(
888        prev_block_last.cmp(&cand).is_lt(),
889        "Invariant violated: prev_block_last >= cand. prev={:?}, cand={:?}",
890        prev_block_last,
891        cand
892    );
893    assert!(
894        cand.cmp(block_smallest).is_le(),
895        "Invariant violated: cand > block_smallest. cand={:?}, block_smallest={:?}",
896        cand,
897        block_smallest
898    );
899
900    Some(cand.copy_into())
901}
902
903#[cfg(test)]
904mod tests {
905
906    use risingwave_common::util::epoch::test_epoch;
907    use risingwave_hummock_sdk::key::MAX_KEY_LEN;
908
909    use super::*;
910    use crate::hummock::{BlockHolder, BlockIterator};
911
912    #[test]
913    fn test_block_enc_dec() {
914        let options = BlockBuilderOptions::default();
915        let mut builder = BlockBuilder::new(options);
916        builder.add_for_test(construct_full_key_struct_for_test(0, b"k1", 1), b"v01");
917        builder.add_for_test(construct_full_key_struct_for_test(0, b"k2", 2), b"v02");
918        builder.add_for_test(construct_full_key_struct_for_test(0, b"k3", 3), b"v03");
919        builder.add_for_test(construct_full_key_struct_for_test(0, b"k4", 4), b"v04");
920        let capacity = builder.uncompressed_block_size();
921        assert_eq!(capacity, builder.approximate_len() - 9);
922        let buf = builder.build().to_vec();
923        let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
924        let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
925
926        bi.seek_to_first();
927        assert!(bi.is_valid());
928        assert_eq!(construct_full_key_struct_for_test(0, b"k1", 1), bi.key());
929        assert_eq!(b"v01", bi.value());
930
931        bi.next();
932        assert!(bi.is_valid());
933        assert_eq!(construct_full_key_struct_for_test(0, b"k2", 2), bi.key());
934        assert_eq!(b"v02", bi.value());
935
936        bi.next();
937        assert!(bi.is_valid());
938        assert_eq!(construct_full_key_struct_for_test(0, b"k3", 3), bi.key());
939        assert_eq!(b"v03", bi.value());
940
941        bi.next();
942        assert!(bi.is_valid());
943        assert_eq!(construct_full_key_struct_for_test(0, b"k4", 4), bi.key());
944        assert_eq!(b"v04", bi.value());
945
946        bi.next();
947        assert!(!bi.is_valid());
948    }
949
950    #[test]
951    fn test_compressed_block_enc_dec() {
952        inner_test_compressed(CompressionAlgorithm::Lz4);
953        inner_test_compressed(CompressionAlgorithm::Zstd);
954    }
955
956    fn inner_test_compressed(algo: CompressionAlgorithm) {
957        let options = BlockBuilderOptions {
958            compression_algorithm: algo,
959            ..Default::default()
960        };
961        let mut builder = BlockBuilder::new(options);
962        builder.add_for_test(construct_full_key_struct_for_test(0, b"k1", 1), b"v01");
963        builder.add_for_test(construct_full_key_struct_for_test(0, b"k2", 2), b"v02");
964        builder.add_for_test(construct_full_key_struct_for_test(0, b"k3", 3), b"v03");
965        builder.add_for_test(construct_full_key_struct_for_test(0, b"k4", 4), b"v04");
966        let capacity = builder.uncompressed_block_size();
967        assert_eq!(capacity, builder.approximate_len() - 9);
968        let buf = builder.build().to_vec();
969        let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
970        let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
971
972        bi.seek_to_first();
973        assert!(bi.is_valid());
974        assert_eq!(construct_full_key_struct_for_test(0, b"k1", 1), bi.key());
975        assert_eq!(b"v01", bi.value());
976
977        bi.next();
978        assert!(bi.is_valid());
979        assert_eq!(construct_full_key_struct_for_test(0, b"k2", 2), bi.key());
980        assert_eq!(b"v02", bi.value());
981
982        bi.next();
983        assert!(bi.is_valid());
984        assert_eq!(construct_full_key_struct_for_test(0, b"k3", 3), bi.key());
985        assert_eq!(b"v03", bi.value());
986
987        bi.next();
988        assert!(bi.is_valid());
989        assert_eq!(construct_full_key_struct_for_test(0, b"k4", 4), bi.key());
990        assert_eq!(b"v04", bi.value());
991
992        bi.next();
993        assert!(!bi.is_valid());
994    }
995
996    pub fn construct_full_key_struct_for_test(
997        table_id: u32,
998        table_key: &[u8],
999        epoch: u64,
1000    ) -> FullKey<&[u8]> {
1001        FullKey::for_test(TableId::new(table_id), table_key, test_epoch(epoch))
1002    }
1003
1004    #[test]
1005    fn test_block_enc_large_key() {
1006        let options = BlockBuilderOptions::default();
1007        let mut builder = BlockBuilder::new(options);
1008        let medium_key = vec![b'a'; MAX_KEY_LEN - 500];
1009        let large_key = vec![b'b'; MAX_KEY_LEN];
1010        let xlarge_key = vec![b'c'; MAX_KEY_LEN + 500];
1011
1012        builder.add_for_test(construct_full_key_struct_for_test(0, &medium_key, 1), b"v1");
1013        builder.add_for_test(construct_full_key_struct_for_test(0, &large_key, 2), b"v2");
1014        builder.add_for_test(construct_full_key_struct_for_test(0, &xlarge_key, 3), b"v3");
1015        let capacity = builder.uncompressed_block_size();
1016        assert_eq!(capacity, builder.approximate_len() - 9);
1017        let buf = builder.build().to_vec();
1018        let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
1019        let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
1020
1021        bi.seek_to_first();
1022        assert!(bi.is_valid());
1023        assert_eq!(
1024            construct_full_key_struct_for_test(0, &medium_key, 1),
1025            bi.key()
1026        );
1027        assert_eq!(b"v1", bi.value());
1028
1029        bi.next();
1030        assert!(bi.is_valid());
1031        assert_eq!(
1032            construct_full_key_struct_for_test(0, &large_key, 2),
1033            bi.key()
1034        );
1035        assert_eq!(b"v2", bi.value());
1036
1037        bi.next();
1038        assert!(bi.is_valid());
1039        assert_eq!(
1040            construct_full_key_struct_for_test(0, &xlarge_key, 3),
1041            bi.key()
1042        );
1043        assert_eq!(b"v3", bi.value());
1044
1045        bi.next();
1046        assert!(!bi.is_valid());
1047    }
1048
1049    #[test]
1050    fn test_block_restart_point() {
1051        let options = BlockBuilderOptions::default();
1052        let mut builder = BlockBuilder::new(options);
1053
1054        const KEY_COUNT: u8 = 100;
1055        const BUILDER_COUNT: u8 = 5;
1056
1057        for _ in 0..BUILDER_COUNT {
1058            for index in 0..KEY_COUNT {
1059                if index < 50 {
1060                    let mut medium_key = vec![b'A'; MAX_KEY_LEN - 500];
1061                    medium_key.push(index);
1062                    builder
1063                        .add_for_test(construct_full_key_struct_for_test(0, &medium_key, 1), b"v1");
1064                } else if index < 80 {
1065                    let mut large_key = vec![b'B'; MAX_KEY_LEN];
1066                    large_key.push(index);
1067                    builder
1068                        .add_for_test(construct_full_key_struct_for_test(0, &large_key, 2), b"v2");
1069                } else {
1070                    let mut xlarge_key = vec![b'C'; MAX_KEY_LEN + 500];
1071                    xlarge_key.push(index);
1072                    builder
1073                        .add_for_test(construct_full_key_struct_for_test(0, &xlarge_key, 3), b"v3");
1074                }
1075            }
1076
1077            let capacity = builder.uncompressed_block_size();
1078            assert_eq!(capacity, builder.approximate_len() - 9);
1079            let buf = builder.build().to_vec();
1080            let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
1081            let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));
1082            bi.seek_to_first();
1083            assert!(bi.is_valid());
1084
1085            for index in 0..KEY_COUNT {
1086                if index < 50 {
1087                    let mut medium_key = vec![b'A'; MAX_KEY_LEN - 500];
1088                    medium_key.push(index);
1089                    assert_eq!(
1090                        construct_full_key_struct_for_test(0, &medium_key, 1),
1091                        bi.key()
1092                    );
1093                } else if index < 80 {
1094                    let mut large_key = vec![b'B'; MAX_KEY_LEN];
1095                    large_key.push(index);
1096                    assert_eq!(
1097                        construct_full_key_struct_for_test(0, &large_key, 2),
1098                        bi.key()
1099                    );
1100                } else {
1101                    let mut xlarge_key = vec![b'C'; MAX_KEY_LEN + 500];
1102                    xlarge_key.push(index);
1103                    assert_eq!(
1104                        construct_full_key_struct_for_test(0, &xlarge_key, 3),
1105                        bi.key()
1106                    );
1107                }
1108                bi.next();
1109            }
1110
1111            assert!(!bi.is_valid());
1112            builder.clear();
1113        }
1114    }
1115
1116    #[test]
1117    fn test_block_serde() {
1118        fn assmut_serde<'de, T: Serialize + Deserialize<'de>>() {}
1119
1120        assmut_serde::<Block>();
1121        assmut_serde::<Box<Block>>();
1122
1123        let options = BlockBuilderOptions::default();
1124        let mut builder = BlockBuilder::new(options);
1125        for i in 0..100 {
1126            builder.add_for_test(
1127                construct_full_key_struct_for_test(0, format!("k{i:8}").as_bytes(), i),
1128                format!("v{i:8}").as_bytes(),
1129            );
1130        }
1131
1132        let capacity = builder.uncompressed_block_size();
1133        assert_eq!(capacity, builder.approximate_len() - 9);
1134        let buf = builder.build().to_vec();
1135
1136        let block = Box::new(Block::decode(buf.into(), capacity).unwrap());
1137
1138        let buffer = bincode::serialize(&block).unwrap();
1139        let blk: Block = bincode::deserialize(&buffer).unwrap();
1140
1141        assert_eq!(block.data, blk.data);
1142        assert_eq!(block.data_len, blk.data_len);
1143        assert_eq!(block.table_id, blk.table_id,);
1144        assert_eq!(block.restart_points, blk.restart_points);
1145    }
1146
1147    #[test]
1148    fn test_try_shorten_block_smallest_key() {
1149        use risingwave_hummock_sdk::EpochWithGap;
1150
1151        fn make_full_key(table_id: u32, table_key: &[u8], epoch: u64) -> FullKey<&[u8]> {
1152            FullKey::new_with_gap_epoch(
1153                TableId::new(table_id),
1154                TableKey(table_key),
1155                EpochWithGap::new_from_epoch(test_epoch(epoch)),
1156            )
1157        }
1158
1159        /// Verifies the invariant: prev < shortened <= next using `FullKey::cmp`
1160        fn assert_invariant(
1161            prev: &FullKey<&[u8]>,
1162            shortened: &FullKey<Vec<u8>>,
1163            next: &FullKey<&[u8]>,
1164        ) {
1165            assert!(
1166                prev.cmp(&shortened.to_ref()).is_lt(),
1167                "Invariant violated: prev >= shortened. prev={:?}, shortened={:?}",
1168                prev,
1169                shortened
1170            );
1171            assert!(
1172                shortened.to_ref().cmp(next).is_le(),
1173                "Invariant violated: shortened > next. shortened={:?}, next={:?}",
1174                shortened,
1175                next
1176            );
1177        }
1178
1179        // Case 1: Basic shortening - prev="abc", next="abdef" -> cand="abd"
1180        {
1181            let prev = make_full_key(1, b"abc", 100);
1182            let next = make_full_key(1, b"abdef", 100);
1183            let result = try_shorten_block_smallest_key(&prev, &next);
1184            assert!(result.is_some());
1185            let shortened = result.unwrap();
1186            assert_eq!(shortened.user_key.table_key.as_ref(), b"abd");
1187            assert_invariant(&prev, &shortened, &next);
1188        }
1189
1190        // Case 2: Prefix case - prev="ab", next="abcd" -> cand="abc"
1191        {
1192            let prev = make_full_key(1, b"ab", 100);
1193            let next = make_full_key(1, b"abcd", 100);
1194            let result = try_shorten_block_smallest_key(&prev, &next);
1195            assert!(result.is_some());
1196            let shortened = result.unwrap();
1197            assert_eq!(shortened.user_key.table_key.as_ref(), b"abc");
1198            assert_invariant(&prev, &shortened, &next);
1199        }
1200
1201        // Case 3: Cannot shorten - only 1 char difference at end
1202        {
1203            let prev = make_full_key(1, b"abc", 100);
1204            let next = make_full_key(1, b"abd", 100);
1205            assert!(try_shorten_block_smallest_key(&prev, &next).is_none());
1206        }
1207
1208        // Case 4: Cannot shorten - next is only 1 char longer than LCP
1209        {
1210            let prev = make_full_key(1, b"abc", 100);
1211            let next = make_full_key(1, b"abcd", 100);
1212            assert!(try_shorten_block_smallest_key(&prev, &next).is_none());
1213        }
1214
1215        // Case 5: Same table_key, different epoch - cannot shorten
1216        {
1217            let prev = make_full_key(1, b"abc", 200);
1218            let next = make_full_key(1, b"abc", 100);
1219            assert!(try_shorten_block_smallest_key(&prev, &next).is_none());
1220        }
1221
1222        // Case 7: Single char keys - cannot shorten
1223        {
1224            let prev = make_full_key(1, b"a", 100);
1225            let next = make_full_key(1, b"b", 100);
1226            assert!(try_shorten_block_smallest_key(&prev, &next).is_none());
1227        }
1228
1229        // Case 8: Long common prefix with divergence
1230        {
1231            let prev = make_full_key(1, b"hello_world_abc", 100);
1232            let next = make_full_key(1, b"hello_world_xyz123", 100);
1233            let result = try_shorten_block_smallest_key(&prev, &next);
1234            assert!(result.is_some());
1235            let shortened = result.unwrap();
1236            assert_eq!(shortened.user_key.table_key.as_ref(), b"hello_world_x");
1237            assert_invariant(&prev, &shortened, &next);
1238        }
1239
1240        // Case 9: Empty common prefix (completely different keys)
1241        {
1242            let prev = make_full_key(1, b"aaa", 100);
1243            let next = make_full_key(1, b"bbbccc", 100);
1244            let result = try_shorten_block_smallest_key(&prev, &next);
1245            assert!(result.is_some());
1246            let shortened = result.unwrap();
1247            assert_eq!(shortened.user_key.table_key.as_ref(), b"b");
1248            assert_invariant(&prev, &shortened, &next);
1249        }
1250
1251        // Case 10: Minimal shortening - next is exactly 2 chars longer than LCP
1252        {
1253            let prev = make_full_key(1, b"abc", 100);
1254            let next = make_full_key(1, b"abcde", 100);
1255            let result = try_shorten_block_smallest_key(&prev, &next);
1256            assert!(result.is_some());
1257            let shortened = result.unwrap();
1258            assert_eq!(shortened.user_key.table_key.as_ref(), b"abcd");
1259            assert_invariant(&prev, &shortened, &next);
1260        }
1261    }
1262}