risingwave_hummock_sdk/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17#![feature(let_chains)]
18#![feature(btree_cursors)]
19#![feature(strict_overflow_ops)]
20
21mod key_cmp;
22
23use std::borrow::Borrow;
24use std::cmp::Ordering;
25use std::collections::HashMap;
26use std::fmt::{Display, Formatter};
27use std::ops::{Add, AddAssign, Sub};
28use std::str::FromStr;
29
30pub use key_cmp::*;
31use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
32use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
33use serde::{Deserialize, Deserializer, Serialize, Serializer};
34use sstable_info::SstableInfo;
35use tracing::warn;
36
37use crate::key_range::KeyRangeCommon;
38use crate::table_stats::TableStatsMap;
39
40pub mod change_log;
41pub mod compact;
42pub mod compact_task;
43pub mod compaction_group;
44pub mod key;
45pub mod key_range;
46pub mod level;
47pub mod prost_key_range;
48pub mod sstable_info;
49pub mod state_table_info;
50pub mod table_stats;
51pub mod table_watermark;
52pub mod time_travel;
53pub mod version;
54pub use frontend_version::{FrontendHummockVersion, FrontendHummockVersionDelta};
55mod frontend_version;
56
57pub use compact::*;
58use risingwave_common::catalog::TableId;
59use risingwave_pb::hummock::hummock_version_checkpoint::PbStaleObjects;
60
61use crate::table_watermark::TableWatermarks;
62
63#[derive(Debug, Eq, PartialEq, Clone, Copy, Hash, Ord, PartialOrd)]
64#[cfg_attr(any(test, feature = "test"), derive(Default))]
65pub struct TypedPrimitive<const C: usize, P>(P);
66
67impl<const C: usize, P: PartialEq> PartialEq<P> for TypedPrimitive<C, P> {
68    fn eq(&self, other: &P) -> bool {
69        self.0 == *other
70    }
71}
72
73macro_rules! impl_primitive {
74    ($($t:ty)*) => {$(
75        impl<const C: usize> PartialEq<TypedPrimitive<C, $t>> for $t {
76            fn eq(&self, other: &TypedPrimitive<C, $t>) -> bool {
77                *self == other.0
78            }
79        }
80    )*}
81}
82
83impl_primitive!(u64);
84
85impl<const C: usize, P: FromStr> FromStr for TypedPrimitive<C, P> {
86    type Err = P::Err;
87
88    fn from_str(s: &str) -> Result<Self, Self::Err> {
89        P::from_str(s).map(TypedPrimitive)
90    }
91}
92
93impl<const C: usize, P> Borrow<P> for TypedPrimitive<C, P> {
94    fn borrow(&self) -> &P {
95        &self.0
96    }
97}
98
99impl<const C: usize, P: Add<Output = P>> Add<P> for TypedPrimitive<C, P> {
100    type Output = Self;
101
102    fn add(self, rhs: P) -> Self::Output {
103        Self(self.0 + rhs)
104    }
105}
106
107impl<const C: usize, P: AddAssign> AddAssign<P> for TypedPrimitive<C, P> {
108    fn add_assign(&mut self, rhs: P) {
109        self.0 += rhs;
110    }
111}
112
113impl<const C: usize, P: Display> Display for TypedPrimitive<C, P> {
114    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
115        write!(f, "{}", self.0)
116    }
117}
118
119impl<const C: usize, P> From<P> for TypedPrimitive<C, P> {
120    fn from(value: P) -> Self {
121        Self(value)
122    }
123}
124
125impl<const C: usize, P: Serialize> Serialize for TypedPrimitive<C, P> {
126    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
127    where
128        S: Serializer,
129    {
130        self.0.serialize(serializer)
131    }
132}
133
134impl<'de, const C: usize, P: Deserialize<'de>> Deserialize<'de> for TypedPrimitive<C, P> {
135    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
136    where
137        D: Deserializer<'de>,
138    {
139        Ok(Self(<P as Deserialize>::deserialize(deserializer)?))
140    }
141}
142
143impl<const C: usize, P> TypedPrimitive<C, P> {
144    pub const fn new(id: P) -> Self {
145        Self(id)
146    }
147
148    pub fn inner(self) -> P {
149        self.0
150    }
151}
152
153pub type HummockRawObjectId = TypedPrimitive<0, u64>;
154pub type HummockSstableObjectId = TypedPrimitive<1, u64>;
155pub type HummockSstableId = TypedPrimitive<2, u64>;
156
157impl HummockSstableObjectId {
158    pub fn as_raw(&self) -> HummockRawObjectId {
159        HummockRawObjectId::new(self.0)
160    }
161}
162
163impl From<HummockRawObjectId> for HummockSstableObjectId {
164    fn from(id: HummockRawObjectId) -> Self {
165        Self(id.0)
166    }
167}
168
169pub type HummockRefCount = u64;
170pub type HummockContextId = u32;
171pub type HummockEpoch = u64;
172pub type HummockCompactionTaskId = u64;
173pub type CompactionGroupId = u64;
174
175#[derive(Debug, Clone, PartialEq, Copy, Ord, PartialOrd, Eq, Hash)]
176pub struct HummockVersionId(u64);
177
178impl Display for HummockVersionId {
179    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
180        write!(f, "{}", self.0)
181    }
182}
183
184impl Serialize for HummockVersionId {
185    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
186    where
187        S: Serializer,
188    {
189        serializer.serialize_u64(self.0)
190    }
191}
192
193impl<'de> Deserialize<'de> for HummockVersionId {
194    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
195    where
196        D: Deserializer<'de>,
197    {
198        Ok(Self(<u64 as Deserialize>::deserialize(deserializer)?))
199    }
200}
201
202impl HummockVersionId {
203    pub const MAX: Self = Self(i64::MAX as _);
204
205    pub const fn new(id: u64) -> Self {
206        Self(id)
207    }
208
209    pub fn next(&self) -> Self {
210        Self(self.0 + 1)
211    }
212
213    pub fn to_u64(self) -> u64 {
214        self.0
215    }
216}
217
218impl Add<u64> for HummockVersionId {
219    type Output = Self;
220
221    fn add(self, rhs: u64) -> Self::Output {
222        Self(self.0 + rhs)
223    }
224}
225
226impl Sub for HummockVersionId {
227    type Output = u64;
228
229    fn sub(self, rhs: Self) -> Self::Output {
230        self.0 - rhs.0
231    }
232}
233
234pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId(0);
235pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId(1);
236pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56;
237pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56;
238pub const SST_OBJECT_SUFFIX: &str = "data";
239pub const HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH: usize = 20;
240
241macro_rules! for_all_object_suffix {
242    ($({$name:ident, $type_name:ty, $suffix:expr},)+) => {
243        #[derive(Eq, PartialEq, Debug, Hash, Clone, Copy)]
244        pub enum HummockObjectId {
245            $(
246                $name($type_name),
247            )+
248        }
249
250        pub const VALID_OBJECT_ID_SUFFIXES: [&str; 1] = [$(
251                $suffix
252            ),+];
253
254        impl HummockObjectId {
255            fn new(id: u64, suffix: &str) -> Self {
256                match suffix {
257                    $(
258                        suffix if suffix == $suffix => HummockObjectId::$name(<$type_name>::new(id)),
259                    )+
260                    _ => panic!("unknown object id suffix {}", suffix),
261                }
262            }
263
264            pub fn suffix(&self) -> &str {
265                match self {
266                    $(
267                        HummockObjectId::$name(_) => $suffix,
268                    )+
269                }
270            }
271
272            pub fn as_raw(&self) -> HummockRawObjectId {
273                let raw = match self {
274                    $(
275                        HummockObjectId::$name(id) => id.0,
276                    )+
277                };
278                HummockRawObjectId::new(raw)
279            }
280        }
281    };
282    () => {
283        for_all_object_suffix! {
284            {Sstable, HummockSstableObjectId, SST_OBJECT_SUFFIX},
285        }
286    };
287}
288
289for_all_object_suffix!();
290
291pub fn get_stale_object_ids(
292    stale_objects: &PbStaleObjects,
293) -> impl Iterator<Item = HummockObjectId> + '_ {
294    // DO NOT REMOVE THIS LINE
295    // This is to ensure that when adding new variant to `HummockObjectId`,
296    // the compiler will warn us if we forget to handle it here.
297    match HummockObjectId::Sstable(0.into()) {
298        HummockObjectId::Sstable(_) => {}
299    };
300    stale_objects
301        .id
302        .iter()
303        .map(|sst_id| HummockObjectId::Sstable((*sst_id).into()))
304}
305
306#[macro_export]
307/// This is wrapper for `info` log.
308///
309/// In our CI tests, we frequently create and drop tables, and checkpoint in all barriers, which may
310/// cause many events. However, these events are not expected to be frequent in production usage, so
311/// we print an info log for every these events. But these events are frequent in CI, and produce
312/// many logs in CI, and we may want to downgrade the log level of these event log to debug.
313/// Therefore, we provide this macro to wrap the `info` log, which will produce `info` log when
314/// `debug_assertions` is not enabled, and `debug` log when `debug_assertions` is enabled.
315macro_rules! info_in_release {
316    ($($arg:tt)*) => {
317        {
318            #[cfg(debug_assertions)]
319            {
320                use tracing::debug;
321                debug!($($arg)*);
322            }
323            #[cfg(not(debug_assertions))]
324            {
325                use tracing::info;
326                info!($($arg)*);
327            }
328        }
329    }
330}
331
332#[derive(Default, Debug)]
333pub struct SyncResult {
334    /// The size of all synced shared buffers.
335    pub sync_size: usize,
336    /// The `sst_info` of sync.
337    pub uncommitted_ssts: Vec<LocalSstableInfo>,
338    /// The collected table watermarks written by state tables.
339    pub table_watermarks: HashMap<TableId, TableWatermarks>,
340    /// Sstable that holds the uncommitted old value
341    pub old_value_ssts: Vec<LocalSstableInfo>,
342}
343
344#[derive(Debug, Clone)]
345pub struct LocalSstableInfo {
346    pub sst_info: SstableInfo,
347    pub table_stats: TableStatsMap,
348    pub created_at: u64,
349}
350
351impl LocalSstableInfo {
352    pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
353        Self {
354            sst_info,
355            table_stats,
356            created_at,
357        }
358    }
359
360    pub fn for_test(sst_info: SstableInfo) -> Self {
361        Self {
362            sst_info,
363            table_stats: Default::default(),
364            created_at: u64::MAX,
365        }
366    }
367
368    pub fn file_size(&self) -> u64 {
369        assert_eq!(self.sst_info.file_size, self.sst_info.sst_size);
370        self.sst_info.file_size
371    }
372}
373
374impl PartialEq for LocalSstableInfo {
375    fn eq(&self, other: &Self) -> bool {
376        self.sst_info == other.sst_info
377    }
378}
379
380/// Package read epoch of hummock, it be used for `wait_epoch`
381#[derive(Debug, Clone, Copy)]
382pub enum HummockReadEpoch {
383    /// We need to wait the `committed_epoch` of the read table
384    Committed(HummockEpoch),
385    /// We need to wait the `committed_epoch` of the read table and also the hummock version to the version id
386    BatchQueryCommitted(HummockEpoch, HummockVersionId),
387    /// We don't need to wait epoch, we usually do stream reading with it.
388    NoWait(HummockEpoch),
389    /// We don't need to wait epoch.
390    Backup(HummockEpoch),
391    TimeTravel(HummockEpoch),
392}
393
394impl From<BatchQueryEpoch> for HummockReadEpoch {
395    fn from(e: BatchQueryEpoch) -> Self {
396        match e.epoch.unwrap() {
397            batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::BatchQueryCommitted(
398                epoch.epoch,
399                HummockVersionId::new(epoch.hummock_version_id),
400            ),
401            batch_query_epoch::Epoch::Current(epoch) => {
402                if epoch != HummockEpoch::MAX {
403                    warn!(
404                        epoch,
405                        "ignore specified current epoch and set it to u64::MAX"
406                    );
407                }
408                HummockReadEpoch::NoWait(HummockEpoch::MAX)
409            }
410            batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch),
411            batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch),
412        }
413    }
414}
415
416pub fn test_batch_query_epoch() -> BatchQueryEpoch {
417    BatchQueryEpoch {
418        epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
419    }
420}
421
422impl HummockReadEpoch {
423    pub fn get_epoch(&self) -> HummockEpoch {
424        *match self {
425            HummockReadEpoch::Committed(epoch)
426            | HummockReadEpoch::BatchQueryCommitted(epoch, _)
427            | HummockReadEpoch::NoWait(epoch)
428            | HummockReadEpoch::Backup(epoch)
429            | HummockReadEpoch::TimeTravel(epoch) => epoch,
430        }
431    }
432
433    pub fn is_read_committed(&self) -> bool {
434        match self {
435            HummockReadEpoch::Committed(_)
436            | HummockReadEpoch::TimeTravel(_)
437            | HummockReadEpoch::BatchQueryCommitted(_, _) => true,
438            HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => false,
439        }
440    }
441}
442pub struct ObjectIdRange {
443    // inclusive
444    pub start_id: HummockRawObjectId,
445    // exclusive
446    pub end_id: HummockRawObjectId,
447}
448
449impl ObjectIdRange {
450    pub fn new(
451        start_id: impl Into<HummockRawObjectId>,
452        end_id: impl Into<HummockRawObjectId>,
453    ) -> Self {
454        Self {
455            start_id: start_id.into(),
456            end_id: end_id.into(),
457        }
458    }
459
460    fn peek_next_object_id(&self) -> Option<HummockRawObjectId> {
461        if self.start_id < self.end_id {
462            return Some(self.start_id);
463        }
464        None
465    }
466
467    /// Pops and returns next SST id.
468    pub fn get_next_object_id(&mut self) -> Option<HummockRawObjectId> {
469        let next_id = self.peek_next_object_id();
470        self.start_id += 1;
471        next_id
472    }
473}
474
475pub fn can_concat(ssts: &[impl Borrow<SstableInfo>]) -> bool {
476    let len = ssts.len();
477    for i in 1..len {
478        if ssts[i - 1]
479            .borrow()
480            .key_range
481            .compare_right_with(&ssts[i].borrow().key_range.left)
482            != Ordering::Less
483        {
484            return false;
485        }
486    }
487    true
488}
489
490pub fn full_key_can_concat(ssts: &[SstableInfo]) -> bool {
491    let len = ssts.len();
492    for i in 1..len {
493        let sst_1 = &ssts[i - 1];
494        let sst_2 = &ssts[i];
495
496        if sst_1.key_range.right_exclusive {
497            if KeyComparator::compare_encoded_full_key(
498                &sst_1.key_range.right,
499                &sst_2.key_range.left,
500            )
501            .is_gt()
502            {
503                return false;
504            }
505        } else if KeyComparator::compare_encoded_full_key(
506            &sst_1.key_range.right,
507            &sst_2.key_range.left,
508        )
509        .is_ge()
510        {
511            return false;
512        }
513    }
514    true
515}
516
517const CHECKPOINT_DIR: &str = "checkpoint";
518const CHECKPOINT_NAME: &str = "0";
519const ARCHIVE_DIR: &str = "archive";
520
521pub fn version_checkpoint_path(root_dir: &str) -> String {
522    format!("{}/{}/{}", root_dir, CHECKPOINT_DIR, CHECKPOINT_NAME)
523}
524
525pub fn version_archive_dir(root_dir: &str) -> String {
526    format!("{}/{}", root_dir, ARCHIVE_DIR)
527}
528
529pub fn version_checkpoint_dir(checkpoint_path: &str) -> String {
530    checkpoint_path.trim_end_matches(|c| c != '/').to_owned()
531}
532
533/// Represents an epoch with a gap.
534///
535/// When a spill of the mem table occurs between two epochs, `EpochWithGap` generates an offset.
536/// This offset is encoded when performing full key encoding. When returning to the upper-level
537/// interface, a pure epoch with the lower 16 bits set to 0 should be returned.
538#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)]
539pub struct EpochWithGap(u64);
540
541impl EpochWithGap {
542    #[allow(unused_variables)]
543    pub fn new(epoch: u64, spill_offset: u16) -> Self {
544        // We only use 48 high bit to store epoch and use 16 low bit to store spill offset. But for MAX epoch,
545        // we still keep `u64::MAX` because we have use it in delete range and persist this value to sstable files.
546        //  So for compatibility, we must skip checking it for u64::MAX. See bug description in https://github.com/risingwavelabs/risingwave/issues/13717
547        if risingwave_common::util::epoch::is_max_epoch(epoch) {
548            EpochWithGap::new_max_epoch()
549        } else {
550            debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0);
551            EpochWithGap(epoch + spill_offset as u64)
552        }
553    }
554
555    pub fn new_from_epoch(epoch: u64) -> Self {
556        EpochWithGap::new(epoch, 0)
557    }
558
559    pub fn new_min_epoch() -> Self {
560        EpochWithGap(0)
561    }
562
563    pub fn new_max_epoch() -> Self {
564        EpochWithGap(HummockEpoch::MAX)
565    }
566
567    // return the epoch_with_gap(epoch + spill_offset)
568    pub(crate) fn as_u64(&self) -> HummockEpoch {
569        self.0
570    }
571
572    // return the epoch_with_gap(epoch + spill_offset)
573    pub fn from_u64(epoch_with_gap: u64) -> Self {
574        EpochWithGap(epoch_with_gap)
575    }
576
577    // return the pure epoch without spill offset
578    pub fn pure_epoch(&self) -> HummockEpoch {
579        self.0 & !EPOCH_SPILL_TIME_MASK
580    }
581
582    pub fn offset(&self) -> u64 {
583        self.0 & EPOCH_SPILL_TIME_MASK
584    }
585}
586
587pub fn get_object_data_path(
588    obj_prefix: &str,
589    path_prefix: &str,
590    object_id: HummockObjectId,
591) -> String {
592    let suffix = object_id.suffix();
593    let object_id = object_id.as_raw();
594
595    let mut path = String::with_capacity(
596        path_prefix.len()
597            + "/".len()
598            + obj_prefix.len()
599            + HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH
600            + ".".len()
601            + suffix.len(),
602    );
603    path.push_str(path_prefix);
604    path.push('/');
605    path.push_str(obj_prefix);
606    path.push_str(&object_id.to_string());
607    path.push('.');
608    path.push_str(suffix);
609    path
610}
611
612pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
613    use itertools::Itertools;
614    let split = path.split(&['/', '.']).collect_vec();
615    assert!(split.len() > 2);
616    let suffix = split[split.len() - 1];
617    let id = split[split.len() - 2]
618        .parse::<u64>()
619        .expect("valid object id");
620    HummockObjectId::new(id, suffix)
621}
622
623#[cfg(test)]
624mod tests {
625    use bytes::Bytes;
626    use sstable_info::SstableInfoInner;
627
628    use super::*;
629
630    #[test]
631    fn test_object_id_decimal_max_length() {
632        let len = u64::MAX.to_string().len();
633        assert_eq!(len, HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH)
634    }
635
636    #[test]
637    fn test_full_key_concat() {
638        let key1 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l'\xe2\0\0";
639        let key2 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l \x12\0\0";
640
641        let sst_1 = SstableInfoInner {
642            key_range: key_range::KeyRange {
643                left: Bytes::from(key1.to_vec()),
644                right: Bytes::from(key1.to_vec()),
645                right_exclusive: false,
646            },
647            ..Default::default()
648        };
649
650        let sst_2 = SstableInfoInner {
651            key_range: key_range::KeyRange {
652                left: Bytes::from(key2.to_vec()),
653                right: Bytes::from(key2.to_vec()),
654                right_exclusive: false,
655            },
656            ..Default::default()
657        };
658
659        let sst_3 = SstableInfoInner {
660            key_range: key_range::KeyRange {
661                left: Bytes::from(key1.to_vec()),
662                right: Bytes::from(key2.to_vec()),
663                right_exclusive: false,
664            },
665            ..Default::default()
666        };
667
668        assert!(full_key_can_concat(&[sst_1.clone().into(), sst_2.into()]));
669
670        assert!(!full_key_can_concat(&[sst_1.into(), sst_3.into()]));
671    }
672}