risingwave_hummock_sdk/
lib.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17#![feature(btree_cursors)]
18#![feature(map_try_insert)]
19
20mod key_cmp;
21
22use std::borrow::Borrow;
23use std::cmp::Ordering;
24use std::collections::HashMap;
25
26pub use key_cmp::*;
27use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
28use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
29use sstable_info::SstableInfo;
30
31use crate::key_range::KeyRangeCommon;
32use crate::table_stats::TableStatsMap;
33
34pub mod change_log;
35pub mod compact;
36pub mod compact_task;
37pub mod compaction_group;
38pub mod filter_utils;
39pub mod key;
40pub mod key_range;
41pub mod level;
42pub mod prost_key_range;
43pub mod sstable_info;
44pub mod state_table_info;
45pub mod table_stats;
46pub mod table_watermark;
47pub mod time_travel;
48pub mod version;
49pub use frontend_version::{FrontendHummockVersion, FrontendHummockVersionDelta};
50mod frontend_version;
51pub mod vector_index;
52
53pub use compact::*;
54use risingwave_common::catalog::TableId;
55use risingwave_pb::hummock::hummock_version_checkpoint::PbStaleObjects;
56use risingwave_pb::hummock::{PbVectorIndexObjectType, VectorIndexObjectType};
57pub use risingwave_pb::id::{
58    CompactionGroupId, HummockHnswGraphFileId, HummockRawObjectId, HummockSstableId,
59    HummockSstableObjectId, HummockVectorFileId, HummockVersionId,
60};
61
62use crate::table_watermark::TableWatermarks;
63use crate::vector_index::VectorIndexAdd;
64
65pub type HummockRefCount = u64;
66pub type HummockContextId = risingwave_common::id::WorkerId;
67pub type HummockEpoch = u64;
68pub type HummockCompactionTaskId = u64;
69
70pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId::new(0);
71pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId::new(1);
72pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56;
73pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56;
74pub const SST_OBJECT_SUFFIX: &str = "data";
75pub const VECTOR_FILE_OBJECT_SUFFIX: &str = "vector";
76pub const HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH: usize = 20;
77
78macro_rules! for_all_object_suffix {
79    ($({$name:ident, $type_name:ty, $suffix:expr},)+) => {
80        #[derive(Eq, PartialEq, Debug, Hash, Clone, Copy)]
81        pub enum HummockObjectId {
82            $(
83                $name($type_name),
84            )+
85        }
86
87        pub const VALID_OBJECT_ID_SUFFIXES: [&str; 3] = [$(
88                $suffix
89            ),+];
90
91        impl HummockObjectId {
92            fn new(id: u64, suffix: &str) -> Option<Self> {
93                match suffix {
94                    $(
95                        suffix if suffix == $suffix => Some(HummockObjectId::$name(<$type_name>::new(id))),
96                    )+
97                    _ => None,
98                }
99            }
100
101            pub fn suffix(&self) -> &str {
102                match self {
103                    $(
104                        HummockObjectId::$name(_) => $suffix,
105                    )+
106                }
107            }
108
109            pub fn as_raw(&self) -> HummockRawObjectId {
110                let raw = match self {
111                    $(
112                        HummockObjectId::$name(id) => id.as_raw_id(),
113                    )+
114                };
115                HummockRawObjectId::new(raw)
116            }
117        }
118
119        pub fn try_get_object_id_from_path(path: &str) -> Option<HummockObjectId> {
120            let split: Vec<_> = path.split(&['/', '.']).collect();
121            if split.len() <= 2 {
122                return None;
123            }
124            let suffix = split[split.len() - 1];
125            let id_str = split[split.len() - 2];
126            match suffix {
127                $(
128                    suffix if suffix == $suffix => {
129                        let id = id_str
130                            .parse::<u64>()
131                            .unwrap_or_else(|_| panic!("expect valid object id, got {}", id_str));
132                        Some(HummockObjectId::$name(<$type_name>::new(id)))
133                    },
134                )+
135                _ => None,
136            }
137        }
138    };
139    () => {
140        for_all_object_suffix! {
141            {Sstable, HummockSstableObjectId, SST_OBJECT_SUFFIX},
142            {VectorFile, HummockVectorFileId, VECTOR_FILE_OBJECT_SUFFIX},
143            {HnswGraphFile, HummockHnswGraphFileId, "hnsw_graph"},
144        }
145    };
146}
147
148for_all_object_suffix!();
149
150pub fn get_stale_object_ids(
151    stale_objects: &PbStaleObjects,
152) -> impl Iterator<Item = HummockObjectId> + '_ {
153    // DO NOT REMOVE THIS LINE
154    // This is to ensure that when adding new variant to `HummockObjectId`,
155    // the compiler will warn us if we forget to handle it here.
156    match HummockObjectId::Sstable(0.into()) {
157        HummockObjectId::Sstable(_) => {}
158        HummockObjectId::VectorFile(_) => {}
159        HummockObjectId::HnswGraphFile(_) => {}
160    };
161    stale_objects
162        .id
163        .iter()
164        .map(|sst_id| HummockObjectId::Sstable(*sst_id))
165        .chain(stale_objects.vector_files.iter().map(
166            |file| match file.get_object_type().unwrap() {
167                PbVectorIndexObjectType::VectorIndexObjectUnspecified => {
168                    unreachable!()
169                }
170                VectorIndexObjectType::VectorIndexObjectVector => {
171                    HummockObjectId::VectorFile(file.id.into())
172                }
173                VectorIndexObjectType::VectorIndexObjectHnswGraph => {
174                    HummockObjectId::HnswGraphFile(file.id.into())
175                }
176            },
177        ))
178}
179
180#[macro_export]
181/// This is wrapper for `info` log.
182///
183/// In our CI tests, we frequently create and drop tables, and checkpoint in all barriers, which may
184/// cause many events. However, these events are not expected to be frequent in production usage, so
185/// we print an info log for every these events. But these events are frequent in CI, and produce
186/// many logs in CI, and we may want to downgrade the log level of these event log to debug.
187/// Therefore, we provide this macro to wrap the `info` log, which will produce `info` log when
188/// `debug_assertions` is not enabled, and `debug` log when `debug_assertions` is enabled.
189macro_rules! info_in_release {
190    ($($arg:tt)*) => {
191        {
192            #[cfg(debug_assertions)]
193            {
194                use tracing::debug;
195                debug!($($arg)*);
196            }
197            #[cfg(not(debug_assertions))]
198            {
199                use tracing::info;
200                info!($($arg)*);
201            }
202        }
203    }
204}
205
206#[derive(Default, Debug)]
207pub struct SyncResult {
208    /// The size of all synced shared buffers.
209    pub sync_size: usize,
210    /// The `sst_info` of sync.
211    pub uncommitted_ssts: Vec<LocalSstableInfo>,
212    /// The collected table watermarks written by state tables.
213    pub table_watermarks: HashMap<TableId, TableWatermarks>,
214    /// Sstable that holds the uncommitted old value
215    pub old_value_ssts: Vec<LocalSstableInfo>,
216    pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
217}
218
219#[derive(Debug, Clone)]
220pub struct LocalSstableInfo {
221    pub sst_info: SstableInfo,
222    pub table_stats: TableStatsMap,
223    pub created_at: u64,
224}
225
226impl LocalSstableInfo {
227    pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
228        Self {
229            sst_info,
230            table_stats,
231            created_at,
232        }
233    }
234
235    pub fn for_test(sst_info: SstableInfo) -> Self {
236        Self {
237            sst_info,
238            table_stats: Default::default(),
239            created_at: u64::MAX,
240        }
241    }
242
243    pub fn file_size(&self) -> u64 {
244        assert_eq!(self.sst_info.file_size, self.sst_info.sst_size);
245        self.sst_info.file_size
246    }
247}
248
249impl PartialEq for LocalSstableInfo {
250    fn eq(&self, other: &Self) -> bool {
251        self.sst_info == other.sst_info
252    }
253}
254
255/// Package read epoch of hummock, it be used for `wait_epoch`
256#[derive(Debug, Clone, Copy)]
257pub enum HummockReadEpoch {
258    /// We need to wait the `committed_epoch` of the read table
259    Committed(HummockEpoch),
260    /// We need to wait the `committed_epoch` of the read table and also the hummock version to the version id
261    BatchQueryCommitted(HummockEpoch, HummockVersionId),
262    /// We don't need to wait epoch, we usually do stream reading with it.
263    NoWait(HummockEpoch),
264    /// We don't need to wait epoch.
265    Backup(HummockEpoch),
266    TimeTravel(HummockEpoch),
267}
268
269impl From<BatchQueryEpoch> for HummockReadEpoch {
270    fn from(e: BatchQueryEpoch) -> Self {
271        match e.epoch.unwrap() {
272            batch_query_epoch::Epoch::Committed(epoch) => {
273                HummockReadEpoch::BatchQueryCommitted(epoch.epoch, epoch.hummock_version_id)
274            }
275            batch_query_epoch::Epoch::Current(epoch) => HummockReadEpoch::NoWait(epoch),
276            batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch),
277            batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch),
278        }
279    }
280}
281
282pub fn test_batch_query_epoch() -> BatchQueryEpoch {
283    BatchQueryEpoch {
284        epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
285    }
286}
287
288impl HummockReadEpoch {
289    pub fn get_epoch(&self) -> HummockEpoch {
290        *match self {
291            HummockReadEpoch::Committed(epoch)
292            | HummockReadEpoch::BatchQueryCommitted(epoch, _)
293            | HummockReadEpoch::NoWait(epoch)
294            | HummockReadEpoch::Backup(epoch)
295            | HummockReadEpoch::TimeTravel(epoch) => epoch,
296        }
297    }
298
299    pub fn is_read_committed(&self) -> bool {
300        match self {
301            HummockReadEpoch::Committed(_)
302            | HummockReadEpoch::TimeTravel(_)
303            | HummockReadEpoch::BatchQueryCommitted(_, _) => true,
304            HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => false,
305        }
306    }
307}
308pub struct ObjectIdRange {
309    // inclusive
310    pub start_id: HummockRawObjectId,
311    // exclusive
312    pub end_id: HummockRawObjectId,
313}
314
315impl ObjectIdRange {
316    pub fn new(
317        start_id: impl Into<HummockRawObjectId>,
318        end_id: impl Into<HummockRawObjectId>,
319    ) -> Self {
320        Self {
321            start_id: start_id.into(),
322            end_id: end_id.into(),
323        }
324    }
325
326    fn peek_next_object_id(&self) -> Option<HummockRawObjectId> {
327        if self.start_id < self.end_id {
328            return Some(self.start_id);
329        }
330        None
331    }
332
333    /// Pops and returns next SST id.
334    pub fn get_next_object_id(&mut self) -> Option<HummockRawObjectId> {
335        let next_id = self.peek_next_object_id();
336        self.start_id += 1;
337        next_id
338    }
339}
340
341pub fn can_concat(ssts: &[impl Borrow<SstableInfo>]) -> bool {
342    let len = ssts.len();
343    for i in 1..len {
344        if ssts[i - 1]
345            .borrow()
346            .key_range
347            .compare_right_with(&ssts[i].borrow().key_range.left)
348            != Ordering::Less
349        {
350            return false;
351        }
352    }
353    true
354}
355
356pub fn full_key_can_concat(ssts: &[SstableInfo]) -> bool {
357    let len = ssts.len();
358    for i in 1..len {
359        let sst_1 = &ssts[i - 1];
360        let sst_2 = &ssts[i];
361
362        if sst_1.key_range.right_exclusive {
363            if KeyComparator::compare_encoded_full_key(
364                &sst_1.key_range.right,
365                &sst_2.key_range.left,
366            )
367            .is_gt()
368            {
369                return false;
370            }
371        } else if KeyComparator::compare_encoded_full_key(
372            &sst_1.key_range.right,
373            &sst_2.key_range.left,
374        )
375        .is_ge()
376        {
377            return false;
378        }
379    }
380    true
381}
382
383const CHECKPOINT_DIR: &str = "checkpoint";
384const CHECKPOINT_NAME: &str = "0";
385const ARCHIVE_DIR: &str = "archive";
386
387pub fn version_checkpoint_path(root_dir: &str) -> String {
388    format!("{}/{}/{}", root_dir, CHECKPOINT_DIR, CHECKPOINT_NAME)
389}
390
391pub fn version_archive_dir(root_dir: &str) -> String {
392    format!("{}/{}", root_dir, ARCHIVE_DIR)
393}
394
395pub fn version_checkpoint_dir(checkpoint_path: &str) -> String {
396    checkpoint_path.trim_end_matches(|c| c != '/').to_owned()
397}
398
399/// Represents an epoch with a gap.
400///
401/// When a spill of the mem table occurs between two epochs, `EpochWithGap` generates an offset.
402/// This offset is encoded when performing full key encoding. When returning to the upper-level
403/// interface, a pure epoch with the lower 16 bits set to 0 should be returned.
404#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)]
405pub struct EpochWithGap(u64);
406
407impl EpochWithGap {
408    #[allow(unused_variables)]
409    pub fn new(epoch: u64, spill_offset: u16) -> Self {
410        // We only use 48 high bit to store epoch and use 16 low bit to store spill offset. But for MAX epoch,
411        // we still keep `u64::MAX` because we have use it in delete range and persist this value to sstable files.
412        //  So for compatibility, we must skip checking it for u64::MAX. See bug description in https://github.com/risingwavelabs/risingwave/issues/13717
413        if risingwave_common::util::epoch::is_max_epoch(epoch) {
414            EpochWithGap::new_max_epoch()
415        } else {
416            debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0);
417            EpochWithGap(epoch + spill_offset as u64)
418        }
419    }
420
421    pub fn new_from_epoch(epoch: u64) -> Self {
422        EpochWithGap::new(epoch, 0)
423    }
424
425    pub fn new_min_epoch() -> Self {
426        EpochWithGap(0)
427    }
428
429    pub fn new_max_epoch() -> Self {
430        EpochWithGap(HummockEpoch::MAX)
431    }
432
433    // return the epoch_with_gap(epoch + spill_offset)
434    pub(crate) fn as_u64(&self) -> HummockEpoch {
435        self.0
436    }
437
438    // return the epoch_with_gap(epoch + spill_offset)
439    pub fn from_u64(epoch_with_gap: u64) -> Self {
440        EpochWithGap(epoch_with_gap)
441    }
442
443    // return the pure epoch without spill offset
444    pub fn pure_epoch(&self) -> HummockEpoch {
445        self.0 & !EPOCH_SPILL_TIME_MASK
446    }
447
448    pub fn offset(&self) -> u64 {
449        self.0 & EPOCH_SPILL_TIME_MASK
450    }
451}
452
453pub fn get_object_data_path(
454    obj_prefix: &str,
455    path_prefix: &str,
456    object_id: HummockObjectId,
457) -> String {
458    let suffix = object_id.suffix();
459    let object_id = object_id.as_raw();
460
461    let mut path = String::with_capacity(
462        path_prefix.len()
463            + "/".len()
464            + obj_prefix.len()
465            + HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH
466            + ".".len()
467            + suffix.len(),
468    );
469    path.push_str(path_prefix);
470    path.push('/');
471    path.push_str(obj_prefix);
472    path.push_str(&object_id.to_string());
473    path.push('.');
474    path.push_str(suffix);
475    path
476}
477
478pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
479    use itertools::Itertools;
480    let split = path.split(&['/', '.']).collect_vec();
481    assert!(split.len() > 2);
482    let suffix = split[split.len() - 1];
483    let id = split[split.len() - 2]
484        .parse::<u64>()
485        .expect("valid object id");
486    HummockObjectId::new(id, suffix)
487        .unwrap_or_else(|| panic!("unknown object id suffix {}", suffix))
488}
489
490#[cfg(test)]
491mod tests {
492    use bytes::Bytes;
493    use sstable_info::SstableInfoInner;
494
495    use super::*;
496
497    #[test]
498    fn test_object_id_decimal_max_length() {
499        let len = u64::MAX.to_string().len();
500        assert_eq!(len, HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH)
501    }
502
503    #[test]
504    fn test_full_key_concat() {
505        let key1 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l'\xe2\0\0";
506        let key2 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l \x12\0\0";
507
508        let sst_1 = SstableInfoInner {
509            key_range: key_range::KeyRange {
510                left: Bytes::from(key1.to_vec()),
511                right: Bytes::from(key1.to_vec()),
512                right_exclusive: false,
513            },
514            ..Default::default()
515        };
516
517        let sst_2 = SstableInfoInner {
518            key_range: key_range::KeyRange {
519                left: Bytes::from(key2.to_vec()),
520                right: Bytes::from(key2.to_vec()),
521                right_exclusive: false,
522            },
523            ..Default::default()
524        };
525
526        let sst_3 = SstableInfoInner {
527            key_range: key_range::KeyRange {
528                left: Bytes::from(key1.to_vec()),
529                right: Bytes::from(key2.to_vec()),
530                right_exclusive: false,
531            },
532            ..Default::default()
533        };
534
535        assert!(full_key_can_concat(&[sst_1.clone().into(), sst_2.into()]));
536
537        assert!(!full_key_can_concat(&[sst_1.into(), sst_3.into()]));
538    }
539}