risingwave_storage/hummock/sstable/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
16
17use std::fmt::Display;
18use std::ptr;
19
20use risingwave_hummock_sdk::key::MAX_KEY_LEN;
21use xxhash_rust::xxh64;
22
23use super::{HummockError, HummockResult};
24
25unsafe fn read_u64(ptr: *const u8) -> u64 {
26    unsafe { ptr::read_unaligned(ptr as *const u64) }
27}
28
29unsafe fn read_u32(ptr: *const u8) -> u32 {
30    unsafe { ptr::read_unaligned(ptr as *const u32) }
31}
32
33#[inline]
34pub fn bytes_diff_below_max_key_length<'a>(base: &[u8], target: &'a [u8]) -> &'a [u8] {
35    let end = base.len().min(target.len()).min(MAX_KEY_LEN);
36    let mut i = 0;
37    unsafe {
38        while i + 8 <= end {
39            if read_u64(base.as_ptr().add(i)) != read_u64(target.as_ptr().add(i)) {
40                break;
41            }
42            i += 8;
43        }
44        if i + 4 <= end && read_u32(base.as_ptr().add(i)) == read_u32(target.as_ptr().add(i)) {
45            i += 4;
46        }
47        while i < end {
48            if base.get_unchecked(i) != target.get_unchecked(i) {
49                return target.get_unchecked(i..);
50            }
51            i += 1;
52        }
53        target.get_unchecked(end..)
54    }
55}
56
57/// Calculates the ``XxHash`` of the given data.
58pub fn xxhash64_checksum(data: &[u8]) -> u64 {
59    xxh64::xxh64(data, 0)
60}
61
62/// Verifies the checksum of the data equals the given checksum with xxhash64.
63pub fn xxhash64_verify(data: &[u8], checksum: u64) -> HummockResult<()> {
64    let data_checksum = xxhash64_checksum(data);
65    if data_checksum != checksum {
66        return Err(HummockError::checksum_mismatch(checksum, data_checksum));
67    }
68    Ok(())
69}
70
71use bytes::{Buf, BufMut};
72
73pub fn put_length_prefixed_slice(mut buf: impl BufMut, slice: &[u8]) {
74    let len = checked_into_u32(slice.len())
75        .unwrap_or_else(|_| panic!("WARN overflow can't convert slice {} into u32", slice.len()));
76    buf.put_u32_le(len);
77    buf.put_slice(slice);
78}
79
80pub fn get_length_prefixed_slice(buf: &mut &[u8]) -> Vec<u8> {
81    let len = buf.get_u32_le() as usize;
82    let v = buf[..len].to_vec();
83    buf.advance(len);
84    v
85}
86
87#[derive(Clone, Copy, Debug, PartialEq, Eq)]
88pub enum CompressionAlgorithm {
89    None,
90    Lz4,
91    Zstd,
92}
93
94impl CompressionAlgorithm {
95    pub fn encode(&self, buf: &mut impl BufMut) {
96        let v = match self {
97            Self::None => 0,
98            Self::Lz4 => 1,
99            Self::Zstd => 2,
100        };
101        buf.put_u8(v);
102    }
103
104    pub fn decode(buf: &mut impl Buf) -> HummockResult<Self> {
105        match buf.get_u8() {
106            0 => Ok(Self::None),
107            1 => Ok(Self::Lz4),
108            2 => Ok(Self::Zstd),
109            _ => Err(HummockError::decode_error(
110                "not valid compression algorithm",
111            )),
112        }
113    }
114}
115
116impl From<u32> for CompressionAlgorithm {
117    fn from(ca: u32) -> Self {
118        match ca {
119            0 => CompressionAlgorithm::None,
120            1 => CompressionAlgorithm::Lz4,
121            _ => CompressionAlgorithm::Zstd,
122        }
123    }
124}
125
126impl From<CompressionAlgorithm> for u8 {
127    fn from(ca: CompressionAlgorithm) -> Self {
128        match ca {
129            CompressionAlgorithm::None => 0,
130            CompressionAlgorithm::Lz4 => 1,
131            CompressionAlgorithm::Zstd => 2,
132        }
133    }
134}
135
136impl From<CompressionAlgorithm> for u64 {
137    fn from(ca: CompressionAlgorithm) -> Self {
138        match ca {
139            CompressionAlgorithm::None => 0,
140            CompressionAlgorithm::Lz4 => 1,
141            CompressionAlgorithm::Zstd => 2,
142        }
143    }
144}
145
146impl TryFrom<u8> for CompressionAlgorithm {
147    type Error = HummockError;
148
149    fn try_from(v: u8) -> core::result::Result<Self, Self::Error> {
150        match v {
151            0 => Ok(Self::None),
152            1 => Ok(Self::Lz4),
153            2 => Ok(Self::Zstd),
154            _ => Err(HummockError::decode_error(
155                "not valid compression algorithm",
156            )),
157        }
158    }
159}
160
161pub fn checked_into_u32<T: TryInto<u32> + Copy + Display>(i: T) -> Result<u32, T::Error> {
162    i.try_into()
163}