1#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17#![feature(let_chains)]
18#![feature(btree_cursors)]
19#![feature(strict_overflow_ops)]
20
21mod key_cmp;
22
23use std::borrow::Borrow;
24use std::cmp::Ordering;
25use std::collections::HashMap;
26use std::fmt::{Display, Formatter};
27use std::ops::{Add, AddAssign, Sub};
28use std::str::FromStr;
29
30pub use key_cmp::*;
31use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
32use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
33use serde::{Deserialize, Deserializer, Serialize, Serializer};
34use sstable_info::SstableInfo;
35use tracing::warn;
36
37use crate::key_range::KeyRangeCommon;
38use crate::table_stats::TableStatsMap;
39
40pub mod change_log;
41pub mod compact;
42pub mod compact_task;
43pub mod compaction_group;
44pub mod key;
45pub mod key_range;
46pub mod level;
47pub mod prost_key_range;
48pub mod sstable_info;
49pub mod state_table_info;
50pub mod table_stats;
51pub mod table_watermark;
52pub mod time_travel;
53pub mod version;
54pub use frontend_version::{FrontendHummockVersion, FrontendHummockVersionDelta};
55mod frontend_version;
56
57pub use compact::*;
58use risingwave_common::catalog::TableId;
59use risingwave_pb::hummock::hummock_version_checkpoint::PbStaleObjects;
60
61use crate::table_watermark::TableWatermarks;
62
63#[derive(Debug, Eq, PartialEq, Clone, Copy, Hash, Ord, PartialOrd)]
64#[cfg_attr(any(test, feature = "test"), derive(Default))]
65pub struct TypedPrimitive<const C: usize, P>(P);
66
67impl<const C: usize, P: PartialEq> PartialEq<P> for TypedPrimitive<C, P> {
68 fn eq(&self, other: &P) -> bool {
69 self.0 == *other
70 }
71}
72
73macro_rules! impl_primitive {
74 ($($t:ty)*) => {$(
75 impl<const C: usize> PartialEq<TypedPrimitive<C, $t>> for $t {
76 fn eq(&self, other: &TypedPrimitive<C, $t>) -> bool {
77 *self == other.0
78 }
79 }
80 )*}
81}
82
83impl_primitive!(u64);
84
85impl<const C: usize, P: FromStr> FromStr for TypedPrimitive<C, P> {
86 type Err = P::Err;
87
88 fn from_str(s: &str) -> Result<Self, Self::Err> {
89 P::from_str(s).map(TypedPrimitive)
90 }
91}
92
93impl<const C: usize, P> Borrow<P> for TypedPrimitive<C, P> {
94 fn borrow(&self) -> &P {
95 &self.0
96 }
97}
98
99impl<const C: usize, P: Add<Output = P>> Add<P> for TypedPrimitive<C, P> {
100 type Output = Self;
101
102 fn add(self, rhs: P) -> Self::Output {
103 Self(self.0 + rhs)
104 }
105}
106
107impl<const C: usize, P: AddAssign> AddAssign<P> for TypedPrimitive<C, P> {
108 fn add_assign(&mut self, rhs: P) {
109 self.0 += rhs;
110 }
111}
112
113impl<const C: usize, P: Display> Display for TypedPrimitive<C, P> {
114 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
115 write!(f, "{}", self.0)
116 }
117}
118
119impl<const C: usize, P> From<P> for TypedPrimitive<C, P> {
120 fn from(value: P) -> Self {
121 Self(value)
122 }
123}
124
125impl<const C: usize, P: Serialize> Serialize for TypedPrimitive<C, P> {
126 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
127 where
128 S: Serializer,
129 {
130 self.0.serialize(serializer)
131 }
132}
133
134impl<'de, const C: usize, P: Deserialize<'de>> Deserialize<'de> for TypedPrimitive<C, P> {
135 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
136 where
137 D: Deserializer<'de>,
138 {
139 Ok(Self(<P as Deserialize>::deserialize(deserializer)?))
140 }
141}
142
143impl<const C: usize, P> TypedPrimitive<C, P> {
144 pub const fn new(id: P) -> Self {
145 Self(id)
146 }
147
148 pub fn inner(self) -> P {
149 self.0
150 }
151}
152
153pub type HummockRawObjectId = TypedPrimitive<0, u64>;
154pub type HummockSstableObjectId = TypedPrimitive<1, u64>;
155pub type HummockSstableId = TypedPrimitive<2, u64>;
156
157impl HummockSstableObjectId {
158 pub fn as_raw(&self) -> HummockRawObjectId {
159 HummockRawObjectId::new(self.0)
160 }
161}
162
163impl From<HummockRawObjectId> for HummockSstableObjectId {
164 fn from(id: HummockRawObjectId) -> Self {
165 Self(id.0)
166 }
167}
168
169pub type HummockRefCount = u64;
170pub type HummockContextId = u32;
171pub type HummockEpoch = u64;
172pub type HummockCompactionTaskId = u64;
173pub type CompactionGroupId = u64;
174
175#[derive(Debug, Clone, PartialEq, Copy, Ord, PartialOrd, Eq, Hash)]
176pub struct HummockVersionId(u64);
177
178impl Display for HummockVersionId {
179 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
180 write!(f, "{}", self.0)
181 }
182}
183
184impl Serialize for HummockVersionId {
185 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
186 where
187 S: Serializer,
188 {
189 serializer.serialize_u64(self.0)
190 }
191}
192
193impl<'de> Deserialize<'de> for HummockVersionId {
194 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
195 where
196 D: Deserializer<'de>,
197 {
198 Ok(Self(<u64 as Deserialize>::deserialize(deserializer)?))
199 }
200}
201
202impl HummockVersionId {
203 pub const MAX: Self = Self(i64::MAX as _);
204
205 pub const fn new(id: u64) -> Self {
206 Self(id)
207 }
208
209 pub fn next(&self) -> Self {
210 Self(self.0 + 1)
211 }
212
213 pub fn to_u64(self) -> u64 {
214 self.0
215 }
216}
217
218impl Add<u64> for HummockVersionId {
219 type Output = Self;
220
221 fn add(self, rhs: u64) -> Self::Output {
222 Self(self.0 + rhs)
223 }
224}
225
226impl Sub for HummockVersionId {
227 type Output = u64;
228
229 fn sub(self, rhs: Self) -> Self::Output {
230 self.0 - rhs.0
231 }
232}
233
234pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId(0);
235pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId(1);
236pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56;
237pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56;
238pub const SST_OBJECT_SUFFIX: &str = "data";
239pub const HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH: usize = 20;
240
241macro_rules! for_all_object_suffix {
242 ($({$name:ident, $type_name:ty, $suffix:expr},)+) => {
243 #[derive(Eq, PartialEq, Debug, Hash, Clone, Copy)]
244 pub enum HummockObjectId {
245 $(
246 $name($type_name),
247 )+
248 }
249
250 pub const VALID_OBJECT_ID_SUFFIXES: [&str; 1] = [$(
251 $suffix
252 ),+];
253
254 impl HummockObjectId {
255 fn new(id: u64, suffix: &str) -> Self {
256 match suffix {
257 $(
258 suffix if suffix == $suffix => HummockObjectId::$name(<$type_name>::new(id)),
259 )+
260 _ => panic!("unknown object id suffix {}", suffix),
261 }
262 }
263
264 pub fn suffix(&self) -> &str {
265 match self {
266 $(
267 HummockObjectId::$name(_) => $suffix,
268 )+
269 }
270 }
271
272 pub fn as_raw(&self) -> HummockRawObjectId {
273 let raw = match self {
274 $(
275 HummockObjectId::$name(id) => id.0,
276 )+
277 };
278 HummockRawObjectId::new(raw)
279 }
280 }
281 };
282 () => {
283 for_all_object_suffix! {
284 {Sstable, HummockSstableObjectId, SST_OBJECT_SUFFIX},
285 }
286 };
287}
288
289for_all_object_suffix!();
290
291pub fn get_stale_object_ids(
292 stale_objects: &PbStaleObjects,
293) -> impl Iterator<Item = HummockObjectId> + '_ {
294 match HummockObjectId::Sstable(0.into()) {
298 HummockObjectId::Sstable(_) => {}
299 };
300 stale_objects
301 .id
302 .iter()
303 .map(|sst_id| HummockObjectId::Sstable((*sst_id).into()))
304}
305
306#[macro_export]
307macro_rules! info_in_release {
316 ($($arg:tt)*) => {
317 {
318 #[cfg(debug_assertions)]
319 {
320 use tracing::debug;
321 debug!($($arg)*);
322 }
323 #[cfg(not(debug_assertions))]
324 {
325 use tracing::info;
326 info!($($arg)*);
327 }
328 }
329 }
330}
331
332#[derive(Default, Debug)]
333pub struct SyncResult {
334 pub sync_size: usize,
336 pub uncommitted_ssts: Vec<LocalSstableInfo>,
338 pub table_watermarks: HashMap<TableId, TableWatermarks>,
340 pub old_value_ssts: Vec<LocalSstableInfo>,
342}
343
344#[derive(Debug, Clone)]
345pub struct LocalSstableInfo {
346 pub sst_info: SstableInfo,
347 pub table_stats: TableStatsMap,
348 pub created_at: u64,
349}
350
351impl LocalSstableInfo {
352 pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
353 Self {
354 sst_info,
355 table_stats,
356 created_at,
357 }
358 }
359
360 pub fn for_test(sst_info: SstableInfo) -> Self {
361 Self {
362 sst_info,
363 table_stats: Default::default(),
364 created_at: u64::MAX,
365 }
366 }
367
368 pub fn file_size(&self) -> u64 {
369 assert_eq!(self.sst_info.file_size, self.sst_info.sst_size);
370 self.sst_info.file_size
371 }
372}
373
374impl PartialEq for LocalSstableInfo {
375 fn eq(&self, other: &Self) -> bool {
376 self.sst_info == other.sst_info
377 }
378}
379
380#[derive(Debug, Clone, Copy)]
382pub enum HummockReadEpoch {
383 Committed(HummockEpoch),
385 BatchQueryCommitted(HummockEpoch, HummockVersionId),
387 NoWait(HummockEpoch),
389 Backup(HummockEpoch),
391 TimeTravel(HummockEpoch),
392}
393
394impl From<BatchQueryEpoch> for HummockReadEpoch {
395 fn from(e: BatchQueryEpoch) -> Self {
396 match e.epoch.unwrap() {
397 batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::BatchQueryCommitted(
398 epoch.epoch,
399 HummockVersionId::new(epoch.hummock_version_id),
400 ),
401 batch_query_epoch::Epoch::Current(epoch) => {
402 if epoch != HummockEpoch::MAX {
403 warn!(
404 epoch,
405 "ignore specified current epoch and set it to u64::MAX"
406 );
407 }
408 HummockReadEpoch::NoWait(HummockEpoch::MAX)
409 }
410 batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch),
411 batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch),
412 }
413 }
414}
415
416pub fn test_batch_query_epoch() -> BatchQueryEpoch {
417 BatchQueryEpoch {
418 epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
419 }
420}
421
422impl HummockReadEpoch {
423 pub fn get_epoch(&self) -> HummockEpoch {
424 *match self {
425 HummockReadEpoch::Committed(epoch)
426 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
427 | HummockReadEpoch::NoWait(epoch)
428 | HummockReadEpoch::Backup(epoch)
429 | HummockReadEpoch::TimeTravel(epoch) => epoch,
430 }
431 }
432
433 pub fn is_read_committed(&self) -> bool {
434 match self {
435 HummockReadEpoch::Committed(_)
436 | HummockReadEpoch::TimeTravel(_)
437 | HummockReadEpoch::BatchQueryCommitted(_, _) => true,
438 HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => false,
439 }
440 }
441}
442pub struct ObjectIdRange {
443 pub start_id: HummockRawObjectId,
445 pub end_id: HummockRawObjectId,
447}
448
449impl ObjectIdRange {
450 pub fn new(
451 start_id: impl Into<HummockRawObjectId>,
452 end_id: impl Into<HummockRawObjectId>,
453 ) -> Self {
454 Self {
455 start_id: start_id.into(),
456 end_id: end_id.into(),
457 }
458 }
459
460 fn peek_next_object_id(&self) -> Option<HummockRawObjectId> {
461 if self.start_id < self.end_id {
462 return Some(self.start_id);
463 }
464 None
465 }
466
467 pub fn get_next_object_id(&mut self) -> Option<HummockRawObjectId> {
469 let next_id = self.peek_next_object_id();
470 self.start_id += 1;
471 next_id
472 }
473}
474
475pub fn can_concat(ssts: &[impl Borrow<SstableInfo>]) -> bool {
476 let len = ssts.len();
477 for i in 1..len {
478 if ssts[i - 1]
479 .borrow()
480 .key_range
481 .compare_right_with(&ssts[i].borrow().key_range.left)
482 != Ordering::Less
483 {
484 return false;
485 }
486 }
487 true
488}
489
490pub fn full_key_can_concat(ssts: &[SstableInfo]) -> bool {
491 let len = ssts.len();
492 for i in 1..len {
493 let sst_1 = &ssts[i - 1];
494 let sst_2 = &ssts[i];
495
496 if sst_1.key_range.right_exclusive {
497 if KeyComparator::compare_encoded_full_key(
498 &sst_1.key_range.right,
499 &sst_2.key_range.left,
500 )
501 .is_gt()
502 {
503 return false;
504 }
505 } else if KeyComparator::compare_encoded_full_key(
506 &sst_1.key_range.right,
507 &sst_2.key_range.left,
508 )
509 .is_ge()
510 {
511 return false;
512 }
513 }
514 true
515}
516
517const CHECKPOINT_DIR: &str = "checkpoint";
518const CHECKPOINT_NAME: &str = "0";
519const ARCHIVE_DIR: &str = "archive";
520
521pub fn version_checkpoint_path(root_dir: &str) -> String {
522 format!("{}/{}/{}", root_dir, CHECKPOINT_DIR, CHECKPOINT_NAME)
523}
524
525pub fn version_archive_dir(root_dir: &str) -> String {
526 format!("{}/{}", root_dir, ARCHIVE_DIR)
527}
528
529pub fn version_checkpoint_dir(checkpoint_path: &str) -> String {
530 checkpoint_path.trim_end_matches(|c| c != '/').to_owned()
531}
532
533#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)]
539pub struct EpochWithGap(u64);
540
541impl EpochWithGap {
542 #[allow(unused_variables)]
543 pub fn new(epoch: u64, spill_offset: u16) -> Self {
544 if risingwave_common::util::epoch::is_max_epoch(epoch) {
548 EpochWithGap::new_max_epoch()
549 } else {
550 debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0);
551 EpochWithGap(epoch + spill_offset as u64)
552 }
553 }
554
555 pub fn new_from_epoch(epoch: u64) -> Self {
556 EpochWithGap::new(epoch, 0)
557 }
558
559 pub fn new_min_epoch() -> Self {
560 EpochWithGap(0)
561 }
562
563 pub fn new_max_epoch() -> Self {
564 EpochWithGap(HummockEpoch::MAX)
565 }
566
567 pub(crate) fn as_u64(&self) -> HummockEpoch {
569 self.0
570 }
571
572 pub fn from_u64(epoch_with_gap: u64) -> Self {
574 EpochWithGap(epoch_with_gap)
575 }
576
577 pub fn pure_epoch(&self) -> HummockEpoch {
579 self.0 & !EPOCH_SPILL_TIME_MASK
580 }
581
582 pub fn offset(&self) -> u64 {
583 self.0 & EPOCH_SPILL_TIME_MASK
584 }
585}
586
587pub fn get_object_data_path(
588 obj_prefix: &str,
589 path_prefix: &str,
590 object_id: HummockObjectId,
591) -> String {
592 let suffix = object_id.suffix();
593 let object_id = object_id.as_raw();
594
595 let mut path = String::with_capacity(
596 path_prefix.len()
597 + "/".len()
598 + obj_prefix.len()
599 + HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH
600 + ".".len()
601 + suffix.len(),
602 );
603 path.push_str(path_prefix);
604 path.push('/');
605 path.push_str(obj_prefix);
606 path.push_str(&object_id.to_string());
607 path.push('.');
608 path.push_str(suffix);
609 path
610}
611
612pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
613 use itertools::Itertools;
614 let split = path.split(&['/', '.']).collect_vec();
615 assert!(split.len() > 2);
616 let suffix = split[split.len() - 1];
617 let id = split[split.len() - 2]
618 .parse::<u64>()
619 .expect("valid object id");
620 HummockObjectId::new(id, suffix)
621}
622
623#[cfg(test)]
624mod tests {
625 use bytes::Bytes;
626 use sstable_info::SstableInfoInner;
627
628 use super::*;
629
630 #[test]
631 fn test_object_id_decimal_max_length() {
632 let len = u64::MAX.to_string().len();
633 assert_eq!(len, HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH)
634 }
635
636 #[test]
637 fn test_full_key_concat() {
638 let key1 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l'\xe2\0\0";
639 let key2 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l \x12\0\0";
640
641 let sst_1 = SstableInfoInner {
642 key_range: key_range::KeyRange {
643 left: Bytes::from(key1.to_vec()),
644 right: Bytes::from(key1.to_vec()),
645 right_exclusive: false,
646 },
647 ..Default::default()
648 };
649
650 let sst_2 = SstableInfoInner {
651 key_range: key_range::KeyRange {
652 left: Bytes::from(key2.to_vec()),
653 right: Bytes::from(key2.to_vec()),
654 right_exclusive: false,
655 },
656 ..Default::default()
657 };
658
659 let sst_3 = SstableInfoInner {
660 key_range: key_range::KeyRange {
661 left: Bytes::from(key1.to_vec()),
662 right: Bytes::from(key2.to_vec()),
663 right_exclusive: false,
664 },
665 ..Default::default()
666 };
667
668 assert!(full_key_can_concat(&[sst_1.clone().into(), sst_2.into()]));
669
670 assert!(!full_key_can_concat(&[sst_1.into(), sst_3.into()]));
671 }
672}