risingwave_hummock_sdk/
lib.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17#![feature(btree_cursors)]
18#![feature(map_try_insert)]
19#![feature(stmt_expr_attributes)]
20
21mod key_cmp;
22
23use std::borrow::Borrow;
24use std::cmp::Ordering;
25use std::collections::HashMap;
26
27pub use key_cmp::*;
28use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
29use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
30use sstable_info::SstableInfo;
31
32use crate::key_range::KeyRangeCommon;
33use crate::table_stats::TableStatsMap;
34
35pub mod change_log;
36pub mod compact;
37pub mod compact_task;
38pub mod compaction_group;
39pub mod filter_utils;
40pub mod key;
41pub mod key_range;
42pub mod level;
43pub mod prost_key_range;
44pub mod sstable_info;
45pub mod state_table_info;
46pub mod table_stats;
47pub mod table_watermark;
48pub mod time_travel;
49pub mod version;
50pub use frontend_version::{FrontendHummockVersion, FrontendHummockVersionDelta};
51mod frontend_version;
52pub mod vector_index;
53
54pub use compact::*;
55use risingwave_common::catalog::TableId;
56use risingwave_pb::hummock::hummock_version_checkpoint::PbStaleObjects;
57use risingwave_pb::hummock::{PbVectorIndexObjectType, VectorIndexObjectType};
58pub use risingwave_pb::id::{
59    CompactionGroupId, HummockHnswGraphFileId, HummockRawObjectId, HummockSstableId,
60    HummockSstableObjectId, HummockVectorFileId, HummockVersionId,
61};
62
63use crate::table_watermark::TableWatermarks;
64use crate::vector_index::VectorIndexAdd;
65
66pub type HummockRefCount = u64;
67pub type HummockContextId = risingwave_common::id::WorkerId;
68pub type HummockEpoch = u64;
69pub type HummockCompactionTaskId = u64;
70
71pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId::new(0);
72pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId::new(1);
73pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56;
74pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56;
75pub const SST_OBJECT_SUFFIX: &str = "data";
76pub const VECTOR_FILE_OBJECT_SUFFIX: &str = "vector";
77pub const HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH: usize = 20;
78
79macro_rules! for_all_object_suffix {
80    ($({$name:ident, $type_name:ty, $suffix:expr},)+) => {
81        #[derive(Eq, PartialEq, Debug, Hash, Clone, Copy)]
82        pub enum HummockObjectId {
83            $(
84                $name($type_name),
85            )+
86        }
87
88        pub const VALID_OBJECT_ID_SUFFIXES: [&str; 3] = [$(
89                $suffix
90            ),+];
91
92        impl HummockObjectId {
93            fn new(id: u64, suffix: &str) -> Option<Self> {
94                match suffix {
95                    $(
96                        suffix if suffix == $suffix => Some(HummockObjectId::$name(<$type_name>::new(id))),
97                    )+
98                    _ => None,
99                }
100            }
101
102            pub fn suffix(&self) -> &str {
103                match self {
104                    $(
105                        HummockObjectId::$name(_) => $suffix,
106                    )+
107                }
108            }
109
110            pub fn as_raw(&self) -> HummockRawObjectId {
111                let raw = match self {
112                    $(
113                        HummockObjectId::$name(id) => id.as_raw_id(),
114                    )+
115                };
116                HummockRawObjectId::new(raw)
117            }
118        }
119
120        pub fn try_get_object_id_from_path(path: &str) -> Option<HummockObjectId> {
121            let split: Vec<_> = path.split(&['/', '.']).collect();
122            if split.len() <= 2 {
123                return None;
124            }
125            let suffix = split[split.len() - 1];
126            let id_str = split[split.len() - 2];
127            match suffix {
128                $(
129                    suffix if suffix == $suffix => {
130                        let id = id_str
131                            .parse::<u64>()
132                            .unwrap_or_else(|_| panic!("expect valid object id, got {}", id_str));
133                        Some(HummockObjectId::$name(<$type_name>::new(id)))
134                    },
135                )+
136                _ => None,
137            }
138        }
139    };
140    () => {
141        for_all_object_suffix! {
142            {Sstable, HummockSstableObjectId, SST_OBJECT_SUFFIX},
143            {VectorFile, HummockVectorFileId, VECTOR_FILE_OBJECT_SUFFIX},
144            {HnswGraphFile, HummockHnswGraphFileId, "hnsw_graph"},
145        }
146    };
147}
148
149for_all_object_suffix!();
150
151pub fn get_stale_object_ids(
152    stale_objects: &PbStaleObjects,
153) -> impl Iterator<Item = HummockObjectId> + '_ {
154    // DO NOT REMOVE THIS LINE
155    // This is to ensure that when adding new variant to `HummockObjectId`,
156    // the compiler will warn us if we forget to handle it here.
157    match HummockObjectId::Sstable(0.into()) {
158        HummockObjectId::Sstable(_) => {}
159        HummockObjectId::VectorFile(_) => {}
160        HummockObjectId::HnswGraphFile(_) => {}
161    };
162    stale_objects
163        .id
164        .iter()
165        .map(|sst_id| HummockObjectId::Sstable(*sst_id))
166        .chain(stale_objects.vector_files.iter().map(
167            |file| match file.get_object_type().unwrap() {
168                PbVectorIndexObjectType::VectorIndexObjectUnspecified => {
169                    unreachable!()
170                }
171                VectorIndexObjectType::VectorIndexObjectVector => {
172                    HummockObjectId::VectorFile(file.id.into())
173                }
174                VectorIndexObjectType::VectorIndexObjectHnswGraph => {
175                    HummockObjectId::HnswGraphFile(file.id.into())
176                }
177            },
178        ))
179}
180
181#[macro_export]
182/// This is wrapper for `info` log.
183///
184/// In our CI tests, we frequently create and drop tables, and checkpoint in all barriers, which may
185/// cause many events. However, these events are not expected to be frequent in production usage, so
186/// we print an info log for every these events. But these events are frequent in CI, and produce
187/// many logs in CI, and we may want to downgrade the log level of these event log to debug.
188/// Therefore, we provide this macro to wrap the `info` log, which will produce `info` log when
189/// `debug_assertions` is not enabled, and `debug` log when `debug_assertions` is enabled.
190macro_rules! info_in_release {
191    ($($arg:tt)*) => {
192        {
193            #[cfg(debug_assertions)]
194            {
195                use tracing::debug;
196                debug!($($arg)*);
197            }
198            #[cfg(not(debug_assertions))]
199            {
200                use tracing::info;
201                info!($($arg)*);
202            }
203        }
204    }
205}
206
207#[derive(Default, Debug)]
208pub struct SyncResult {
209    /// The size of all synced shared buffers.
210    pub sync_size: usize,
211    /// The `sst_info` of sync.
212    pub uncommitted_ssts: Vec<LocalSstableInfo>,
213    /// The collected table watermarks written by state tables.
214    pub table_watermarks: HashMap<TableId, TableWatermarks>,
215    /// Sstable that holds the uncommitted old value
216    pub old_value_ssts: Vec<LocalSstableInfo>,
217    pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
218}
219
220#[derive(Debug, Clone)]
221pub struct LocalSstableInfo {
222    pub sst_info: SstableInfo,
223    pub table_stats: TableStatsMap,
224    pub created_at: u64,
225}
226
227impl LocalSstableInfo {
228    pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
229        Self {
230            sst_info,
231            table_stats,
232            created_at,
233        }
234    }
235
236    pub fn for_test(sst_info: SstableInfo) -> Self {
237        Self {
238            sst_info,
239            table_stats: Default::default(),
240            created_at: u64::MAX,
241        }
242    }
243
244    pub fn file_size(&self) -> u64 {
245        assert_eq!(self.sst_info.file_size, self.sst_info.sst_size);
246        self.sst_info.file_size
247    }
248}
249
250impl PartialEq for LocalSstableInfo {
251    fn eq(&self, other: &Self) -> bool {
252        self.sst_info == other.sst_info
253    }
254}
255
256/// Package read epoch of hummock, it be used for `wait_epoch`
257#[derive(Debug, Clone, Copy)]
258pub enum HummockReadEpoch {
259    /// We need to wait the `committed_epoch` of the read table
260    Committed(HummockEpoch),
261    /// We need to wait the `committed_epoch` of the read table and also the hummock version to the version id
262    BatchQueryCommitted(HummockEpoch, HummockVersionId),
263    /// We don't need to wait epoch, we usually do stream reading with it.
264    NoWait(HummockEpoch),
265    /// We don't need to wait epoch.
266    Backup(HummockEpoch),
267    TimeTravel(HummockEpoch),
268}
269
270impl From<BatchQueryEpoch> for HummockReadEpoch {
271    fn from(e: BatchQueryEpoch) -> Self {
272        match e.epoch.unwrap() {
273            batch_query_epoch::Epoch::Committed(epoch) => {
274                HummockReadEpoch::BatchQueryCommitted(epoch.epoch, epoch.hummock_version_id)
275            }
276            batch_query_epoch::Epoch::Current(epoch) => HummockReadEpoch::NoWait(epoch),
277            batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch),
278            batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch),
279        }
280    }
281}
282
283pub fn test_batch_query_epoch() -> BatchQueryEpoch {
284    BatchQueryEpoch {
285        epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
286    }
287}
288
289impl HummockReadEpoch {
290    pub fn get_epoch(&self) -> HummockEpoch {
291        *match self {
292            HummockReadEpoch::Committed(epoch)
293            | HummockReadEpoch::BatchQueryCommitted(epoch, _)
294            | HummockReadEpoch::NoWait(epoch)
295            | HummockReadEpoch::Backup(epoch)
296            | HummockReadEpoch::TimeTravel(epoch) => epoch,
297        }
298    }
299
300    pub fn is_read_committed(&self) -> bool {
301        match self {
302            HummockReadEpoch::Committed(_)
303            | HummockReadEpoch::TimeTravel(_)
304            | HummockReadEpoch::BatchQueryCommitted(_, _) => true,
305            HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => false,
306        }
307    }
308}
309pub struct ObjectIdRange {
310    // inclusive
311    pub start_id: HummockRawObjectId,
312    // exclusive
313    pub end_id: HummockRawObjectId,
314}
315
316impl ObjectIdRange {
317    pub fn new(
318        start_id: impl Into<HummockRawObjectId>,
319        end_id: impl Into<HummockRawObjectId>,
320    ) -> Self {
321        Self {
322            start_id: start_id.into(),
323            end_id: end_id.into(),
324        }
325    }
326
327    fn peek_next_object_id(&self) -> Option<HummockRawObjectId> {
328        if self.start_id < self.end_id {
329            return Some(self.start_id);
330        }
331        None
332    }
333
334    /// Pops and returns next SST id.
335    pub fn get_next_object_id(&mut self) -> Option<HummockRawObjectId> {
336        let next_id = self.peek_next_object_id();
337        self.start_id += 1;
338        next_id
339    }
340}
341
342pub fn can_concat(ssts: &[impl Borrow<SstableInfo>]) -> bool {
343    let len = ssts.len();
344    for i in 1..len {
345        if ssts[i - 1]
346            .borrow()
347            .key_range
348            .compare_right_with(&ssts[i].borrow().key_range.left)
349            != Ordering::Less
350        {
351            return false;
352        }
353    }
354    true
355}
356
357pub fn full_key_can_concat(ssts: &[SstableInfo]) -> bool {
358    let len = ssts.len();
359    for i in 1..len {
360        let sst_1 = &ssts[i - 1];
361        let sst_2 = &ssts[i];
362
363        if sst_1.key_range.right_exclusive {
364            if KeyComparator::compare_encoded_full_key(
365                &sst_1.key_range.right,
366                &sst_2.key_range.left,
367            )
368            .is_gt()
369            {
370                return false;
371            }
372        } else if KeyComparator::compare_encoded_full_key(
373            &sst_1.key_range.right,
374            &sst_2.key_range.left,
375        )
376        .is_ge()
377        {
378            return false;
379        }
380    }
381    true
382}
383
384const CHECKPOINT_DIR: &str = "checkpoint";
385const CHECKPOINT_NAME: &str = "0";
386const ARCHIVE_DIR: &str = "archive";
387
388pub fn version_checkpoint_path(root_dir: &str) -> String {
389    format!("{}/{}/{}", root_dir, CHECKPOINT_DIR, CHECKPOINT_NAME)
390}
391
392pub fn version_archive_dir(root_dir: &str) -> String {
393    format!("{}/{}", root_dir, ARCHIVE_DIR)
394}
395
396pub fn version_checkpoint_dir(checkpoint_path: &str) -> String {
397    checkpoint_path.trim_end_matches(|c| c != '/').to_owned()
398}
399
400/// Represents an epoch with a gap.
401///
402/// When a spill of the mem table occurs between two epochs, `EpochWithGap` generates an offset.
403/// This offset is encoded when performing full key encoding. When returning to the upper-level
404/// interface, a pure epoch with the lower 16 bits set to 0 should be returned.
405#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)]
406pub struct EpochWithGap(u64);
407
408impl EpochWithGap {
409    #[allow(unused_variables)]
410    pub fn new(epoch: u64, spill_offset: u16) -> Self {
411        // We only use 48 high bit to store epoch and use 16 low bit to store spill offset. But for MAX epoch,
412        // we still keep `u64::MAX` because we have use it in delete range and persist this value to sstable files.
413        //  So for compatibility, we must skip checking it for u64::MAX. See bug description in https://github.com/risingwavelabs/risingwave/issues/13717
414        if risingwave_common::util::epoch::is_max_epoch(epoch) {
415            EpochWithGap::new_max_epoch()
416        } else {
417            debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0);
418            EpochWithGap(epoch + spill_offset as u64)
419        }
420    }
421
422    pub fn new_from_epoch(epoch: u64) -> Self {
423        EpochWithGap::new(epoch, 0)
424    }
425
426    pub fn new_min_epoch() -> Self {
427        EpochWithGap(0)
428    }
429
430    pub fn new_max_epoch() -> Self {
431        EpochWithGap(HummockEpoch::MAX)
432    }
433
434    // return the epoch_with_gap(epoch + spill_offset)
435    pub(crate) fn as_u64(&self) -> HummockEpoch {
436        self.0
437    }
438
439    // return the epoch_with_gap(epoch + spill_offset)
440    pub fn from_u64(epoch_with_gap: u64) -> Self {
441        EpochWithGap(epoch_with_gap)
442    }
443
444    // return the pure epoch without spill offset
445    pub fn pure_epoch(&self) -> HummockEpoch {
446        self.0 & !EPOCH_SPILL_TIME_MASK
447    }
448
449    pub fn offset(&self) -> u64 {
450        self.0 & EPOCH_SPILL_TIME_MASK
451    }
452}
453
454pub fn get_object_data_path(
455    obj_prefix: &str,
456    path_prefix: &str,
457    object_id: HummockObjectId,
458) -> String {
459    let suffix = object_id.suffix();
460    let object_id = object_id.as_raw();
461
462    let mut path = String::with_capacity(
463        path_prefix.len()
464            + "/".len()
465            + obj_prefix.len()
466            + HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH
467            + ".".len()
468            + suffix.len(),
469    );
470    path.push_str(path_prefix);
471    path.push('/');
472    path.push_str(obj_prefix);
473    path.push_str(&object_id.to_string());
474    path.push('.');
475    path.push_str(suffix);
476    path
477}
478
479pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
480    use itertools::Itertools;
481    let split = path.split(&['/', '.']).collect_vec();
482    assert!(split.len() > 2);
483    let suffix = split[split.len() - 1];
484    let id = split[split.len() - 2]
485        .parse::<u64>()
486        .expect("valid object id");
487    HummockObjectId::new(id, suffix)
488        .unwrap_or_else(|| panic!("unknown object id suffix {}", suffix))
489}
490
491#[cfg(test)]
492mod tests {
493    use bytes::Bytes;
494    use sstable_info::SstableInfoInner;
495
496    use super::*;
497
498    #[test]
499    fn test_object_id_decimal_max_length() {
500        let len = u64::MAX.to_string().len();
501        assert_eq!(len, HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH)
502    }
503
504    #[test]
505    fn test_full_key_concat() {
506        let key1 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l'\xe2\0\0";
507        let key2 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l \x12\0\0";
508
509        let sst_1 = SstableInfoInner {
510            key_range: key_range::KeyRange {
511                left: Bytes::from(key1.to_vec()),
512                right: Bytes::from(key1.to_vec()),
513                right_exclusive: false,
514            },
515            ..Default::default()
516        };
517
518        let sst_2 = SstableInfoInner {
519            key_range: key_range::KeyRange {
520                left: Bytes::from(key2.to_vec()),
521                right: Bytes::from(key2.to_vec()),
522                right_exclusive: false,
523            },
524            ..Default::default()
525        };
526
527        let sst_3 = SstableInfoInner {
528            key_range: key_range::KeyRange {
529                left: Bytes::from(key1.to_vec()),
530                right: Bytes::from(key2.to_vec()),
531                right_exclusive: false,
532            },
533            ..Default::default()
534        };
535
536        assert!(full_key_can_concat(&[sst_1.clone().into(), sst_2.into()]));
537
538        assert!(!full_key_can_concat(&[sst_1.into(), sst_3.into()]));
539    }
540}