risingwave_hummock_sdk/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17#![feature(let_chains)]
18#![feature(btree_cursors)]
19#![feature(strict_overflow_ops)]
20
21mod key_cmp;
22
23use std::borrow::Borrow;
24use std::cmp::Ordering;
25use std::collections::HashMap;
26use std::fmt::{Display, Formatter};
27use std::ops::{Add, Sub};
28
29pub use key_cmp::*;
30use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
31use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
32use serde::{Deserialize, Deserializer, Serialize, Serializer};
33use sstable_info::SstableInfo;
34use tracing::warn;
35
36use crate::key_range::KeyRangeCommon;
37use crate::table_stats::TableStatsMap;
38
39pub mod change_log;
40pub mod compact;
41pub mod compact_task;
42pub mod compaction_group;
43pub mod key;
44pub mod key_range;
45pub mod level;
46pub mod prost_key_range;
47pub mod sstable_info;
48pub mod state_table_info;
49pub mod table_stats;
50pub mod table_watermark;
51pub mod time_travel;
52pub mod version;
53pub use frontend_version::{FrontendHummockVersion, FrontendHummockVersionDelta};
54mod frontend_version;
55
56pub use compact::*;
57use risingwave_common::catalog::TableId;
58
59use crate::table_watermark::TableWatermarks;
60
61pub type HummockSstableObjectId = u64;
62pub type HummockSstableId = u64;
63pub type HummockRefCount = u64;
64pub type HummockContextId = u32;
65pub type HummockEpoch = u64;
66pub type HummockCompactionTaskId = u64;
67pub type CompactionGroupId = u64;
68
69#[derive(Debug, Clone, PartialEq, Copy, Ord, PartialOrd, Eq, Hash)]
70pub struct HummockVersionId(u64);
71
72impl Display for HummockVersionId {
73    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74        write!(f, "{}", self.0)
75    }
76}
77
78impl Serialize for HummockVersionId {
79    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
80    where
81        S: Serializer,
82    {
83        serializer.serialize_u64(self.0)
84    }
85}
86
87impl<'de> Deserialize<'de> for HummockVersionId {
88    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
89    where
90        D: Deserializer<'de>,
91    {
92        Ok(Self(<u64 as Deserialize>::deserialize(deserializer)?))
93    }
94}
95
96impl HummockVersionId {
97    pub const MAX: Self = Self(i64::MAX as _);
98
99    pub const fn new(id: u64) -> Self {
100        Self(id)
101    }
102
103    pub fn next(&self) -> Self {
104        Self(self.0 + 1)
105    }
106
107    pub fn to_u64(self) -> u64 {
108        self.0
109    }
110}
111
112impl Add<u64> for HummockVersionId {
113    type Output = Self;
114
115    fn add(self, rhs: u64) -> Self::Output {
116        Self(self.0 + rhs)
117    }
118}
119
120impl Sub for HummockVersionId {
121    type Output = u64;
122
123    fn sub(self, rhs: Self) -> Self::Output {
124        self.0 - rhs.0
125    }
126}
127
128pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId(0);
129pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId(1);
130pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56;
131pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56;
132pub const OBJECT_SUFFIX: &str = "data";
133pub const HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH: usize = 20;
134
135#[macro_export]
136/// This is wrapper for `info` log.
137///
138/// In our CI tests, we frequently create and drop tables, and checkpoint in all barriers, which may
139/// cause many events. However, these events are not expected to be frequent in production usage, so
140/// we print an info log for every these events. But these events are frequent in CI, and produce
141/// many logs in CI, and we may want to downgrade the log level of these event log to debug.
142/// Therefore, we provide this macro to wrap the `info` log, which will produce `info` log when
143/// `debug_assertions` is not enabled, and `debug` log when `debug_assertions` is enabled.
144macro_rules! info_in_release {
145    ($($arg:tt)*) => {
146        {
147            #[cfg(debug_assertions)]
148            {
149                use tracing::debug;
150                debug!($($arg)*);
151            }
152            #[cfg(not(debug_assertions))]
153            {
154                use tracing::info;
155                info!($($arg)*);
156            }
157        }
158    }
159}
160
161#[derive(Default, Debug)]
162pub struct SyncResult {
163    /// The size of all synced shared buffers.
164    pub sync_size: usize,
165    /// The `sst_info` of sync.
166    pub uncommitted_ssts: Vec<LocalSstableInfo>,
167    /// The collected table watermarks written by state tables.
168    pub table_watermarks: HashMap<TableId, TableWatermarks>,
169    /// Sstable that holds the uncommitted old value
170    pub old_value_ssts: Vec<LocalSstableInfo>,
171}
172
173#[derive(Debug, Clone)]
174pub struct LocalSstableInfo {
175    pub sst_info: SstableInfo,
176    pub table_stats: TableStatsMap,
177    pub created_at: u64,
178}
179
180impl LocalSstableInfo {
181    pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
182        Self {
183            sst_info,
184            table_stats,
185            created_at,
186        }
187    }
188
189    pub fn for_test(sst_info: SstableInfo) -> Self {
190        Self {
191            sst_info,
192            table_stats: Default::default(),
193            created_at: u64::MAX,
194        }
195    }
196
197    pub fn file_size(&self) -> u64 {
198        assert_eq!(self.sst_info.file_size, self.sst_info.sst_size);
199        self.sst_info.file_size
200    }
201}
202
203impl PartialEq for LocalSstableInfo {
204    fn eq(&self, other: &Self) -> bool {
205        self.sst_info == other.sst_info
206    }
207}
208
209/// Package read epoch of hummock, it be used for `wait_epoch`
210#[derive(Debug, Clone, Copy)]
211pub enum HummockReadEpoch {
212    /// We need to wait the `committed_epoch` of the read table
213    Committed(HummockEpoch),
214    /// We need to wait the `committed_epoch` of the read table and also the hummock version to the version id
215    BatchQueryCommitted(HummockEpoch, HummockVersionId),
216    /// We don't need to wait epoch, we usually do stream reading with it.
217    NoWait(HummockEpoch),
218    /// We don't need to wait epoch.
219    Backup(HummockEpoch),
220    TimeTravel(HummockEpoch),
221}
222
223impl From<BatchQueryEpoch> for HummockReadEpoch {
224    fn from(e: BatchQueryEpoch) -> Self {
225        match e.epoch.unwrap() {
226            batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::BatchQueryCommitted(
227                epoch.epoch,
228                HummockVersionId::new(epoch.hummock_version_id),
229            ),
230            batch_query_epoch::Epoch::Current(epoch) => {
231                if epoch != HummockEpoch::MAX {
232                    warn!(
233                        epoch,
234                        "ignore specified current epoch and set it to u64::MAX"
235                    );
236                }
237                HummockReadEpoch::NoWait(HummockEpoch::MAX)
238            }
239            batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch),
240            batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch),
241        }
242    }
243}
244
245pub fn test_batch_query_epoch() -> BatchQueryEpoch {
246    BatchQueryEpoch {
247        epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
248    }
249}
250
251impl HummockReadEpoch {
252    pub fn get_epoch(&self) -> HummockEpoch {
253        *match self {
254            HummockReadEpoch::Committed(epoch)
255            | HummockReadEpoch::BatchQueryCommitted(epoch, _)
256            | HummockReadEpoch::NoWait(epoch)
257            | HummockReadEpoch::Backup(epoch)
258            | HummockReadEpoch::TimeTravel(epoch) => epoch,
259        }
260    }
261
262    pub fn is_read_committed(&self) -> bool {
263        match self {
264            HummockReadEpoch::Committed(_)
265            | HummockReadEpoch::TimeTravel(_)
266            | HummockReadEpoch::BatchQueryCommitted(_, _) => true,
267            HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => false,
268        }
269    }
270}
271pub struct SstObjectIdRange {
272    // inclusive
273    pub start_id: HummockSstableObjectId,
274    // exclusive
275    pub end_id: HummockSstableObjectId,
276}
277
278impl SstObjectIdRange {
279    pub fn new(start_id: HummockSstableObjectId, end_id: HummockSstableObjectId) -> Self {
280        Self { start_id, end_id }
281    }
282
283    pub fn peek_next_sst_object_id(&self) -> Option<HummockSstableObjectId> {
284        if self.start_id < self.end_id {
285            return Some(self.start_id);
286        }
287        None
288    }
289
290    /// Pops and returns next SST id.
291    pub fn get_next_sst_object_id(&mut self) -> Option<HummockSstableObjectId> {
292        let next_id = self.peek_next_sst_object_id();
293        self.start_id += 1;
294        next_id
295    }
296}
297
298pub fn can_concat(ssts: &[impl Borrow<SstableInfo>]) -> bool {
299    let len = ssts.len();
300    for i in 1..len {
301        if ssts[i - 1]
302            .borrow()
303            .key_range
304            .compare_right_with(&ssts[i].borrow().key_range.left)
305            != Ordering::Less
306        {
307            return false;
308        }
309    }
310    true
311}
312
313pub fn full_key_can_concat(ssts: &[SstableInfo]) -> bool {
314    let len = ssts.len();
315    for i in 1..len {
316        let sst_1 = &ssts[i - 1];
317        let sst_2 = &ssts[i];
318
319        if sst_1.key_range.right_exclusive {
320            if KeyComparator::compare_encoded_full_key(
321                &sst_1.key_range.right,
322                &sst_2.key_range.left,
323            )
324            .is_gt()
325            {
326                return false;
327            }
328        } else if KeyComparator::compare_encoded_full_key(
329            &sst_1.key_range.right,
330            &sst_2.key_range.left,
331        )
332        .is_ge()
333        {
334            return false;
335        }
336    }
337    true
338}
339
340const CHECKPOINT_DIR: &str = "checkpoint";
341const CHECKPOINT_NAME: &str = "0";
342const ARCHIVE_DIR: &str = "archive";
343
344pub fn version_checkpoint_path(root_dir: &str) -> String {
345    format!("{}/{}/{}", root_dir, CHECKPOINT_DIR, CHECKPOINT_NAME)
346}
347
348pub fn version_archive_dir(root_dir: &str) -> String {
349    format!("{}/{}", root_dir, ARCHIVE_DIR)
350}
351
352pub fn version_checkpoint_dir(checkpoint_path: &str) -> String {
353    checkpoint_path.trim_end_matches(|c| c != '/').to_owned()
354}
355
356/// Represents an epoch with a gap.
357///
358/// When a spill of the mem table occurs between two epochs, `EpochWithGap` generates an offset.
359/// This offset is encoded when performing full key encoding. When returning to the upper-level
360/// interface, a pure epoch with the lower 16 bits set to 0 should be returned.
361#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)]
362pub struct EpochWithGap(u64);
363
364impl EpochWithGap {
365    #[allow(unused_variables)]
366    pub fn new(epoch: u64, spill_offset: u16) -> Self {
367        // We only use 48 high bit to store epoch and use 16 low bit to store spill offset. But for MAX epoch,
368        // we still keep `u64::MAX` because we have use it in delete range and persist this value to sstable files.
369        //  So for compatibility, we must skip checking it for u64::MAX. See bug description in https://github.com/risingwavelabs/risingwave/issues/13717
370        if risingwave_common::util::epoch::is_max_epoch(epoch) {
371            EpochWithGap::new_max_epoch()
372        } else {
373            debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0);
374            EpochWithGap(epoch + spill_offset as u64)
375        }
376    }
377
378    pub fn new_from_epoch(epoch: u64) -> Self {
379        EpochWithGap::new(epoch, 0)
380    }
381
382    pub fn new_min_epoch() -> Self {
383        EpochWithGap(0)
384    }
385
386    pub fn new_max_epoch() -> Self {
387        EpochWithGap(HummockEpoch::MAX)
388    }
389
390    // return the epoch_with_gap(epoch + spill_offset)
391    pub(crate) fn as_u64(&self) -> HummockEpoch {
392        self.0
393    }
394
395    // return the epoch_with_gap(epoch + spill_offset)
396    pub fn from_u64(epoch_with_gap: u64) -> Self {
397        EpochWithGap(epoch_with_gap)
398    }
399
400    // return the pure epoch without spill offset
401    pub fn pure_epoch(&self) -> HummockEpoch {
402        self.0 & !EPOCH_SPILL_TIME_MASK
403    }
404
405    pub fn offset(&self) -> u64 {
406        self.0 & EPOCH_SPILL_TIME_MASK
407    }
408}
409
410pub fn get_sst_data_path(
411    obj_prefix: &str,
412    path_prefix: &str,
413    object_id: HummockSstableObjectId,
414) -> String {
415    let mut path = String::with_capacity(
416        path_prefix.len()
417            + "/".len()
418            + obj_prefix.len()
419            + HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH
420            + ".".len()
421            + OBJECT_SUFFIX.len(),
422    );
423    path.push_str(path_prefix);
424    path.push('/');
425    path.push_str(obj_prefix);
426    path.push_str(&object_id.to_string());
427    path.push('.');
428    path.push_str(OBJECT_SUFFIX);
429    path
430}
431
432pub fn get_object_id_from_path(path: &str) -> HummockSstableObjectId {
433    use itertools::Itertools;
434    let split = path.split(&['/', '.']).collect_vec();
435    assert!(split.len() > 2);
436    assert_eq!(split[split.len() - 1], OBJECT_SUFFIX);
437    split[split.len() - 2]
438        .parse::<HummockSstableObjectId>()
439        .expect("valid sst id")
440}
441
442#[cfg(test)]
443mod tests {
444    use bytes::Bytes;
445    use sstable_info::SstableInfoInner;
446
447    use super::*;
448
449    #[test]
450    fn test_object_id_decimal_max_length() {
451        let len = HummockSstableObjectId::MAX.to_string().len();
452        assert_eq!(len, HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH)
453    }
454
455    #[test]
456    fn test_full_key_concat() {
457        let key1 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l'\xe2\0\0";
458        let key2 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l \x12\0\0";
459
460        let sst_1 = SstableInfoInner {
461            key_range: key_range::KeyRange {
462                left: Bytes::from(key1.to_vec()),
463                right: Bytes::from(key1.to_vec()),
464                right_exclusive: false,
465            },
466            ..Default::default()
467        };
468
469        let sst_2 = SstableInfoInner {
470            key_range: key_range::KeyRange {
471                left: Bytes::from(key2.to_vec()),
472                right: Bytes::from(key2.to_vec()),
473                right_exclusive: false,
474            },
475            ..Default::default()
476        };
477
478        let sst_3 = SstableInfoInner {
479            key_range: key_range::KeyRange {
480                left: Bytes::from(key1.to_vec()),
481                right: Bytes::from(key2.to_vec()),
482                right_exclusive: false,
483            },
484            ..Default::default()
485        };
486
487        assert!(full_key_can_concat(&[sst_1.clone().into(), sst_2.into()]));
488
489        assert!(!full_key_can_concat(&[sst_1.into(), sst_3.into()]));
490    }
491}