1use std::cmp;
16use std::collections::Bound::{Excluded, Included};
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use itertools::Itertools;
20use risingwave_hummock_sdk::compaction_group::StateTableId;
21use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
22 BranchedSstInfo, get_compaction_group_ids, get_table_compaction_group_id_mapping,
23};
24use risingwave_hummock_sdk::sstable_info::SstableInfo;
25use risingwave_hummock_sdk::table_stats::add_prost_table_stats_map;
26use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
27use risingwave_hummock_sdk::{
28 CompactionGroupId, HummockContextId, HummockSstableId, HummockSstableObjectId, HummockVersionId,
29};
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::write_limits::WriteLimit;
32use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats, TableStats};
33use risingwave_pb::meta::subscribe_response::{Info, Operation};
34
35use super::GroupStateValidator;
36use crate::MetaResult;
37use crate::hummock::HummockManager;
38use crate::hummock::error::Result;
39use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
40use crate::hummock::manager::commit_multi_var;
41use crate::hummock::manager::context::ContextInfo;
42use crate::hummock::manager::transaction::HummockVersionTransaction;
43use crate::hummock::metrics_utils::{LocalTableMetrics, trigger_write_stop_stats};
44use crate::hummock::model::CompactionGroup;
45use crate::model::VarTransaction;
46
47#[derive(Default)]
48pub struct Versioning {
49 pub disable_commit_epochs: bool,
53 pub current_version: HummockVersion,
55 pub local_metrics: HashMap<u32, LocalTableMetrics>,
56 pub time_travel_snapshot_interval_counter: u64,
57 pub last_time_travel_snapshot_sst_ids: HashSet<HummockSstableId>,
59
60 pub hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta>,
62 pub version_stats: HummockVersionStats,
64 pub checkpoint: HummockVersionCheckpoint,
65}
66
67impl ContextInfo {
68 pub fn min_pinned_version_id(&self) -> HummockVersionId {
69 let mut min_pinned_version_id = HummockVersionId::MAX;
70 for id in self
71 .pinned_versions
72 .values()
73 .map(|v| HummockVersionId::new(v.min_pinned_id))
74 .chain(self.version_safe_points.iter().cloned())
75 {
76 min_pinned_version_id = cmp::min(id, min_pinned_version_id);
77 }
78 min_pinned_version_id
79 }
80}
81
82impl Versioning {
83 pub(super) fn mark_next_time_travel_version_snapshot(&mut self) {
84 self.time_travel_snapshot_interval_counter = u64::MAX;
85 }
86
87 pub fn get_tracked_object_ids(
88 &self,
89 min_pinned_version_id: HummockVersionId,
90 ) -> HashSet<HummockSstableObjectId> {
91 let mut tracked_object_ids = self.checkpoint.version.get_object_ids(false);
93 for (_, delta) in self.hummock_version_deltas.range((
95 Excluded(self.checkpoint.version.id),
96 Included(self.current_version.id),
97 )) {
98 tracked_object_ids.extend(delta.newly_added_object_ids(false));
99 }
100 tracked_object_ids.extend(
102 self.checkpoint
103 .stale_objects
104 .iter()
105 .filter(|(version_id, _)| **version_id >= min_pinned_version_id)
106 .flat_map(|(_, objects)| objects.id.iter())
107 .cloned(),
108 );
109 tracked_object_ids
110 }
111}
112
113impl HummockManager {
114 pub async fn list_pinned_version(&self) -> Vec<HummockPinnedVersion> {
115 self.context_info
116 .read()
117 .await
118 .pinned_versions
119 .values()
120 .cloned()
121 .collect_vec()
122 }
123
124 pub async fn list_workers(
125 &self,
126 context_ids: &[HummockContextId],
127 ) -> MetaResult<HashMap<HummockContextId, WorkerNode>> {
128 let mut workers = HashMap::new();
129 for context_id in context_ids {
130 if let Some(worker_node) = self
131 .metadata_manager()
132 .get_worker_by_id(*context_id as _)
133 .await?
134 {
135 workers.insert(*context_id, worker_node);
136 }
137 }
138 Ok(workers)
139 }
140
141 #[cfg(any(test, feature = "test"))]
146 pub async fn get_current_version(&self) -> HummockVersion {
147 self.on_current_version(|version| version.clone()).await
148 }
149
150 pub async fn on_current_version<T>(&self, mut f: impl FnMut(&HummockVersion) -> T) -> T {
151 f(&self.versioning.read().await.current_version)
152 }
153
154 pub async fn get_version_id(&self) -> HummockVersionId {
155 self.on_current_version(|version| version.id).await
156 }
157
158 pub async fn get_table_compaction_group_id_mapping(
160 &self,
161 ) -> HashMap<StateTableId, CompactionGroupId> {
162 get_table_compaction_group_id_mapping(&self.versioning.read().await.current_version)
163 }
164
165 #[cfg_attr(coverage, coverage(off))]
167 pub async fn list_version_deltas(
168 &self,
169 start_id: HummockVersionId,
170 num_limit: u32,
171 ) -> Result<Vec<HummockVersionDelta>> {
172 let versioning = self.versioning.read().await;
173 let version_deltas = versioning
174 .hummock_version_deltas
175 .range(start_id..)
176 .map(|(_id, delta)| delta)
177 .take(num_limit as _)
178 .cloned()
179 .collect();
180 Ok(version_deltas)
181 }
182
183 pub async fn get_version_stats(&self) -> HummockVersionStats {
184 self.versioning.read().await.version_stats.clone()
185 }
186
187 pub(super) async fn try_update_write_limits(
191 &self,
192 target_group_ids: &[CompactionGroupId],
193 ) -> bool {
194 let versioning = self.versioning.read().await;
195 let mut cg_manager = self.compaction_group_manager.write().await;
196 let target_group_configs = target_group_ids
197 .iter()
198 .filter_map(|id| {
199 cg_manager
200 .try_get_compaction_group_config(*id)
201 .map(|config| (*id, config))
202 })
203 .collect();
204 let mut new_write_limits = calc_new_write_limits(
205 target_group_configs,
206 cg_manager.write_limit.clone(),
207 &versioning.current_version,
208 );
209 let all_group_ids: HashSet<_> =
210 HashSet::from_iter(get_compaction_group_ids(&versioning.current_version));
211 new_write_limits.retain(|group_id, _| all_group_ids.contains(group_id));
212 if new_write_limits == cg_manager.write_limit {
213 return false;
214 }
215 tracing::debug!("Hummock stopped write is updated: {:#?}", new_write_limits);
216 trigger_write_stop_stats(&self.metrics, &new_write_limits);
217 cg_manager.write_limit = new_write_limits;
218 self.env
219 .notification_manager()
220 .notify_hummock_without_version(
221 Operation::Add,
222 Info::HummockWriteLimits(risingwave_pb::hummock::WriteLimits {
223 write_limits: cg_manager.write_limit.clone(),
224 }),
225 );
226 true
227 }
228
229 pub async fn write_limits(&self) -> HashMap<CompactionGroupId, WriteLimit> {
232 let guard = self.compaction_group_manager.read().await;
233 guard.write_limit.clone()
234 }
235
236 pub async fn list_branched_objects(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
237 let guard = self.versioning.read().await;
238 guard.current_version.build_branched_sst_info()
239 }
240
241 pub async fn rebuild_table_stats(&self) -> Result<()> {
242 let mut versioning = self.versioning.write().await;
243 let new_stats = rebuild_table_stats(&versioning.current_version);
244 let mut version_stats = VarTransaction::new(&mut versioning.version_stats);
245 version_stats.table_stats = new_stats.table_stats;
247 commit_multi_var!(self.meta_store_ref(), version_stats)?;
248 Ok(())
249 }
250
251 pub async fn may_fill_backward_state_table_info(&self) -> Result<()> {
252 let mut versioning = self.versioning.write().await;
253 if versioning
254 .current_version
255 .need_fill_backward_compatible_state_table_info_delta()
256 {
257 let versioning: &mut Versioning = &mut versioning;
258 let mut version = HummockVersionTransaction::new(
259 &mut versioning.current_version,
260 &mut versioning.hummock_version_deltas,
261 self.env.notification_manager(),
262 None,
263 &self.metrics,
264 );
265 let mut new_version_delta = version.new_delta();
266 new_version_delta.with_latest_version(|version, delta| {
267 version.may_fill_backward_compatible_state_table_info_delta(delta)
268 });
269 new_version_delta.pre_apply();
270 commit_multi_var!(self.meta_store_ref(), version)?;
271 }
272 Ok(())
273 }
274}
275
276pub(super) fn calc_new_write_limits(
279 target_groups: HashMap<CompactionGroupId, CompactionGroup>,
280 origin_snapshot: HashMap<CompactionGroupId, WriteLimit>,
281 version: &HummockVersion,
282) -> HashMap<CompactionGroupId, WriteLimit> {
283 let mut new_write_limits = origin_snapshot;
284 for (id, config) in &target_groups {
285 let levels = match version.levels.get(id) {
286 None => {
287 new_write_limits.remove(id);
288 continue;
289 }
290 Some(levels) => levels,
291 };
292
293 let group_state = GroupStateValidator::check_single_group_write_stop(
294 levels,
295 config.compaction_config.as_ref(),
296 );
297
298 if group_state.is_write_stop() {
299 new_write_limits.insert(
300 *id,
301 WriteLimit {
302 table_ids: version
303 .state_table_info
304 .compaction_group_member_table_ids(*id)
305 .iter()
306 .map(|table_id| table_id.table_id)
307 .collect(),
308 reason: group_state.reason().unwrap().to_owned(),
309 },
310 );
311 continue;
312 }
313 new_write_limits.remove(id);
315 }
316 new_write_limits
317}
318
319fn rebuild_table_stats(version: &HummockVersion) -> HummockVersionStats {
322 let mut stats = HummockVersionStats {
323 hummock_version_id: version.id.to_u64(),
324 table_stats: Default::default(),
325 };
326 for level in version.get_combined_levels() {
327 for sst in &level.table_infos {
328 let changes = estimate_table_stats(sst);
329 add_prost_table_stats_map(&mut stats.table_stats, &changes);
330 }
331 }
332 stats
333}
334
335fn estimate_table_stats(sst: &SstableInfo) -> HashMap<u32, TableStats> {
340 let mut changes: HashMap<u32, TableStats> = HashMap::default();
341 let weighted_value =
342 |value: i64| -> i64 { (value as f64 / sst.table_ids.len() as f64).ceil() as i64 };
343 let key_range = &sst.key_range;
344 let estimated_key_size: u64 = (key_range.left.len() + key_range.right.len()) as u64 / 2;
345 let mut estimated_total_key_size = estimated_key_size * sst.total_key_count;
346 if estimated_total_key_size > sst.uncompressed_file_size {
347 estimated_total_key_size = sst.uncompressed_file_size / 2;
348 tracing::warn!(
349 sst.sst_id,
350 "Calculated estimated_total_key_size {} > uncompressed_file_size {}. Use uncompressed_file_size/2 as estimated_total_key_size instead.",
351 estimated_total_key_size,
352 sst.uncompressed_file_size
353 );
354 }
355 let estimated_total_value_size = sst.uncompressed_file_size - estimated_total_key_size;
356 for table_id in &sst.table_ids {
357 let e = changes.entry(*table_id).or_default();
358 e.total_key_count += weighted_value(sst.total_key_count as i64);
359 e.total_key_size += weighted_value(estimated_total_key_size as i64);
360 e.total_value_size += weighted_value(estimated_total_value_size as i64);
361 }
362 changes
363}
364
365#[cfg(test)]
366mod tests {
367 use std::collections::HashMap;
368 use std::sync::Arc;
369
370 use risingwave_hummock_sdk::key_range::KeyRange;
371 use risingwave_hummock_sdk::level::{Level, Levels};
372 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
373 use risingwave_hummock_sdk::version::HummockVersion;
374 use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
375 use risingwave_pb::hummock::write_limits::WriteLimit;
376 use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats};
377
378 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
379 use crate::hummock::manager::context::ContextInfo;
380 use crate::hummock::manager::versioning::{
381 calc_new_write_limits, estimate_table_stats, rebuild_table_stats,
382 };
383 use crate::hummock::model::CompactionGroup;
384
385 #[test]
386 fn test_min_pinned_version_id() {
387 let mut context_info = ContextInfo::default();
388 assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX);
389 context_info.pinned_versions.insert(
390 1,
391 HummockPinnedVersion {
392 context_id: 1,
393 min_pinned_id: 10,
394 },
395 );
396 assert_eq!(context_info.min_pinned_version_id().to_u64(), 10);
397 context_info
398 .version_safe_points
399 .push(HummockVersionId::new(5));
400 assert_eq!(context_info.min_pinned_version_id().to_u64(), 5);
401 context_info.version_safe_points.clear();
402 assert_eq!(context_info.min_pinned_version_id().to_u64(), 10);
403 context_info.pinned_versions.clear();
404 assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX);
405 }
406
407 #[test]
408 fn test_calc_new_write_limits() {
409 let add_level_to_l0 = |levels: &mut Levels| {
410 levels.l0.sub_levels.push(Level::default());
411 };
412 let set_sub_level_number_threshold_for_group_1 =
413 |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
414 sub_level_number_threshold: u64| {
415 target_groups.insert(
416 1,
417 CompactionGroup {
418 group_id: 1,
419 compaction_config: Arc::new(
420 CompactionConfigBuilder::new()
421 .level0_stop_write_threshold_sub_level_number(
422 sub_level_number_threshold,
423 )
424 .build(),
425 ),
426 },
427 );
428 };
429
430 let set_level_0_max_sst_count_threshold_for_group_1 =
431 |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
432 max_sst_count_threshold: u32| {
433 target_groups.insert(
434 1,
435 CompactionGroup {
436 group_id: 1,
437 compaction_config: Arc::new(
438 CompactionConfigBuilder::new()
439 .level0_stop_write_threshold_max_sst_count(Some(
440 max_sst_count_threshold,
441 ))
442 .build(),
443 ),
444 },
445 );
446 };
447
448 let set_level_0_max_size_threshold_for_group_1 =
449 |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
450 max_size_threshold: u64| {
451 target_groups.insert(
452 1,
453 CompactionGroup {
454 group_id: 1,
455 compaction_config: Arc::new(
456 CompactionConfigBuilder::new()
457 .level0_stop_write_threshold_max_size(Some(max_size_threshold))
458 .build(),
459 ),
460 },
461 );
462 };
463
464 let mut target_groups: HashMap<CompactionGroupId, CompactionGroup> = Default::default();
465 set_sub_level_number_threshold_for_group_1(&mut target_groups, 10);
466 let origin_snapshot: HashMap<CompactionGroupId, WriteLimit> = [(
467 2,
468 WriteLimit {
469 table_ids: vec![1, 2, 3],
470 reason: "for test".to_owned(),
471 },
472 )]
473 .into_iter()
474 .collect();
475 let mut version: HummockVersion = Default::default();
476 for group_id in 1..=3 {
477 version.levels.insert(group_id, Levels::default());
478 }
479 let new_write_limits =
480 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
481 assert_eq!(
482 new_write_limits, origin_snapshot,
483 "write limit should not be triggered for group 1"
484 );
485 assert_eq!(new_write_limits.len(), 1);
486 for _ in 1..=10 {
487 add_level_to_l0(version.levels.get_mut(&1).unwrap());
488 let new_write_limits =
489 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
490 assert_eq!(
491 new_write_limits, origin_snapshot,
492 "write limit should not be triggered for group 1"
493 );
494 }
495 add_level_to_l0(version.levels.get_mut(&1).unwrap());
496 let new_write_limits =
497 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
498 assert_ne!(
499 new_write_limits, origin_snapshot,
500 "write limit should be triggered for group 1"
501 );
502 assert_eq!(
503 new_write_limits.get(&1).as_ref().unwrap().reason,
504 "WriteStop(l0_level_count: 11, threshold: 10) too many L0 sub levels"
505 );
506 assert_eq!(new_write_limits.len(), 2);
507
508 set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
509 let new_write_limits =
510 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
511 assert_eq!(
512 new_write_limits, origin_snapshot,
513 "write limit should not be triggered for group 1"
514 );
515
516 set_sub_level_number_threshold_for_group_1(&mut target_groups, 5);
517 let new_write_limits =
518 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
519 assert_ne!(
520 new_write_limits, origin_snapshot,
521 "write limit should be triggered for group 1"
522 );
523 assert_eq!(
524 new_write_limits.get(&1).as_ref().unwrap().reason,
525 "WriteStop(l0_level_count: 11, threshold: 5) too many L0 sub levels"
526 );
527
528 set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
529 let last_level = version
530 .levels
531 .get_mut(&1)
532 .unwrap()
533 .l0
534 .sub_levels
535 .last_mut()
536 .unwrap();
537 last_level.table_infos.extend(vec![
538 SstableInfoInner {
539 key_range: KeyRange::default(),
540 table_ids: vec![1, 2, 3],
541 total_key_count: 100,
542 sst_size: 100,
543 uncompressed_file_size: 100,
544 ..Default::default()
545 }
546 .into(),
547 SstableInfoInner {
548 key_range: KeyRange::default(),
549 table_ids: vec![1, 2, 3],
550 total_key_count: 100,
551 sst_size: 100,
552 uncompressed_file_size: 100,
553 ..Default::default()
554 }
555 .into(),
556 ]);
557 version.levels.get_mut(&1).unwrap().l0.total_file_size += 200;
558 let new_write_limits =
559 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
560 assert_eq!(
561 new_write_limits, origin_snapshot,
562 "write limit should not be triggered for group 1"
563 );
564
565 set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10);
566 let new_write_limits =
567 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
568 assert_ne!(
569 new_write_limits, origin_snapshot,
570 "write limit should be triggered for group 1"
571 );
572 assert_eq!(
573 new_write_limits.get(&1).as_ref().unwrap().reason,
574 "WriteStop(l0_size: 200, threshold: 10) too large L0 size"
575 );
576
577 set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10000);
578 let new_write_limits =
579 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
580 assert_eq!(
581 new_write_limits, origin_snapshot,
582 "write limit should not be triggered for group 1"
583 );
584
585 set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 1);
586 let new_write_limits =
587 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
588 assert_ne!(
589 new_write_limits, origin_snapshot,
590 "write limit should be triggered for group 1"
591 );
592 assert_eq!(
593 new_write_limits.get(&1).as_ref().unwrap().reason,
594 "WriteStop(l0_sst_count: 2, threshold: 1) too many L0 sst files"
595 );
596
597 set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 100);
598 let new_write_limits =
599 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
600
601 assert_eq!(
602 new_write_limits, origin_snapshot,
603 "write limit should not be triggered for group 1"
604 );
605 }
606
607 #[test]
608 fn test_estimate_table_stats() {
609 let sst = SstableInfoInner {
610 key_range: KeyRange {
611 left: vec![1; 10].into(),
612 right: vec![1; 20].into(),
613 ..Default::default()
614 },
615 table_ids: vec![1, 2, 3],
616 total_key_count: 6000,
617 uncompressed_file_size: 6_000_000,
618 ..Default::default()
619 }
620 .into();
621 let changes = estimate_table_stats(&sst);
622 assert_eq!(changes.len(), 3);
623 for stats in changes.values() {
624 assert_eq!(stats.total_key_count, 6000 / 3);
625 assert_eq!(stats.total_key_size, (10 + 20) / 2 * 6000 / 3);
626 assert_eq!(
627 stats.total_value_size,
628 (6_000_000 - (10 + 20) / 2 * 6000) / 3
629 );
630 }
631
632 let mut version = HummockVersion::default();
633 version.id = HummockVersionId::new(123);
634
635 for cg in 1..3 {
636 version.levels.insert(
637 cg,
638 Levels {
639 levels: vec![Level {
640 table_infos: vec![sst.clone()],
641 ..Default::default()
642 }],
643 ..Default::default()
644 },
645 );
646 }
647 let HummockVersionStats {
648 hummock_version_id,
649 table_stats,
650 } = rebuild_table_stats(&version);
651 assert_eq!(hummock_version_id, version.id.to_u64());
652 assert_eq!(table_stats.len(), 3);
653 for (tid, stats) in table_stats {
654 assert_eq!(
655 stats.total_key_count,
656 changes.get(&tid).unwrap().total_key_count * 2
657 );
658 assert_eq!(
659 stats.total_key_size,
660 changes.get(&tid).unwrap().total_key_size * 2
661 );
662 assert_eq!(
663 stats.total_value_size,
664 changes.get(&tid).unwrap().total_value_size * 2
665 );
666 }
667 }
668
669 #[test]
670 fn test_estimate_table_stats_large_key_range() {
671 let sst = SstableInfoInner {
672 key_range: KeyRange {
673 left: vec![1; 1000].into(),
674 right: vec![1; 2000].into(),
675 ..Default::default()
676 },
677 table_ids: vec![1, 2, 3],
678 total_key_count: 6000,
679 uncompressed_file_size: 60_000,
680 ..Default::default()
681 }
682 .into();
683 let changes = estimate_table_stats(&sst);
684 assert_eq!(changes.len(), 3);
685 for t in &sst.table_ids {
686 let stats = changes.get(t).unwrap();
687 assert_eq!(stats.total_key_count, 6000 / 3);
688 assert_eq!(stats.total_key_size, 60_000 / 2 / 3);
689 assert_eq!(stats.total_value_size, (60_000 - 60_000 / 2) / 3);
690 }
691 }
692}