1use std::cmp;
16use std::collections::Bound::{Excluded, Included};
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use itertools::Itertools;
20use risingwave_hummock_sdk::compaction_group::StateTableId;
21use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
22 BranchedSstInfo, get_compaction_group_ids, get_table_compaction_group_id_mapping,
23};
24use risingwave_hummock_sdk::sstable_info::SstableInfo;
25use risingwave_hummock_sdk::table_stats::{PbTableStatsMap, add_prost_table_stats_map};
26use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
27use risingwave_hummock_sdk::{
28 CompactionGroupId, HummockContextId, HummockObjectId, HummockSstableId, HummockSstableObjectId,
29 HummockVersionId, get_stale_object_ids,
30};
31use risingwave_pb::common::WorkerNode;
32use risingwave_pb::hummock::write_limits::WriteLimit;
33use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats};
34use risingwave_pb::id::TableId;
35use risingwave_pb::meta::subscribe_response::{Info, Operation};
36
37use super::GroupStateValidator;
38use crate::MetaResult;
39use crate::hummock::HummockManager;
40use crate::hummock::error::Result;
41use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
42use crate::hummock::manager::commit_multi_var;
43use crate::hummock::manager::context::ContextInfo;
44use crate::hummock::manager::transaction::HummockVersionTransaction;
45use crate::hummock::metrics_utils::{LocalTableMetrics, trigger_write_stop_stats};
46use crate::hummock::model::CompactionGroup;
47use crate::model::VarTransaction;
48
49#[derive(Default)]
50pub struct Versioning {
51 pub disable_commit_epochs: bool,
55 pub current_version: HummockVersion,
57 pub local_metrics: HashMap<TableId, LocalTableMetrics>,
58 pub time_travel_snapshot_interval_counter: u64,
59 pub last_time_travel_snapshot_sst_ids: HashSet<HummockSstableId>,
61
62 pub hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta>,
64 pub version_stats: HummockVersionStats,
66 pub checkpoint: HummockVersionCheckpoint,
67}
68
69impl ContextInfo {
70 pub fn min_pinned_version_id(&self) -> HummockVersionId {
71 let mut min_pinned_version_id = HummockVersionId::MAX;
72 for id in self
73 .pinned_versions
74 .values()
75 .map(|v| HummockVersionId::new(v.min_pinned_id))
76 .chain(self.version_safe_points.iter().cloned())
77 {
78 min_pinned_version_id = cmp::min(id, min_pinned_version_id);
79 }
80 min_pinned_version_id
81 }
82}
83
84impl Versioning {
85 pub(super) fn mark_next_time_travel_version_snapshot(&mut self) {
86 self.time_travel_snapshot_interval_counter = u64::MAX;
87 }
88
89 pub fn get_tracked_object_ids(
90 &self,
91 min_pinned_version_id: HummockVersionId,
92 ) -> HashSet<HummockObjectId> {
93 let mut tracked_object_ids = self
95 .checkpoint
96 .version
97 .get_object_ids(false)
98 .collect::<HashSet<_>>();
99 for (_, delta) in self.hummock_version_deltas.range((
101 Excluded(self.checkpoint.version.id),
102 Included(self.current_version.id),
103 )) {
104 tracked_object_ids.extend(delta.newly_added_object_ids(false));
105 }
106 tracked_object_ids.extend(
108 self.checkpoint
109 .stale_objects
110 .iter()
111 .filter(|(version_id, _)| **version_id >= min_pinned_version_id)
112 .flat_map(|(_, objects)| get_stale_object_ids(objects)),
113 );
114 tracked_object_ids
115 }
116}
117
118impl HummockManager {
119 pub async fn list_pinned_version(&self) -> Vec<HummockPinnedVersion> {
120 self.context_info
121 .read()
122 .await
123 .pinned_versions
124 .values()
125 .cloned()
126 .collect_vec()
127 }
128
129 pub async fn list_workers(
130 &self,
131 context_ids: &[HummockContextId],
132 ) -> MetaResult<HashMap<HummockContextId, WorkerNode>> {
133 let mut workers = HashMap::new();
134 for context_id in context_ids {
135 if let Some(worker_node) = self
136 .metadata_manager()
137 .get_worker_by_id(*context_id as _)
138 .await?
139 {
140 workers.insert(*context_id, worker_node);
141 }
142 }
143 Ok(workers)
144 }
145
146 #[cfg(any(test, feature = "test"))]
151 pub async fn get_current_version(&self) -> HummockVersion {
152 self.on_current_version(|version| version.clone()).await
153 }
154
155 pub async fn on_current_version<T>(&self, mut f: impl FnMut(&HummockVersion) -> T) -> T {
156 f(&self.versioning.read().await.current_version)
157 }
158
159 pub async fn get_version_id(&self) -> HummockVersionId {
160 self.on_current_version(|version| version.id).await
161 }
162
163 pub async fn get_table_compaction_group_id_mapping(
165 &self,
166 ) -> HashMap<StateTableId, CompactionGroupId> {
167 get_table_compaction_group_id_mapping(&self.versioning.read().await.current_version)
168 }
169
170 pub async fn list_version_deltas(
172 &self,
173 start_id: HummockVersionId,
174 num_limit: u32,
175 ) -> Result<Vec<HummockVersionDelta>> {
176 let versioning = self.versioning.read().await;
177 let version_deltas = versioning
178 .hummock_version_deltas
179 .range(start_id..)
180 .map(|(_id, delta)| delta)
181 .take(num_limit as _)
182 .cloned()
183 .collect();
184 Ok(version_deltas)
185 }
186
187 pub async fn get_version_stats(&self) -> HummockVersionStats {
188 self.versioning.read().await.version_stats.clone()
189 }
190
191 pub(super) async fn try_update_write_limits(
195 &self,
196 target_group_ids: &[CompactionGroupId],
197 ) -> bool {
198 let versioning = self.versioning.read().await;
199 let mut cg_manager = self.compaction_group_manager.write().await;
200 let target_group_configs = target_group_ids
201 .iter()
202 .filter_map(|id| {
203 cg_manager
204 .try_get_compaction_group_config(*id)
205 .map(|config| (*id, config))
206 })
207 .collect();
208 let mut new_write_limits = calc_new_write_limits(
209 target_group_configs,
210 cg_manager.write_limit.clone(),
211 &versioning.current_version,
212 );
213 let all_group_ids: HashSet<_> =
214 HashSet::from_iter(get_compaction_group_ids(&versioning.current_version));
215 new_write_limits.retain(|group_id, _| all_group_ids.contains(group_id));
216 if new_write_limits == cg_manager.write_limit {
217 return false;
218 }
219 tracing::debug!("Hummock stopped write is updated: {:#?}", new_write_limits);
220 trigger_write_stop_stats(&self.metrics, &new_write_limits);
221 cg_manager.write_limit = new_write_limits;
222 self.env
223 .notification_manager()
224 .notify_hummock_without_version(
225 Operation::Add,
226 Info::HummockWriteLimits(risingwave_pb::hummock::WriteLimits {
227 write_limits: cg_manager.write_limit.clone(),
228 }),
229 );
230 true
231 }
232
233 pub async fn write_limits(&self) -> HashMap<CompactionGroupId, WriteLimit> {
236 let guard = self.compaction_group_manager.read().await;
237 guard.write_limit.clone()
238 }
239
240 pub async fn list_branched_objects(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
241 let guard = self.versioning.read().await;
242 guard.current_version.build_branched_sst_info()
243 }
244
245 pub async fn rebuild_table_stats(&self) -> Result<()> {
246 let mut versioning = self.versioning.write().await;
247 let new_stats = rebuild_table_stats(&versioning.current_version);
248 let mut version_stats = VarTransaction::new(&mut versioning.version_stats);
249 version_stats.table_stats = new_stats.table_stats;
251 commit_multi_var!(self.meta_store_ref(), version_stats)?;
252 Ok(())
253 }
254
255 pub async fn may_fill_backward_state_table_info(&self) -> Result<()> {
256 let mut versioning = self.versioning.write().await;
257 if versioning
258 .current_version
259 .need_fill_backward_compatible_state_table_info_delta()
260 {
261 let versioning: &mut Versioning = &mut versioning;
262 let mut version = HummockVersionTransaction::new(
263 &mut versioning.current_version,
264 &mut versioning.hummock_version_deltas,
265 self.env.notification_manager(),
266 None,
267 &self.metrics,
268 );
269 let mut new_version_delta = version.new_delta();
270 new_version_delta.with_latest_version(|version, delta| {
271 version.may_fill_backward_compatible_state_table_info_delta(delta)
272 });
273 new_version_delta.pre_apply();
274 commit_multi_var!(self.meta_store_ref(), version)?;
275 }
276 Ok(())
277 }
278}
279
280pub(super) fn calc_new_write_limits(
283 target_groups: HashMap<CompactionGroupId, CompactionGroup>,
284 origin_snapshot: HashMap<CompactionGroupId, WriteLimit>,
285 version: &HummockVersion,
286) -> HashMap<CompactionGroupId, WriteLimit> {
287 let mut new_write_limits = origin_snapshot;
288 for (id, config) in &target_groups {
289 let levels = match version.levels.get(id) {
290 None => {
291 new_write_limits.remove(id);
292 continue;
293 }
294 Some(levels) => levels,
295 };
296
297 let group_state = GroupStateValidator::check_single_group_write_stop(
298 levels,
299 config.compaction_config.as_ref(),
300 );
301
302 if group_state.is_write_stop() {
303 new_write_limits.insert(
304 *id,
305 WriteLimit {
306 table_ids: version
307 .state_table_info
308 .compaction_group_member_table_ids(*id)
309 .iter()
310 .copied()
311 .collect(),
312 reason: group_state.reason().unwrap().to_owned(),
313 },
314 );
315 continue;
316 }
317 new_write_limits.remove(id);
319 }
320 new_write_limits
321}
322
323fn rebuild_table_stats(version: &HummockVersion) -> HummockVersionStats {
326 let mut stats = HummockVersionStats {
327 hummock_version_id: version.id.to_u64(),
328 table_stats: Default::default(),
329 };
330 for level in version.get_combined_levels() {
331 for sst in &level.table_infos {
332 let changes = estimate_table_stats(sst);
333 add_prost_table_stats_map(&mut stats.table_stats, &changes);
334 }
335 }
336 stats
337}
338
339fn estimate_table_stats(sst: &SstableInfo) -> PbTableStatsMap {
344 let mut changes: PbTableStatsMap = HashMap::default();
345 let weighted_value =
346 |value: i64| -> i64 { (value as f64 / sst.table_ids.len() as f64).ceil() as i64 };
347 let key_range = &sst.key_range;
348 let estimated_key_size: u64 = (key_range.left.len() + key_range.right.len()) as u64 / 2;
349 let mut estimated_total_key_size = estimated_key_size * sst.total_key_count;
350 if estimated_total_key_size > sst.uncompressed_file_size {
351 estimated_total_key_size = sst.uncompressed_file_size / 2;
352 tracing::warn!(
353 %sst.sst_id,
354 "Calculated estimated_total_key_size {} > uncompressed_file_size {}. Use uncompressed_file_size/2 as estimated_total_key_size instead.",
355 estimated_total_key_size,
356 sst.uncompressed_file_size
357 );
358 }
359 let estimated_total_value_size = sst.uncompressed_file_size - estimated_total_key_size;
360 for table_id in &sst.table_ids {
361 let e = changes.entry(*table_id).or_default();
362 e.total_key_count += weighted_value(sst.total_key_count as i64);
363 e.total_key_size += weighted_value(estimated_total_key_size as i64);
364 e.total_value_size += weighted_value(estimated_total_value_size as i64);
365 }
366 changes
367}
368
369#[cfg(test)]
370mod tests {
371 use std::collections::HashMap;
372 use std::sync::Arc;
373
374 use itertools::Itertools;
375 use risingwave_hummock_sdk::key_range::KeyRange;
376 use risingwave_hummock_sdk::level::{Level, Levels};
377 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
378 use risingwave_hummock_sdk::version::HummockVersion;
379 use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
380 use risingwave_pb::hummock::write_limits::WriteLimit;
381 use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats};
382
383 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
384 use crate::hummock::manager::context::ContextInfo;
385 use crate::hummock::manager::versioning::{
386 calc_new_write_limits, estimate_table_stats, rebuild_table_stats,
387 };
388 use crate::hummock::model::CompactionGroup;
389
390 #[test]
391 fn test_min_pinned_version_id() {
392 let mut context_info = ContextInfo::default();
393 assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX);
394 context_info.pinned_versions.insert(
395 1.into(),
396 HummockPinnedVersion {
397 context_id: 1.into(),
398 min_pinned_id: 10,
399 },
400 );
401 assert_eq!(context_info.min_pinned_version_id().to_u64(), 10);
402 context_info
403 .version_safe_points
404 .push(HummockVersionId::new(5));
405 assert_eq!(context_info.min_pinned_version_id().to_u64(), 5);
406 context_info.version_safe_points.clear();
407 assert_eq!(context_info.min_pinned_version_id().to_u64(), 10);
408 context_info.pinned_versions.clear();
409 assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX);
410 }
411
412 #[test]
413 fn test_calc_new_write_limits() {
414 let add_level_to_l0 = |levels: &mut Levels| {
415 levels.l0.sub_levels.push(Level::default());
416 };
417 let set_sub_level_number_threshold_for_group_1 =
418 |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
419 sub_level_number_threshold: u64| {
420 target_groups.insert(
421 1,
422 CompactionGroup {
423 group_id: 1,
424 compaction_config: Arc::new(
425 CompactionConfigBuilder::new()
426 .level0_stop_write_threshold_sub_level_number(
427 sub_level_number_threshold,
428 )
429 .build(),
430 ),
431 },
432 );
433 };
434
435 let set_level_0_max_sst_count_threshold_for_group_1 =
436 |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
437 max_sst_count_threshold: u32| {
438 target_groups.insert(
439 1,
440 CompactionGroup {
441 group_id: 1,
442 compaction_config: Arc::new(
443 CompactionConfigBuilder::new()
444 .level0_stop_write_threshold_max_sst_count(Some(
445 max_sst_count_threshold,
446 ))
447 .build(),
448 ),
449 },
450 );
451 };
452
453 let set_level_0_max_size_threshold_for_group_1 =
454 |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
455 max_size_threshold: u64| {
456 target_groups.insert(
457 1,
458 CompactionGroup {
459 group_id: 1,
460 compaction_config: Arc::new(
461 CompactionConfigBuilder::new()
462 .level0_stop_write_threshold_max_size(Some(max_size_threshold))
463 .build(),
464 ),
465 },
466 );
467 };
468
469 let mut target_groups: HashMap<CompactionGroupId, CompactionGroup> = Default::default();
470 set_sub_level_number_threshold_for_group_1(&mut target_groups, 10);
471 let origin_snapshot: HashMap<CompactionGroupId, WriteLimit> = [(
472 2,
473 WriteLimit {
474 table_ids: [1, 2, 3].into_iter().map_into().collect(),
475 reason: "for test".to_owned(),
476 },
477 )]
478 .into_iter()
479 .collect();
480 let mut version: HummockVersion = Default::default();
481 for group_id in 1..=3 {
482 version.levels.insert(group_id, Levels::default());
483 }
484 let new_write_limits =
485 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
486 assert_eq!(
487 new_write_limits, origin_snapshot,
488 "write limit should not be triggered for group 1"
489 );
490 assert_eq!(new_write_limits.len(), 1);
491 for _ in 1..=10 {
492 add_level_to_l0(version.levels.get_mut(&1).unwrap());
493 let new_write_limits =
494 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
495 assert_eq!(
496 new_write_limits, origin_snapshot,
497 "write limit should not be triggered for group 1"
498 );
499 }
500 add_level_to_l0(version.levels.get_mut(&1).unwrap());
501 let new_write_limits =
502 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
503 assert_ne!(
504 new_write_limits, origin_snapshot,
505 "write limit should be triggered for group 1"
506 );
507 assert_eq!(
508 new_write_limits.get(&1).as_ref().unwrap().reason,
509 "WriteStop(l0_level_count: 11, threshold: 10) too many L0 sub levels"
510 );
511 assert_eq!(new_write_limits.len(), 2);
512
513 set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
514 let new_write_limits =
515 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
516 assert_eq!(
517 new_write_limits, origin_snapshot,
518 "write limit should not be triggered for group 1"
519 );
520
521 set_sub_level_number_threshold_for_group_1(&mut target_groups, 5);
522 let new_write_limits =
523 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
524 assert_ne!(
525 new_write_limits, origin_snapshot,
526 "write limit should be triggered for group 1"
527 );
528 assert_eq!(
529 new_write_limits.get(&1).as_ref().unwrap().reason,
530 "WriteStop(l0_level_count: 11, threshold: 5) too many L0 sub levels"
531 );
532
533 set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
534 let last_level = version
535 .levels
536 .get_mut(&1)
537 .unwrap()
538 .l0
539 .sub_levels
540 .last_mut()
541 .unwrap();
542 last_level.table_infos.extend(vec![
543 SstableInfoInner {
544 key_range: KeyRange::default(),
545 table_ids: vec![1.into(), 2.into(), 3.into()],
546 total_key_count: 100,
547 sst_size: 100,
548 uncompressed_file_size: 100,
549 ..Default::default()
550 }
551 .into(),
552 SstableInfoInner {
553 key_range: KeyRange::default(),
554 table_ids: vec![1.into(), 2.into(), 3.into()],
555 total_key_count: 100,
556 sst_size: 100,
557 uncompressed_file_size: 100,
558 ..Default::default()
559 }
560 .into(),
561 ]);
562 version.levels.get_mut(&1).unwrap().l0.total_file_size += 200;
563 let new_write_limits =
564 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
565 assert_eq!(
566 new_write_limits, origin_snapshot,
567 "write limit should not be triggered for group 1"
568 );
569
570 set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10);
571 let new_write_limits =
572 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
573 assert_ne!(
574 new_write_limits, origin_snapshot,
575 "write limit should be triggered for group 1"
576 );
577 assert_eq!(
578 new_write_limits.get(&1).as_ref().unwrap().reason,
579 "WriteStop(l0_size: 200, threshold: 10) too large L0 size"
580 );
581
582 set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10000);
583 let new_write_limits =
584 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
585 assert_eq!(
586 new_write_limits, origin_snapshot,
587 "write limit should not be triggered for group 1"
588 );
589
590 set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 1);
591 let new_write_limits =
592 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
593 assert_ne!(
594 new_write_limits, origin_snapshot,
595 "write limit should be triggered for group 1"
596 );
597 assert_eq!(
598 new_write_limits.get(&1).as_ref().unwrap().reason,
599 "WriteStop(l0_sst_count: 2, threshold: 1) too many L0 sst files"
600 );
601
602 set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 100);
603 let new_write_limits =
604 calc_new_write_limits(target_groups, origin_snapshot.clone(), &version);
605
606 assert_eq!(
607 new_write_limits, origin_snapshot,
608 "write limit should not be triggered for group 1"
609 );
610 }
611
612 #[test]
613 fn test_estimate_table_stats() {
614 let sst = SstableInfoInner {
615 key_range: KeyRange {
616 left: vec![1; 10].into(),
617 right: vec![1; 20].into(),
618 ..Default::default()
619 },
620 table_ids: vec![1.into(), 2.into(), 3.into()],
621 total_key_count: 6000,
622 uncompressed_file_size: 6_000_000,
623 ..Default::default()
624 }
625 .into();
626 let changes = estimate_table_stats(&sst);
627 assert_eq!(changes.len(), 3);
628 for stats in changes.values() {
629 assert_eq!(stats.total_key_count, 6000 / 3);
630 assert_eq!(stats.total_key_size, (10 + 20) / 2 * 6000 / 3);
631 assert_eq!(
632 stats.total_value_size,
633 (6_000_000 - (10 + 20) / 2 * 6000) / 3
634 );
635 }
636
637 let mut version = HummockVersion::default();
638 version.id = HummockVersionId::new(123);
639
640 for cg in 1..3 {
641 version.levels.insert(
642 cg,
643 Levels {
644 levels: vec![Level {
645 table_infos: vec![sst.clone()],
646 ..Default::default()
647 }],
648 ..Default::default()
649 },
650 );
651 }
652 let HummockVersionStats {
653 hummock_version_id,
654 table_stats,
655 } = rebuild_table_stats(&version);
656 assert_eq!(hummock_version_id, version.id.to_u64());
657 assert_eq!(table_stats.len(), 3);
658 for (tid, stats) in table_stats {
659 assert_eq!(
660 stats.total_key_count,
661 changes.get(&tid).unwrap().total_key_count * 2
662 );
663 assert_eq!(
664 stats.total_key_size,
665 changes.get(&tid).unwrap().total_key_size * 2
666 );
667 assert_eq!(
668 stats.total_value_size,
669 changes.get(&tid).unwrap().total_value_size * 2
670 );
671 }
672 }
673
674 #[test]
675 fn test_estimate_table_stats_large_key_range() {
676 let sst = SstableInfoInner {
677 key_range: KeyRange {
678 left: vec![1; 1000].into(),
679 right: vec![1; 2000].into(),
680 ..Default::default()
681 },
682 table_ids: vec![1.into(), 2.into(), 3.into()],
683 total_key_count: 6000,
684 uncompressed_file_size: 60_000,
685 ..Default::default()
686 }
687 .into();
688 let changes = estimate_table_stats(&sst);
689 assert_eq!(changes.len(), 3);
690 for t in &sst.table_ids {
691 let stats = changes.get(t).unwrap();
692 assert_eq!(stats.total_key_count, 6000 / 3);
693 assert_eq!(stats.total_key_size, 60_000 / 2 / 3);
694 assert_eq!(stats.total_value_size, (60_000 - 60_000 / 2) / 3);
695 }
696 }
697}