risingwave_hummock_sdk/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17#![feature(let_chains)]
18#![feature(btree_cursors)]
19#![feature(strict_overflow_ops)]
20#![feature(map_try_insert)]
21
22mod key_cmp;
23
24use std::borrow::Borrow;
25use std::cmp::Ordering;
26use std::collections::HashMap;
27use std::fmt::{Display, Formatter};
28use std::ops::{Add, AddAssign, Sub};
29use std::str::FromStr;
30
31pub use key_cmp::*;
32use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
33use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
34use serde::{Deserialize, Deserializer, Serialize, Serializer};
35use sstable_info::SstableInfo;
36use tracing::warn;
37
38use crate::key_range::KeyRangeCommon;
39use crate::table_stats::TableStatsMap;
40
41pub mod change_log;
42pub mod compact;
43pub mod compact_task;
44pub mod compaction_group;
45pub mod key;
46pub mod key_range;
47pub mod level;
48pub mod prost_key_range;
49pub mod sstable_info;
50pub mod state_table_info;
51pub mod table_stats;
52pub mod table_watermark;
53pub mod time_travel;
54pub mod version;
55pub use frontend_version::{FrontendHummockVersion, FrontendHummockVersionDelta};
56mod frontend_version;
57pub mod vector_index;
58
59pub use compact::*;
60use risingwave_common::catalog::TableId;
61use risingwave_pb::hummock::hummock_version_checkpoint::PbStaleObjects;
62use risingwave_pb::hummock::{PbVectorIndexObjectType, VectorIndexObjectType};
63
64use crate::table_watermark::TableWatermarks;
65
66#[derive(Debug, Eq, PartialEq, Clone, Copy, Hash, Ord, PartialOrd)]
67#[cfg_attr(any(test, feature = "test"), derive(Default))]
68pub struct TypedPrimitive<const C: usize, P>(P);
69
70impl<const C: usize, P: PartialEq> PartialEq<P> for TypedPrimitive<C, P> {
71    fn eq(&self, other: &P) -> bool {
72        self.0 == *other
73    }
74}
75
76macro_rules! impl_primitive {
77    ($($t:ty)*) => {$(
78        impl<const C: usize> PartialEq<TypedPrimitive<C, $t>> for $t {
79            fn eq(&self, other: &TypedPrimitive<C, $t>) -> bool {
80                *self == other.0
81            }
82        }
83    )*}
84}
85
86impl_primitive!(u64);
87
88impl<const C: usize, P: FromStr> FromStr for TypedPrimitive<C, P> {
89    type Err = P::Err;
90
91    fn from_str(s: &str) -> Result<Self, Self::Err> {
92        P::from_str(s).map(TypedPrimitive)
93    }
94}
95
96impl<const C: usize, P> Borrow<P> for TypedPrimitive<C, P> {
97    fn borrow(&self) -> &P {
98        &self.0
99    }
100}
101
102impl<const C: usize, P: Add<Output = P>> Add<P> for TypedPrimitive<C, P> {
103    type Output = Self;
104
105    fn add(self, rhs: P) -> Self::Output {
106        Self(self.0 + rhs)
107    }
108}
109
110impl<const C: usize, P: AddAssign> AddAssign<P> for TypedPrimitive<C, P> {
111    fn add_assign(&mut self, rhs: P) {
112        self.0 += rhs;
113    }
114}
115
116impl<const C: usize, P: Display> Display for TypedPrimitive<C, P> {
117    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
118        write!(f, "{}", self.0)
119    }
120}
121
122impl<const C: usize, P> From<P> for TypedPrimitive<C, P> {
123    fn from(value: P) -> Self {
124        Self(value)
125    }
126}
127
128impl<const C: usize, P: Serialize> Serialize for TypedPrimitive<C, P> {
129    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
130    where
131        S: Serializer,
132    {
133        self.0.serialize(serializer)
134    }
135}
136
137impl<'de, const C: usize, P: Deserialize<'de>> Deserialize<'de> for TypedPrimitive<C, P> {
138    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
139    where
140        D: Deserializer<'de>,
141    {
142        Ok(Self(<P as Deserialize>::deserialize(deserializer)?))
143    }
144}
145
146impl<const C: usize, P> TypedPrimitive<C, P> {
147    pub const fn new(id: P) -> Self {
148        Self(id)
149    }
150
151    pub fn inner(self) -> P {
152        self.0
153    }
154}
155
156pub type HummockRawObjectId = TypedPrimitive<0, u64>;
157pub type HummockSstableObjectId = TypedPrimitive<1, u64>;
158pub type HummockSstableId = TypedPrimitive<2, u64>;
159pub type HummockVectorFileId = TypedPrimitive<3, u64>;
160
161impl HummockSstableObjectId {
162    pub fn as_raw(&self) -> HummockRawObjectId {
163        HummockRawObjectId::new(self.0)
164    }
165}
166
167impl HummockVectorFileId {
168    pub fn as_raw(&self) -> HummockRawObjectId {
169        HummockRawObjectId::new(self.0)
170    }
171}
172
173impl From<HummockRawObjectId> for HummockSstableObjectId {
174    fn from(id: HummockRawObjectId) -> Self {
175        Self(id.0)
176    }
177}
178
179pub type HummockRefCount = u64;
180pub type HummockContextId = u32;
181pub type HummockEpoch = u64;
182pub type HummockCompactionTaskId = u64;
183pub type CompactionGroupId = u64;
184
185#[derive(Debug, Clone, PartialEq, Copy, Ord, PartialOrd, Eq, Hash)]
186pub struct HummockVersionId(u64);
187
188impl Display for HummockVersionId {
189    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
190        write!(f, "{}", self.0)
191    }
192}
193
194impl Serialize for HummockVersionId {
195    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
196    where
197        S: Serializer,
198    {
199        serializer.serialize_u64(self.0)
200    }
201}
202
203impl<'de> Deserialize<'de> for HummockVersionId {
204    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
205    where
206        D: Deserializer<'de>,
207    {
208        Ok(Self(<u64 as Deserialize>::deserialize(deserializer)?))
209    }
210}
211
212impl HummockVersionId {
213    pub const MAX: Self = Self(i64::MAX as _);
214
215    pub const fn new(id: u64) -> Self {
216        Self(id)
217    }
218
219    pub fn next(&self) -> Self {
220        Self(self.0 + 1)
221    }
222
223    pub fn to_u64(self) -> u64 {
224        self.0
225    }
226}
227
228impl Add<u64> for HummockVersionId {
229    type Output = Self;
230
231    fn add(self, rhs: u64) -> Self::Output {
232        Self(self.0 + rhs)
233    }
234}
235
236impl Sub for HummockVersionId {
237    type Output = u64;
238
239    fn sub(self, rhs: Self) -> Self::Output {
240        self.0 - rhs.0
241    }
242}
243
244pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId(0);
245pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId(1);
246pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56;
247pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56;
248pub const SST_OBJECT_SUFFIX: &str = "data";
249pub const VECTOR_FILE_OBJECT_SUFFIX: &str = "vector";
250pub const HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH: usize = 20;
251
252macro_rules! for_all_object_suffix {
253    ($({$name:ident, $type_name:ty, $suffix:expr},)+) => {
254        #[derive(Eq, PartialEq, Debug, Hash, Clone, Copy)]
255        pub enum HummockObjectId {
256            $(
257                $name($type_name),
258            )+
259        }
260
261        pub const VALID_OBJECT_ID_SUFFIXES: [&str; 2] = [$(
262                $suffix
263            ),+];
264
265        impl HummockObjectId {
266            fn new(id: u64, suffix: &str) -> Option<Self> {
267                match suffix {
268                    $(
269                        suffix if suffix == $suffix => Some(HummockObjectId::$name(<$type_name>::new(id))),
270                    )+
271                    _ => None,
272                }
273            }
274
275            pub fn suffix(&self) -> &str {
276                match self {
277                    $(
278                        HummockObjectId::$name(_) => $suffix,
279                    )+
280                }
281            }
282
283            pub fn as_raw(&self) -> HummockRawObjectId {
284                let raw = match self {
285                    $(
286                        HummockObjectId::$name(id) => id.0,
287                    )+
288                };
289                HummockRawObjectId::new(raw)
290            }
291        }
292
293        pub fn try_get_object_id_from_path(path: &str) -> Option<HummockObjectId> {
294            let split: Vec<_> = path.split(&['/', '.']).collect();
295            if split.len() <= 2 {
296                return None;
297            }
298            let suffix = split[split.len() - 1];
299            let id_str = split[split.len() - 2];
300            match suffix {
301                $(
302                    suffix if suffix == $suffix => {
303                        let id = id_str
304                            .parse::<u64>()
305                            .unwrap_or_else(|_| panic!("expect valid object id, got {}", id_str));
306                        Some(HummockObjectId::$name(<$type_name>::new(id)))
307                    },
308                )+
309                _ => None,
310            }
311        }
312    };
313    () => {
314        for_all_object_suffix! {
315            {Sstable, HummockSstableObjectId, SST_OBJECT_SUFFIX},
316            {VectorFile, HummockVectorFileId, VECTOR_FILE_OBJECT_SUFFIX},
317        }
318    };
319}
320
321for_all_object_suffix!();
322
323pub fn get_stale_object_ids(
324    stale_objects: &PbStaleObjects,
325) -> impl Iterator<Item = HummockObjectId> + '_ {
326    // DO NOT REMOVE THIS LINE
327    // This is to ensure that when adding new variant to `HummockObjectId`,
328    // the compiler will warn us if we forget to handle it here.
329    match HummockObjectId::Sstable(0.into()) {
330        HummockObjectId::Sstable(_) => {}
331        HummockObjectId::VectorFile(_) => {}
332    };
333    stale_objects
334        .id
335        .iter()
336        .map(|sst_id| HummockObjectId::Sstable((*sst_id).into()))
337        .chain(stale_objects.vector_files.iter().map(
338            |file| match file.get_object_type().unwrap() {
339                PbVectorIndexObjectType::VectorIndexObjectUnspecified => {
340                    unreachable!()
341                }
342                VectorIndexObjectType::VectorIndexObjectVector => {
343                    HummockObjectId::VectorFile(file.id.into())
344                }
345            },
346        ))
347}
348
349#[macro_export]
350/// This is wrapper for `info` log.
351///
352/// In our CI tests, we frequently create and drop tables, and checkpoint in all barriers, which may
353/// cause many events. However, these events are not expected to be frequent in production usage, so
354/// we print an info log for every these events. But these events are frequent in CI, and produce
355/// many logs in CI, and we may want to downgrade the log level of these event log to debug.
356/// Therefore, we provide this macro to wrap the `info` log, which will produce `info` log when
357/// `debug_assertions` is not enabled, and `debug` log when `debug_assertions` is enabled.
358macro_rules! info_in_release {
359    ($($arg:tt)*) => {
360        {
361            #[cfg(debug_assertions)]
362            {
363                use tracing::debug;
364                debug!($($arg)*);
365            }
366            #[cfg(not(debug_assertions))]
367            {
368                use tracing::info;
369                info!($($arg)*);
370            }
371        }
372    }
373}
374
375#[derive(Default, Debug)]
376pub struct SyncResult {
377    /// The size of all synced shared buffers.
378    pub sync_size: usize,
379    /// The `sst_info` of sync.
380    pub uncommitted_ssts: Vec<LocalSstableInfo>,
381    /// The collected table watermarks written by state tables.
382    pub table_watermarks: HashMap<TableId, TableWatermarks>,
383    /// Sstable that holds the uncommitted old value
384    pub old_value_ssts: Vec<LocalSstableInfo>,
385}
386
387#[derive(Debug, Clone)]
388pub struct LocalSstableInfo {
389    pub sst_info: SstableInfo,
390    pub table_stats: TableStatsMap,
391    pub created_at: u64,
392}
393
394impl LocalSstableInfo {
395    pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
396        Self {
397            sst_info,
398            table_stats,
399            created_at,
400        }
401    }
402
403    pub fn for_test(sst_info: SstableInfo) -> Self {
404        Self {
405            sst_info,
406            table_stats: Default::default(),
407            created_at: u64::MAX,
408        }
409    }
410
411    pub fn file_size(&self) -> u64 {
412        assert_eq!(self.sst_info.file_size, self.sst_info.sst_size);
413        self.sst_info.file_size
414    }
415}
416
417impl PartialEq for LocalSstableInfo {
418    fn eq(&self, other: &Self) -> bool {
419        self.sst_info == other.sst_info
420    }
421}
422
423/// Package read epoch of hummock, it be used for `wait_epoch`
424#[derive(Debug, Clone, Copy)]
425pub enum HummockReadEpoch {
426    /// We need to wait the `committed_epoch` of the read table
427    Committed(HummockEpoch),
428    /// We need to wait the `committed_epoch` of the read table and also the hummock version to the version id
429    BatchQueryCommitted(HummockEpoch, HummockVersionId),
430    /// We don't need to wait epoch, we usually do stream reading with it.
431    NoWait(HummockEpoch),
432    /// We don't need to wait epoch.
433    Backup(HummockEpoch),
434    TimeTravel(HummockEpoch),
435}
436
437impl From<BatchQueryEpoch> for HummockReadEpoch {
438    fn from(e: BatchQueryEpoch) -> Self {
439        match e.epoch.unwrap() {
440            batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::BatchQueryCommitted(
441                epoch.epoch,
442                HummockVersionId::new(epoch.hummock_version_id),
443            ),
444            batch_query_epoch::Epoch::Current(epoch) => {
445                if epoch != HummockEpoch::MAX {
446                    warn!(
447                        epoch,
448                        "ignore specified current epoch and set it to u64::MAX"
449                    );
450                }
451                HummockReadEpoch::NoWait(HummockEpoch::MAX)
452            }
453            batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch),
454            batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch),
455        }
456    }
457}
458
459pub fn test_batch_query_epoch() -> BatchQueryEpoch {
460    BatchQueryEpoch {
461        epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
462    }
463}
464
465impl HummockReadEpoch {
466    pub fn get_epoch(&self) -> HummockEpoch {
467        *match self {
468            HummockReadEpoch::Committed(epoch)
469            | HummockReadEpoch::BatchQueryCommitted(epoch, _)
470            | HummockReadEpoch::NoWait(epoch)
471            | HummockReadEpoch::Backup(epoch)
472            | HummockReadEpoch::TimeTravel(epoch) => epoch,
473        }
474    }
475
476    pub fn is_read_committed(&self) -> bool {
477        match self {
478            HummockReadEpoch::Committed(_)
479            | HummockReadEpoch::TimeTravel(_)
480            | HummockReadEpoch::BatchQueryCommitted(_, _) => true,
481            HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => false,
482        }
483    }
484}
485pub struct ObjectIdRange {
486    // inclusive
487    pub start_id: HummockRawObjectId,
488    // exclusive
489    pub end_id: HummockRawObjectId,
490}
491
492impl ObjectIdRange {
493    pub fn new(
494        start_id: impl Into<HummockRawObjectId>,
495        end_id: impl Into<HummockRawObjectId>,
496    ) -> Self {
497        Self {
498            start_id: start_id.into(),
499            end_id: end_id.into(),
500        }
501    }
502
503    fn peek_next_object_id(&self) -> Option<HummockRawObjectId> {
504        if self.start_id < self.end_id {
505            return Some(self.start_id);
506        }
507        None
508    }
509
510    /// Pops and returns next SST id.
511    pub fn get_next_object_id(&mut self) -> Option<HummockRawObjectId> {
512        let next_id = self.peek_next_object_id();
513        self.start_id += 1;
514        next_id
515    }
516}
517
518pub fn can_concat(ssts: &[impl Borrow<SstableInfo>]) -> bool {
519    let len = ssts.len();
520    for i in 1..len {
521        if ssts[i - 1]
522            .borrow()
523            .key_range
524            .compare_right_with(&ssts[i].borrow().key_range.left)
525            != Ordering::Less
526        {
527            return false;
528        }
529    }
530    true
531}
532
533pub fn full_key_can_concat(ssts: &[SstableInfo]) -> bool {
534    let len = ssts.len();
535    for i in 1..len {
536        let sst_1 = &ssts[i - 1];
537        let sst_2 = &ssts[i];
538
539        if sst_1.key_range.right_exclusive {
540            if KeyComparator::compare_encoded_full_key(
541                &sst_1.key_range.right,
542                &sst_2.key_range.left,
543            )
544            .is_gt()
545            {
546                return false;
547            }
548        } else if KeyComparator::compare_encoded_full_key(
549            &sst_1.key_range.right,
550            &sst_2.key_range.left,
551        )
552        .is_ge()
553        {
554            return false;
555        }
556    }
557    true
558}
559
560const CHECKPOINT_DIR: &str = "checkpoint";
561const CHECKPOINT_NAME: &str = "0";
562const ARCHIVE_DIR: &str = "archive";
563
564pub fn version_checkpoint_path(root_dir: &str) -> String {
565    format!("{}/{}/{}", root_dir, CHECKPOINT_DIR, CHECKPOINT_NAME)
566}
567
568pub fn version_archive_dir(root_dir: &str) -> String {
569    format!("{}/{}", root_dir, ARCHIVE_DIR)
570}
571
572pub fn version_checkpoint_dir(checkpoint_path: &str) -> String {
573    checkpoint_path.trim_end_matches(|c| c != '/').to_owned()
574}
575
576/// Represents an epoch with a gap.
577///
578/// When a spill of the mem table occurs between two epochs, `EpochWithGap` generates an offset.
579/// This offset is encoded when performing full key encoding. When returning to the upper-level
580/// interface, a pure epoch with the lower 16 bits set to 0 should be returned.
581#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)]
582pub struct EpochWithGap(u64);
583
584impl EpochWithGap {
585    #[allow(unused_variables)]
586    pub fn new(epoch: u64, spill_offset: u16) -> Self {
587        // We only use 48 high bit to store epoch and use 16 low bit to store spill offset. But for MAX epoch,
588        // we still keep `u64::MAX` because we have use it in delete range and persist this value to sstable files.
589        //  So for compatibility, we must skip checking it for u64::MAX. See bug description in https://github.com/risingwavelabs/risingwave/issues/13717
590        if risingwave_common::util::epoch::is_max_epoch(epoch) {
591            EpochWithGap::new_max_epoch()
592        } else {
593            debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0);
594            EpochWithGap(epoch + spill_offset as u64)
595        }
596    }
597
598    pub fn new_from_epoch(epoch: u64) -> Self {
599        EpochWithGap::new(epoch, 0)
600    }
601
602    pub fn new_min_epoch() -> Self {
603        EpochWithGap(0)
604    }
605
606    pub fn new_max_epoch() -> Self {
607        EpochWithGap(HummockEpoch::MAX)
608    }
609
610    // return the epoch_with_gap(epoch + spill_offset)
611    pub(crate) fn as_u64(&self) -> HummockEpoch {
612        self.0
613    }
614
615    // return the epoch_with_gap(epoch + spill_offset)
616    pub fn from_u64(epoch_with_gap: u64) -> Self {
617        EpochWithGap(epoch_with_gap)
618    }
619
620    // return the pure epoch without spill offset
621    pub fn pure_epoch(&self) -> HummockEpoch {
622        self.0 & !EPOCH_SPILL_TIME_MASK
623    }
624
625    pub fn offset(&self) -> u64 {
626        self.0 & EPOCH_SPILL_TIME_MASK
627    }
628}
629
630pub fn get_object_data_path(
631    obj_prefix: &str,
632    path_prefix: &str,
633    object_id: HummockObjectId,
634) -> String {
635    let suffix = object_id.suffix();
636    let object_id = object_id.as_raw();
637
638    let mut path = String::with_capacity(
639        path_prefix.len()
640            + "/".len()
641            + obj_prefix.len()
642            + HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH
643            + ".".len()
644            + suffix.len(),
645    );
646    path.push_str(path_prefix);
647    path.push('/');
648    path.push_str(obj_prefix);
649    path.push_str(&object_id.to_string());
650    path.push('.');
651    path.push_str(suffix);
652    path
653}
654
655pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
656    use itertools::Itertools;
657    let split = path.split(&['/', '.']).collect_vec();
658    assert!(split.len() > 2);
659    let suffix = split[split.len() - 1];
660    let id = split[split.len() - 2]
661        .parse::<u64>()
662        .expect("valid object id");
663    HummockObjectId::new(id, suffix)
664        .unwrap_or_else(|| panic!("unknown object id suffix {}", suffix))
665}
666
667#[cfg(test)]
668mod tests {
669    use bytes::Bytes;
670    use sstable_info::SstableInfoInner;
671
672    use super::*;
673
674    #[test]
675    fn test_object_id_decimal_max_length() {
676        let len = u64::MAX.to_string().len();
677        assert_eq!(len, HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH)
678    }
679
680    #[test]
681    fn test_full_key_concat() {
682        let key1 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l'\xe2\0\0";
683        let key2 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l \x12\0\0";
684
685        let sst_1 = SstableInfoInner {
686            key_range: key_range::KeyRange {
687                left: Bytes::from(key1.to_vec()),
688                right: Bytes::from(key1.to_vec()),
689                right_exclusive: false,
690            },
691            ..Default::default()
692        };
693
694        let sst_2 = SstableInfoInner {
695            key_range: key_range::KeyRange {
696                left: Bytes::from(key2.to_vec()),
697                right: Bytes::from(key2.to_vec()),
698                right_exclusive: false,
699            },
700            ..Default::default()
701        };
702
703        let sst_3 = SstableInfoInner {
704            key_range: key_range::KeyRange {
705                left: Bytes::from(key1.to_vec()),
706                right: Bytes::from(key2.to_vec()),
707                right_exclusive: false,
708            },
709            ..Default::default()
710        };
711
712        assert!(full_key_can_concat(&[sst_1.clone().into(), sst_2.into()]));
713
714        assert!(!full_key_can_concat(&[sst_1.into(), sst_3.into()]));
715    }
716}