1#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17#![feature(btree_cursors)]
18#![feature(map_try_insert)]
19
20mod key_cmp;
21
22use std::borrow::Borrow;
23use std::cmp::Ordering;
24use std::collections::HashMap;
25
26pub use key_cmp::*;
27use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
28use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
29use sstable_info::SstableInfo;
30
31use crate::key_range::KeyRangeCommon;
32use crate::table_stats::TableStatsMap;
33
34pub mod change_log;
35pub mod compact;
36pub mod compact_task;
37pub mod compaction_group;
38pub mod filter_utils;
39pub mod key;
40pub mod key_range;
41pub mod level;
42pub mod prost_key_range;
43pub mod sstable_info;
44pub mod state_table_info;
45pub mod table_stats;
46pub mod table_watermark;
47pub mod time_travel;
48pub mod version;
49pub use frontend_version::{FrontendHummockVersion, FrontendHummockVersionDelta};
50mod frontend_version;
51pub mod vector_index;
52
53pub use compact::*;
54use risingwave_common::catalog::TableId;
55use risingwave_pb::hummock::hummock_version_checkpoint::PbStaleObjects;
56use risingwave_pb::hummock::{PbVectorIndexObjectType, VectorIndexObjectType};
57pub use risingwave_pb::id::{
58 CompactionGroupId, HummockHnswGraphFileId, HummockRawObjectId, HummockSstableId,
59 HummockSstableObjectId, HummockVectorFileId, HummockVersionId,
60};
61
62use crate::table_watermark::TableWatermarks;
63use crate::vector_index::VectorIndexAdd;
64
65pub type HummockRefCount = u64;
66pub type HummockContextId = risingwave_common::id::WorkerId;
67pub type HummockEpoch = u64;
68pub type HummockCompactionTaskId = u64;
69
70pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId::new(0);
71pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId::new(1);
72pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56;
73pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56;
74pub const SST_OBJECT_SUFFIX: &str = "data";
75pub const VECTOR_FILE_OBJECT_SUFFIX: &str = "vector";
76pub const HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH: usize = 20;
77
78macro_rules! for_all_object_suffix {
79 ($({$name:ident, $type_name:ty, $suffix:expr},)+) => {
80 #[derive(Eq, PartialEq, Debug, Hash, Clone, Copy)]
81 pub enum HummockObjectId {
82 $(
83 $name($type_name),
84 )+
85 }
86
87 pub const VALID_OBJECT_ID_SUFFIXES: [&str; 3] = [$(
88 $suffix
89 ),+];
90
91 impl HummockObjectId {
92 fn new(id: u64, suffix: &str) -> Option<Self> {
93 match suffix {
94 $(
95 suffix if suffix == $suffix => Some(HummockObjectId::$name(<$type_name>::new(id))),
96 )+
97 _ => None,
98 }
99 }
100
101 pub fn suffix(&self) -> &str {
102 match self {
103 $(
104 HummockObjectId::$name(_) => $suffix,
105 )+
106 }
107 }
108
109 pub fn as_raw(&self) -> HummockRawObjectId {
110 let raw = match self {
111 $(
112 HummockObjectId::$name(id) => id.as_raw_id(),
113 )+
114 };
115 HummockRawObjectId::new(raw)
116 }
117 }
118
119 pub fn try_get_object_id_from_path(path: &str) -> Option<HummockObjectId> {
120 let split: Vec<_> = path.split(&['/', '.']).collect();
121 if split.len() <= 2 {
122 return None;
123 }
124 let suffix = split[split.len() - 1];
125 let id_str = split[split.len() - 2];
126 match suffix {
127 $(
128 suffix if suffix == $suffix => {
129 let id = id_str
130 .parse::<u64>()
131 .unwrap_or_else(|_| panic!("expect valid object id, got {}", id_str));
132 Some(HummockObjectId::$name(<$type_name>::new(id)))
133 },
134 )+
135 _ => None,
136 }
137 }
138 };
139 () => {
140 for_all_object_suffix! {
141 {Sstable, HummockSstableObjectId, SST_OBJECT_SUFFIX},
142 {VectorFile, HummockVectorFileId, VECTOR_FILE_OBJECT_SUFFIX},
143 {HnswGraphFile, HummockHnswGraphFileId, "hnsw_graph"},
144 }
145 };
146}
147
148for_all_object_suffix!();
149
150pub fn get_stale_object_ids(
151 stale_objects: &PbStaleObjects,
152) -> impl Iterator<Item = HummockObjectId> + '_ {
153 match HummockObjectId::Sstable(0.into()) {
157 HummockObjectId::Sstable(_) => {}
158 HummockObjectId::VectorFile(_) => {}
159 HummockObjectId::HnswGraphFile(_) => {}
160 };
161 stale_objects
162 .id
163 .iter()
164 .map(|sst_id| HummockObjectId::Sstable(*sst_id))
165 .chain(stale_objects.vector_files.iter().map(
166 |file| match file.get_object_type().unwrap() {
167 PbVectorIndexObjectType::VectorIndexObjectUnspecified => {
168 unreachable!()
169 }
170 VectorIndexObjectType::VectorIndexObjectVector => {
171 HummockObjectId::VectorFile(file.id.into())
172 }
173 VectorIndexObjectType::VectorIndexObjectHnswGraph => {
174 HummockObjectId::HnswGraphFile(file.id.into())
175 }
176 },
177 ))
178}
179
180#[macro_export]
181macro_rules! info_in_release {
190 ($($arg:tt)*) => {
191 {
192 #[cfg(debug_assertions)]
193 {
194 use tracing::debug;
195 debug!($($arg)*);
196 }
197 #[cfg(not(debug_assertions))]
198 {
199 use tracing::info;
200 info!($($arg)*);
201 }
202 }
203 }
204}
205
206#[derive(Default, Debug)]
207pub struct SyncResult {
208 pub sync_size: usize,
210 pub uncommitted_ssts: Vec<LocalSstableInfo>,
212 pub table_watermarks: HashMap<TableId, TableWatermarks>,
214 pub old_value_ssts: Vec<LocalSstableInfo>,
216 pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
217}
218
219#[derive(Debug, Clone)]
220pub struct LocalSstableInfo {
221 pub sst_info: SstableInfo,
222 pub table_stats: TableStatsMap,
223 pub created_at: u64,
224}
225
226impl LocalSstableInfo {
227 pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
228 Self {
229 sst_info,
230 table_stats,
231 created_at,
232 }
233 }
234
235 pub fn for_test(sst_info: SstableInfo) -> Self {
236 Self {
237 sst_info,
238 table_stats: Default::default(),
239 created_at: u64::MAX,
240 }
241 }
242
243 pub fn file_size(&self) -> u64 {
244 assert_eq!(self.sst_info.file_size, self.sst_info.sst_size);
245 self.sst_info.file_size
246 }
247}
248
249impl PartialEq for LocalSstableInfo {
250 fn eq(&self, other: &Self) -> bool {
251 self.sst_info == other.sst_info
252 }
253}
254
255#[derive(Debug, Clone, Copy)]
257pub enum HummockReadEpoch {
258 Committed(HummockEpoch),
260 BatchQueryCommitted(HummockEpoch, HummockVersionId),
262 NoWait(HummockEpoch),
264 Backup(HummockEpoch),
266 TimeTravel(HummockEpoch),
267}
268
269impl From<BatchQueryEpoch> for HummockReadEpoch {
270 fn from(e: BatchQueryEpoch) -> Self {
271 match e.epoch.unwrap() {
272 batch_query_epoch::Epoch::Committed(epoch) => {
273 HummockReadEpoch::BatchQueryCommitted(epoch.epoch, epoch.hummock_version_id)
274 }
275 batch_query_epoch::Epoch::Current(epoch) => HummockReadEpoch::NoWait(epoch),
276 batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch),
277 batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch),
278 }
279 }
280}
281
282pub fn test_batch_query_epoch() -> BatchQueryEpoch {
283 BatchQueryEpoch {
284 epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
285 }
286}
287
288impl HummockReadEpoch {
289 pub fn get_epoch(&self) -> HummockEpoch {
290 *match self {
291 HummockReadEpoch::Committed(epoch)
292 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
293 | HummockReadEpoch::NoWait(epoch)
294 | HummockReadEpoch::Backup(epoch)
295 | HummockReadEpoch::TimeTravel(epoch) => epoch,
296 }
297 }
298
299 pub fn is_read_committed(&self) -> bool {
300 match self {
301 HummockReadEpoch::Committed(_)
302 | HummockReadEpoch::TimeTravel(_)
303 | HummockReadEpoch::BatchQueryCommitted(_, _) => true,
304 HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => false,
305 }
306 }
307}
308pub struct ObjectIdRange {
309 pub start_id: HummockRawObjectId,
311 pub end_id: HummockRawObjectId,
313}
314
315impl ObjectIdRange {
316 pub fn new(
317 start_id: impl Into<HummockRawObjectId>,
318 end_id: impl Into<HummockRawObjectId>,
319 ) -> Self {
320 Self {
321 start_id: start_id.into(),
322 end_id: end_id.into(),
323 }
324 }
325
326 fn peek_next_object_id(&self) -> Option<HummockRawObjectId> {
327 if self.start_id < self.end_id {
328 return Some(self.start_id);
329 }
330 None
331 }
332
333 pub fn get_next_object_id(&mut self) -> Option<HummockRawObjectId> {
335 let next_id = self.peek_next_object_id();
336 self.start_id += 1;
337 next_id
338 }
339}
340
341pub fn can_concat(ssts: &[impl Borrow<SstableInfo>]) -> bool {
342 let len = ssts.len();
343 for i in 1..len {
344 if ssts[i - 1]
345 .borrow()
346 .key_range
347 .compare_right_with(&ssts[i].borrow().key_range.left)
348 != Ordering::Less
349 {
350 return false;
351 }
352 }
353 true
354}
355
356pub fn full_key_can_concat(ssts: &[SstableInfo]) -> bool {
357 let len = ssts.len();
358 for i in 1..len {
359 let sst_1 = &ssts[i - 1];
360 let sst_2 = &ssts[i];
361
362 if sst_1.key_range.right_exclusive {
363 if KeyComparator::compare_encoded_full_key(
364 &sst_1.key_range.right,
365 &sst_2.key_range.left,
366 )
367 .is_gt()
368 {
369 return false;
370 }
371 } else if KeyComparator::compare_encoded_full_key(
372 &sst_1.key_range.right,
373 &sst_2.key_range.left,
374 )
375 .is_ge()
376 {
377 return false;
378 }
379 }
380 true
381}
382
383const CHECKPOINT_DIR: &str = "checkpoint";
384const CHECKPOINT_NAME: &str = "0";
385const ARCHIVE_DIR: &str = "archive";
386
387pub fn version_checkpoint_path(root_dir: &str) -> String {
388 format!("{}/{}/{}", root_dir, CHECKPOINT_DIR, CHECKPOINT_NAME)
389}
390
391pub fn version_archive_dir(root_dir: &str) -> String {
392 format!("{}/{}", root_dir, ARCHIVE_DIR)
393}
394
395pub fn version_checkpoint_dir(checkpoint_path: &str) -> String {
396 checkpoint_path.trim_end_matches(|c| c != '/').to_owned()
397}
398
399#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)]
405pub struct EpochWithGap(u64);
406
407impl EpochWithGap {
408 #[allow(unused_variables)]
409 pub fn new(epoch: u64, spill_offset: u16) -> Self {
410 if risingwave_common::util::epoch::is_max_epoch(epoch) {
414 EpochWithGap::new_max_epoch()
415 } else {
416 debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0);
417 EpochWithGap(epoch + spill_offset as u64)
418 }
419 }
420
421 pub fn new_from_epoch(epoch: u64) -> Self {
422 EpochWithGap::new(epoch, 0)
423 }
424
425 pub fn new_min_epoch() -> Self {
426 EpochWithGap(0)
427 }
428
429 pub fn new_max_epoch() -> Self {
430 EpochWithGap(HummockEpoch::MAX)
431 }
432
433 pub(crate) fn as_u64(&self) -> HummockEpoch {
435 self.0
436 }
437
438 pub fn from_u64(epoch_with_gap: u64) -> Self {
440 EpochWithGap(epoch_with_gap)
441 }
442
443 pub fn pure_epoch(&self) -> HummockEpoch {
445 self.0 & !EPOCH_SPILL_TIME_MASK
446 }
447
448 pub fn offset(&self) -> u64 {
449 self.0 & EPOCH_SPILL_TIME_MASK
450 }
451}
452
453pub fn get_object_data_path(
454 obj_prefix: &str,
455 path_prefix: &str,
456 object_id: HummockObjectId,
457) -> String {
458 let suffix = object_id.suffix();
459 let object_id = object_id.as_raw();
460
461 let mut path = String::with_capacity(
462 path_prefix.len()
463 + "/".len()
464 + obj_prefix.len()
465 + HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH
466 + ".".len()
467 + suffix.len(),
468 );
469 path.push_str(path_prefix);
470 path.push('/');
471 path.push_str(obj_prefix);
472 path.push_str(&object_id.to_string());
473 path.push('.');
474 path.push_str(suffix);
475 path
476}
477
478pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
479 use itertools::Itertools;
480 let split = path.split(&['/', '.']).collect_vec();
481 assert!(split.len() > 2);
482 let suffix = split[split.len() - 1];
483 let id = split[split.len() - 2]
484 .parse::<u64>()
485 .expect("valid object id");
486 HummockObjectId::new(id, suffix)
487 .unwrap_or_else(|| panic!("unknown object id suffix {}", suffix))
488}
489
490#[cfg(test)]
491mod tests {
492 use bytes::Bytes;
493 use sstable_info::SstableInfoInner;
494
495 use super::*;
496
497 #[test]
498 fn test_object_id_decimal_max_length() {
499 let len = u64::MAX.to_string().len();
500 assert_eq!(len, HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH)
501 }
502
503 #[test]
504 fn test_full_key_concat() {
505 let key1 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l'\xe2\0\0";
506 let key2 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l \x12\0\0";
507
508 let sst_1 = SstableInfoInner {
509 key_range: key_range::KeyRange {
510 left: Bytes::from(key1.to_vec()),
511 right: Bytes::from(key1.to_vec()),
512 right_exclusive: false,
513 },
514 ..Default::default()
515 };
516
517 let sst_2 = SstableInfoInner {
518 key_range: key_range::KeyRange {
519 left: Bytes::from(key2.to_vec()),
520 right: Bytes::from(key2.to_vec()),
521 right_exclusive: false,
522 },
523 ..Default::default()
524 };
525
526 let sst_3 = SstableInfoInner {
527 key_range: key_range::KeyRange {
528 left: Bytes::from(key1.to_vec()),
529 right: Bytes::from(key2.to_vec()),
530 right_exclusive: false,
531 },
532 ..Default::default()
533 };
534
535 assert!(full_key_can_concat(&[sst_1.clone().into(), sst_2.into()]));
536
537 assert!(!full_key_can_concat(&[sst_1.into(), sst_3.into()]));
538 }
539}