1#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17#![feature(let_chains)]
18#![feature(btree_cursors)]
19#![feature(strict_overflow_ops)]
20#![feature(map_try_insert)]
21
22mod key_cmp;
23
24use std::borrow::Borrow;
25use std::cmp::Ordering;
26use std::collections::HashMap;
27use std::fmt::{Display, Formatter};
28use std::ops::{Add, AddAssign, Sub};
29use std::str::FromStr;
30
31pub use key_cmp::*;
32use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
33use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
34use serde::{Deserialize, Deserializer, Serialize, Serializer};
35use sstable_info::SstableInfo;
36use tracing::warn;
37
38use crate::key_range::KeyRangeCommon;
39use crate::table_stats::TableStatsMap;
40
41pub mod change_log;
42pub mod compact;
43pub mod compact_task;
44pub mod compaction_group;
45pub mod key;
46pub mod key_range;
47pub mod level;
48pub mod prost_key_range;
49pub mod sstable_info;
50pub mod state_table_info;
51pub mod table_stats;
52pub mod table_watermark;
53pub mod time_travel;
54pub mod version;
55pub use frontend_version::{FrontendHummockVersion, FrontendHummockVersionDelta};
56mod frontend_version;
57pub mod vector_index;
58
59pub use compact::*;
60use risingwave_common::catalog::TableId;
61use risingwave_pb::hummock::hummock_version_checkpoint::PbStaleObjects;
62use risingwave_pb::hummock::{PbVectorIndexObjectType, VectorIndexObjectType};
63
64use crate::table_watermark::TableWatermarks;
65
66#[derive(Debug, Eq, PartialEq, Clone, Copy, Hash, Ord, PartialOrd)]
67#[cfg_attr(any(test, feature = "test"), derive(Default))]
68pub struct TypedPrimitive<const C: usize, P>(P);
69
70impl<const C: usize, P: PartialEq> PartialEq<P> for TypedPrimitive<C, P> {
71 fn eq(&self, other: &P) -> bool {
72 self.0 == *other
73 }
74}
75
76macro_rules! impl_primitive {
77 ($($t:ty)*) => {$(
78 impl<const C: usize> PartialEq<TypedPrimitive<C, $t>> for $t {
79 fn eq(&self, other: &TypedPrimitive<C, $t>) -> bool {
80 *self == other.0
81 }
82 }
83 )*}
84}
85
86impl_primitive!(u64);
87
88impl<const C: usize, P: FromStr> FromStr for TypedPrimitive<C, P> {
89 type Err = P::Err;
90
91 fn from_str(s: &str) -> Result<Self, Self::Err> {
92 P::from_str(s).map(TypedPrimitive)
93 }
94}
95
96impl<const C: usize, P> Borrow<P> for TypedPrimitive<C, P> {
97 fn borrow(&self) -> &P {
98 &self.0
99 }
100}
101
102impl<const C: usize, P: Add<Output = P>> Add<P> for TypedPrimitive<C, P> {
103 type Output = Self;
104
105 fn add(self, rhs: P) -> Self::Output {
106 Self(self.0 + rhs)
107 }
108}
109
110impl<const C: usize, P: AddAssign> AddAssign<P> for TypedPrimitive<C, P> {
111 fn add_assign(&mut self, rhs: P) {
112 self.0 += rhs;
113 }
114}
115
116impl<const C: usize, P: Display> Display for TypedPrimitive<C, P> {
117 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
118 write!(f, "{}", self.0)
119 }
120}
121
122impl<const C: usize, P> From<P> for TypedPrimitive<C, P> {
123 fn from(value: P) -> Self {
124 Self(value)
125 }
126}
127
128impl<const C: usize, P: Serialize> Serialize for TypedPrimitive<C, P> {
129 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
130 where
131 S: Serializer,
132 {
133 self.0.serialize(serializer)
134 }
135}
136
137impl<'de, const C: usize, P: Deserialize<'de>> Deserialize<'de> for TypedPrimitive<C, P> {
138 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
139 where
140 D: Deserializer<'de>,
141 {
142 Ok(Self(<P as Deserialize>::deserialize(deserializer)?))
143 }
144}
145
146impl<const C: usize, P> TypedPrimitive<C, P> {
147 pub const fn new(id: P) -> Self {
148 Self(id)
149 }
150
151 pub fn inner(self) -> P {
152 self.0
153 }
154}
155
156pub type HummockRawObjectId = TypedPrimitive<0, u64>;
157pub type HummockSstableObjectId = TypedPrimitive<1, u64>;
158pub type HummockSstableId = TypedPrimitive<2, u64>;
159pub type HummockVectorFileId = TypedPrimitive<3, u64>;
160
161impl HummockSstableObjectId {
162 pub fn as_raw(&self) -> HummockRawObjectId {
163 HummockRawObjectId::new(self.0)
164 }
165}
166
167impl HummockVectorFileId {
168 pub fn as_raw(&self) -> HummockRawObjectId {
169 HummockRawObjectId::new(self.0)
170 }
171}
172
173impl From<HummockRawObjectId> for HummockSstableObjectId {
174 fn from(id: HummockRawObjectId) -> Self {
175 Self(id.0)
176 }
177}
178
179pub type HummockRefCount = u64;
180pub type HummockContextId = u32;
181pub type HummockEpoch = u64;
182pub type HummockCompactionTaskId = u64;
183pub type CompactionGroupId = u64;
184
185#[derive(Debug, Clone, PartialEq, Copy, Ord, PartialOrd, Eq, Hash)]
186pub struct HummockVersionId(u64);
187
188impl Display for HummockVersionId {
189 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
190 write!(f, "{}", self.0)
191 }
192}
193
194impl Serialize for HummockVersionId {
195 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
196 where
197 S: Serializer,
198 {
199 serializer.serialize_u64(self.0)
200 }
201}
202
203impl<'de> Deserialize<'de> for HummockVersionId {
204 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
205 where
206 D: Deserializer<'de>,
207 {
208 Ok(Self(<u64 as Deserialize>::deserialize(deserializer)?))
209 }
210}
211
212impl HummockVersionId {
213 pub const MAX: Self = Self(i64::MAX as _);
214
215 pub const fn new(id: u64) -> Self {
216 Self(id)
217 }
218
219 pub fn next(&self) -> Self {
220 Self(self.0 + 1)
221 }
222
223 pub fn to_u64(self) -> u64 {
224 self.0
225 }
226}
227
228impl Add<u64> for HummockVersionId {
229 type Output = Self;
230
231 fn add(self, rhs: u64) -> Self::Output {
232 Self(self.0 + rhs)
233 }
234}
235
236impl Sub for HummockVersionId {
237 type Output = u64;
238
239 fn sub(self, rhs: Self) -> Self::Output {
240 self.0 - rhs.0
241 }
242}
243
244pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId(0);
245pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId(1);
246pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56;
247pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56;
248pub const SST_OBJECT_SUFFIX: &str = "data";
249pub const VECTOR_FILE_OBJECT_SUFFIX: &str = "vector";
250pub const HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH: usize = 20;
251
252macro_rules! for_all_object_suffix {
253 ($({$name:ident, $type_name:ty, $suffix:expr},)+) => {
254 #[derive(Eq, PartialEq, Debug, Hash, Clone, Copy)]
255 pub enum HummockObjectId {
256 $(
257 $name($type_name),
258 )+
259 }
260
261 pub const VALID_OBJECT_ID_SUFFIXES: [&str; 2] = [$(
262 $suffix
263 ),+];
264
265 impl HummockObjectId {
266 fn new(id: u64, suffix: &str) -> Option<Self> {
267 match suffix {
268 $(
269 suffix if suffix == $suffix => Some(HummockObjectId::$name(<$type_name>::new(id))),
270 )+
271 _ => None,
272 }
273 }
274
275 pub fn suffix(&self) -> &str {
276 match self {
277 $(
278 HummockObjectId::$name(_) => $suffix,
279 )+
280 }
281 }
282
283 pub fn as_raw(&self) -> HummockRawObjectId {
284 let raw = match self {
285 $(
286 HummockObjectId::$name(id) => id.0,
287 )+
288 };
289 HummockRawObjectId::new(raw)
290 }
291 }
292
293 pub fn try_get_object_id_from_path(path: &str) -> Option<HummockObjectId> {
294 let split: Vec<_> = path.split(&['/', '.']).collect();
295 if split.len() <= 2 {
296 return None;
297 }
298 let suffix = split[split.len() - 1];
299 let id_str = split[split.len() - 2];
300 match suffix {
301 $(
302 suffix if suffix == $suffix => {
303 let id = id_str
304 .parse::<u64>()
305 .unwrap_or_else(|_| panic!("expect valid object id, got {}", id_str));
306 Some(HummockObjectId::$name(<$type_name>::new(id)))
307 },
308 )+
309 _ => None,
310 }
311 }
312 };
313 () => {
314 for_all_object_suffix! {
315 {Sstable, HummockSstableObjectId, SST_OBJECT_SUFFIX},
316 {VectorFile, HummockVectorFileId, VECTOR_FILE_OBJECT_SUFFIX},
317 }
318 };
319}
320
321for_all_object_suffix!();
322
323pub fn get_stale_object_ids(
324 stale_objects: &PbStaleObjects,
325) -> impl Iterator<Item = HummockObjectId> + '_ {
326 match HummockObjectId::Sstable(0.into()) {
330 HummockObjectId::Sstable(_) => {}
331 HummockObjectId::VectorFile(_) => {}
332 };
333 stale_objects
334 .id
335 .iter()
336 .map(|sst_id| HummockObjectId::Sstable((*sst_id).into()))
337 .chain(stale_objects.vector_files.iter().map(
338 |file| match file.get_object_type().unwrap() {
339 PbVectorIndexObjectType::VectorIndexObjectUnspecified => {
340 unreachable!()
341 }
342 VectorIndexObjectType::VectorIndexObjectVector => {
343 HummockObjectId::VectorFile(file.id.into())
344 }
345 },
346 ))
347}
348
349#[macro_export]
350macro_rules! info_in_release {
359 ($($arg:tt)*) => {
360 {
361 #[cfg(debug_assertions)]
362 {
363 use tracing::debug;
364 debug!($($arg)*);
365 }
366 #[cfg(not(debug_assertions))]
367 {
368 use tracing::info;
369 info!($($arg)*);
370 }
371 }
372 }
373}
374
375#[derive(Default, Debug)]
376pub struct SyncResult {
377 pub sync_size: usize,
379 pub uncommitted_ssts: Vec<LocalSstableInfo>,
381 pub table_watermarks: HashMap<TableId, TableWatermarks>,
383 pub old_value_ssts: Vec<LocalSstableInfo>,
385}
386
387#[derive(Debug, Clone)]
388pub struct LocalSstableInfo {
389 pub sst_info: SstableInfo,
390 pub table_stats: TableStatsMap,
391 pub created_at: u64,
392}
393
394impl LocalSstableInfo {
395 pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
396 Self {
397 sst_info,
398 table_stats,
399 created_at,
400 }
401 }
402
403 pub fn for_test(sst_info: SstableInfo) -> Self {
404 Self {
405 sst_info,
406 table_stats: Default::default(),
407 created_at: u64::MAX,
408 }
409 }
410
411 pub fn file_size(&self) -> u64 {
412 assert_eq!(self.sst_info.file_size, self.sst_info.sst_size);
413 self.sst_info.file_size
414 }
415}
416
417impl PartialEq for LocalSstableInfo {
418 fn eq(&self, other: &Self) -> bool {
419 self.sst_info == other.sst_info
420 }
421}
422
423#[derive(Debug, Clone, Copy)]
425pub enum HummockReadEpoch {
426 Committed(HummockEpoch),
428 BatchQueryCommitted(HummockEpoch, HummockVersionId),
430 NoWait(HummockEpoch),
432 Backup(HummockEpoch),
434 TimeTravel(HummockEpoch),
435}
436
437impl From<BatchQueryEpoch> for HummockReadEpoch {
438 fn from(e: BatchQueryEpoch) -> Self {
439 match e.epoch.unwrap() {
440 batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::BatchQueryCommitted(
441 epoch.epoch,
442 HummockVersionId::new(epoch.hummock_version_id),
443 ),
444 batch_query_epoch::Epoch::Current(epoch) => {
445 if epoch != HummockEpoch::MAX {
446 warn!(
447 epoch,
448 "ignore specified current epoch and set it to u64::MAX"
449 );
450 }
451 HummockReadEpoch::NoWait(HummockEpoch::MAX)
452 }
453 batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch),
454 batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch),
455 }
456 }
457}
458
459pub fn test_batch_query_epoch() -> BatchQueryEpoch {
460 BatchQueryEpoch {
461 epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
462 }
463}
464
465impl HummockReadEpoch {
466 pub fn get_epoch(&self) -> HummockEpoch {
467 *match self {
468 HummockReadEpoch::Committed(epoch)
469 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
470 | HummockReadEpoch::NoWait(epoch)
471 | HummockReadEpoch::Backup(epoch)
472 | HummockReadEpoch::TimeTravel(epoch) => epoch,
473 }
474 }
475
476 pub fn is_read_committed(&self) -> bool {
477 match self {
478 HummockReadEpoch::Committed(_)
479 | HummockReadEpoch::TimeTravel(_)
480 | HummockReadEpoch::BatchQueryCommitted(_, _) => true,
481 HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => false,
482 }
483 }
484}
485pub struct ObjectIdRange {
486 pub start_id: HummockRawObjectId,
488 pub end_id: HummockRawObjectId,
490}
491
492impl ObjectIdRange {
493 pub fn new(
494 start_id: impl Into<HummockRawObjectId>,
495 end_id: impl Into<HummockRawObjectId>,
496 ) -> Self {
497 Self {
498 start_id: start_id.into(),
499 end_id: end_id.into(),
500 }
501 }
502
503 fn peek_next_object_id(&self) -> Option<HummockRawObjectId> {
504 if self.start_id < self.end_id {
505 return Some(self.start_id);
506 }
507 None
508 }
509
510 pub fn get_next_object_id(&mut self) -> Option<HummockRawObjectId> {
512 let next_id = self.peek_next_object_id();
513 self.start_id += 1;
514 next_id
515 }
516}
517
518pub fn can_concat(ssts: &[impl Borrow<SstableInfo>]) -> bool {
519 let len = ssts.len();
520 for i in 1..len {
521 if ssts[i - 1]
522 .borrow()
523 .key_range
524 .compare_right_with(&ssts[i].borrow().key_range.left)
525 != Ordering::Less
526 {
527 return false;
528 }
529 }
530 true
531}
532
533pub fn full_key_can_concat(ssts: &[SstableInfo]) -> bool {
534 let len = ssts.len();
535 for i in 1..len {
536 let sst_1 = &ssts[i - 1];
537 let sst_2 = &ssts[i];
538
539 if sst_1.key_range.right_exclusive {
540 if KeyComparator::compare_encoded_full_key(
541 &sst_1.key_range.right,
542 &sst_2.key_range.left,
543 )
544 .is_gt()
545 {
546 return false;
547 }
548 } else if KeyComparator::compare_encoded_full_key(
549 &sst_1.key_range.right,
550 &sst_2.key_range.left,
551 )
552 .is_ge()
553 {
554 return false;
555 }
556 }
557 true
558}
559
560const CHECKPOINT_DIR: &str = "checkpoint";
561const CHECKPOINT_NAME: &str = "0";
562const ARCHIVE_DIR: &str = "archive";
563
564pub fn version_checkpoint_path(root_dir: &str) -> String {
565 format!("{}/{}/{}", root_dir, CHECKPOINT_DIR, CHECKPOINT_NAME)
566}
567
568pub fn version_archive_dir(root_dir: &str) -> String {
569 format!("{}/{}", root_dir, ARCHIVE_DIR)
570}
571
572pub fn version_checkpoint_dir(checkpoint_path: &str) -> String {
573 checkpoint_path.trim_end_matches(|c| c != '/').to_owned()
574}
575
576#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)]
582pub struct EpochWithGap(u64);
583
584impl EpochWithGap {
585 #[allow(unused_variables)]
586 pub fn new(epoch: u64, spill_offset: u16) -> Self {
587 if risingwave_common::util::epoch::is_max_epoch(epoch) {
591 EpochWithGap::new_max_epoch()
592 } else {
593 debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0);
594 EpochWithGap(epoch + spill_offset as u64)
595 }
596 }
597
598 pub fn new_from_epoch(epoch: u64) -> Self {
599 EpochWithGap::new(epoch, 0)
600 }
601
602 pub fn new_min_epoch() -> Self {
603 EpochWithGap(0)
604 }
605
606 pub fn new_max_epoch() -> Self {
607 EpochWithGap(HummockEpoch::MAX)
608 }
609
610 pub(crate) fn as_u64(&self) -> HummockEpoch {
612 self.0
613 }
614
615 pub fn from_u64(epoch_with_gap: u64) -> Self {
617 EpochWithGap(epoch_with_gap)
618 }
619
620 pub fn pure_epoch(&self) -> HummockEpoch {
622 self.0 & !EPOCH_SPILL_TIME_MASK
623 }
624
625 pub fn offset(&self) -> u64 {
626 self.0 & EPOCH_SPILL_TIME_MASK
627 }
628}
629
630pub fn get_object_data_path(
631 obj_prefix: &str,
632 path_prefix: &str,
633 object_id: HummockObjectId,
634) -> String {
635 let suffix = object_id.suffix();
636 let object_id = object_id.as_raw();
637
638 let mut path = String::with_capacity(
639 path_prefix.len()
640 + "/".len()
641 + obj_prefix.len()
642 + HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH
643 + ".".len()
644 + suffix.len(),
645 );
646 path.push_str(path_prefix);
647 path.push('/');
648 path.push_str(obj_prefix);
649 path.push_str(&object_id.to_string());
650 path.push('.');
651 path.push_str(suffix);
652 path
653}
654
655pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
656 use itertools::Itertools;
657 let split = path.split(&['/', '.']).collect_vec();
658 assert!(split.len() > 2);
659 let suffix = split[split.len() - 1];
660 let id = split[split.len() - 2]
661 .parse::<u64>()
662 .expect("valid object id");
663 HummockObjectId::new(id, suffix)
664 .unwrap_or_else(|| panic!("unknown object id suffix {}", suffix))
665}
666
667#[cfg(test)]
668mod tests {
669 use bytes::Bytes;
670 use sstable_info::SstableInfoInner;
671
672 use super::*;
673
674 #[test]
675 fn test_object_id_decimal_max_length() {
676 let len = u64::MAX.to_string().len();
677 assert_eq!(len, HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH)
678 }
679
680 #[test]
681 fn test_full_key_concat() {
682 let key1 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l'\xe2\0\0";
683 let key2 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l \x12\0\0";
684
685 let sst_1 = SstableInfoInner {
686 key_range: key_range::KeyRange {
687 left: Bytes::from(key1.to_vec()),
688 right: Bytes::from(key1.to_vec()),
689 right_exclusive: false,
690 },
691 ..Default::default()
692 };
693
694 let sst_2 = SstableInfoInner {
695 key_range: key_range::KeyRange {
696 left: Bytes::from(key2.to_vec()),
697 right: Bytes::from(key2.to_vec()),
698 right_exclusive: false,
699 },
700 ..Default::default()
701 };
702
703 let sst_3 = SstableInfoInner {
704 key_range: key_range::KeyRange {
705 left: Bytes::from(key1.to_vec()),
706 right: Bytes::from(key2.to_vec()),
707 right_exclusive: false,
708 },
709 ..Default::default()
710 };
711
712 assert!(full_key_can_concat(&[sst_1.clone().into(), sst_2.into()]));
713
714 assert!(!full_key_can_concat(&[sst_1.into(), sst_3.into()]));
715 }
716}