1#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17#![feature(let_chains)]
18#![feature(btree_cursors)]
19#![feature(strict_overflow_ops)]
20#![feature(map_try_insert)]
21
22mod key_cmp;
23
24use std::borrow::Borrow;
25use std::cmp::Ordering;
26use std::collections::HashMap;
27use std::fmt::{Display, Formatter};
28use std::ops::{Add, AddAssign, Sub};
29use std::str::FromStr;
30
31pub use key_cmp::*;
32use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
33use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
34use serde::{Deserialize, Deserializer, Serialize, Serializer};
35use sstable_info::SstableInfo;
36use tracing::warn;
37
38use crate::key_range::KeyRangeCommon;
39use crate::table_stats::TableStatsMap;
40
41pub mod change_log;
42pub mod compact;
43pub mod compact_task;
44pub mod compaction_group;
45pub mod key;
46pub mod key_range;
47pub mod level;
48pub mod prost_key_range;
49pub mod sstable_info;
50pub mod state_table_info;
51pub mod table_stats;
52pub mod table_watermark;
53pub mod time_travel;
54pub mod version;
55pub use frontend_version::{FrontendHummockVersion, FrontendHummockVersionDelta};
56mod frontend_version;
57pub mod vector_index;
58
59pub use compact::*;
60use risingwave_common::catalog::TableId;
61use risingwave_pb::hummock::hummock_version_checkpoint::PbStaleObjects;
62use risingwave_pb::hummock::{PbVectorIndexObjectType, VectorIndexObjectType};
63
64use crate::table_watermark::TableWatermarks;
65use crate::vector_index::VectorIndexAdd;
66
67#[derive(Debug, Eq, PartialEq, Clone, Copy, Hash, Ord, PartialOrd)]
68#[cfg_attr(any(test, feature = "test"), derive(Default))]
69pub struct TypedPrimitive<const C: usize, P>(P);
70
71impl<const C: usize, P: PartialEq> PartialEq<P> for TypedPrimitive<C, P> {
72 fn eq(&self, other: &P) -> bool {
73 self.0 == *other
74 }
75}
76
77macro_rules! impl_primitive {
78 ($($t:ty)*) => {$(
79 impl<const C: usize> PartialEq<TypedPrimitive<C, $t>> for $t {
80 fn eq(&self, other: &TypedPrimitive<C, $t>) -> bool {
81 *self == other.0
82 }
83 }
84 )*}
85}
86
87impl_primitive!(u64);
88
89impl<const C: usize, P: FromStr> FromStr for TypedPrimitive<C, P> {
90 type Err = P::Err;
91
92 fn from_str(s: &str) -> Result<Self, Self::Err> {
93 P::from_str(s).map(TypedPrimitive)
94 }
95}
96
97impl<const C: usize, P> Borrow<P> for TypedPrimitive<C, P> {
98 fn borrow(&self) -> &P {
99 &self.0
100 }
101}
102
103impl<const C: usize, P: Add<Output = P>> Add<P> for TypedPrimitive<C, P> {
104 type Output = Self;
105
106 fn add(self, rhs: P) -> Self::Output {
107 Self(self.0 + rhs)
108 }
109}
110
111impl<const C: usize, P: AddAssign> AddAssign<P> for TypedPrimitive<C, P> {
112 fn add_assign(&mut self, rhs: P) {
113 self.0 += rhs;
114 }
115}
116
117impl<const C: usize, P: Display> Display for TypedPrimitive<C, P> {
118 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
119 write!(f, "{}", self.0)
120 }
121}
122
123impl<const C: usize, P> From<P> for TypedPrimitive<C, P> {
124 fn from(value: P) -> Self {
125 Self(value)
126 }
127}
128
129impl<const C: usize, P: Serialize> Serialize for TypedPrimitive<C, P> {
130 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
131 where
132 S: Serializer,
133 {
134 self.0.serialize(serializer)
135 }
136}
137
138impl<'de, const C: usize, P: Deserialize<'de>> Deserialize<'de> for TypedPrimitive<C, P> {
139 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
140 where
141 D: Deserializer<'de>,
142 {
143 Ok(Self(<P as Deserialize>::deserialize(deserializer)?))
144 }
145}
146
147impl<const C: usize, P> TypedPrimitive<C, P> {
148 pub const fn new(id: P) -> Self {
149 Self(id)
150 }
151
152 pub fn inner(self) -> P {
153 self.0
154 }
155}
156
157pub type HummockRawObjectId = TypedPrimitive<0, u64>;
158pub type HummockSstableObjectId = TypedPrimitive<1, u64>;
159pub type HummockSstableId = TypedPrimitive<2, u64>;
160pub type HummockVectorFileId = TypedPrimitive<3, u64>;
161pub type HummockHnswGraphFileId = TypedPrimitive<4, u64>;
162
163macro_rules! impl_object_id {
164 ($type_name:ty) => {
165 impl $type_name {
166 pub fn as_raw(&self) -> HummockRawObjectId {
167 HummockRawObjectId::new(self.0)
168 }
169 }
170
171 impl From<HummockRawObjectId> for $type_name {
172 fn from(id: HummockRawObjectId) -> Self {
173 Self(id.0)
174 }
175 }
176 };
177}
178
179impl_object_id!(HummockSstableObjectId);
180impl_object_id!(HummockVectorFileId);
181impl_object_id!(HummockHnswGraphFileId);
182
183pub type HummockRefCount = u64;
184pub type HummockContextId = u32;
185pub type HummockEpoch = u64;
186pub type HummockCompactionTaskId = u64;
187pub type CompactionGroupId = u64;
188
189#[derive(Debug, Clone, PartialEq, Copy, Ord, PartialOrd, Eq, Hash)]
190pub struct HummockVersionId(u64);
191
192impl Display for HummockVersionId {
193 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
194 write!(f, "{}", self.0)
195 }
196}
197
198impl Serialize for HummockVersionId {
199 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
200 where
201 S: Serializer,
202 {
203 serializer.serialize_u64(self.0)
204 }
205}
206
207impl<'de> Deserialize<'de> for HummockVersionId {
208 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
209 where
210 D: Deserializer<'de>,
211 {
212 Ok(Self(<u64 as Deserialize>::deserialize(deserializer)?))
213 }
214}
215
216impl HummockVersionId {
217 pub const MAX: Self = Self(i64::MAX as _);
218
219 pub const fn new(id: u64) -> Self {
220 Self(id)
221 }
222
223 pub fn next(&self) -> Self {
224 Self(self.0 + 1)
225 }
226
227 pub fn to_u64(self) -> u64 {
228 self.0
229 }
230}
231
232impl Add<u64> for HummockVersionId {
233 type Output = Self;
234
235 fn add(self, rhs: u64) -> Self::Output {
236 Self(self.0 + rhs)
237 }
238}
239
240impl Sub for HummockVersionId {
241 type Output = u64;
242
243 fn sub(self, rhs: Self) -> Self::Output {
244 self.0 - rhs.0
245 }
246}
247
248pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId(0);
249pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId(1);
250pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56;
251pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56;
252pub const SST_OBJECT_SUFFIX: &str = "data";
253pub const VECTOR_FILE_OBJECT_SUFFIX: &str = "vector";
254pub const HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH: usize = 20;
255
256macro_rules! for_all_object_suffix {
257 ($({$name:ident, $type_name:ty, $suffix:expr},)+) => {
258 #[derive(Eq, PartialEq, Debug, Hash, Clone, Copy)]
259 pub enum HummockObjectId {
260 $(
261 $name($type_name),
262 )+
263 }
264
265 pub const VALID_OBJECT_ID_SUFFIXES: [&str; 3] = [$(
266 $suffix
267 ),+];
268
269 impl HummockObjectId {
270 fn new(id: u64, suffix: &str) -> Option<Self> {
271 match suffix {
272 $(
273 suffix if suffix == $suffix => Some(HummockObjectId::$name(<$type_name>::new(id))),
274 )+
275 _ => None,
276 }
277 }
278
279 pub fn suffix(&self) -> &str {
280 match self {
281 $(
282 HummockObjectId::$name(_) => $suffix,
283 )+
284 }
285 }
286
287 pub fn as_raw(&self) -> HummockRawObjectId {
288 let raw = match self {
289 $(
290 HummockObjectId::$name(id) => id.0,
291 )+
292 };
293 HummockRawObjectId::new(raw)
294 }
295 }
296
297 pub fn try_get_object_id_from_path(path: &str) -> Option<HummockObjectId> {
298 let split: Vec<_> = path.split(&['/', '.']).collect();
299 if split.len() <= 2 {
300 return None;
301 }
302 let suffix = split[split.len() - 1];
303 let id_str = split[split.len() - 2];
304 match suffix {
305 $(
306 suffix if suffix == $suffix => {
307 let id = id_str
308 .parse::<u64>()
309 .unwrap_or_else(|_| panic!("expect valid object id, got {}", id_str));
310 Some(HummockObjectId::$name(<$type_name>::new(id)))
311 },
312 )+
313 _ => None,
314 }
315 }
316 };
317 () => {
318 for_all_object_suffix! {
319 {Sstable, HummockSstableObjectId, SST_OBJECT_SUFFIX},
320 {VectorFile, HummockVectorFileId, VECTOR_FILE_OBJECT_SUFFIX},
321 {HnswGraphFile, HummockHnswGraphFileId, "hnsw_graph"},
322 }
323 };
324}
325
326for_all_object_suffix!();
327
328pub fn get_stale_object_ids(
329 stale_objects: &PbStaleObjects,
330) -> impl Iterator<Item = HummockObjectId> + '_ {
331 match HummockObjectId::Sstable(0.into()) {
335 HummockObjectId::Sstable(_) => {}
336 HummockObjectId::VectorFile(_) => {}
337 HummockObjectId::HnswGraphFile(_) => {}
338 };
339 stale_objects
340 .id
341 .iter()
342 .map(|sst_id| HummockObjectId::Sstable((*sst_id).into()))
343 .chain(stale_objects.vector_files.iter().map(
344 |file| match file.get_object_type().unwrap() {
345 PbVectorIndexObjectType::VectorIndexObjectUnspecified => {
346 unreachable!()
347 }
348 VectorIndexObjectType::VectorIndexObjectVector => {
349 HummockObjectId::VectorFile(file.id.into())
350 }
351 VectorIndexObjectType::VectorIndexObjectHnswGraph => {
352 HummockObjectId::HnswGraphFile(file.id.into())
353 }
354 },
355 ))
356}
357
358#[macro_export]
359macro_rules! info_in_release {
368 ($($arg:tt)*) => {
369 {
370 #[cfg(debug_assertions)]
371 {
372 use tracing::debug;
373 debug!($($arg)*);
374 }
375 #[cfg(not(debug_assertions))]
376 {
377 use tracing::info;
378 info!($($arg)*);
379 }
380 }
381 }
382}
383
384#[derive(Default, Debug)]
385pub struct SyncResult {
386 pub sync_size: usize,
388 pub uncommitted_ssts: Vec<LocalSstableInfo>,
390 pub table_watermarks: HashMap<TableId, TableWatermarks>,
392 pub old_value_ssts: Vec<LocalSstableInfo>,
394 pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
395}
396
397#[derive(Debug, Clone)]
398pub struct LocalSstableInfo {
399 pub sst_info: SstableInfo,
400 pub table_stats: TableStatsMap,
401 pub created_at: u64,
402}
403
404impl LocalSstableInfo {
405 pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
406 Self {
407 sst_info,
408 table_stats,
409 created_at,
410 }
411 }
412
413 pub fn for_test(sst_info: SstableInfo) -> Self {
414 Self {
415 sst_info,
416 table_stats: Default::default(),
417 created_at: u64::MAX,
418 }
419 }
420
421 pub fn file_size(&self) -> u64 {
422 assert_eq!(self.sst_info.file_size, self.sst_info.sst_size);
423 self.sst_info.file_size
424 }
425}
426
427impl PartialEq for LocalSstableInfo {
428 fn eq(&self, other: &Self) -> bool {
429 self.sst_info == other.sst_info
430 }
431}
432
433#[derive(Debug, Clone, Copy)]
435pub enum HummockReadEpoch {
436 Committed(HummockEpoch),
438 BatchQueryCommitted(HummockEpoch, HummockVersionId),
440 NoWait(HummockEpoch),
442 Backup(HummockEpoch),
444 TimeTravel(HummockEpoch),
445}
446
447impl From<BatchQueryEpoch> for HummockReadEpoch {
448 fn from(e: BatchQueryEpoch) -> Self {
449 match e.epoch.unwrap() {
450 batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::BatchQueryCommitted(
451 epoch.epoch,
452 HummockVersionId::new(epoch.hummock_version_id),
453 ),
454 batch_query_epoch::Epoch::Current(epoch) => {
455 if epoch != HummockEpoch::MAX {
456 warn!(
457 epoch,
458 "ignore specified current epoch and set it to u64::MAX"
459 );
460 }
461 HummockReadEpoch::NoWait(HummockEpoch::MAX)
462 }
463 batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch),
464 batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch),
465 }
466 }
467}
468
469pub fn test_batch_query_epoch() -> BatchQueryEpoch {
470 BatchQueryEpoch {
471 epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
472 }
473}
474
475impl HummockReadEpoch {
476 pub fn get_epoch(&self) -> HummockEpoch {
477 *match self {
478 HummockReadEpoch::Committed(epoch)
479 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
480 | HummockReadEpoch::NoWait(epoch)
481 | HummockReadEpoch::Backup(epoch)
482 | HummockReadEpoch::TimeTravel(epoch) => epoch,
483 }
484 }
485
486 pub fn is_read_committed(&self) -> bool {
487 match self {
488 HummockReadEpoch::Committed(_)
489 | HummockReadEpoch::TimeTravel(_)
490 | HummockReadEpoch::BatchQueryCommitted(_, _) => true,
491 HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => false,
492 }
493 }
494}
495pub struct ObjectIdRange {
496 pub start_id: HummockRawObjectId,
498 pub end_id: HummockRawObjectId,
500}
501
502impl ObjectIdRange {
503 pub fn new(
504 start_id: impl Into<HummockRawObjectId>,
505 end_id: impl Into<HummockRawObjectId>,
506 ) -> Self {
507 Self {
508 start_id: start_id.into(),
509 end_id: end_id.into(),
510 }
511 }
512
513 fn peek_next_object_id(&self) -> Option<HummockRawObjectId> {
514 if self.start_id < self.end_id {
515 return Some(self.start_id);
516 }
517 None
518 }
519
520 pub fn get_next_object_id(&mut self) -> Option<HummockRawObjectId> {
522 let next_id = self.peek_next_object_id();
523 self.start_id += 1;
524 next_id
525 }
526}
527
528pub fn can_concat(ssts: &[impl Borrow<SstableInfo>]) -> bool {
529 let len = ssts.len();
530 for i in 1..len {
531 if ssts[i - 1]
532 .borrow()
533 .key_range
534 .compare_right_with(&ssts[i].borrow().key_range.left)
535 != Ordering::Less
536 {
537 return false;
538 }
539 }
540 true
541}
542
543pub fn full_key_can_concat(ssts: &[SstableInfo]) -> bool {
544 let len = ssts.len();
545 for i in 1..len {
546 let sst_1 = &ssts[i - 1];
547 let sst_2 = &ssts[i];
548
549 if sst_1.key_range.right_exclusive {
550 if KeyComparator::compare_encoded_full_key(
551 &sst_1.key_range.right,
552 &sst_2.key_range.left,
553 )
554 .is_gt()
555 {
556 return false;
557 }
558 } else if KeyComparator::compare_encoded_full_key(
559 &sst_1.key_range.right,
560 &sst_2.key_range.left,
561 )
562 .is_ge()
563 {
564 return false;
565 }
566 }
567 true
568}
569
570const CHECKPOINT_DIR: &str = "checkpoint";
571const CHECKPOINT_NAME: &str = "0";
572const ARCHIVE_DIR: &str = "archive";
573
574pub fn version_checkpoint_path(root_dir: &str) -> String {
575 format!("{}/{}/{}", root_dir, CHECKPOINT_DIR, CHECKPOINT_NAME)
576}
577
578pub fn version_archive_dir(root_dir: &str) -> String {
579 format!("{}/{}", root_dir, ARCHIVE_DIR)
580}
581
582pub fn version_checkpoint_dir(checkpoint_path: &str) -> String {
583 checkpoint_path.trim_end_matches(|c| c != '/').to_owned()
584}
585
586#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)]
592pub struct EpochWithGap(u64);
593
594impl EpochWithGap {
595 #[allow(unused_variables)]
596 pub fn new(epoch: u64, spill_offset: u16) -> Self {
597 if risingwave_common::util::epoch::is_max_epoch(epoch) {
601 EpochWithGap::new_max_epoch()
602 } else {
603 debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0);
604 EpochWithGap(epoch + spill_offset as u64)
605 }
606 }
607
608 pub fn new_from_epoch(epoch: u64) -> Self {
609 EpochWithGap::new(epoch, 0)
610 }
611
612 pub fn new_min_epoch() -> Self {
613 EpochWithGap(0)
614 }
615
616 pub fn new_max_epoch() -> Self {
617 EpochWithGap(HummockEpoch::MAX)
618 }
619
620 pub(crate) fn as_u64(&self) -> HummockEpoch {
622 self.0
623 }
624
625 pub fn from_u64(epoch_with_gap: u64) -> Self {
627 EpochWithGap(epoch_with_gap)
628 }
629
630 pub fn pure_epoch(&self) -> HummockEpoch {
632 self.0 & !EPOCH_SPILL_TIME_MASK
633 }
634
635 pub fn offset(&self) -> u64 {
636 self.0 & EPOCH_SPILL_TIME_MASK
637 }
638}
639
640pub fn get_object_data_path(
641 obj_prefix: &str,
642 path_prefix: &str,
643 object_id: HummockObjectId,
644) -> String {
645 let suffix = object_id.suffix();
646 let object_id = object_id.as_raw();
647
648 let mut path = String::with_capacity(
649 path_prefix.len()
650 + "/".len()
651 + obj_prefix.len()
652 + HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH
653 + ".".len()
654 + suffix.len(),
655 );
656 path.push_str(path_prefix);
657 path.push('/');
658 path.push_str(obj_prefix);
659 path.push_str(&object_id.to_string());
660 path.push('.');
661 path.push_str(suffix);
662 path
663}
664
665pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
666 use itertools::Itertools;
667 let split = path.split(&['/', '.']).collect_vec();
668 assert!(split.len() > 2);
669 let suffix = split[split.len() - 1];
670 let id = split[split.len() - 2]
671 .parse::<u64>()
672 .expect("valid object id");
673 HummockObjectId::new(id, suffix)
674 .unwrap_or_else(|| panic!("unknown object id suffix {}", suffix))
675}
676
677#[cfg(test)]
678mod tests {
679 use bytes::Bytes;
680 use sstable_info::SstableInfoInner;
681
682 use super::*;
683
684 #[test]
685 fn test_object_id_decimal_max_length() {
686 let len = u64::MAX.to_string().len();
687 assert_eq!(len, HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH)
688 }
689
690 #[test]
691 fn test_full_key_concat() {
692 let key1 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l'\xe2\0\0";
693 let key2 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l \x12\0\0";
694
695 let sst_1 = SstableInfoInner {
696 key_range: key_range::KeyRange {
697 left: Bytes::from(key1.to_vec()),
698 right: Bytes::from(key1.to_vec()),
699 right_exclusive: false,
700 },
701 ..Default::default()
702 };
703
704 let sst_2 = SstableInfoInner {
705 key_range: key_range::KeyRange {
706 left: Bytes::from(key2.to_vec()),
707 right: Bytes::from(key2.to_vec()),
708 right_exclusive: false,
709 },
710 ..Default::default()
711 };
712
713 let sst_3 = SstableInfoInner {
714 key_range: key_range::KeyRange {
715 left: Bytes::from(key1.to_vec()),
716 right: Bytes::from(key2.to_vec()),
717 right_exclusive: false,
718 },
719 ..Default::default()
720 };
721
722 assert!(full_key_can_concat(&[sst_1.clone().into(), sst_2.into()]));
723
724 assert!(!full_key_can_concat(&[sst_1.into(), sst_3.into()]));
725 }
726}