1use std::cmp;
16use std::collections::Bound::{Excluded, Included};
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use itertools::Itertools;
20use risingwave_hummock_sdk::compaction_group::StateTableId;
21use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
22 BranchedSstInfo, get_compaction_group_ids, get_table_compaction_group_id_mapping,
23};
24use risingwave_hummock_sdk::sstable_info::SstableInfo;
25use risingwave_hummock_sdk::table_stats::add_prost_table_stats_map;
26use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
27use risingwave_hummock_sdk::{
28 CompactionGroupId, HummockContextId, HummockObjectId, HummockSstableId, HummockSstableObjectId,
29 HummockVersionId, get_stale_object_ids,
30};
31use risingwave_pb::common::WorkerNode;
32use risingwave_pb::hummock::write_limits::WriteLimit;
33use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats, TableStats};
34use risingwave_pb::meta::subscribe_response::{Info, Operation};
35
36use super::GroupStateValidator;
37use crate::MetaResult;
38use crate::hummock::HummockManager;
39use crate::hummock::error::Result;
40use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
41use crate::hummock::manager::commit_multi_var;
42use crate::hummock::manager::context::ContextInfo;
43use crate::hummock::manager::transaction::HummockVersionTransaction;
44use crate::hummock::metrics_utils::{LocalTableMetrics, trigger_write_stop_stats};
45use crate::hummock::model::CompactionGroup;
46use crate::model::VarTransaction;
47
48#[derive(Default)]
49pub struct Versioning {
50 pub disable_commit_epochs: bool,
54 pub current_version: HummockVersion,
56 pub local_metrics: HashMap<u32, LocalTableMetrics>,
57 pub time_travel_snapshot_interval_counter: u64,
58 pub last_time_travel_snapshot_sst_ids: HashSet<HummockSstableId>,
60
61 pub hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta>,
63 pub version_stats: HummockVersionStats,
65 pub checkpoint: HummockVersionCheckpoint,
66}
67
68impl ContextInfo {
69 pub fn min_pinned_version_id(&self) -> HummockVersionId {
70 let mut min_pinned_version_id = HummockVersionId::MAX;
71 for id in self
72 .pinned_versions
73 .values()
74 .map(|v| HummockVersionId::new(v.min_pinned_id))
75 .chain(self.version_safe_points.iter().cloned())
76 {
77 min_pinned_version_id = cmp::min(id, min_pinned_version_id);
78 }
79 min_pinned_version_id
80 }
81}
82
83impl Versioning {
84 pub(super) fn mark_next_time_travel_version_snapshot(&mut self) {
85 self.time_travel_snapshot_interval_counter = u64::MAX;
86 }
87
88 pub fn get_tracked_object_ids(
89 &self,
90 min_pinned_version_id: HummockVersionId,
91 ) -> HashSet<HummockObjectId> {
92 let mut tracked_object_ids = self
94 .checkpoint
95 .version
96 .get_object_ids(false)
97 .collect::<HashSet<_>>();
98 for (_, delta) in self.hummock_version_deltas.range((
100 Excluded(self.checkpoint.version.id),
101 Included(self.current_version.id),
102 )) {
103 tracked_object_ids.extend(delta.newly_added_object_ids(false));
104 }
105 tracked_object_ids.extend(
107 self.checkpoint
108 .stale_objects
109 .iter()
110 .filter(|(version_id, _)| **version_id >= min_pinned_version_id)
111 .flat_map(|(_, objects)| get_stale_object_ids(objects)),
112 );
113 tracked_object_ids
114 }
115}
116
117impl HummockManager {
118 pub async fn list_pinned_version(&self) -> Vec<HummockPinnedVersion> {
119 self.context_info
120 .read()
121 .await
122 .pinned_versions
123 .values()
124 .cloned()
125 .collect_vec()
126 }
127
128 pub async fn list_workers(
129 &self,
130 context_ids: &[HummockContextId],
131 ) -> MetaResult<HashMap<HummockContextId, WorkerNode>> {
132 let mut workers = HashMap::new();
133 for context_id in context_ids {
134 if let Some(worker_node) = self
135 .metadata_manager()
136 .get_worker_by_id(*context_id as _)
137 .await?
138 {
139 workers.insert(*context_id, worker_node);
140 }
141 }
142 Ok(workers)
143 }
144
145 #[cfg(any(test, feature = "test"))]
150 pub async fn get_current_version(&self) -> HummockVersion {
151 self.on_current_version(|version| version.clone()).await
152 }
153
154 pub async fn on_current_version<T>(&self, mut f: impl FnMut(&HummockVersion) -> T) -> T {
155 f(&self.versioning.read().await.current_version)
156 }
157
158 pub async fn get_version_id(&self) -> HummockVersionId {
159 self.on_current_version(|version| version.id).await
160 }
161
162 pub async fn get_table_compaction_group_id_mapping(
164 &self,
165 ) -> HashMap<StateTableId, CompactionGroupId> {
166 get_table_compaction_group_id_mapping(&self.versioning.read().await.current_version)
167 }
168
169 pub async fn list_version_deltas(
171 &self,
172 start_id: HummockVersionId,
173 num_limit: u32,
174 ) -> Result<Vec<HummockVersionDelta>> {
175 let versioning = self.versioning.read().await;
176 let version_deltas = versioning
177 .hummock_version_deltas
178 .range(start_id..)
179 .map(|(_id, delta)| delta)
180 .take(num_limit as _)
181 .cloned()
182 .collect();
183 Ok(version_deltas)
184 }
185
186 pub async fn get_version_stats(&self) -> HummockVersionStats {
187 self.versioning.read().await.version_stats.clone()
188 }
189
190 pub(super) async fn try_update_write_limits(
194 &self,
195 target_group_ids: &[CompactionGroupId],
196 ) -> bool {
197 let versioning = self.versioning.read().await;
198 let mut cg_manager = self.compaction_group_manager.write().await;
199 let target_group_configs = target_group_ids
200 .iter()
201 .filter_map(|id| {
202 cg_manager
203 .try_get_compaction_group_config(*id)
204 .map(|config| (*id, config))
205 })
206 .collect();
207 let mut new_write_limits = calc_new_write_limits(
208 target_group_configs,
209 cg_manager.write_limit.clone(),
210 &versioning.current_version,
211 );
212 let all_group_ids: HashSet<_> =
213 HashSet::from_iter(get_compaction_group_ids(&versioning.current_version));
214 new_write_limits.retain(|group_id, _| all_group_ids.contains(group_id));
215 if new_write_limits == cg_manager.write_limit {
216 return false;
217 }
218 tracing::debug!("Hummock stopped write is updated: {:#?}", new_write_limits);
219 trigger_write_stop_stats(&self.metrics, &new_write_limits);
220 cg_manager.write_limit = new_write_limits;
221 self.env
222 .notification_manager()
223 .notify_hummock_without_version(
224 Operation::Add,
225 Info::HummockWriteLimits(risingwave_pb::hummock::WriteLimits {
226 write_limits: cg_manager.write_limit.clone(),
227 }),
228 );
229 true
230 }
231
232 pub async fn write_limits(&self) -> HashMap<CompactionGroupId, WriteLimit> {
235 let guard = self.compaction_group_manager.read().await;
236 guard.write_limit.clone()
237 }
238
239 pub async fn list_branched_objects(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
240 let guard = self.versioning.read().await;
241 guard.current_version.build_branched_sst_info()
242 }
243
244 pub async fn rebuild_table_stats(&self) -> Result<()> {
245 let mut versioning = self.versioning.write().await;
246 let new_stats = rebuild_table_stats(&versioning.current_version);
247 let mut version_stats = VarTransaction::new(&mut versioning.version_stats);
248 version_stats.table_stats = new_stats.table_stats;
250 commit_multi_var!(self.meta_store_ref(), version_stats)?;
251 Ok(())
252 }
253
254 pub async fn may_fill_backward_state_table_info(&self) -> Result<()> {
255 let mut versioning = self.versioning.write().await;
256 if versioning
257 .current_version
258 .need_fill_backward_compatible_state_table_info_delta()
259 {
260 let versioning: &mut Versioning = &mut versioning;
261 let mut version = HummockVersionTransaction::new(
262 &mut versioning.current_version,
263 &mut versioning.hummock_version_deltas,
264 self.env.notification_manager(),
265 None,
266 &self.metrics,
267 );
268 let mut new_version_delta = version.new_delta();
269 new_version_delta.with_latest_version(|version, delta| {
270 version.may_fill_backward_compatible_state_table_info_delta(delta)
271 });
272 new_version_delta.pre_apply();
273 commit_multi_var!(self.meta_store_ref(), version)?;
274 }
275 Ok(())
276 }
277}
278
279pub(super) fn calc_new_write_limits(
282 target_groups: HashMap<CompactionGroupId, CompactionGroup>,
283 origin_snapshot: HashMap<CompactionGroupId, WriteLimit>,
284 version: &HummockVersion,
285) -> HashMap<CompactionGroupId, WriteLimit> {
286 let mut new_write_limits = origin_snapshot;
287 for (id, config) in &target_groups {
288 let levels = match version.levels.get(id) {
289 None => {
290 new_write_limits.remove(id);
291 continue;
292 }
293 Some(levels) => levels,
294 };
295
296 let group_state = GroupStateValidator::check_single_group_write_stop(
297 levels,
298 config.compaction_config.as_ref(),
299 );
300
301 if group_state.is_write_stop() {
302 new_write_limits.insert(
303 *id,
304 WriteLimit {
305 table_ids: version
306 .state_table_info
307 .compaction_group_member_table_ids(*id)
308 .iter()
309 .map(|table_id| table_id.table_id)
310 .collect(),
311 reason: group_state.reason().unwrap().to_owned(),
312 },
313 );
314 continue;
315 }
316 new_write_limits.remove(id);
318 }
319 new_write_limits
320}
321
322fn rebuild_table_stats(version: &HummockVersion) -> HummockVersionStats {
325 let mut stats = HummockVersionStats {
326 hummock_version_id: version.id.to_u64(),
327 table_stats: Default::default(),
328 };
329 for level in version.get_combined_levels() {
330 for sst in &level.table_infos {
331 let changes = estimate_table_stats(sst);
332 add_prost_table_stats_map(&mut stats.table_stats, &changes);
333 }
334 }
335 stats
336}
337
338fn estimate_table_stats(sst: &SstableInfo) -> HashMap<u32, TableStats> {
343 let mut changes: HashMap<u32, TableStats> = HashMap::default();
344 let weighted_value =
345 |value: i64| -> i64 { (value as f64 / sst.table_ids.len() as f64).ceil() as i64 };
346 let key_range = &sst.key_range;
347 let estimated_key_size: u64 = (key_range.left.len() + key_range.right.len()) as u64 / 2;
348 let mut estimated_total_key_size = estimated_key_size * sst.total_key_count;
349 if estimated_total_key_size > sst.uncompressed_file_size {
350 estimated_total_key_size = sst.uncompressed_file_size / 2;
351 tracing::warn!(
352 %sst.sst_id,
353 "Calculated estimated_total_key_size {} > uncompressed_file_size {}. Use uncompressed_file_size/2 as estimated_total_key_size instead.",
354 estimated_total_key_size,
355 sst.uncompressed_file_size
356 );
357 }
358 let estimated_total_value_size = sst.uncompressed_file_size - estimated_total_key_size;
359 for table_id in &sst.table_ids {
360 let e = changes.entry(*table_id).or_default();
361 e.total_key_count += weighted_value(sst.total_key_count as i64);
362 e.total_key_size += weighted_value(estimated_total_key_size as i64);
363 e.total_value_size += weighted_value(estimated_total_value_size as i64);
364 }
365 changes
366}
367
368#[cfg(test)]
369mod tests {
370 use std::collections::HashMap;
371 use std::sync::Arc;
372
373 use risingwave_hummock_sdk::key_range::KeyRange;
374 use risingwave_hummock_sdk::level::{Level, Levels};
375 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
376 use risingwave_hummock_sdk::version::HummockVersion;
377 use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
378 use risingwave_pb::hummock::write_limits::WriteLimit;
379 use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats};
380
381 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
382 use crate::hummock::manager::context::ContextInfo;
383 use crate::hummock::manager::versioning::{
384 calc_new_write_limits, estimate_table_stats, rebuild_table_stats,
385 };
386 use crate::hummock::model::CompactionGroup;
387
388 #[test]
389 fn test_min_pinned_version_id() {
390 let mut context_info = ContextInfo::default();
391 assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX);
392 context_info.pinned_versions.insert(
393 1,
394 HummockPinnedVersion {
395 context_id: 1,
396 min_pinned_id: 10,
397 },
398 );
399 assert_eq!(context_info.min_pinned_version_id().to_u64(), 10);
400 context_info
401 .version_safe_points
402 .push(HummockVersionId::new(5));
403 assert_eq!(context_info.min_pinned_version_id().to_u64(), 5);
404 context_info.version_safe_points.clear();
405 assert_eq!(context_info.min_pinned_version_id().to_u64(), 10);
406 context_info.pinned_versions.clear();
407 assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX);
408 }
409
410 #[test]
411 fn test_calc_new_write_limits() {
412 let add_level_to_l0 = |levels: &mut Levels| {
413 levels.l0.sub_levels.push(Level::default());
414 };
415 let set_sub_level_number_threshold_for_group_1 =
416 |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
417 sub_level_number_threshold: u64| {
418 target_groups.insert(
419 1,
420 CompactionGroup {
421 group_id: 1,
422 compaction_config: Arc::new(
423 CompactionConfigBuilder::new()
424 .level0_stop_write_threshold_sub_level_number(
425 sub_level_number_threshold,
426 )
427 .build(),
428 ),
429 },
430 );
431 };
432
433 let set_level_0_max_sst_count_threshold_for_group_1 =
434 |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
435 max_sst_count_threshold: u32| {
436 target_groups.insert(
437 1,
438 CompactionGroup {
439 group_id: 1,
440 compaction_config: Arc::new(
441 CompactionConfigBuilder::new()
442 .level0_stop_write_threshold_max_sst_count(Some(
443 max_sst_count_threshold,
444 ))
445 .build(),
446 ),
447 },
448 );
449 };
450
451 let set_level_0_max_size_threshold_for_group_1 =
452 |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
453 max_size_threshold: u64| {
454 target_groups.insert(
455 1,
456 CompactionGroup {
457 group_id: 1,
458 compaction_config: Arc::new(
459 CompactionConfigBuilder::new()
460 .level0_stop_write_threshold_max_size(Some(max_size_threshold))
461 .build(),
462 ),
463 },
464 );
465 };
466
467 let mut target_groups: HashMap<CompactionGroupId, CompactionGroup> = Default::default();
468 set_sub_level_number_threshold_for_group_1(&mut target_groups, 10);
469 let origin_snapshot: HashMap<CompactionGroupId, WriteLimit> = [(
470 2,
471 WriteLimit {
472 table_ids: vec![1, 2, 3],
473 reason: "for test".to_owned(),
474 },
475 )]
476 .into_iter()
477 .collect();
478 let mut version: HummockVersion = Default::default();
479 for group_id in 1..=3 {
480 version.levels.insert(group_id, Levels::default());
481 }
482 let new_write_limits =
483 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
484 assert_eq!(
485 new_write_limits, origin_snapshot,
486 "write limit should not be triggered for group 1"
487 );
488 assert_eq!(new_write_limits.len(), 1);
489 for _ in 1..=10 {
490 add_level_to_l0(version.levels.get_mut(&1).unwrap());
491 let new_write_limits =
492 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
493 assert_eq!(
494 new_write_limits, origin_snapshot,
495 "write limit should not be triggered for group 1"
496 );
497 }
498 add_level_to_l0(version.levels.get_mut(&1).unwrap());
499 let new_write_limits =
500 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
501 assert_ne!(
502 new_write_limits, origin_snapshot,
503 "write limit should be triggered for group 1"
504 );
505 assert_eq!(
506 new_write_limits.get(&1).as_ref().unwrap().reason,
507 "WriteStop(l0_level_count: 11, threshold: 10) too many L0 sub levels"
508 );
509 assert_eq!(new_write_limits.len(), 2);
510
511 set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
512 let new_write_limits =
513 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
514 assert_eq!(
515 new_write_limits, origin_snapshot,
516 "write limit should not be triggered for group 1"
517 );
518
519 set_sub_level_number_threshold_for_group_1(&mut target_groups, 5);
520 let new_write_limits =
521 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
522 assert_ne!(
523 new_write_limits, origin_snapshot,
524 "write limit should be triggered for group 1"
525 );
526 assert_eq!(
527 new_write_limits.get(&1).as_ref().unwrap().reason,
528 "WriteStop(l0_level_count: 11, threshold: 5) too many L0 sub levels"
529 );
530
531 set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
532 let last_level = version
533 .levels
534 .get_mut(&1)
535 .unwrap()
536 .l0
537 .sub_levels
538 .last_mut()
539 .unwrap();
540 last_level.table_infos.extend(vec![
541 SstableInfoInner {
542 key_range: KeyRange::default(),
543 table_ids: vec![1, 2, 3],
544 total_key_count: 100,
545 sst_size: 100,
546 uncompressed_file_size: 100,
547 ..Default::default()
548 }
549 .into(),
550 SstableInfoInner {
551 key_range: KeyRange::default(),
552 table_ids: vec![1, 2, 3],
553 total_key_count: 100,
554 sst_size: 100,
555 uncompressed_file_size: 100,
556 ..Default::default()
557 }
558 .into(),
559 ]);
560 version.levels.get_mut(&1).unwrap().l0.total_file_size += 200;
561 let new_write_limits =
562 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
563 assert_eq!(
564 new_write_limits, origin_snapshot,
565 "write limit should not be triggered for group 1"
566 );
567
568 set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10);
569 let new_write_limits =
570 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
571 assert_ne!(
572 new_write_limits, origin_snapshot,
573 "write limit should be triggered for group 1"
574 );
575 assert_eq!(
576 new_write_limits.get(&1).as_ref().unwrap().reason,
577 "WriteStop(l0_size: 200, threshold: 10) too large L0 size"
578 );
579
580 set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10000);
581 let new_write_limits =
582 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
583 assert_eq!(
584 new_write_limits, origin_snapshot,
585 "write limit should not be triggered for group 1"
586 );
587
588 set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 1);
589 let new_write_limits =
590 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
591 assert_ne!(
592 new_write_limits, origin_snapshot,
593 "write limit should be triggered for group 1"
594 );
595 assert_eq!(
596 new_write_limits.get(&1).as_ref().unwrap().reason,
597 "WriteStop(l0_sst_count: 2, threshold: 1) too many L0 sst files"
598 );
599
600 set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 100);
601 let new_write_limits =
602 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
603
604 assert_eq!(
605 new_write_limits, origin_snapshot,
606 "write limit should not be triggered for group 1"
607 );
608 }
609
610 #[test]
611 fn test_estimate_table_stats() {
612 let sst = SstableInfoInner {
613 key_range: KeyRange {
614 left: vec![1; 10].into(),
615 right: vec![1; 20].into(),
616 ..Default::default()
617 },
618 table_ids: vec![1, 2, 3],
619 total_key_count: 6000,
620 uncompressed_file_size: 6_000_000,
621 ..Default::default()
622 }
623 .into();
624 let changes = estimate_table_stats(&sst);
625 assert_eq!(changes.len(), 3);
626 for stats in changes.values() {
627 assert_eq!(stats.total_key_count, 6000 / 3);
628 assert_eq!(stats.total_key_size, (10 + 20) / 2 * 6000 / 3);
629 assert_eq!(
630 stats.total_value_size,
631 (6_000_000 - (10 + 20) / 2 * 6000) / 3
632 );
633 }
634
635 let mut version = HummockVersion::default();
636 version.id = HummockVersionId::new(123);
637
638 for cg in 1..3 {
639 version.levels.insert(
640 cg,
641 Levels {
642 levels: vec![Level {
643 table_infos: vec![sst.clone()],
644 ..Default::default()
645 }],
646 ..Default::default()
647 },
648 );
649 }
650 let HummockVersionStats {
651 hummock_version_id,
652 table_stats,
653 } = rebuild_table_stats(&version);
654 assert_eq!(hummock_version_id, version.id.to_u64());
655 assert_eq!(table_stats.len(), 3);
656 for (tid, stats) in table_stats {
657 assert_eq!(
658 stats.total_key_count,
659 changes.get(&tid).unwrap().total_key_count * 2
660 );
661 assert_eq!(
662 stats.total_key_size,
663 changes.get(&tid).unwrap().total_key_size * 2
664 );
665 assert_eq!(
666 stats.total_value_size,
667 changes.get(&tid).unwrap().total_value_size * 2
668 );
669 }
670 }
671
672 #[test]
673 fn test_estimate_table_stats_large_key_range() {
674 let sst = SstableInfoInner {
675 key_range: KeyRange {
676 left: vec![1; 1000].into(),
677 right: vec![1; 2000].into(),
678 ..Default::default()
679 },
680 table_ids: vec![1, 2, 3],
681 total_key_count: 6000,
682 uncompressed_file_size: 60_000,
683 ..Default::default()
684 }
685 .into();
686 let changes = estimate_table_stats(&sst);
687 assert_eq!(changes.len(), 3);
688 for t in &sst.table_ids {
689 let stats = changes.get(t).unwrap();
690 assert_eq!(stats.total_key_count, 6000 / 3);
691 assert_eq!(stats.total_key_size, 60_000 / 2 / 3);
692 assert_eq!(stats.total_value_size, (60_000 - 60_000 / 2) / 3);
693 }
694 }
695}