1#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17#![feature(btree_cursors)]
18#![feature(map_try_insert)]
19
20mod key_cmp;
21
22use std::borrow::Borrow;
23use std::cmp::Ordering;
24use std::collections::HashMap;
25use std::fmt::{Display, Formatter};
26use std::ops::{Add, AddAssign, Sub};
27use std::str::FromStr;
28
29pub use key_cmp::*;
30use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
31use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
32use serde::{Deserialize, Deserializer, Serialize, Serializer};
33use sstable_info::SstableInfo;
34use tracing::warn;
35
36use crate::key_range::KeyRangeCommon;
37use crate::table_stats::TableStatsMap;
38
39pub mod change_log;
40pub mod compact;
41pub mod compact_task;
42pub mod compaction_group;
43pub mod key;
44pub mod key_range;
45pub mod level;
46pub mod prost_key_range;
47pub mod sstable_info;
48pub mod state_table_info;
49pub mod table_stats;
50pub mod table_watermark;
51pub mod time_travel;
52pub mod version;
53pub use frontend_version::{FrontendHummockVersion, FrontendHummockVersionDelta};
54mod frontend_version;
55pub mod vector_index;
56
57pub use compact::*;
58use risingwave_common::catalog::TableId;
59use risingwave_pb::hummock::hummock_version_checkpoint::PbStaleObjects;
60use risingwave_pb::hummock::{PbVectorIndexObjectType, VectorIndexObjectType};
61
62use crate::table_watermark::TableWatermarks;
63use crate::vector_index::VectorIndexAdd;
64
65#[derive(Debug, Eq, PartialEq, Clone, Copy, Hash, Ord, PartialOrd)]
66#[cfg_attr(any(test, feature = "test"), derive(Default))]
67pub struct TypedPrimitive<const C: usize, P>(P);
68
69impl<const C: usize, P: PartialEq> PartialEq<P> for TypedPrimitive<C, P> {
70 fn eq(&self, other: &P) -> bool {
71 self.0 == *other
72 }
73}
74
75macro_rules! impl_primitive {
76 ($($t:ty)*) => {$(
77 impl<const C: usize> PartialEq<TypedPrimitive<C, $t>> for $t {
78 fn eq(&self, other: &TypedPrimitive<C, $t>) -> bool {
79 *self == other.0
80 }
81 }
82 )*}
83}
84
85impl_primitive!(u64);
86
87impl<const C: usize, P: FromStr> FromStr for TypedPrimitive<C, P> {
88 type Err = P::Err;
89
90 fn from_str(s: &str) -> Result<Self, Self::Err> {
91 P::from_str(s).map(TypedPrimitive)
92 }
93}
94
95impl<const C: usize, P> Borrow<P> for TypedPrimitive<C, P> {
96 fn borrow(&self) -> &P {
97 &self.0
98 }
99}
100
101impl<const C: usize, P: Add<Output = P>> Add<P> for TypedPrimitive<C, P> {
102 type Output = Self;
103
104 fn add(self, rhs: P) -> Self::Output {
105 Self(self.0 + rhs)
106 }
107}
108
109impl<const C: usize, P: AddAssign> AddAssign<P> for TypedPrimitive<C, P> {
110 fn add_assign(&mut self, rhs: P) {
111 self.0 += rhs;
112 }
113}
114
115impl<const C: usize, P: Display> Display for TypedPrimitive<C, P> {
116 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
117 write!(f, "{}", self.0)
118 }
119}
120
121impl<const C: usize, P> From<P> for TypedPrimitive<C, P> {
122 fn from(value: P) -> Self {
123 Self(value)
124 }
125}
126
127impl<const C: usize, P: Serialize> Serialize for TypedPrimitive<C, P> {
128 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
129 where
130 S: Serializer,
131 {
132 self.0.serialize(serializer)
133 }
134}
135
136impl<'de, const C: usize, P: Deserialize<'de>> Deserialize<'de> for TypedPrimitive<C, P> {
137 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
138 where
139 D: Deserializer<'de>,
140 {
141 Ok(Self(<P as Deserialize>::deserialize(deserializer)?))
142 }
143}
144
145impl<const C: usize, P> TypedPrimitive<C, P> {
146 pub const fn new(id: P) -> Self {
147 Self(id)
148 }
149
150 pub fn inner(self) -> P {
151 self.0
152 }
153}
154
155pub type HummockRawObjectId = TypedPrimitive<0, u64>;
156pub type HummockSstableObjectId = TypedPrimitive<1, u64>;
157pub type HummockSstableId = TypedPrimitive<2, u64>;
158pub type HummockVectorFileId = TypedPrimitive<3, u64>;
159pub type HummockHnswGraphFileId = TypedPrimitive<4, u64>;
160
161macro_rules! impl_object_id {
162 ($type_name:ty) => {
163 impl $type_name {
164 pub fn as_raw(&self) -> HummockRawObjectId {
165 HummockRawObjectId::new(self.0)
166 }
167 }
168
169 impl From<HummockRawObjectId> for $type_name {
170 fn from(id: HummockRawObjectId) -> Self {
171 Self(id.0)
172 }
173 }
174 };
175}
176
177impl_object_id!(HummockSstableObjectId);
178impl_object_id!(HummockVectorFileId);
179impl_object_id!(HummockHnswGraphFileId);
180
181pub type HummockRefCount = u64;
182pub type HummockContextId = u32;
183pub type HummockEpoch = u64;
184pub type HummockCompactionTaskId = u64;
185pub type CompactionGroupId = u64;
186
187#[derive(Debug, Clone, PartialEq, Copy, Ord, PartialOrd, Eq, Hash)]
188pub struct HummockVersionId(u64);
189
190impl Display for HummockVersionId {
191 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
192 write!(f, "{}", self.0)
193 }
194}
195
196impl Serialize for HummockVersionId {
197 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
198 where
199 S: Serializer,
200 {
201 serializer.serialize_u64(self.0)
202 }
203}
204
205impl<'de> Deserialize<'de> for HummockVersionId {
206 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
207 where
208 D: Deserializer<'de>,
209 {
210 Ok(Self(<u64 as Deserialize>::deserialize(deserializer)?))
211 }
212}
213
214impl HummockVersionId {
215 pub const MAX: Self = Self(i64::MAX as _);
216
217 pub const fn new(id: u64) -> Self {
218 Self(id)
219 }
220
221 pub fn next(&self) -> Self {
222 Self(self.0 + 1)
223 }
224
225 pub fn to_u64(self) -> u64 {
226 self.0
227 }
228}
229
230impl Add<u64> for HummockVersionId {
231 type Output = Self;
232
233 fn add(self, rhs: u64) -> Self::Output {
234 Self(self.0 + rhs)
235 }
236}
237
238impl Sub for HummockVersionId {
239 type Output = u64;
240
241 fn sub(self, rhs: Self) -> Self::Output {
242 self.0 - rhs.0
243 }
244}
245
246pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId(0);
247pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId(1);
248pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56;
249pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56;
250pub const SST_OBJECT_SUFFIX: &str = "data";
251pub const VECTOR_FILE_OBJECT_SUFFIX: &str = "vector";
252pub const HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH: usize = 20;
253
254macro_rules! for_all_object_suffix {
255 ($({$name:ident, $type_name:ty, $suffix:expr},)+) => {
256 #[derive(Eq, PartialEq, Debug, Hash, Clone, Copy)]
257 pub enum HummockObjectId {
258 $(
259 $name($type_name),
260 )+
261 }
262
263 pub const VALID_OBJECT_ID_SUFFIXES: [&str; 3] = [$(
264 $suffix
265 ),+];
266
267 impl HummockObjectId {
268 fn new(id: u64, suffix: &str) -> Option<Self> {
269 match suffix {
270 $(
271 suffix if suffix == $suffix => Some(HummockObjectId::$name(<$type_name>::new(id))),
272 )+
273 _ => None,
274 }
275 }
276
277 pub fn suffix(&self) -> &str {
278 match self {
279 $(
280 HummockObjectId::$name(_) => $suffix,
281 )+
282 }
283 }
284
285 pub fn as_raw(&self) -> HummockRawObjectId {
286 let raw = match self {
287 $(
288 HummockObjectId::$name(id) => id.0,
289 )+
290 };
291 HummockRawObjectId::new(raw)
292 }
293 }
294
295 pub fn try_get_object_id_from_path(path: &str) -> Option<HummockObjectId> {
296 let split: Vec<_> = path.split(&['/', '.']).collect();
297 if split.len() <= 2 {
298 return None;
299 }
300 let suffix = split[split.len() - 1];
301 let id_str = split[split.len() - 2];
302 match suffix {
303 $(
304 suffix if suffix == $suffix => {
305 let id = id_str
306 .parse::<u64>()
307 .unwrap_or_else(|_| panic!("expect valid object id, got {}", id_str));
308 Some(HummockObjectId::$name(<$type_name>::new(id)))
309 },
310 )+
311 _ => None,
312 }
313 }
314 };
315 () => {
316 for_all_object_suffix! {
317 {Sstable, HummockSstableObjectId, SST_OBJECT_SUFFIX},
318 {VectorFile, HummockVectorFileId, VECTOR_FILE_OBJECT_SUFFIX},
319 {HnswGraphFile, HummockHnswGraphFileId, "hnsw_graph"},
320 }
321 };
322}
323
324for_all_object_suffix!();
325
326pub fn get_stale_object_ids(
327 stale_objects: &PbStaleObjects,
328) -> impl Iterator<Item = HummockObjectId> + '_ {
329 match HummockObjectId::Sstable(0.into()) {
333 HummockObjectId::Sstable(_) => {}
334 HummockObjectId::VectorFile(_) => {}
335 HummockObjectId::HnswGraphFile(_) => {}
336 };
337 stale_objects
338 .id
339 .iter()
340 .map(|sst_id| HummockObjectId::Sstable((*sst_id).into()))
341 .chain(stale_objects.vector_files.iter().map(
342 |file| match file.get_object_type().unwrap() {
343 PbVectorIndexObjectType::VectorIndexObjectUnspecified => {
344 unreachable!()
345 }
346 VectorIndexObjectType::VectorIndexObjectVector => {
347 HummockObjectId::VectorFile(file.id.into())
348 }
349 VectorIndexObjectType::VectorIndexObjectHnswGraph => {
350 HummockObjectId::HnswGraphFile(file.id.into())
351 }
352 },
353 ))
354}
355
356#[macro_export]
357macro_rules! info_in_release {
366 ($($arg:tt)*) => {
367 {
368 #[cfg(debug_assertions)]
369 {
370 use tracing::debug;
371 debug!($($arg)*);
372 }
373 #[cfg(not(debug_assertions))]
374 {
375 use tracing::info;
376 info!($($arg)*);
377 }
378 }
379 }
380}
381
382#[derive(Default, Debug)]
383pub struct SyncResult {
384 pub sync_size: usize,
386 pub uncommitted_ssts: Vec<LocalSstableInfo>,
388 pub table_watermarks: HashMap<TableId, TableWatermarks>,
390 pub old_value_ssts: Vec<LocalSstableInfo>,
392 pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
393}
394
395#[derive(Debug, Clone)]
396pub struct LocalSstableInfo {
397 pub sst_info: SstableInfo,
398 pub table_stats: TableStatsMap,
399 pub created_at: u64,
400}
401
402impl LocalSstableInfo {
403 pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
404 Self {
405 sst_info,
406 table_stats,
407 created_at,
408 }
409 }
410
411 pub fn for_test(sst_info: SstableInfo) -> Self {
412 Self {
413 sst_info,
414 table_stats: Default::default(),
415 created_at: u64::MAX,
416 }
417 }
418
419 pub fn file_size(&self) -> u64 {
420 assert_eq!(self.sst_info.file_size, self.sst_info.sst_size);
421 self.sst_info.file_size
422 }
423}
424
425impl PartialEq for LocalSstableInfo {
426 fn eq(&self, other: &Self) -> bool {
427 self.sst_info == other.sst_info
428 }
429}
430
431#[derive(Debug, Clone, Copy)]
433pub enum HummockReadEpoch {
434 Committed(HummockEpoch),
436 BatchQueryCommitted(HummockEpoch, HummockVersionId),
438 NoWait(HummockEpoch),
440 Backup(HummockEpoch),
442 TimeTravel(HummockEpoch),
443}
444
445impl From<BatchQueryEpoch> for HummockReadEpoch {
446 fn from(e: BatchQueryEpoch) -> Self {
447 match e.epoch.unwrap() {
448 batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::BatchQueryCommitted(
449 epoch.epoch,
450 HummockVersionId::new(epoch.hummock_version_id),
451 ),
452 batch_query_epoch::Epoch::Current(epoch) => {
453 if epoch != HummockEpoch::MAX {
454 warn!(
455 epoch,
456 "ignore specified current epoch and set it to u64::MAX"
457 );
458 }
459 HummockReadEpoch::NoWait(HummockEpoch::MAX)
460 }
461 batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch),
462 batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch),
463 }
464 }
465}
466
467pub fn test_batch_query_epoch() -> BatchQueryEpoch {
468 BatchQueryEpoch {
469 epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
470 }
471}
472
473impl HummockReadEpoch {
474 pub fn get_epoch(&self) -> HummockEpoch {
475 *match self {
476 HummockReadEpoch::Committed(epoch)
477 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
478 | HummockReadEpoch::NoWait(epoch)
479 | HummockReadEpoch::Backup(epoch)
480 | HummockReadEpoch::TimeTravel(epoch) => epoch,
481 }
482 }
483
484 pub fn is_read_committed(&self) -> bool {
485 match self {
486 HummockReadEpoch::Committed(_)
487 | HummockReadEpoch::TimeTravel(_)
488 | HummockReadEpoch::BatchQueryCommitted(_, _) => true,
489 HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => false,
490 }
491 }
492}
493pub struct ObjectIdRange {
494 pub start_id: HummockRawObjectId,
496 pub end_id: HummockRawObjectId,
498}
499
500impl ObjectIdRange {
501 pub fn new(
502 start_id: impl Into<HummockRawObjectId>,
503 end_id: impl Into<HummockRawObjectId>,
504 ) -> Self {
505 Self {
506 start_id: start_id.into(),
507 end_id: end_id.into(),
508 }
509 }
510
511 fn peek_next_object_id(&self) -> Option<HummockRawObjectId> {
512 if self.start_id < self.end_id {
513 return Some(self.start_id);
514 }
515 None
516 }
517
518 pub fn get_next_object_id(&mut self) -> Option<HummockRawObjectId> {
520 let next_id = self.peek_next_object_id();
521 self.start_id += 1;
522 next_id
523 }
524}
525
526pub fn can_concat(ssts: &[impl Borrow<SstableInfo>]) -> bool {
527 let len = ssts.len();
528 for i in 1..len {
529 if ssts[i - 1]
530 .borrow()
531 .key_range
532 .compare_right_with(&ssts[i].borrow().key_range.left)
533 != Ordering::Less
534 {
535 return false;
536 }
537 }
538 true
539}
540
541pub fn full_key_can_concat(ssts: &[SstableInfo]) -> bool {
542 let len = ssts.len();
543 for i in 1..len {
544 let sst_1 = &ssts[i - 1];
545 let sst_2 = &ssts[i];
546
547 if sst_1.key_range.right_exclusive {
548 if KeyComparator::compare_encoded_full_key(
549 &sst_1.key_range.right,
550 &sst_2.key_range.left,
551 )
552 .is_gt()
553 {
554 return false;
555 }
556 } else if KeyComparator::compare_encoded_full_key(
557 &sst_1.key_range.right,
558 &sst_2.key_range.left,
559 )
560 .is_ge()
561 {
562 return false;
563 }
564 }
565 true
566}
567
568const CHECKPOINT_DIR: &str = "checkpoint";
569const CHECKPOINT_NAME: &str = "0";
570const ARCHIVE_DIR: &str = "archive";
571
572pub fn version_checkpoint_path(root_dir: &str) -> String {
573 format!("{}/{}/{}", root_dir, CHECKPOINT_DIR, CHECKPOINT_NAME)
574}
575
576pub fn version_archive_dir(root_dir: &str) -> String {
577 format!("{}/{}", root_dir, ARCHIVE_DIR)
578}
579
580pub fn version_checkpoint_dir(checkpoint_path: &str) -> String {
581 checkpoint_path.trim_end_matches(|c| c != '/').to_owned()
582}
583
584#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)]
590pub struct EpochWithGap(u64);
591
592impl EpochWithGap {
593 #[allow(unused_variables)]
594 pub fn new(epoch: u64, spill_offset: u16) -> Self {
595 if risingwave_common::util::epoch::is_max_epoch(epoch) {
599 EpochWithGap::new_max_epoch()
600 } else {
601 debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0);
602 EpochWithGap(epoch + spill_offset as u64)
603 }
604 }
605
606 pub fn new_from_epoch(epoch: u64) -> Self {
607 EpochWithGap::new(epoch, 0)
608 }
609
610 pub fn new_min_epoch() -> Self {
611 EpochWithGap(0)
612 }
613
614 pub fn new_max_epoch() -> Self {
615 EpochWithGap(HummockEpoch::MAX)
616 }
617
618 pub(crate) fn as_u64(&self) -> HummockEpoch {
620 self.0
621 }
622
623 pub fn from_u64(epoch_with_gap: u64) -> Self {
625 EpochWithGap(epoch_with_gap)
626 }
627
628 pub fn pure_epoch(&self) -> HummockEpoch {
630 self.0 & !EPOCH_SPILL_TIME_MASK
631 }
632
633 pub fn offset(&self) -> u64 {
634 self.0 & EPOCH_SPILL_TIME_MASK
635 }
636}
637
638pub fn get_object_data_path(
639 obj_prefix: &str,
640 path_prefix: &str,
641 object_id: HummockObjectId,
642) -> String {
643 let suffix = object_id.suffix();
644 let object_id = object_id.as_raw();
645
646 let mut path = String::with_capacity(
647 path_prefix.len()
648 + "/".len()
649 + obj_prefix.len()
650 + HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH
651 + ".".len()
652 + suffix.len(),
653 );
654 path.push_str(path_prefix);
655 path.push('/');
656 path.push_str(obj_prefix);
657 path.push_str(&object_id.to_string());
658 path.push('.');
659 path.push_str(suffix);
660 path
661}
662
663pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
664 use itertools::Itertools;
665 let split = path.split(&['/', '.']).collect_vec();
666 assert!(split.len() > 2);
667 let suffix = split[split.len() - 1];
668 let id = split[split.len() - 2]
669 .parse::<u64>()
670 .expect("valid object id");
671 HummockObjectId::new(id, suffix)
672 .unwrap_or_else(|| panic!("unknown object id suffix {}", suffix))
673}
674
675#[cfg(test)]
676mod tests {
677 use bytes::Bytes;
678 use sstable_info::SstableInfoInner;
679
680 use super::*;
681
682 #[test]
683 fn test_object_id_decimal_max_length() {
684 let len = u64::MAX.to_string().len();
685 assert_eq!(len, HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH)
686 }
687
688 #[test]
689 fn test_full_key_concat() {
690 let key1 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l'\xe2\0\0";
691 let key2 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l \x12\0\0";
692
693 let sst_1 = SstableInfoInner {
694 key_range: key_range::KeyRange {
695 left: Bytes::from(key1.to_vec()),
696 right: Bytes::from(key1.to_vec()),
697 right_exclusive: false,
698 },
699 ..Default::default()
700 };
701
702 let sst_2 = SstableInfoInner {
703 key_range: key_range::KeyRange {
704 left: Bytes::from(key2.to_vec()),
705 right: Bytes::from(key2.to_vec()),
706 right_exclusive: false,
707 },
708 ..Default::default()
709 };
710
711 let sst_3 = SstableInfoInner {
712 key_range: key_range::KeyRange {
713 left: Bytes::from(key1.to_vec()),
714 right: Bytes::from(key2.to_vec()),
715 right_exclusive: false,
716 },
717 ..Default::default()
718 };
719
720 assert!(full_key_can_concat(&[sst_1.clone().into(), sst_2.into()]));
721
722 assert!(!full_key_can_concat(&[sst_1.into(), sst_3.into()]));
723 }
724}