1#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17#![feature(btree_cursors)]
18#![feature(map_try_insert)]
19#![feature(stmt_expr_attributes)]
20
21mod key_cmp;
22
23use std::borrow::Borrow;
24use std::cmp::Ordering;
25use std::collections::HashMap;
26
27pub use key_cmp::*;
28use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
29use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
30use sstable_info::SstableInfo;
31
32use crate::key_range::KeyRangeCommon;
33use crate::table_stats::TableStatsMap;
34
35pub mod change_log;
36pub mod compact;
37pub mod compact_task;
38pub mod compaction_group;
39pub mod filter_utils;
40pub mod key;
41pub mod key_range;
42pub mod level;
43pub mod prost_key_range;
44pub mod sstable_info;
45pub mod state_table_info;
46pub mod table_stats;
47pub mod table_watermark;
48pub mod time_travel;
49pub mod version;
50pub use frontend_version::{FrontendHummockVersion, FrontendHummockVersionDelta};
51mod frontend_version;
52pub mod vector_index;
53
54pub use compact::*;
55use risingwave_common::catalog::TableId;
56use risingwave_pb::hummock::hummock_version_checkpoint::PbStaleObjects;
57use risingwave_pb::hummock::{PbVectorIndexObjectType, VectorIndexObjectType};
58pub use risingwave_pb::id::{
59 CompactionGroupId, HummockHnswGraphFileId, HummockRawObjectId, HummockSstableId,
60 HummockSstableObjectId, HummockVectorFileId, HummockVersionId,
61};
62
63use crate::table_watermark::TableWatermarks;
64use crate::vector_index::VectorIndexAdd;
65
66pub type HummockRefCount = u64;
67pub type HummockContextId = risingwave_common::id::WorkerId;
68pub type HummockEpoch = u64;
69pub type HummockCompactionTaskId = u64;
70
71pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId::new(0);
72pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId::new(1);
73pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56;
74pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56;
75pub const SST_OBJECT_SUFFIX: &str = "data";
76pub const VECTOR_FILE_OBJECT_SUFFIX: &str = "vector";
77pub const HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH: usize = 20;
78
79macro_rules! for_all_object_suffix {
80 ($({$name:ident, $type_name:ty, $suffix:expr},)+) => {
81 #[derive(Eq, PartialEq, Debug, Hash, Clone, Copy)]
82 pub enum HummockObjectId {
83 $(
84 $name($type_name),
85 )+
86 }
87
88 pub const VALID_OBJECT_ID_SUFFIXES: [&str; 3] = [$(
89 $suffix
90 ),+];
91
92 impl HummockObjectId {
93 fn new(id: u64, suffix: &str) -> Option<Self> {
94 match suffix {
95 $(
96 suffix if suffix == $suffix => Some(HummockObjectId::$name(<$type_name>::new(id))),
97 )+
98 _ => None,
99 }
100 }
101
102 pub fn suffix(&self) -> &str {
103 match self {
104 $(
105 HummockObjectId::$name(_) => $suffix,
106 )+
107 }
108 }
109
110 pub fn as_raw(&self) -> HummockRawObjectId {
111 let raw = match self {
112 $(
113 HummockObjectId::$name(id) => id.as_raw_id(),
114 )+
115 };
116 HummockRawObjectId::new(raw)
117 }
118 }
119
120 pub fn try_get_object_id_from_path(path: &str) -> Option<HummockObjectId> {
121 let split: Vec<_> = path.split(&['/', '.']).collect();
122 if split.len() <= 2 {
123 return None;
124 }
125 let suffix = split[split.len() - 1];
126 let id_str = split[split.len() - 2];
127 match suffix {
128 $(
129 suffix if suffix == $suffix => {
130 let id = id_str
131 .parse::<u64>()
132 .unwrap_or_else(|_| panic!("expect valid object id, got {}", id_str));
133 Some(HummockObjectId::$name(<$type_name>::new(id)))
134 },
135 )+
136 _ => None,
137 }
138 }
139 };
140 () => {
141 for_all_object_suffix! {
142 {Sstable, HummockSstableObjectId, SST_OBJECT_SUFFIX},
143 {VectorFile, HummockVectorFileId, VECTOR_FILE_OBJECT_SUFFIX},
144 {HnswGraphFile, HummockHnswGraphFileId, "hnsw_graph"},
145 }
146 };
147}
148
149for_all_object_suffix!();
150
151pub fn get_stale_object_ids(
152 stale_objects: &PbStaleObjects,
153) -> impl Iterator<Item = HummockObjectId> + '_ {
154 match HummockObjectId::Sstable(0.into()) {
158 HummockObjectId::Sstable(_) => {}
159 HummockObjectId::VectorFile(_) => {}
160 HummockObjectId::HnswGraphFile(_) => {}
161 };
162 stale_objects
163 .id
164 .iter()
165 .map(|sst_id| HummockObjectId::Sstable(*sst_id))
166 .chain(stale_objects.vector_files.iter().map(
167 |file| match file.get_object_type().unwrap() {
168 PbVectorIndexObjectType::VectorIndexObjectUnspecified => {
169 unreachable!()
170 }
171 VectorIndexObjectType::VectorIndexObjectVector => {
172 HummockObjectId::VectorFile(file.id.into())
173 }
174 VectorIndexObjectType::VectorIndexObjectHnswGraph => {
175 HummockObjectId::HnswGraphFile(file.id.into())
176 }
177 },
178 ))
179}
180
181#[macro_export]
182macro_rules! info_in_release {
191 ($($arg:tt)*) => {
192 {
193 #[cfg(debug_assertions)]
194 {
195 use tracing::debug;
196 debug!($($arg)*);
197 }
198 #[cfg(not(debug_assertions))]
199 {
200 use tracing::info;
201 info!($($arg)*);
202 }
203 }
204 }
205}
206
207#[derive(Default, Debug)]
208pub struct SyncResult {
209 pub sync_size: usize,
211 pub uncommitted_ssts: Vec<LocalSstableInfo>,
213 pub table_watermarks: HashMap<TableId, TableWatermarks>,
215 pub old_value_ssts: Vec<LocalSstableInfo>,
217 pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
218}
219
220#[derive(Debug, Clone)]
221pub struct LocalSstableInfo {
222 pub sst_info: SstableInfo,
223 pub table_stats: TableStatsMap,
224 pub created_at: u64,
225}
226
227impl LocalSstableInfo {
228 pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
229 Self {
230 sst_info,
231 table_stats,
232 created_at,
233 }
234 }
235
236 pub fn for_test(sst_info: SstableInfo) -> Self {
237 Self {
238 sst_info,
239 table_stats: Default::default(),
240 created_at: u64::MAX,
241 }
242 }
243
244 pub fn file_size(&self) -> u64 {
245 assert_eq!(self.sst_info.file_size, self.sst_info.sst_size);
246 self.sst_info.file_size
247 }
248}
249
250impl PartialEq for LocalSstableInfo {
251 fn eq(&self, other: &Self) -> bool {
252 self.sst_info == other.sst_info
253 }
254}
255
256#[derive(Debug, Clone, Copy)]
258pub enum HummockReadEpoch {
259 Committed(HummockEpoch),
261 BatchQueryCommitted(HummockEpoch, HummockVersionId),
263 NoWait(HummockEpoch),
265 Backup(HummockEpoch),
267 TimeTravel(HummockEpoch),
268}
269
270impl From<BatchQueryEpoch> for HummockReadEpoch {
271 fn from(e: BatchQueryEpoch) -> Self {
272 match e.epoch.unwrap() {
273 batch_query_epoch::Epoch::Committed(epoch) => {
274 HummockReadEpoch::BatchQueryCommitted(epoch.epoch, epoch.hummock_version_id)
275 }
276 batch_query_epoch::Epoch::Current(epoch) => HummockReadEpoch::NoWait(epoch),
277 batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch),
278 batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch),
279 }
280 }
281}
282
283pub fn test_batch_query_epoch() -> BatchQueryEpoch {
284 BatchQueryEpoch {
285 epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
286 }
287}
288
289impl HummockReadEpoch {
290 pub fn get_epoch(&self) -> HummockEpoch {
291 *match self {
292 HummockReadEpoch::Committed(epoch)
293 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
294 | HummockReadEpoch::NoWait(epoch)
295 | HummockReadEpoch::Backup(epoch)
296 | HummockReadEpoch::TimeTravel(epoch) => epoch,
297 }
298 }
299
300 pub fn is_read_committed(&self) -> bool {
301 match self {
302 HummockReadEpoch::Committed(_)
303 | HummockReadEpoch::TimeTravel(_)
304 | HummockReadEpoch::BatchQueryCommitted(_, _) => true,
305 HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => false,
306 }
307 }
308}
309pub struct ObjectIdRange {
310 pub start_id: HummockRawObjectId,
312 pub end_id: HummockRawObjectId,
314}
315
316impl ObjectIdRange {
317 pub fn new(
318 start_id: impl Into<HummockRawObjectId>,
319 end_id: impl Into<HummockRawObjectId>,
320 ) -> Self {
321 Self {
322 start_id: start_id.into(),
323 end_id: end_id.into(),
324 }
325 }
326
327 fn peek_next_object_id(&self) -> Option<HummockRawObjectId> {
328 if self.start_id < self.end_id {
329 return Some(self.start_id);
330 }
331 None
332 }
333
334 pub fn get_next_object_id(&mut self) -> Option<HummockRawObjectId> {
336 let next_id = self.peek_next_object_id();
337 self.start_id += 1;
338 next_id
339 }
340}
341
342pub fn can_concat(ssts: &[impl Borrow<SstableInfo>]) -> bool {
343 let len = ssts.len();
344 for i in 1..len {
345 if ssts[i - 1]
346 .borrow()
347 .key_range
348 .compare_right_with(&ssts[i].borrow().key_range.left)
349 != Ordering::Less
350 {
351 return false;
352 }
353 }
354 true
355}
356
357pub fn full_key_can_concat(ssts: &[SstableInfo]) -> bool {
358 let len = ssts.len();
359 for i in 1..len {
360 let sst_1 = &ssts[i - 1];
361 let sst_2 = &ssts[i];
362
363 if sst_1.key_range.right_exclusive {
364 if KeyComparator::compare_encoded_full_key(
365 &sst_1.key_range.right,
366 &sst_2.key_range.left,
367 )
368 .is_gt()
369 {
370 return false;
371 }
372 } else if KeyComparator::compare_encoded_full_key(
373 &sst_1.key_range.right,
374 &sst_2.key_range.left,
375 )
376 .is_ge()
377 {
378 return false;
379 }
380 }
381 true
382}
383
384const CHECKPOINT_DIR: &str = "checkpoint";
385const CHECKPOINT_NAME: &str = "0";
386const ARCHIVE_DIR: &str = "archive";
387
388pub fn version_checkpoint_path(root_dir: &str) -> String {
389 format!("{}/{}/{}", root_dir, CHECKPOINT_DIR, CHECKPOINT_NAME)
390}
391
392pub fn version_archive_dir(root_dir: &str) -> String {
393 format!("{}/{}", root_dir, ARCHIVE_DIR)
394}
395
396pub fn version_checkpoint_dir(checkpoint_path: &str) -> String {
397 checkpoint_path.trim_end_matches(|c| c != '/').to_owned()
398}
399
400#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)]
406pub struct EpochWithGap(u64);
407
408impl EpochWithGap {
409 #[allow(unused_variables)]
410 pub fn new(epoch: u64, spill_offset: u16) -> Self {
411 if risingwave_common::util::epoch::is_max_epoch(epoch) {
415 EpochWithGap::new_max_epoch()
416 } else {
417 debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0);
418 EpochWithGap(epoch + spill_offset as u64)
419 }
420 }
421
422 pub fn new_from_epoch(epoch: u64) -> Self {
423 EpochWithGap::new(epoch, 0)
424 }
425
426 pub fn new_min_epoch() -> Self {
427 EpochWithGap(0)
428 }
429
430 pub fn new_max_epoch() -> Self {
431 EpochWithGap(HummockEpoch::MAX)
432 }
433
434 pub(crate) fn as_u64(&self) -> HummockEpoch {
436 self.0
437 }
438
439 pub fn from_u64(epoch_with_gap: u64) -> Self {
441 EpochWithGap(epoch_with_gap)
442 }
443
444 pub fn pure_epoch(&self) -> HummockEpoch {
446 self.0 & !EPOCH_SPILL_TIME_MASK
447 }
448
449 pub fn offset(&self) -> u64 {
450 self.0 & EPOCH_SPILL_TIME_MASK
451 }
452}
453
454pub fn get_object_data_path(
455 obj_prefix: &str,
456 path_prefix: &str,
457 object_id: HummockObjectId,
458) -> String {
459 let suffix = object_id.suffix();
460 let object_id = object_id.as_raw();
461
462 let mut path = String::with_capacity(
463 path_prefix.len()
464 + "/".len()
465 + obj_prefix.len()
466 + HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH
467 + ".".len()
468 + suffix.len(),
469 );
470 path.push_str(path_prefix);
471 path.push('/');
472 path.push_str(obj_prefix);
473 path.push_str(&object_id.to_string());
474 path.push('.');
475 path.push_str(suffix);
476 path
477}
478
479pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
480 use itertools::Itertools;
481 let split = path.split(&['/', '.']).collect_vec();
482 assert!(split.len() > 2);
483 let suffix = split[split.len() - 1];
484 let id = split[split.len() - 2]
485 .parse::<u64>()
486 .expect("valid object id");
487 HummockObjectId::new(id, suffix)
488 .unwrap_or_else(|| panic!("unknown object id suffix {}", suffix))
489}
490
491#[cfg(test)]
492mod tests {
493 use bytes::Bytes;
494 use sstable_info::SstableInfoInner;
495
496 use super::*;
497
498 #[test]
499 fn test_object_id_decimal_max_length() {
500 let len = u64::MAX.to_string().len();
501 assert_eq!(len, HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH)
502 }
503
504 #[test]
505 fn test_full_key_concat() {
506 let key1 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l'\xe2\0\0";
507 let key2 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l \x12\0\0";
508
509 let sst_1 = SstableInfoInner {
510 key_range: key_range::KeyRange {
511 left: Bytes::from(key1.to_vec()),
512 right: Bytes::from(key1.to_vec()),
513 right_exclusive: false,
514 },
515 ..Default::default()
516 };
517
518 let sst_2 = SstableInfoInner {
519 key_range: key_range::KeyRange {
520 left: Bytes::from(key2.to_vec()),
521 right: Bytes::from(key2.to_vec()),
522 right_exclusive: false,
523 },
524 ..Default::default()
525 };
526
527 let sst_3 = SstableInfoInner {
528 key_range: key_range::KeyRange {
529 left: Bytes::from(key1.to_vec()),
530 right: Bytes::from(key2.to_vec()),
531 right_exclusive: false,
532 },
533 ..Default::default()
534 };
535
536 assert!(full_key_can_concat(&[sst_1.clone().into(), sst_2.into()]));
537
538 assert!(!full_key_can_concat(&[sst_1.into(), sst_3.into()]));
539 }
540}