1#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17#![feature(let_chains)]
18#![feature(btree_cursors)]
19#![feature(strict_overflow_ops)]
20
21mod key_cmp;
22
23use std::borrow::Borrow;
24use std::cmp::Ordering;
25use std::collections::HashMap;
26use std::fmt::{Display, Formatter};
27use std::ops::{Add, Sub};
28
29pub use key_cmp::*;
30use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
31use risingwave_pb::common::{BatchQueryEpoch, batch_query_epoch};
32use serde::{Deserialize, Deserializer, Serialize, Serializer};
33use sstable_info::SstableInfo;
34use tracing::warn;
35
36use crate::key_range::KeyRangeCommon;
37use crate::table_stats::TableStatsMap;
38
39pub mod change_log;
40pub mod compact;
41pub mod compact_task;
42pub mod compaction_group;
43pub mod key;
44pub mod key_range;
45pub mod level;
46pub mod prost_key_range;
47pub mod sstable_info;
48pub mod state_table_info;
49pub mod table_stats;
50pub mod table_watermark;
51pub mod time_travel;
52pub mod version;
53pub use frontend_version::{FrontendHummockVersion, FrontendHummockVersionDelta};
54mod frontend_version;
55
56pub use compact::*;
57use risingwave_common::catalog::TableId;
58
59use crate::table_watermark::TableWatermarks;
60
61pub type HummockSstableObjectId = u64;
62pub type HummockSstableId = u64;
63pub type HummockRefCount = u64;
64pub type HummockContextId = u32;
65pub type HummockEpoch = u64;
66pub type HummockCompactionTaskId = u64;
67pub type CompactionGroupId = u64;
68
69#[derive(Debug, Clone, PartialEq, Copy, Ord, PartialOrd, Eq, Hash)]
70pub struct HummockVersionId(u64);
71
72impl Display for HummockVersionId {
73 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74 write!(f, "{}", self.0)
75 }
76}
77
78impl Serialize for HummockVersionId {
79 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
80 where
81 S: Serializer,
82 {
83 serializer.serialize_u64(self.0)
84 }
85}
86
87impl<'de> Deserialize<'de> for HummockVersionId {
88 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
89 where
90 D: Deserializer<'de>,
91 {
92 Ok(Self(<u64 as Deserialize>::deserialize(deserializer)?))
93 }
94}
95
96impl HummockVersionId {
97 pub const MAX: Self = Self(i64::MAX as _);
98
99 pub const fn new(id: u64) -> Self {
100 Self(id)
101 }
102
103 pub fn next(&self) -> Self {
104 Self(self.0 + 1)
105 }
106
107 pub fn to_u64(self) -> u64 {
108 self.0
109 }
110}
111
112impl Add<u64> for HummockVersionId {
113 type Output = Self;
114
115 fn add(self, rhs: u64) -> Self::Output {
116 Self(self.0 + rhs)
117 }
118}
119
120impl Sub for HummockVersionId {
121 type Output = u64;
122
123 fn sub(self, rhs: Self) -> Self::Output {
124 self.0 - rhs.0
125 }
126}
127
128pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId(0);
129pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId(1);
130pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56;
131pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56;
132pub const OBJECT_SUFFIX: &str = "data";
133pub const HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH: usize = 20;
134
135#[macro_export]
136macro_rules! info_in_release {
145 ($($arg:tt)*) => {
146 {
147 #[cfg(debug_assertions)]
148 {
149 use tracing::debug;
150 debug!($($arg)*);
151 }
152 #[cfg(not(debug_assertions))]
153 {
154 use tracing::info;
155 info!($($arg)*);
156 }
157 }
158 }
159}
160
161#[derive(Default, Debug)]
162pub struct SyncResult {
163 pub sync_size: usize,
165 pub uncommitted_ssts: Vec<LocalSstableInfo>,
167 pub table_watermarks: HashMap<TableId, TableWatermarks>,
169 pub old_value_ssts: Vec<LocalSstableInfo>,
171}
172
173#[derive(Debug, Clone)]
174pub struct LocalSstableInfo {
175 pub sst_info: SstableInfo,
176 pub table_stats: TableStatsMap,
177 pub created_at: u64,
178}
179
180impl LocalSstableInfo {
181 pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self {
182 Self {
183 sst_info,
184 table_stats,
185 created_at,
186 }
187 }
188
189 pub fn for_test(sst_info: SstableInfo) -> Self {
190 Self {
191 sst_info,
192 table_stats: Default::default(),
193 created_at: u64::MAX,
194 }
195 }
196
197 pub fn file_size(&self) -> u64 {
198 assert_eq!(self.sst_info.file_size, self.sst_info.sst_size);
199 self.sst_info.file_size
200 }
201}
202
203impl PartialEq for LocalSstableInfo {
204 fn eq(&self, other: &Self) -> bool {
205 self.sst_info == other.sst_info
206 }
207}
208
209#[derive(Debug, Clone, Copy)]
211pub enum HummockReadEpoch {
212 Committed(HummockEpoch),
214 BatchQueryCommitted(HummockEpoch, HummockVersionId),
216 NoWait(HummockEpoch),
218 Backup(HummockEpoch),
220 TimeTravel(HummockEpoch),
221}
222
223impl From<BatchQueryEpoch> for HummockReadEpoch {
224 fn from(e: BatchQueryEpoch) -> Self {
225 match e.epoch.unwrap() {
226 batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::BatchQueryCommitted(
227 epoch.epoch,
228 HummockVersionId::new(epoch.hummock_version_id),
229 ),
230 batch_query_epoch::Epoch::Current(epoch) => {
231 if epoch != HummockEpoch::MAX {
232 warn!(
233 epoch,
234 "ignore specified current epoch and set it to u64::MAX"
235 );
236 }
237 HummockReadEpoch::NoWait(HummockEpoch::MAX)
238 }
239 batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch),
240 batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch),
241 }
242 }
243}
244
245pub fn test_batch_query_epoch() -> BatchQueryEpoch {
246 BatchQueryEpoch {
247 epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
248 }
249}
250
251impl HummockReadEpoch {
252 pub fn get_epoch(&self) -> HummockEpoch {
253 *match self {
254 HummockReadEpoch::Committed(epoch)
255 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
256 | HummockReadEpoch::NoWait(epoch)
257 | HummockReadEpoch::Backup(epoch)
258 | HummockReadEpoch::TimeTravel(epoch) => epoch,
259 }
260 }
261
262 pub fn is_read_committed(&self) -> bool {
263 match self {
264 HummockReadEpoch::Committed(_)
265 | HummockReadEpoch::TimeTravel(_)
266 | HummockReadEpoch::BatchQueryCommitted(_, _) => true,
267 HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => false,
268 }
269 }
270}
271pub struct SstObjectIdRange {
272 pub start_id: HummockSstableObjectId,
274 pub end_id: HummockSstableObjectId,
276}
277
278impl SstObjectIdRange {
279 pub fn new(start_id: HummockSstableObjectId, end_id: HummockSstableObjectId) -> Self {
280 Self { start_id, end_id }
281 }
282
283 pub fn peek_next_sst_object_id(&self) -> Option<HummockSstableObjectId> {
284 if self.start_id < self.end_id {
285 return Some(self.start_id);
286 }
287 None
288 }
289
290 pub fn get_next_sst_object_id(&mut self) -> Option<HummockSstableObjectId> {
292 let next_id = self.peek_next_sst_object_id();
293 self.start_id += 1;
294 next_id
295 }
296}
297
298pub fn can_concat(ssts: &[impl Borrow<SstableInfo>]) -> bool {
299 let len = ssts.len();
300 for i in 1..len {
301 if ssts[i - 1]
302 .borrow()
303 .key_range
304 .compare_right_with(&ssts[i].borrow().key_range.left)
305 != Ordering::Less
306 {
307 return false;
308 }
309 }
310 true
311}
312
313pub fn full_key_can_concat(ssts: &[SstableInfo]) -> bool {
314 let len = ssts.len();
315 for i in 1..len {
316 let sst_1 = &ssts[i - 1];
317 let sst_2 = &ssts[i];
318
319 if sst_1.key_range.right_exclusive {
320 if KeyComparator::compare_encoded_full_key(
321 &sst_1.key_range.right,
322 &sst_2.key_range.left,
323 )
324 .is_gt()
325 {
326 return false;
327 }
328 } else if KeyComparator::compare_encoded_full_key(
329 &sst_1.key_range.right,
330 &sst_2.key_range.left,
331 )
332 .is_ge()
333 {
334 return false;
335 }
336 }
337 true
338}
339
340const CHECKPOINT_DIR: &str = "checkpoint";
341const CHECKPOINT_NAME: &str = "0";
342const ARCHIVE_DIR: &str = "archive";
343
344pub fn version_checkpoint_path(root_dir: &str) -> String {
345 format!("{}/{}/{}", root_dir, CHECKPOINT_DIR, CHECKPOINT_NAME)
346}
347
348pub fn version_archive_dir(root_dir: &str) -> String {
349 format!("{}/{}", root_dir, ARCHIVE_DIR)
350}
351
352pub fn version_checkpoint_dir(checkpoint_path: &str) -> String {
353 checkpoint_path.trim_end_matches(|c| c != '/').to_owned()
354}
355
356#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)]
362pub struct EpochWithGap(u64);
363
364impl EpochWithGap {
365 #[allow(unused_variables)]
366 pub fn new(epoch: u64, spill_offset: u16) -> Self {
367 if risingwave_common::util::epoch::is_max_epoch(epoch) {
371 EpochWithGap::new_max_epoch()
372 } else {
373 debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0);
374 EpochWithGap(epoch + spill_offset as u64)
375 }
376 }
377
378 pub fn new_from_epoch(epoch: u64) -> Self {
379 EpochWithGap::new(epoch, 0)
380 }
381
382 pub fn new_min_epoch() -> Self {
383 EpochWithGap(0)
384 }
385
386 pub fn new_max_epoch() -> Self {
387 EpochWithGap(HummockEpoch::MAX)
388 }
389
390 pub(crate) fn as_u64(&self) -> HummockEpoch {
392 self.0
393 }
394
395 pub fn from_u64(epoch_with_gap: u64) -> Self {
397 EpochWithGap(epoch_with_gap)
398 }
399
400 pub fn pure_epoch(&self) -> HummockEpoch {
402 self.0 & !EPOCH_SPILL_TIME_MASK
403 }
404
405 pub fn offset(&self) -> u64 {
406 self.0 & EPOCH_SPILL_TIME_MASK
407 }
408}
409
410pub fn get_sst_data_path(
411 obj_prefix: &str,
412 path_prefix: &str,
413 object_id: HummockSstableObjectId,
414) -> String {
415 let mut path = String::with_capacity(
416 path_prefix.len()
417 + "/".len()
418 + obj_prefix.len()
419 + HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH
420 + ".".len()
421 + OBJECT_SUFFIX.len(),
422 );
423 path.push_str(path_prefix);
424 path.push('/');
425 path.push_str(obj_prefix);
426 path.push_str(&object_id.to_string());
427 path.push('.');
428 path.push_str(OBJECT_SUFFIX);
429 path
430}
431
432pub fn get_object_id_from_path(path: &str) -> HummockSstableObjectId {
433 use itertools::Itertools;
434 let split = path.split(&['/', '.']).collect_vec();
435 assert!(split.len() > 2);
436 assert_eq!(split[split.len() - 1], OBJECT_SUFFIX);
437 split[split.len() - 2]
438 .parse::<HummockSstableObjectId>()
439 .expect("valid sst id")
440}
441
442#[cfg(test)]
443mod tests {
444 use bytes::Bytes;
445 use sstable_info::SstableInfoInner;
446
447 use super::*;
448
449 #[test]
450 fn test_object_id_decimal_max_length() {
451 let len = HummockSstableObjectId::MAX.to_string().len();
452 assert_eq!(len, HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH)
453 }
454
455 #[test]
456 fn test_full_key_concat() {
457 let key1 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l'\xe2\0\0";
458 let key2 = b"\0\0\0\x08\0\0\0\x0112-3\0\0\0\0\x04\0\x1c\x16l \x12\0\0";
459
460 let sst_1 = SstableInfoInner {
461 key_range: key_range::KeyRange {
462 left: Bytes::from(key1.to_vec()),
463 right: Bytes::from(key1.to_vec()),
464 right_exclusive: false,
465 },
466 ..Default::default()
467 };
468
469 let sst_2 = SstableInfoInner {
470 key_range: key_range::KeyRange {
471 left: Bytes::from(key2.to_vec()),
472 right: Bytes::from(key2.to_vec()),
473 right_exclusive: false,
474 },
475 ..Default::default()
476 };
477
478 let sst_3 = SstableInfoInner {
479 key_range: key_range::KeyRange {
480 left: Bytes::from(key1.to_vec()),
481 right: Bytes::from(key2.to_vec()),
482 right_exclusive: false,
483 },
484 ..Default::default()
485 };
486
487 assert!(full_key_can_concat(&[sst_1.clone().into(), sst_2.into()]));
488
489 assert!(!full_key_can_concat(&[sst_1.into(), sst_3.into()]));
490 }
491}