1use std::cmp;
16use std::collections::Bound::{Excluded, Included};
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use itertools::Itertools;
20use risingwave_hummock_sdk::compaction_group::StateTableId;
21use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
22 BranchedSstInfo, get_compaction_group_ids, get_table_compaction_group_id_mapping,
23};
24use risingwave_hummock_sdk::sstable_info::SstableInfo;
25use risingwave_hummock_sdk::table_stats::{PbTableStatsMap, add_prost_table_stats_map};
26use risingwave_hummock_sdk::version::{
27 HummockVersion, HummockVersionDelta, MAX_HUMMOCK_VERSION_ID,
28};
29use risingwave_hummock_sdk::{
30 CompactionGroupId, HummockContextId, HummockObjectId, HummockSstableId, HummockSstableObjectId,
31 HummockVersionId, get_stale_object_ids,
32};
33use risingwave_pb::common::WorkerNode;
34use risingwave_pb::hummock::write_limits::WriteLimit;
35use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats};
36use risingwave_pb::id::TableId;
37use risingwave_pb::meta::subscribe_response::{Info, Operation};
38
39use super::GroupStateValidator;
40use crate::MetaResult;
41use crate::hummock::HummockManager;
42use crate::hummock::error::Result;
43use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
44use crate::hummock::manager::commit_multi_var;
45use crate::hummock::manager::context::ContextInfo;
46use crate::hummock::manager::transaction::HummockVersionTransaction;
47use crate::hummock::metrics_utils::{LocalTableMetrics, trigger_write_stop_stats};
48use crate::hummock::model::CompactionGroup;
49use crate::model::VarTransaction;
50
51#[derive(Default)]
52pub struct Versioning {
53 pub disable_commit_epochs: bool,
57 pub current_version: HummockVersion,
59 pub local_metrics: HashMap<TableId, LocalTableMetrics>,
60 pub time_travel_snapshot_interval_counter: u64,
61 pub last_time_travel_snapshot_sst_ids: HashSet<HummockSstableId>,
63
64 pub hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta>,
66 pub version_stats: HummockVersionStats,
68 pub checkpoint: HummockVersionCheckpoint,
69}
70
71impl ContextInfo {
72 pub fn min_pinned_version_id(&self) -> HummockVersionId {
73 let mut min_pinned_version_id = MAX_HUMMOCK_VERSION_ID;
74 for id in self
75 .pinned_versions
76 .values()
77 .map(|v| v.min_pinned_id)
78 .chain(self.version_safe_points.iter().cloned())
79 {
80 min_pinned_version_id = cmp::min(id, min_pinned_version_id);
81 }
82 min_pinned_version_id
83 }
84}
85
86impl Versioning {
87 pub(super) fn mark_next_time_travel_version_snapshot(&mut self) {
88 self.time_travel_snapshot_interval_counter = u64::MAX;
89 }
90
91 pub fn get_tracked_object_ids(
92 &self,
93 min_pinned_version_id: HummockVersionId,
94 ) -> HashSet<HummockObjectId> {
95 let mut tracked_object_ids = self
97 .checkpoint
98 .version
99 .get_object_ids(false)
100 .collect::<HashSet<_>>();
101 for (_, delta) in self.hummock_version_deltas.range((
103 Excluded(self.checkpoint.version.id),
104 Included(self.current_version.id),
105 )) {
106 tracked_object_ids.extend(delta.newly_added_object_ids(false));
107 }
108 tracked_object_ids.extend(
110 self.checkpoint
111 .stale_objects
112 .iter()
113 .filter(|(version_id, _)| **version_id >= min_pinned_version_id)
114 .flat_map(|(_, objects)| get_stale_object_ids(objects)),
115 );
116 tracked_object_ids
117 }
118}
119
120impl HummockManager {
121 pub async fn list_pinned_version(&self) -> Vec<HummockPinnedVersion> {
122 self.context_info
123 .read()
124 .await
125 .pinned_versions
126 .values()
127 .cloned()
128 .collect_vec()
129 }
130
131 pub async fn list_workers(
132 &self,
133 context_ids: &[HummockContextId],
134 ) -> MetaResult<HashMap<HummockContextId, WorkerNode>> {
135 let mut workers = HashMap::new();
136 for context_id in context_ids {
137 if let Some(worker_node) = self
138 .metadata_manager()
139 .get_worker_by_id(*context_id as _)
140 .await?
141 {
142 workers.insert(*context_id, worker_node);
143 }
144 }
145 Ok(workers)
146 }
147
148 #[cfg(any(test, feature = "test"))]
153 pub async fn get_current_version(&self) -> HummockVersion {
154 self.on_current_version(|version| version.clone()).await
155 }
156
157 pub async fn on_current_version<T>(&self, mut f: impl FnMut(&HummockVersion) -> T) -> T {
158 f(&self.versioning.read().await.current_version)
159 }
160
161 pub async fn get_version_id(&self) -> HummockVersionId {
162 self.on_current_version(|version| version.id).await
163 }
164
165 pub async fn get_table_compaction_group_id_mapping(
167 &self,
168 ) -> HashMap<StateTableId, CompactionGroupId> {
169 get_table_compaction_group_id_mapping(&self.versioning.read().await.current_version)
170 }
171
172 pub async fn list_version_deltas(
174 &self,
175 start_id: HummockVersionId,
176 num_limit: u32,
177 ) -> Result<Vec<HummockVersionDelta>> {
178 let versioning = self.versioning.read().await;
179 let version_deltas = versioning
180 .hummock_version_deltas
181 .range(start_id..)
182 .map(|(_id, delta)| delta)
183 .take(num_limit as _)
184 .cloned()
185 .collect();
186 Ok(version_deltas)
187 }
188
189 pub async fn get_version_stats(&self) -> HummockVersionStats {
190 self.versioning.read().await.version_stats.clone()
191 }
192
193 pub(super) async fn try_update_write_limits(
197 &self,
198 target_group_ids: &[CompactionGroupId],
199 ) -> bool {
200 let versioning = self.versioning.read().await;
201 let mut cg_manager = self.compaction_group_manager.write().await;
202 let target_group_configs = target_group_ids
203 .iter()
204 .filter_map(|id| {
205 cg_manager
206 .try_get_compaction_group_config(*id)
207 .map(|config| (*id, config))
208 })
209 .collect();
210 let mut new_write_limits = calc_new_write_limits(
211 target_group_configs,
212 cg_manager.write_limit.clone(),
213 &versioning.current_version,
214 );
215 let all_group_ids: HashSet<_> =
216 HashSet::from_iter(get_compaction_group_ids(&versioning.current_version));
217 new_write_limits.retain(|group_id, _| all_group_ids.contains(group_id));
218 if new_write_limits == cg_manager.write_limit {
219 return false;
220 }
221 tracing::debug!("Hummock stopped write is updated: {:#?}", new_write_limits);
222 trigger_write_stop_stats(&self.metrics, &new_write_limits);
223 cg_manager.write_limit = new_write_limits;
224 self.env
225 .notification_manager()
226 .notify_hummock_without_version(
227 Operation::Add,
228 Info::HummockWriteLimits(risingwave_pb::hummock::WriteLimits {
229 write_limits: cg_manager.write_limit.clone(),
230 }),
231 );
232 true
233 }
234
235 pub async fn write_limits(&self) -> HashMap<CompactionGroupId, WriteLimit> {
238 let guard = self.compaction_group_manager.read().await;
239 guard.write_limit.clone()
240 }
241
242 pub async fn list_branched_objects(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
243 let guard = self.versioning.read().await;
244 guard.current_version.build_branched_sst_info()
245 }
246
247 pub async fn rebuild_table_stats(&self) -> Result<()> {
248 let mut versioning = self.versioning.write().await;
249 let new_stats = rebuild_table_stats(&versioning.current_version);
250 let mut version_stats = VarTransaction::new(&mut versioning.version_stats);
251 version_stats.table_stats = new_stats.table_stats;
253 commit_multi_var!(self.meta_store_ref(), version_stats)?;
254 Ok(())
255 }
256
257 pub async fn may_fill_backward_state_table_info(&self) -> Result<()> {
258 let mut versioning = self.versioning.write().await;
259 if versioning
260 .current_version
261 .need_fill_backward_compatible_state_table_info_delta()
262 {
263 let versioning: &mut Versioning = &mut versioning;
264 let mut version = HummockVersionTransaction::new(
265 &mut versioning.current_version,
266 &mut versioning.hummock_version_deltas,
267 self.env.notification_manager(),
268 None,
269 &self.metrics,
270 );
271 let mut new_version_delta = version.new_delta();
272 new_version_delta.with_latest_version(|version, delta| {
273 version.may_fill_backward_compatible_state_table_info_delta(delta)
274 });
275 new_version_delta.pre_apply();
276 commit_multi_var!(self.meta_store_ref(), version)?;
277 }
278 Ok(())
279 }
280}
281
282pub(super) fn calc_new_write_limits(
285 target_groups: HashMap<CompactionGroupId, CompactionGroup>,
286 origin_snapshot: HashMap<CompactionGroupId, WriteLimit>,
287 version: &HummockVersion,
288) -> HashMap<CompactionGroupId, WriteLimit> {
289 let mut new_write_limits = origin_snapshot;
290 for (id, config) in &target_groups {
291 let levels = match version.levels.get(id) {
292 None => {
293 new_write_limits.remove(id);
294 continue;
295 }
296 Some(levels) => levels,
297 };
298
299 let group_state = GroupStateValidator::check_single_group_write_stop(
300 levels,
301 config.compaction_config.as_ref(),
302 );
303
304 if group_state.is_write_stop() {
305 new_write_limits.insert(
306 *id,
307 WriteLimit {
308 table_ids: version
309 .state_table_info
310 .compaction_group_member_table_ids(*id)
311 .iter()
312 .copied()
313 .collect(),
314 reason: group_state.reason().unwrap().to_owned(),
315 },
316 );
317 continue;
318 }
319 new_write_limits.remove(id);
321 }
322 new_write_limits
323}
324
325fn rebuild_table_stats(version: &HummockVersion) -> HummockVersionStats {
328 let mut stats = HummockVersionStats {
329 hummock_version_id: version.id,
330 table_stats: Default::default(),
331 };
332 for level in version.get_combined_levels() {
333 for sst in &level.table_infos {
334 let changes = estimate_table_stats(sst);
335 add_prost_table_stats_map(&mut stats.table_stats, &changes);
336 }
337 }
338 stats
339}
340
341fn estimate_table_stats(sst: &SstableInfo) -> PbTableStatsMap {
346 let mut changes: PbTableStatsMap = HashMap::default();
347 let weighted_value =
348 |value: i64| -> i64 { (value as f64 / sst.table_ids.len() as f64).ceil() as i64 };
349 let key_range = &sst.key_range;
350 let estimated_key_size: u64 = (key_range.left.len() + key_range.right.len()) as u64 / 2;
351 let mut estimated_total_key_size = estimated_key_size * sst.total_key_count;
352 if estimated_total_key_size > sst.uncompressed_file_size {
353 estimated_total_key_size = sst.uncompressed_file_size / 2;
354 tracing::warn!(
355 %sst.sst_id,
356 "Calculated estimated_total_key_size {} > uncompressed_file_size {}. Use uncompressed_file_size/2 as estimated_total_key_size instead.",
357 estimated_total_key_size,
358 sst.uncompressed_file_size
359 );
360 }
361 let estimated_total_value_size = sst.uncompressed_file_size - estimated_total_key_size;
362 for table_id in &sst.table_ids {
363 let e = changes.entry(*table_id).or_default();
364 e.total_key_count += weighted_value(sst.total_key_count as i64);
365 e.total_key_size += weighted_value(estimated_total_key_size as i64);
366 e.total_value_size += weighted_value(estimated_total_value_size as i64);
367 }
368 changes
369}
370
371#[cfg(test)]
372mod tests {
373 use std::collections::HashMap;
374 use std::sync::Arc;
375
376 use itertools::Itertools;
377 use risingwave_hummock_sdk::key_range::KeyRange;
378 use risingwave_hummock_sdk::level::{Level, Levels};
379 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
380 use risingwave_hummock_sdk::version::{HummockVersion, MAX_HUMMOCK_VERSION_ID};
381 use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
382 use risingwave_pb::hummock::write_limits::WriteLimit;
383 use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats};
384
385 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
386 use crate::hummock::manager::context::ContextInfo;
387 use crate::hummock::manager::versioning::{
388 calc_new_write_limits, estimate_table_stats, rebuild_table_stats,
389 };
390 use crate::hummock::model::CompactionGroup;
391
392 #[test]
393 fn test_min_pinned_version_id() {
394 let mut context_info = ContextInfo::default();
395 assert_eq!(context_info.min_pinned_version_id(), MAX_HUMMOCK_VERSION_ID);
396 context_info.pinned_versions.insert(
397 1.into(),
398 HummockPinnedVersion {
399 context_id: 1.into(),
400 min_pinned_id: 10.into(),
401 },
402 );
403 assert_eq!(context_info.min_pinned_version_id(), 10);
404 context_info
405 .version_safe_points
406 .push(HummockVersionId::new(5));
407 assert_eq!(context_info.min_pinned_version_id(), 5);
408 context_info.version_safe_points.clear();
409 assert_eq!(context_info.min_pinned_version_id(), 10);
410 context_info.pinned_versions.clear();
411 assert_eq!(context_info.min_pinned_version_id(), MAX_HUMMOCK_VERSION_ID);
412 }
413
414 #[test]
415 fn test_calc_new_write_limits() {
416 let add_level_to_l0 = |levels: &mut Levels| {
417 levels.l0.sub_levels.push(Level::default());
418 };
419 let set_sub_level_number_threshold_for_group_1 =
420 |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
421 sub_level_number_threshold: u64| {
422 target_groups.insert(
423 1.into(),
424 CompactionGroup {
425 group_id: 1.into(),
426 compaction_config: Arc::new(
427 CompactionConfigBuilder::new()
428 .level0_stop_write_threshold_sub_level_number(
429 sub_level_number_threshold,
430 )
431 .build(),
432 ),
433 },
434 );
435 };
436
437 let set_level_0_max_sst_count_threshold_for_group_1 =
438 |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
439 max_sst_count_threshold: u32| {
440 target_groups.insert(
441 1.into(),
442 CompactionGroup {
443 group_id: 1.into(),
444 compaction_config: Arc::new(
445 CompactionConfigBuilder::new()
446 .level0_stop_write_threshold_max_sst_count(Some(
447 max_sst_count_threshold,
448 ))
449 .build(),
450 ),
451 },
452 );
453 };
454
455 let set_level_0_max_size_threshold_for_group_1 =
456 |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
457 max_size_threshold: u64| {
458 target_groups.insert(
459 1.into(),
460 CompactionGroup {
461 group_id: 1.into(),
462 compaction_config: Arc::new(
463 CompactionConfigBuilder::new()
464 .level0_stop_write_threshold_max_size(Some(max_size_threshold))
465 .build(),
466 ),
467 },
468 );
469 };
470
471 let mut target_groups: HashMap<CompactionGroupId, CompactionGroup> = Default::default();
472 set_sub_level_number_threshold_for_group_1(&mut target_groups, 10);
473 let origin_snapshot: HashMap<CompactionGroupId, WriteLimit> = [(
474 2.into(),
475 WriteLimit {
476 table_ids: [1, 2, 3].into_iter().map_into().collect(),
477 reason: "for test".to_owned(),
478 },
479 )]
480 .into_iter()
481 .collect();
482 let mut version: HummockVersion = Default::default();
483 for group_id in 1..=3 {
484 version.levels.insert(group_id.into(), Levels::default());
485 }
486 let new_write_limits =
487 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
488 assert_eq!(
489 new_write_limits, origin_snapshot,
490 "write limit should not be triggered for group 1"
491 );
492 assert_eq!(new_write_limits.len(), 1);
493 for _ in 1..=10 {
494 add_level_to_l0(version.levels.get_mut(&1).unwrap());
495 let new_write_limits =
496 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
497 assert_eq!(
498 new_write_limits, origin_snapshot,
499 "write limit should not be triggered for group 1"
500 );
501 }
502 add_level_to_l0(version.levels.get_mut(&1).unwrap());
503 let new_write_limits =
504 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
505 assert_ne!(
506 new_write_limits, origin_snapshot,
507 "write limit should be triggered for group 1"
508 );
509 assert_eq!(
510 new_write_limits.get(&1).as_ref().unwrap().reason,
511 "WriteStop(l0_level_count: 11, threshold: 10) too many L0 sub levels"
512 );
513 assert_eq!(new_write_limits.len(), 2);
514
515 set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
516 let new_write_limits =
517 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
518 assert_eq!(
519 new_write_limits, origin_snapshot,
520 "write limit should not be triggered for group 1"
521 );
522
523 set_sub_level_number_threshold_for_group_1(&mut target_groups, 5);
524 let new_write_limits =
525 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
526 assert_ne!(
527 new_write_limits, origin_snapshot,
528 "write limit should be triggered for group 1"
529 );
530 assert_eq!(
531 new_write_limits.get(&1).as_ref().unwrap().reason,
532 "WriteStop(l0_level_count: 11, threshold: 5) too many L0 sub levels"
533 );
534
535 set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
536 let last_level = version
537 .levels
538 .get_mut(&1)
539 .unwrap()
540 .l0
541 .sub_levels
542 .last_mut()
543 .unwrap();
544 last_level.table_infos.extend(vec![
545 SstableInfoInner {
546 key_range: KeyRange::default(),
547 table_ids: vec![1.into(), 2.into(), 3.into()],
548 total_key_count: 100,
549 sst_size: 100,
550 uncompressed_file_size: 100,
551 ..Default::default()
552 }
553 .into(),
554 SstableInfoInner {
555 key_range: KeyRange::default(),
556 table_ids: vec![1.into(), 2.into(), 3.into()],
557 total_key_count: 100,
558 sst_size: 100,
559 uncompressed_file_size: 100,
560 ..Default::default()
561 }
562 .into(),
563 ]);
564 version.levels.get_mut(&1).unwrap().l0.total_file_size += 200;
565 let new_write_limits =
566 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
567 assert_eq!(
568 new_write_limits, origin_snapshot,
569 "write limit should not be triggered for group 1"
570 );
571
572 set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10);
573 let new_write_limits =
574 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
575 assert_ne!(
576 new_write_limits, origin_snapshot,
577 "write limit should be triggered for group 1"
578 );
579 assert_eq!(
580 new_write_limits.get(&1).as_ref().unwrap().reason,
581 "WriteStop(l0_size: 200, threshold: 10) too large L0 size"
582 );
583
584 set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10000);
585 let new_write_limits =
586 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
587 assert_eq!(
588 new_write_limits, origin_snapshot,
589 "write limit should not be triggered for group 1"
590 );
591
592 set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 1);
593 let new_write_limits =
594 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
595 assert_ne!(
596 new_write_limits, origin_snapshot,
597 "write limit should be triggered for group 1"
598 );
599 assert_eq!(
600 new_write_limits.get(&1).as_ref().unwrap().reason,
601 "WriteStop(l0_sst_count: 2, threshold: 1) too many L0 sst files"
602 );
603
604 set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 100);
605 let new_write_limits =
606 calc_new_write_limits(target_groups, origin_snapshot.clone(), &version);
607
608 assert_eq!(
609 new_write_limits, origin_snapshot,
610 "write limit should not be triggered for group 1"
611 );
612 }
613
614 #[test]
615 fn test_estimate_table_stats() {
616 let sst = SstableInfoInner {
617 key_range: KeyRange {
618 left: vec![1; 10].into(),
619 right: vec![1; 20].into(),
620 ..Default::default()
621 },
622 table_ids: vec![1.into(), 2.into(), 3.into()],
623 total_key_count: 6000,
624 uncompressed_file_size: 6_000_000,
625 ..Default::default()
626 }
627 .into();
628 let changes = estimate_table_stats(&sst);
629 assert_eq!(changes.len(), 3);
630 for stats in changes.values() {
631 assert_eq!(stats.total_key_count, 6000 / 3);
632 assert_eq!(stats.total_key_size, (10 + 20) / 2 * 6000 / 3);
633 assert_eq!(
634 stats.total_value_size,
635 (6_000_000 - (10 + 20) / 2 * 6000) / 3
636 );
637 }
638
639 let mut version = HummockVersion::default();
640 version.id = HummockVersionId::new(123);
641
642 for cg in 1..3 {
643 version.levels.insert(
644 cg.into(),
645 Levels {
646 levels: vec![Level {
647 table_infos: vec![sst.clone()],
648 ..Default::default()
649 }],
650 ..Default::default()
651 },
652 );
653 }
654 let HummockVersionStats {
655 hummock_version_id,
656 table_stats,
657 } = rebuild_table_stats(&version);
658 assert_eq!(hummock_version_id, version.id);
659 assert_eq!(table_stats.len(), 3);
660 for (tid, stats) in table_stats {
661 assert_eq!(
662 stats.total_key_count,
663 changes.get(&tid).unwrap().total_key_count * 2
664 );
665 assert_eq!(
666 stats.total_key_size,
667 changes.get(&tid).unwrap().total_key_size * 2
668 );
669 assert_eq!(
670 stats.total_value_size,
671 changes.get(&tid).unwrap().total_value_size * 2
672 );
673 }
674 }
675
676 #[test]
677 fn test_estimate_table_stats_large_key_range() {
678 let sst = SstableInfoInner {
679 key_range: KeyRange {
680 left: vec![1; 1000].into(),
681 right: vec![1; 2000].into(),
682 ..Default::default()
683 },
684 table_ids: vec![1.into(), 2.into(), 3.into()],
685 total_key_count: 6000,
686 uncompressed_file_size: 60_000,
687 ..Default::default()
688 }
689 .into();
690 let changes = estimate_table_stats(&sst);
691 assert_eq!(changes.len(), 3);
692 for t in &sst.table_ids {
693 let stats = changes.get(t).unwrap();
694 assert_eq!(stats.total_key_count, 6000 / 3);
695 assert_eq!(stats.total_key_size, 60_000 / 2 / 3);
696 assert_eq!(stats.total_value_size, (60_000 - 60_000 / 2) / 3);
697 }
698 }
699}