risingwave_storage/hummock/sstable/
utils.rs1use std::fmt::Display;
18use std::ptr;
19
20use risingwave_hummock_sdk::key::MAX_KEY_LEN;
21use xxhash_rust::xxh64;
22
23use super::{HummockError, HummockResult};
24
25unsafe fn read_u64(ptr: *const u8) -> u64 {
26 unsafe { ptr::read_unaligned(ptr as *const u64) }
27}
28
29unsafe fn read_u32(ptr: *const u8) -> u32 {
30 unsafe { ptr::read_unaligned(ptr as *const u32) }
31}
32
33#[inline]
34pub fn bytes_diff_below_max_key_length<'a>(base: &[u8], target: &'a [u8]) -> &'a [u8] {
35 let end = base.len().min(target.len()).min(MAX_KEY_LEN);
36 let mut i = 0;
37 unsafe {
38 while i + 8 <= end {
39 if read_u64(base.as_ptr().add(i)) != read_u64(target.as_ptr().add(i)) {
40 break;
41 }
42 i += 8;
43 }
44 if i + 4 <= end && read_u32(base.as_ptr().add(i)) == read_u32(target.as_ptr().add(i)) {
45 i += 4;
46 }
47 while i < end {
48 if base.get_unchecked(i) != target.get_unchecked(i) {
49 return target.get_unchecked(i..);
50 }
51 i += 1;
52 }
53 target.get_unchecked(end..)
54 }
55}
56
57pub fn xxhash64_checksum(data: &[u8]) -> u64 {
59 xxh64::xxh64(data, 0)
60}
61
62pub fn xxhash64_verify(data: &[u8], checksum: u64) -> HummockResult<()> {
64 let data_checksum = xxhash64_checksum(data);
65 if data_checksum != checksum {
66 return Err(HummockError::checksum_mismatch(checksum, data_checksum));
67 }
68 Ok(())
69}
70
71use bytes::{Buf, BufMut};
72
73pub fn put_length_prefixed_slice(mut buf: impl BufMut, slice: &[u8]) {
74 let len = checked_into_u32(slice.len())
75 .unwrap_or_else(|_| panic!("WARN overflow can't convert slice {} into u32", slice.len()));
76 buf.put_u32_le(len);
77 buf.put_slice(slice);
78}
79
80pub fn get_length_prefixed_slice(buf: &mut &[u8]) -> Vec<u8> {
81 let len = buf.get_u32_le() as usize;
82 let v = buf[..len].to_vec();
83 buf.advance(len);
84 v
85}
86
87#[derive(Clone, Copy, Debug, PartialEq, Eq)]
88pub enum CompressionAlgorithm {
89 None,
90 Lz4,
91 Zstd,
92}
93
94impl CompressionAlgorithm {
95 pub fn encode(&self, buf: &mut impl BufMut) {
96 let v = match self {
97 Self::None => 0,
98 Self::Lz4 => 1,
99 Self::Zstd => 2,
100 };
101 buf.put_u8(v);
102 }
103
104 pub fn decode(buf: &mut impl Buf) -> HummockResult<Self> {
105 match buf.get_u8() {
106 0 => Ok(Self::None),
107 1 => Ok(Self::Lz4),
108 2 => Ok(Self::Zstd),
109 _ => Err(HummockError::decode_error(
110 "not valid compression algorithm",
111 )),
112 }
113 }
114}
115
116impl From<u32> for CompressionAlgorithm {
117 fn from(ca: u32) -> Self {
118 match ca {
119 0 => CompressionAlgorithm::None,
120 1 => CompressionAlgorithm::Lz4,
121 _ => CompressionAlgorithm::Zstd,
122 }
123 }
124}
125
126impl From<CompressionAlgorithm> for u8 {
127 fn from(ca: CompressionAlgorithm) -> Self {
128 match ca {
129 CompressionAlgorithm::None => 0,
130 CompressionAlgorithm::Lz4 => 1,
131 CompressionAlgorithm::Zstd => 2,
132 }
133 }
134}
135
136impl From<CompressionAlgorithm> for u64 {
137 fn from(ca: CompressionAlgorithm) -> Self {
138 match ca {
139 CompressionAlgorithm::None => 0,
140 CompressionAlgorithm::Lz4 => 1,
141 CompressionAlgorithm::Zstd => 2,
142 }
143 }
144}
145
146impl TryFrom<u8> for CompressionAlgorithm {
147 type Error = HummockError;
148
149 fn try_from(v: u8) -> core::result::Result<Self, Self::Error> {
150 match v {
151 0 => Ok(Self::None),
152 1 => Ok(Self::Lz4),
153 2 => Ok(Self::Zstd),
154 _ => Err(HummockError::decode_error(
155 "not valid compression algorithm",
156 )),
157 }
158 }
159}
160
161pub fn checked_into_u32<T: TryInto<u32> + Copy + Display>(i: T) -> Result<u32, T::Error> {
162 i.try_into()
163}