risingwave_hummock_sdk/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17#![feature(let_chains)]
18#![feature(btree_cursors)]
19#![feature(strict_overflow_ops)]
20#![feature(map_try_insert)]
21
22mod key_cmp;
23
24use std::borrow::Borrow;
25use std::cmp::Ordering;
26use std::collections::HashMap;
27use std::fmt::{Display, Formatter};
28use std::ops::{Add, AddAssign, Sub};
29use std::str::FromStr;
30
31pub use key_cmp::*;
32use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
33use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
34use serde::{Deserialize, Deserializer, Serialize, Serializer};
35use sstable_info::SstableInfo;
36use tracing::warn;
37
38use crate::key_range::KeyRangeCommon;
39use crate::table_stats::TableStatsMap;
40
41pub mod change_log;
42pub mod compact;
43pub mod compact_task;
44pub mod compaction_group;
45pub mod key;
46pub mod key_range;
47pub mod level;
48pub mod prost_key_range;
49pub mod sstable_info;
50pub mod state_table_info;
51pub mod table_stats;
52pub mod table_watermark;
53pub mod time_travel;
54pub mod version;
55pub use frontend_version::{FrontendHummockVersion, FrontendHummockVersionDelta};
56mod frontend_version;
57pub mod vector_index;
58
59pub use compact::*;
60use risingwave_common::catalog::TableId;
61use risingwave_pb::hummock::hummock_version_checkpoint::PbStaleObjects;
62use risingwave_pb::hummock::{PbVectorIndexObjectType, VectorIndexObjectType};
63
64use crate::table_watermark::TableWatermarks;
65use crate::vector_index::VectorIndexAdd;
66
67#[derive(Debug, Eq, PartialEq, Clone, Copy, Hash, Ord, PartialOrd)]
68#[cfg_attr(any(test, feature = "test"), derive(Default))]
69pub struct TypedPrimitive<const C: usize, P>(P);
70
71impl<const C: usize, P: PartialEq> PartialEq<P> for TypedPrimitive<C, P> {
72    fn eq(&self, other: &P) -> bool {
73        self.0 == *other
74    }
75}
76
77macro_rules! impl_primitive {
78    ($($t:ty)*) => {$(
79        impl<const C: usize> PartialEq<TypedPrimitive<C, $t>> for $t {
80            fn eq(&self, other: &TypedPrimitive<C, $t>) -> bool {
81                *self == other.0
82            }
83        }
84    )*}
85}
86
87impl_primitive!(u64);
88
89impl<const C: usize, P: FromStr> FromStr for TypedPrimitive<C, P> {
90    type Err = P::Err;
91
92    fn from_str(s: &str) -> Result<Self, Self::Err> {
93        P::from_str(s).map(TypedPrimitive)
94    }
95}
96
97impl<const C: usize, P> Borrow<P> for TypedPrimitive<C, P> {
98    fn borrow(&self) -> &P {
99        &self.0
100    }
101}
102
103impl<const C: usize, P: Add<Output = P>> Add<P> for TypedPrimitive<C, P> {
104    type Output = Self;
105
106    fn add(self, rhs: P) -> Self::Output {
107        Self(self.0 + rhs)
108    }
109}
110
111impl<const C: usize, P: AddAssign> AddAssign<P> for TypedPrimitive<C, P> {
112    fn add_assign(&mut self, rhs: P) {
113        self.0 += rhs;
114    }
115}
116
117impl<const C: usize, P: Display> Display for TypedPrimitive<C, P> {
118    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
119        write!(f, "{}", self.0)
120    }
121}
122
123impl<const C: usize, P> From<P> for TypedPrimitive<C, P> {
124    fn from(value: P) -> Self {
125        Self(value)
126    }
127}
128
129impl<const C: usize, P: Serialize> Serialize for TypedPrimitive<C, P> {
130    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
131    where
132        S: Serializer,
133    {
134        self.0.serialize(serializer)
135    }
136}
137
138impl<'de, const C: usize, P: Deserialize<'de>> Deserialize<'de> for TypedPrimitive<C, P> {
139    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
140    where
141        D: Deserializer<'de>,
142    {
143        Ok(Self(<P as Deserialize>::deserialize(deserializer)?))
144    }
145}
146
147impl<const C: usize, P> TypedPrimitive<C, P> {
148    pub const fn new(id: P) -> Self {
149        Self(id)
150    }
151
152    pub fn inner(self) -> P {
153        self.0
154    }
155}
156
157pub type HummockRawObjectId = TypedPrimitive<0, u64>;
158pub type HummockSstableObjectId = TypedPrimitive<1, u64>;
159pub type HummockSstableId = TypedPrimitive<2, u64>;
160pub type HummockVectorFileId = TypedPrimitive<3, u64>;
161
162macro_rules! impl_object_id {
163    ($type_name:ty) => {
164        impl $type_name {
165            pub fn as_raw(&self) -> HummockRawObjectId {
166                HummockRawObjectId::new(self.0)
167            }
168        }
169
170        impl From<HummockRawObjectId> for $type_name {
171            fn from(id: HummockRawObjectId) -> Self {
172                Self(id.0)
173            }
174        }
175    };
176}
177
178impl_object_id!(HummockSstableObjectId);
179impl_object_id!(HummockVectorFileId);
180
181pub type HummockRefCount = u64;
182pub type HummockContextId = u32;
183pub type HummockEpoch = u64;
184pub type HummockCompactionTaskId = u64;
185pub type CompactionGroupId = u64;
186
187#[derive(Debug, Clone, PartialEq, Copy, Ord, PartialOrd, Eq, Hash)]
188pub struct HummockVersionId(u64);
189
190impl Display for HummockVersionId {
191    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
192        write!(f, "{}", self.0)
193    }
194}
195
196impl Serialize for HummockVersionId {
197    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
198    where
199        S: Serializer,
200    {
201        serializer.serialize_u64(self.0)
202    }
203}
204
205impl<'de> Deserialize<'de> for HummockVersionId {
206    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
207    where
208        D: Deserializer<'de>,
209    {
210        Ok(Self(<u64 as Deserialize>::deserialize(deserializer)?))
211    }
212}
213
214impl HummockVersionId {
215    pub const MAX: Self = Self(i64::MAX as _);
216
217    pub const fn new(id: u64) -> Self {
218        Self(id)
219    }
220
221    pub fn next(&self) -> Self {
222        Self(self.0 + 1)
223    }
224
225    pub fn to_u64(self) -> u64 {
226        self.0
227    }
228}
229
230impl Add<u64> for HummockVersionId {
231    type Output = Self;
232
233    fn add(self, rhs: u64) -> Self::Output {
234        Self(self.0 + rhs)
235    }
236}
237
238impl Sub for HummockVersionId {
239    type Output = u64;
240
241    fn sub(self, rhs: Self) -> Self::Output {
242        self.0 - rhs.0
243    }
244}
245
246pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId(0);
247pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId(1);
248pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56;
249pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56;
250pub const SST_OBJECT_SUFFIX: &str = "data";
251pub const VECTOR_FILE_OBJECT_SUFFIX: &str = "vector";
252pub const HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH: usize = 20;
253
254macro_rules! for_all_object_suffix {
255    ($({$name:ident, $type_name:ty, $suffix:expr},)+) => {
256        #[derive(Eq, PartialEq, Debug, Hash, Clone, Copy)]
257        pub enum HummockObjectId {
258            $(
259                $name($type_name),
260            )+
261        }
262
263        pub const VALID_OBJECT_ID_SUFFIXES: [&str; 2] = [$(
264                $suffix
265            ),+];
266
267        impl HummockObjectId {
268            fn new(id: u64, suffix: &str) -> Option<Self> {
269                match suffix {
270                    $(
271                        suffix if suffix == $suffix => Some(HummockObjectId::$name(<$type_name>::new(id))),
272                    )+
273                    _ => None,
274                }
275            }
276
277            pub fn suffix(&self) -> &str {
278                match self {
279                    $(
280                        HummockObjectId::$name(_) => $suffix,
281                    )+
282                }
283            }
284
285            pub fn as_raw(&self) -> HummockRawObjectId {
286                let raw = match self {
287                    $(
288                        HummockObjectId::$name(id) => id.0,
289                    )+
290                };
291                HummockRawObjectId::new(raw)
292            }
293        }
294
295        pub fn try_get_object_id_from_path(path: &str) -> Option<HummockObjectId> {
296            let split: Vec<_> = path.split(&['/', '.']).collect();
297            if split.len() <= 2 {
298                return None;
299            }
300            let suffix = split[split.len() - 1];
301            let id_str = split[split.len() - 2];
302            match suffix {
303                $(
304                    suffix if suffix == $suffix => {
305                        let id = id_str
306                            .parse::<u64>()
307                            .unwrap_or_else(|_| panic!("expect valid object id, got {}", id_str));
308                        Some(HummockObjectId::$name(<$type_name>::new(id)))
309                    },
310                )+
311                _ => None,
312            }
313        }
314    };
315    () => {
316        for_all_object_suffix! {
317            {Sstable, HummockSstableObjectId, SST_OBJECT_SUFFIX},
318            {VectorFile, HummockVectorFileId, VECTOR_FILE_OBJECT_SUFFIX},
319        }
320    };
321}
322
323for_all_object_suffix!();
324
325pub fn get_stale_object_ids(
326    stale_objects: &PbStaleObjects,
327) -> impl Iterator<Item = HummockObjectId> + '_ {
328    // DO NOT REMOVE THIS LINE
329    // This is to ensure that when adding new variant to `HummockObjectId`,
330    // the compiler will warn us if we forget to handle it here.
331    match HummockObjectId::Sstable(0.into()) {
332        HummockObjectId::Sstable(_) => {}
333        HummockObjectId::VectorFile(_) => {}
334    };
335    stale_objects
336        .id
337        .iter()
338        .map(|sst_id| HummockObjectId::Sstable((*sst_id).into()))
339        .chain(stale_objects.vector_files.iter().map(
340            |file| match file.get_object_type().unwrap() {
341                PbVectorIndexObjectType::VectorIndexObjectUnspecified => {
342                    unreachable!()
343                }
344                VectorIndexObjectType::VectorIndexObjectVector => {
345                    HummockObjectId::VectorFile(file.id.into())
346                }
347            },
348        ))
349}
350
351#[macro_export]
352/// This is wrapper for `info` log.
353///
354/// In our CI tests, we frequently create and drop tables, and checkpoint in all barriers, which may
355/// cause many events. However, these events are not expected to be frequent in production usage, so
356/// we print an info log for every these events. But these events are frequent in CI, and produce
357/// many logs in CI, and we may want to downgrade the log level of these event log to debug.
358/// Therefore, we provide this macro to wrap the `info` log, which will produce `info` log when
359/// `debug_assertions` is not enabled, and `debug` log when `debug_assertions` is enabled.
360macro_rules! info_in_release {
361    ($($arg:tt)*) => {
362        {
363            #[cfg(debug_assertions)]
364            {
365                use tracing::debug;
366                debug!($($arg)*);
367            }
368            #[cfg(not(debug_assertions))]
369            {
370                use tracing::info;
371                info!($($arg)*);
372            }
373        }
374    }
375}
376
377#[derive(Default, Debug)]
378pub struct SyncResult {
379    /// The size of all synced shared buffers.
380    pub sync_size: usize,
381    /// The `sst_info` of sync.
382    pub uncommitted_ssts: Vec<LocalSstableInfo>,
383    /// The collected table watermarks written by state tables.
384    pub table_watermarks: HashMap<TableId, TableWatermarks>,
385    /// Sstable that holds the uncommitted old value
386    pub old_value_ssts: Vec<LocalSstableInfo>,
387    pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
388}
389
390#[derive(Debug, Clone)]
391pub struct LocalSstableInfo {
392    pub sst_info: SstableInfo,
393    pub table_stats: TableStatsMap,
394    pub created_at: u64,
395}
396
397impl LocalSstableInfo {
398    pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
399        Self {
400            sst_info,
401            table_stats,
402            created_at,
403        }
404    }
405
406    pub fn for_test(sst_info: SstableInfo) -> Self {
407        Self {
408            sst_info,
409            table_stats: Default::default(),
410            created_at: u64::MAX,
411        }
412    }
413
414    pub fn file_size(&self) -> u64 {
415        assert_eq!(self.sst_info.file_size, self.sst_info.sst_size);
416        self.sst_info.file_size
417    }
418}
419
420impl PartialEq for LocalSstableInfo {
421    fn eq(&self, other: &Self) -> bool {
422        self.sst_info == other.sst_info
423    }
424}
425
426/// Package read epoch of hummock, it be used for `wait_epoch`
427#[derive(Debug, Clone, Copy)]
428pub enum HummockReadEpoch {
429    /// We need to wait the `committed_epoch` of the read table
430    Committed(HummockEpoch),
431    /// We need to wait the `committed_epoch` of the read table and also the hummock version to the version id
432    BatchQueryCommitted(HummockEpoch, HummockVersionId),
433    /// We don't need to wait epoch, we usually do stream reading with it.
434    NoWait(HummockEpoch),
435    /// We don't need to wait epoch.
436    Backup(HummockEpoch),
437    TimeTravel(HummockEpoch),
438}
439
440impl From<BatchQueryEpoch> for HummockReadEpoch {
441    fn from(e: BatchQueryEpoch) -> Self {
442        match e.epoch.unwrap() {
443            batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::BatchQueryCommitted(
444                epoch.epoch,
445                HummockVersionId::new(epoch.hummock_version_id),
446            ),
447            batch_query_epoch::Epoch::Current(epoch) => {
448                if epoch != HummockEpoch::MAX {
449                    warn!(
450                        epoch,
451                        "ignore specified current epoch and set it to u64::MAX"
452                    );
453                }
454                HummockReadEpoch::NoWait(HummockEpoch::MAX)
455            }
456            batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch),
457            batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch),
458        }
459    }
460}
461
462pub fn test_batch_query_epoch() -> BatchQueryEpoch {
463    BatchQueryEpoch {
464        epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
465    }
466}
467
468impl HummockReadEpoch {
469    pub fn get_epoch(&self) -> HummockEpoch {
470        *match self {
471            HummockReadEpoch::Committed(epoch)
472            | HummockReadEpoch::BatchQueryCommitted(epoch, _)
473            | HummockReadEpoch::NoWait(epoch)
474            | HummockReadEpoch::Backup(epoch)
475            | HummockReadEpoch::TimeTravel(epoch) => epoch,
476        }
477    }
478
479    pub fn is_read_committed(&self) -> bool {
480        match self {
481            HummockReadEpoch::Committed(_)
482            | HummockReadEpoch::TimeTravel(_)
483            | HummockReadEpoch::BatchQueryCommitted(_, _) => true,
484            HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => false,
485        }
486    }
487}
488pub struct ObjectIdRange {
489    // inclusive
490    pub start_id: HummockRawObjectId,
491    // exclusive
492    pub end_id: HummockRawObjectId,
493}
494
495impl ObjectIdRange {
496    pub fn new(
497        start_id: impl Into<HummockRawObjectId>,
498        end_id: impl Into<HummockRawObjectId>,
499    ) -> Self {
500        Self {
501            start_id: start_id.into(),
502            end_id: end_id.into(),
503        }
504    }
505
506    fn peek_next_object_id(&self) -> Option<HummockRawObjectId> {
507        if self.start_id < self.end_id {
508            return Some(self.start_id);
509        }
510        None
511    }
512
513    /// Pops and returns next SST id.
514    pub fn get_next_object_id(&mut self) -> Option<HummockRawObjectId> {
515        let next_id = self.peek_next_object_id();
516        self.start_id += 1;
517        next_id
518    }
519}
520
521pub fn can_concat(ssts: &[impl Borrow<SstableInfo>]) -> bool {
522    let len = ssts.len();
523    for i in 1..len {
524        if ssts[i - 1]
525            .borrow()
526            .key_range
527            .compare_right_with(&ssts[i].borrow().key_range.left)
528            != Ordering::Less
529        {
530            return false;
531        }
532    }
533    true
534}
535
536pub fn full_key_can_concat(ssts: &[SstableInfo]) -> bool {
537    let len = ssts.len();
538    for i in 1..len {
539        let sst_1 = &ssts[i - 1];
540        let sst_2 = &ssts[i];
541
542        if sst_1.key_range.right_exclusive {
543            if KeyComparator::compare_encoded_full_key(
544                &sst_1.key_range.right,
545                &sst_2.key_range.left,
546            )
547            .is_gt()
548            {
549                return false;
550            }
551        } else if KeyComparator::compare_encoded_full_key(
552            &sst_1.key_range.right,
553            &sst_2.key_range.left,
554        )
555        .is_ge()
556        {
557            return false;
558        }
559    }
560    true
561}
562
563const CHECKPOINT_DIR: &str = "checkpoint";
564const CHECKPOINT_NAME: &str = "0";
565const ARCHIVE_DIR: &str = "archive";
566
567pub fn version_checkpoint_path(root_dir: &str) -> String {
568    format!("{}/{}/{}", root_dir, CHECKPOINT_DIR, CHECKPOINT_NAME)
569}
570
571pub fn version_archive_dir(root_dir: &str) -> String {
572    format!("{}/{}", root_dir, ARCHIVE_DIR)
573}
574
575pub fn version_checkpoint_dir(checkpoint_path: &str) -> String {
576    checkpoint_path.trim_end_matches(|c| c != '/').to_owned()
577}
578
579/// Represents an epoch with a gap.
580///
581/// When a spill of the mem table occurs between two epochs, `EpochWithGap` generates an offset.
582/// This offset is encoded when performing full key encoding. When returning to the upper-level
583/// interface, a pure epoch with the lower 16 bits set to 0 should be returned.
584#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)]
585pub struct EpochWithGap(u64);
586
587impl EpochWithGap {
588    #[allow(unused_variables)]
589    pub fn new(epoch: u64, spill_offset: u16) -> Self {
590        // We only use 48 high bit to store epoch and use 16 low bit to store spill offset. But for MAX epoch,
591        // we still keep `u64::MAX` because we have use it in delete range and persist this value to sstable files.
592        //  So for compatibility, we must skip checking it for u64::MAX. See bug description in https://github.com/risingwavelabs/risingwave/issues/13717
593        if risingwave_common::util::epoch::is_max_epoch(epoch) {
594            EpochWithGap::new_max_epoch()
595        } else {
596            debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0);
597            EpochWithGap(epoch + spill_offset as u64)
598        }
599    }
600
601    pub fn new_from_epoch(epoch: u64) -> Self {
602        EpochWithGap::new(epoch, 0)
603    }
604
605    pub fn new_min_epoch() -> Self {
606        EpochWithGap(0)
607    }
608
609    pub fn new_max_epoch() -> Self {
610        EpochWithGap(HummockEpoch::MAX)
611    }
612
613    // return the epoch_with_gap(epoch + spill_offset)
614    pub(crate) fn as_u64(&self) -> HummockEpoch {
615        self.0
616    }
617
618    // return the epoch_with_gap(epoch + spill_offset)
619    pub fn from_u64(epoch_with_gap: u64) -> Self {
620        EpochWithGap(epoch_with_gap)
621    }
622
623    // return the pure epoch without spill offset
624    pub fn pure_epoch(&self) -> HummockEpoch {
625        self.0 & !EPOCH_SPILL_TIME_MASK
626    }
627
628    pub fn offset(&self) -> u64 {
629        self.0 & EPOCH_SPILL_TIME_MASK
630    }
631}
632
633pub fn get_object_data_path(
634    obj_prefix: &str,
635    path_prefix: &str,
636    object_id: HummockObjectId,
637) -> String {
638    let suffix = object_id.suffix();
639    let object_id = object_id.as_raw();
640
641    let mut path = String::with_capacity(
642        path_prefix.len()
643            + "/".len()
644            + obj_prefix.len()
645            + HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH
646            + ".".len()
647            + suffix.len(),
648    );
649    path.push_str(path_prefix);
650    path.push('/');
651    path.push_str(obj_prefix);
652    path.push_str(&object_id.to_string());
653    path.push('.');
654    path.push_str(suffix);
655    path
656}
657
658pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
659    use itertools::Itertools;
660    let split = path.split(&['/', '.']).collect_vec();
661    assert!(split.len() > 2);
662    let suffix = split[split.len() - 1];
663    let id = split[split.len() - 2]
664        .parse::<u64>()
665        .expect("valid object id");
666    HummockObjectId::new(id, suffix)
667        .unwrap_or_else(|| panic!("unknown object id suffix {}", suffix))
668}
669
670#[cfg(test)]
671mod tests {
672    use bytes::Bytes;
673    use sstable_info::SstableInfoInner;
674
675    use super::*;
676
677    #[test]
678    fn test_object_id_decimal_max_length() {
679        let len = u64::MAX.to_string().len();
680        assert_eq!(len, HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH)
681    }
682
683    #[test]
684    fn test_full_key_concat() {
685        let key1 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l'\xe2\0\0";
686        let key2 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l \x12\0\0";
687
688        let sst_1 = SstableInfoInner {
689            key_range: key_range::KeyRange {
690                left: Bytes::from(key1.to_vec()),
691                right: Bytes::from(key1.to_vec()),
692                right_exclusive: false,
693            },
694            ..Default::default()
695        };
696
697        let sst_2 = SstableInfoInner {
698            key_range: key_range::KeyRange {
699                left: Bytes::from(key2.to_vec()),
700                right: Bytes::from(key2.to_vec()),
701                right_exclusive: false,
702            },
703            ..Default::default()
704        };
705
706        let sst_3 = SstableInfoInner {
707            key_range: key_range::KeyRange {
708                left: Bytes::from(key1.to_vec()),
709                right: Bytes::from(key2.to_vec()),
710                right_exclusive: false,
711            },
712            ..Default::default()
713        };
714
715        assert!(full_key_can_concat(&[sst_1.clone().into(), sst_2.into()]));
716
717        assert!(!full_key_can_concat(&[sst_1.into(), sst_3.into()]));
718    }
719}