1#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17#![feature(let_chains)]
18#![feature(btree_cursors)]
19#![feature(strict_overflow_ops)]
20#![feature(map_try_insert)]
21
22mod key_cmp;
23
24use std::borrow::Borrow;
25use std::cmp::Ordering;
26use std::collections::HashMap;
27use std::fmt::{Display, Formatter};
28use std::ops::{Add, AddAssign, Sub};
29use std::str::FromStr;
30
31pub use key_cmp::*;
32use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
33use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
34use serde::{Deserialize, Deserializer, Serialize, Serializer};
35use sstable_info::SstableInfo;
36use tracing::warn;
37
38use crate::key_range::KeyRangeCommon;
39use crate::table_stats::TableStatsMap;
40
41pub mod change_log;
42pub mod compact;
43pub mod compact_task;
44pub mod compaction_group;
45pub mod key;
46pub mod key_range;
47pub mod level;
48pub mod prost_key_range;
49pub mod sstable_info;
50pub mod state_table_info;
51pub mod table_stats;
52pub mod table_watermark;
53pub mod time_travel;
54pub mod version;
55pub use frontend_version::{FrontendHummockVersion, FrontendHummockVersionDelta};
56mod frontend_version;
57pub mod vector_index;
58
59pub use compact::*;
60use risingwave_common::catalog::TableId;
61use risingwave_pb::hummock::hummock_version_checkpoint::PbStaleObjects;
62use risingwave_pb::hummock::{PbVectorIndexObjectType, VectorIndexObjectType};
63
64use crate::table_watermark::TableWatermarks;
65use crate::vector_index::VectorIndexAdd;
66
67#[derive(Debug, Eq, PartialEq, Clone, Copy, Hash, Ord, PartialOrd)]
68#[cfg_attr(any(test, feature = "test"), derive(Default))]
69pub struct TypedPrimitive<const C: usize, P>(P);
70
71impl<const C: usize, P: PartialEq> PartialEq<P> for TypedPrimitive<C, P> {
72 fn eq(&self, other: &P) -> bool {
73 self.0 == *other
74 }
75}
76
77macro_rules! impl_primitive {
78 ($($t:ty)*) => {$(
79 impl<const C: usize> PartialEq<TypedPrimitive<C, $t>> for $t {
80 fn eq(&self, other: &TypedPrimitive<C, $t>) -> bool {
81 *self == other.0
82 }
83 }
84 )*}
85}
86
87impl_primitive!(u64);
88
89impl<const C: usize, P: FromStr> FromStr for TypedPrimitive<C, P> {
90 type Err = P::Err;
91
92 fn from_str(s: &str) -> Result<Self, Self::Err> {
93 P::from_str(s).map(TypedPrimitive)
94 }
95}
96
97impl<const C: usize, P> Borrow<P> for TypedPrimitive<C, P> {
98 fn borrow(&self) -> &P {
99 &self.0
100 }
101}
102
103impl<const C: usize, P: Add<Output = P>> Add<P> for TypedPrimitive<C, P> {
104 type Output = Self;
105
106 fn add(self, rhs: P) -> Self::Output {
107 Self(self.0 + rhs)
108 }
109}
110
111impl<const C: usize, P: AddAssign> AddAssign<P> for TypedPrimitive<C, P> {
112 fn add_assign(&mut self, rhs: P) {
113 self.0 += rhs;
114 }
115}
116
117impl<const C: usize, P: Display> Display for TypedPrimitive<C, P> {
118 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
119 write!(f, "{}", self.0)
120 }
121}
122
123impl<const C: usize, P> From<P> for TypedPrimitive<C, P> {
124 fn from(value: P) -> Self {
125 Self(value)
126 }
127}
128
129impl<const C: usize, P: Serialize> Serialize for TypedPrimitive<C, P> {
130 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
131 where
132 S: Serializer,
133 {
134 self.0.serialize(serializer)
135 }
136}
137
138impl<'de, const C: usize, P: Deserialize<'de>> Deserialize<'de> for TypedPrimitive<C, P> {
139 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
140 where
141 D: Deserializer<'de>,
142 {
143 Ok(Self(<P as Deserialize>::deserialize(deserializer)?))
144 }
145}
146
147impl<const C: usize, P> TypedPrimitive<C, P> {
148 pub const fn new(id: P) -> Self {
149 Self(id)
150 }
151
152 pub fn inner(self) -> P {
153 self.0
154 }
155}
156
157pub type HummockRawObjectId = TypedPrimitive<0, u64>;
158pub type HummockSstableObjectId = TypedPrimitive<1, u64>;
159pub type HummockSstableId = TypedPrimitive<2, u64>;
160pub type HummockVectorFileId = TypedPrimitive<3, u64>;
161
162macro_rules! impl_object_id {
163 ($type_name:ty) => {
164 impl $type_name {
165 pub fn as_raw(&self) -> HummockRawObjectId {
166 HummockRawObjectId::new(self.0)
167 }
168 }
169
170 impl From<HummockRawObjectId> for $type_name {
171 fn from(id: HummockRawObjectId) -> Self {
172 Self(id.0)
173 }
174 }
175 };
176}
177
178impl_object_id!(HummockSstableObjectId);
179impl_object_id!(HummockVectorFileId);
180
181pub type HummockRefCount = u64;
182pub type HummockContextId = u32;
183pub type HummockEpoch = u64;
184pub type HummockCompactionTaskId = u64;
185pub type CompactionGroupId = u64;
186
187#[derive(Debug, Clone, PartialEq, Copy, Ord, PartialOrd, Eq, Hash)]
188pub struct HummockVersionId(u64);
189
190impl Display for HummockVersionId {
191 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
192 write!(f, "{}", self.0)
193 }
194}
195
196impl Serialize for HummockVersionId {
197 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
198 where
199 S: Serializer,
200 {
201 serializer.serialize_u64(self.0)
202 }
203}
204
205impl<'de> Deserialize<'de> for HummockVersionId {
206 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
207 where
208 D: Deserializer<'de>,
209 {
210 Ok(Self(<u64 as Deserialize>::deserialize(deserializer)?))
211 }
212}
213
214impl HummockVersionId {
215 pub const MAX: Self = Self(i64::MAX as _);
216
217 pub const fn new(id: u64) -> Self {
218 Self(id)
219 }
220
221 pub fn next(&self) -> Self {
222 Self(self.0 + 1)
223 }
224
225 pub fn to_u64(self) -> u64 {
226 self.0
227 }
228}
229
230impl Add<u64> for HummockVersionId {
231 type Output = Self;
232
233 fn add(self, rhs: u64) -> Self::Output {
234 Self(self.0 + rhs)
235 }
236}
237
238impl Sub for HummockVersionId {
239 type Output = u64;
240
241 fn sub(self, rhs: Self) -> Self::Output {
242 self.0 - rhs.0
243 }
244}
245
246pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId(0);
247pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId(1);
248pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56;
249pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56;
250pub const SST_OBJECT_SUFFIX: &str = "data";
251pub const VECTOR_FILE_OBJECT_SUFFIX: &str = "vector";
252pub const HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH: usize = 20;
253
254macro_rules! for_all_object_suffix {
255 ($({$name:ident, $type_name:ty, $suffix:expr},)+) => {
256 #[derive(Eq, PartialEq, Debug, Hash, Clone, Copy)]
257 pub enum HummockObjectId {
258 $(
259 $name($type_name),
260 )+
261 }
262
263 pub const VALID_OBJECT_ID_SUFFIXES: [&str; 2] = [$(
264 $suffix
265 ),+];
266
267 impl HummockObjectId {
268 fn new(id: u64, suffix: &str) -> Option<Self> {
269 match suffix {
270 $(
271 suffix if suffix == $suffix => Some(HummockObjectId::$name(<$type_name>::new(id))),
272 )+
273 _ => None,
274 }
275 }
276
277 pub fn suffix(&self) -> &str {
278 match self {
279 $(
280 HummockObjectId::$name(_) => $suffix,
281 )+
282 }
283 }
284
285 pub fn as_raw(&self) -> HummockRawObjectId {
286 let raw = match self {
287 $(
288 HummockObjectId::$name(id) => id.0,
289 )+
290 };
291 HummockRawObjectId::new(raw)
292 }
293 }
294
295 pub fn try_get_object_id_from_path(path: &str) -> Option<HummockObjectId> {
296 let split: Vec<_> = path.split(&['/', '.']).collect();
297 if split.len() <= 2 {
298 return None;
299 }
300 let suffix = split[split.len() - 1];
301 let id_str = split[split.len() - 2];
302 match suffix {
303 $(
304 suffix if suffix == $suffix => {
305 let id = id_str
306 .parse::<u64>()
307 .unwrap_or_else(|_| panic!("expect valid object id, got {}", id_str));
308 Some(HummockObjectId::$name(<$type_name>::new(id)))
309 },
310 )+
311 _ => None,
312 }
313 }
314 };
315 () => {
316 for_all_object_suffix! {
317 {Sstable, HummockSstableObjectId, SST_OBJECT_SUFFIX},
318 {VectorFile, HummockVectorFileId, VECTOR_FILE_OBJECT_SUFFIX},
319 }
320 };
321}
322
323for_all_object_suffix!();
324
325pub fn get_stale_object_ids(
326 stale_objects: &PbStaleObjects,
327) -> impl Iterator<Item = HummockObjectId> + '_ {
328 match HummockObjectId::Sstable(0.into()) {
332 HummockObjectId::Sstable(_) => {}
333 HummockObjectId::VectorFile(_) => {}
334 };
335 stale_objects
336 .id
337 .iter()
338 .map(|sst_id| HummockObjectId::Sstable((*sst_id).into()))
339 .chain(stale_objects.vector_files.iter().map(
340 |file| match file.get_object_type().unwrap() {
341 PbVectorIndexObjectType::VectorIndexObjectUnspecified => {
342 unreachable!()
343 }
344 VectorIndexObjectType::VectorIndexObjectVector => {
345 HummockObjectId::VectorFile(file.id.into())
346 }
347 },
348 ))
349}
350
351#[macro_export]
352macro_rules! info_in_release {
361 ($($arg:tt)*) => {
362 {
363 #[cfg(debug_assertions)]
364 {
365 use tracing::debug;
366 debug!($($arg)*);
367 }
368 #[cfg(not(debug_assertions))]
369 {
370 use tracing::info;
371 info!($($arg)*);
372 }
373 }
374 }
375}
376
377#[derive(Default, Debug)]
378pub struct SyncResult {
379 pub sync_size: usize,
381 pub uncommitted_ssts: Vec<LocalSstableInfo>,
383 pub table_watermarks: HashMap<TableId, TableWatermarks>,
385 pub old_value_ssts: Vec<LocalSstableInfo>,
387 pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
388}
389
390#[derive(Debug, Clone)]
391pub struct LocalSstableInfo {
392 pub sst_info: SstableInfo,
393 pub table_stats: TableStatsMap,
394 pub created_at: u64,
395}
396
397impl LocalSstableInfo {
398 pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
399 Self {
400 sst_info,
401 table_stats,
402 created_at,
403 }
404 }
405
406 pub fn for_test(sst_info: SstableInfo) -> Self {
407 Self {
408 sst_info,
409 table_stats: Default::default(),
410 created_at: u64::MAX,
411 }
412 }
413
414 pub fn file_size(&self) -> u64 {
415 assert_eq!(self.sst_info.file_size, self.sst_info.sst_size);
416 self.sst_info.file_size
417 }
418}
419
420impl PartialEq for LocalSstableInfo {
421 fn eq(&self, other: &Self) -> bool {
422 self.sst_info == other.sst_info
423 }
424}
425
426#[derive(Debug, Clone, Copy)]
428pub enum HummockReadEpoch {
429 Committed(HummockEpoch),
431 BatchQueryCommitted(HummockEpoch, HummockVersionId),
433 NoWait(HummockEpoch),
435 Backup(HummockEpoch),
437 TimeTravel(HummockEpoch),
438}
439
440impl From<BatchQueryEpoch> for HummockReadEpoch {
441 fn from(e: BatchQueryEpoch) -> Self {
442 match e.epoch.unwrap() {
443 batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::BatchQueryCommitted(
444 epoch.epoch,
445 HummockVersionId::new(epoch.hummock_version_id),
446 ),
447 batch_query_epoch::Epoch::Current(epoch) => {
448 if epoch != HummockEpoch::MAX {
449 warn!(
450 epoch,
451 "ignore specified current epoch and set it to u64::MAX"
452 );
453 }
454 HummockReadEpoch::NoWait(HummockEpoch::MAX)
455 }
456 batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch),
457 batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch),
458 }
459 }
460}
461
462pub fn test_batch_query_epoch() -> BatchQueryEpoch {
463 BatchQueryEpoch {
464 epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
465 }
466}
467
468impl HummockReadEpoch {
469 pub fn get_epoch(&self) -> HummockEpoch {
470 *match self {
471 HummockReadEpoch::Committed(epoch)
472 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
473 | HummockReadEpoch::NoWait(epoch)
474 | HummockReadEpoch::Backup(epoch)
475 | HummockReadEpoch::TimeTravel(epoch) => epoch,
476 }
477 }
478
479 pub fn is_read_committed(&self) -> bool {
480 match self {
481 HummockReadEpoch::Committed(_)
482 | HummockReadEpoch::TimeTravel(_)
483 | HummockReadEpoch::BatchQueryCommitted(_, _) => true,
484 HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => false,
485 }
486 }
487}
488pub struct ObjectIdRange {
489 pub start_id: HummockRawObjectId,
491 pub end_id: HummockRawObjectId,
493}
494
495impl ObjectIdRange {
496 pub fn new(
497 start_id: impl Into<HummockRawObjectId>,
498 end_id: impl Into<HummockRawObjectId>,
499 ) -> Self {
500 Self {
501 start_id: start_id.into(),
502 end_id: end_id.into(),
503 }
504 }
505
506 fn peek_next_object_id(&self) -> Option<HummockRawObjectId> {
507 if self.start_id < self.end_id {
508 return Some(self.start_id);
509 }
510 None
511 }
512
513 pub fn get_next_object_id(&mut self) -> Option<HummockRawObjectId> {
515 let next_id = self.peek_next_object_id();
516 self.start_id += 1;
517 next_id
518 }
519}
520
521pub fn can_concat(ssts: &[impl Borrow<SstableInfo>]) -> bool {
522 let len = ssts.len();
523 for i in 1..len {
524 if ssts[i - 1]
525 .borrow()
526 .key_range
527 .compare_right_with(&ssts[i].borrow().key_range.left)
528 != Ordering::Less
529 {
530 return false;
531 }
532 }
533 true
534}
535
536pub fn full_key_can_concat(ssts: &[SstableInfo]) -> bool {
537 let len = ssts.len();
538 for i in 1..len {
539 let sst_1 = &ssts[i - 1];
540 let sst_2 = &ssts[i];
541
542 if sst_1.key_range.right_exclusive {
543 if KeyComparator::compare_encoded_full_key(
544 &sst_1.key_range.right,
545 &sst_2.key_range.left,
546 )
547 .is_gt()
548 {
549 return false;
550 }
551 } else if KeyComparator::compare_encoded_full_key(
552 &sst_1.key_range.right,
553 &sst_2.key_range.left,
554 )
555 .is_ge()
556 {
557 return false;
558 }
559 }
560 true
561}
562
563const CHECKPOINT_DIR: &str = "checkpoint";
564const CHECKPOINT_NAME: &str = "0";
565const ARCHIVE_DIR: &str = "archive";
566
567pub fn version_checkpoint_path(root_dir: &str) -> String {
568 format!("{}/{}/{}", root_dir, CHECKPOINT_DIR, CHECKPOINT_NAME)
569}
570
571pub fn version_archive_dir(root_dir: &str) -> String {
572 format!("{}/{}", root_dir, ARCHIVE_DIR)
573}
574
575pub fn version_checkpoint_dir(checkpoint_path: &str) -> String {
576 checkpoint_path.trim_end_matches(|c| c != '/').to_owned()
577}
578
579#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)]
585pub struct EpochWithGap(u64);
586
587impl EpochWithGap {
588 #[allow(unused_variables)]
589 pub fn new(epoch: u64, spill_offset: u16) -> Self {
590 if risingwave_common::util::epoch::is_max_epoch(epoch) {
594 EpochWithGap::new_max_epoch()
595 } else {
596 debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0);
597 EpochWithGap(epoch + spill_offset as u64)
598 }
599 }
600
601 pub fn new_from_epoch(epoch: u64) -> Self {
602 EpochWithGap::new(epoch, 0)
603 }
604
605 pub fn new_min_epoch() -> Self {
606 EpochWithGap(0)
607 }
608
609 pub fn new_max_epoch() -> Self {
610 EpochWithGap(HummockEpoch::MAX)
611 }
612
613 pub(crate) fn as_u64(&self) -> HummockEpoch {
615 self.0
616 }
617
618 pub fn from_u64(epoch_with_gap: u64) -> Self {
620 EpochWithGap(epoch_with_gap)
621 }
622
623 pub fn pure_epoch(&self) -> HummockEpoch {
625 self.0 & !EPOCH_SPILL_TIME_MASK
626 }
627
628 pub fn offset(&self) -> u64 {
629 self.0 & EPOCH_SPILL_TIME_MASK
630 }
631}
632
633pub fn get_object_data_path(
634 obj_prefix: &str,
635 path_prefix: &str,
636 object_id: HummockObjectId,
637) -> String {
638 let suffix = object_id.suffix();
639 let object_id = object_id.as_raw();
640
641 let mut path = String::with_capacity(
642 path_prefix.len()
643 + "/".len()
644 + obj_prefix.len()
645 + HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH
646 + ".".len()
647 + suffix.len(),
648 );
649 path.push_str(path_prefix);
650 path.push('/');
651 path.push_str(obj_prefix);
652 path.push_str(&object_id.to_string());
653 path.push('.');
654 path.push_str(suffix);
655 path
656}
657
658pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
659 use itertools::Itertools;
660 let split = path.split(&['/', '.']).collect_vec();
661 assert!(split.len() > 2);
662 let suffix = split[split.len() - 1];
663 let id = split[split.len() - 2]
664 .parse::<u64>()
665 .expect("valid object id");
666 HummockObjectId::new(id, suffix)
667 .unwrap_or_else(|| panic!("unknown object id suffix {}", suffix))
668}
669
670#[cfg(test)]
671mod tests {
672 use bytes::Bytes;
673 use sstable_info::SstableInfoInner;
674
675 use super::*;
676
677 #[test]
678 fn test_object_id_decimal_max_length() {
679 let len = u64::MAX.to_string().len();
680 assert_eq!(len, HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH)
681 }
682
683 #[test]
684 fn test_full_key_concat() {
685 let key1 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l'\xe2\0\0";
686 let key2 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l \x12\0\0";
687
688 let sst_1 = SstableInfoInner {
689 key_range: key_range::KeyRange {
690 left: Bytes::from(key1.to_vec()),
691 right: Bytes::from(key1.to_vec()),
692 right_exclusive: false,
693 },
694 ..Default::default()
695 };
696
697 let sst_2 = SstableInfoInner {
698 key_range: key_range::KeyRange {
699 left: Bytes::from(key2.to_vec()),
700 right: Bytes::from(key2.to_vec()),
701 right_exclusive: false,
702 },
703 ..Default::default()
704 };
705
706 let sst_3 = SstableInfoInner {
707 key_range: key_range::KeyRange {
708 left: Bytes::from(key1.to_vec()),
709 right: Bytes::from(key2.to_vec()),
710 right_exclusive: false,
711 },
712 ..Default::default()
713 };
714
715 assert!(full_key_can_concat(&[sst_1.clone().into(), sst_2.into()]));
716
717 assert!(!full_key_can_concat(&[sst_1.into(), sst_3.into()]));
718 }
719}