1use std::cmp;
16use std::collections::Bound::{Excluded, Included};
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use itertools::Itertools;
20use risingwave_hummock_sdk::change_log::{EpochNewChangeLog, TableChangeLog, TableChangeLogs};
21use risingwave_hummock_sdk::compaction_group::StateTableId;
22use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
23 BranchedSstInfo, get_compaction_group_ids, get_table_compaction_group_id_mapping,
24};
25use risingwave_hummock_sdk::sstable_info::SstableInfo;
26use risingwave_hummock_sdk::table_stats::{PbTableStatsMap, add_prost_table_stats_map};
27use risingwave_hummock_sdk::version::{
28 HummockVersion, HummockVersionDelta, MAX_HUMMOCK_VERSION_ID,
29};
30use risingwave_hummock_sdk::{
31 CompactionGroupId, HummockContextId, HummockObjectId, HummockSstableId, HummockSstableObjectId,
32 HummockVersionId, get_stale_object_ids,
33};
34use risingwave_meta_model::{Epoch, hummock_table_change_log};
35use risingwave_pb::common::WorkerNode;
36use risingwave_pb::hummock::write_limits::WriteLimit;
37use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats};
38use risingwave_pb::id::TableId;
39use risingwave_pb::meta::subscribe_response::{Info, Operation};
40use sea_orm::{EntityTrait, QuerySelect, TransactionTrait};
41
42use super::GroupStateValidator;
43use crate::MetaResult;
44use crate::hummock::HummockManager;
45use crate::hummock::error::Result;
46use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
47use crate::hummock::manager::commit_multi_var;
48use crate::hummock::manager::context::ContextInfo;
49use crate::hummock::manager::transaction::HummockVersionTransaction;
50use crate::hummock::metrics_utils::{LocalTableMetrics, trigger_write_stop_stats};
51use crate::hummock::model::CompactionGroup;
52use crate::hummock::model::ext::to_table_change_log_meta_store_model;
53use crate::model::VarTransaction;
54
55#[derive(Default)]
56pub struct Versioning {
57 pub disable_commit_epochs: bool,
61 pub current_version: HummockVersion,
63 pub local_metrics: HashMap<TableId, LocalTableMetrics>,
64 pub time_travel_snapshot_interval_counter: u64,
65 pub last_time_travel_snapshot_sst_ids: HashSet<HummockSstableId>,
67
68 pub hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta>,
70 pub version_stats: HummockVersionStats,
72 pub checkpoint: HummockVersionCheckpoint,
73 pub table_change_log: HashMap<TableId, TableChangeLog>,
74}
75
76impl ContextInfo {
77 pub fn min_pinned_version_id(&self) -> HummockVersionId {
78 let mut min_pinned_version_id = MAX_HUMMOCK_VERSION_ID;
79 for id in self
80 .pinned_versions
81 .values()
82 .map(|v| v.min_pinned_id)
83 .chain(self.version_safe_points.iter().cloned())
84 {
85 min_pinned_version_id = cmp::min(id, min_pinned_version_id);
86 }
87 min_pinned_version_id
88 }
89}
90
91impl Versioning {
92 pub(super) fn mark_next_time_travel_version_snapshot(&mut self) {
93 self.time_travel_snapshot_interval_counter = u64::MAX;
94 }
95
96 pub fn get_tracked_object_ids(
97 &self,
98 min_pinned_version_id: HummockVersionId,
99 ) -> HashSet<HummockObjectId> {
100 let mut tracked_object_ids = self
102 .checkpoint
103 .version
104 .get_object_ids()
105 .chain(
106 self.table_change_log
107 .values()
108 .flat_map(|c| c.get_object_ids()),
109 )
110 .collect::<HashSet<_>>();
111 for (_, delta) in self.hummock_version_deltas.range((
113 Excluded(self.checkpoint.version.id),
114 Included(self.current_version.id),
115 )) {
116 tracked_object_ids.extend(delta.newly_added_object_ids(false));
117 }
118 tracked_object_ids.extend(
120 self.checkpoint
121 .stale_objects
122 .iter()
123 .filter(|(version_id, _)| **version_id >= min_pinned_version_id)
124 .flat_map(|(_, objects)| get_stale_object_ids(objects)),
125 );
126 tracked_object_ids
127 }
128}
129
130impl HummockManager {
131 pub async fn list_pinned_version(&self) -> Vec<HummockPinnedVersion> {
132 self.context_info
133 .read()
134 .await
135 .pinned_versions
136 .values()
137 .cloned()
138 .collect_vec()
139 }
140
141 pub async fn list_workers(
142 &self,
143 context_ids: &[HummockContextId],
144 ) -> MetaResult<HashMap<HummockContextId, WorkerNode>> {
145 let mut workers = HashMap::new();
146 for context_id in context_ids {
147 if let Some(worker_node) = self
148 .metadata_manager()
149 .get_worker_by_id(*context_id as _)
150 .await?
151 {
152 workers.insert(*context_id, worker_node);
153 }
154 }
155 Ok(workers)
156 }
157
158 #[cfg(any(test, feature = "test"))]
163 pub async fn get_current_version(&self) -> HummockVersion {
164 self.on_current_version(|version| version.clone()).await
165 }
166
167 pub async fn on_current_version<T>(&self, mut f: impl FnMut(&HummockVersion) -> T) -> T {
168 f(&self.versioning.read().await.current_version)
169 }
170
171 pub async fn on_current_version_and_table_change_log<T>(
172 &self,
173 mut f: impl FnMut(&HummockVersion, &TableChangeLogs) -> T,
174 ) -> T {
175 let guard = self.versioning.read().await;
176 f(&guard.current_version, &guard.table_change_log)
177 }
178
179 pub async fn get_version_id(&self) -> HummockVersionId {
180 self.on_current_version(|version| version.id).await
181 }
182
183 pub async fn get_table_compaction_group_id_mapping(
185 &self,
186 ) -> HashMap<StateTableId, CompactionGroupId> {
187 get_table_compaction_group_id_mapping(&self.versioning.read().await.current_version)
188 }
189
190 pub async fn list_version_deltas(
192 &self,
193 start_id: HummockVersionId,
194 num_limit: u32,
195 ) -> Result<Vec<HummockVersionDelta>> {
196 let versioning = self.versioning.read().await;
197 let version_deltas = versioning
198 .hummock_version_deltas
199 .range(start_id..)
200 .map(|(_id, delta)| delta)
201 .take(num_limit as _)
202 .cloned()
203 .collect();
204 Ok(version_deltas)
205 }
206
207 pub async fn get_version_stats(&self) -> HummockVersionStats {
208 self.versioning.read().await.version_stats.clone()
209 }
210
211 pub(super) async fn try_update_write_limits(
215 &self,
216 target_group_ids: &[CompactionGroupId],
217 ) -> bool {
218 let versioning = self.versioning.read().await;
219 let mut cg_manager = self.compaction_group_manager.write().await;
220 let target_group_configs = target_group_ids
221 .iter()
222 .filter_map(|id| {
223 cg_manager
224 .try_get_compaction_group_config(*id)
225 .map(|config| (*id, config))
226 })
227 .collect();
228 let mut new_write_limits = calc_new_write_limits(
229 target_group_configs,
230 cg_manager.write_limit.clone(),
231 &versioning.current_version,
232 );
233 let all_group_ids: HashSet<_> =
234 HashSet::from_iter(get_compaction_group_ids(&versioning.current_version));
235 new_write_limits.retain(|group_id, _| all_group_ids.contains(group_id));
236 if new_write_limits == cg_manager.write_limit {
237 return false;
238 }
239 tracing::debug!("Hummock stopped write is updated: {:#?}", new_write_limits);
240 trigger_write_stop_stats(&self.metrics, &new_write_limits);
241 cg_manager.write_limit = new_write_limits;
242 self.env
243 .notification_manager()
244 .notify_hummock_without_version(
245 Operation::Add,
246 Info::HummockWriteLimits(risingwave_pb::hummock::WriteLimits {
247 write_limits: cg_manager.write_limit.clone(),
248 }),
249 );
250 true
251 }
252
253 pub async fn write_limits(&self) -> HashMap<CompactionGroupId, WriteLimit> {
256 let guard = self.compaction_group_manager.read().await;
257 guard.write_limit.clone()
258 }
259
260 pub async fn list_branched_objects(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
261 let guard = self.versioning.read().await;
262 guard.current_version.build_branched_sst_info()
263 }
264
265 pub async fn rebuild_table_stats(&self) -> Result<()> {
266 let mut versioning = self.versioning.write().await;
267 let new_stats = rebuild_table_stats(&versioning.current_version);
268 let mut version_stats = VarTransaction::new(&mut versioning.version_stats);
269 version_stats.table_stats = new_stats.table_stats;
271 commit_multi_var!(self.meta_store_ref(), version_stats)?;
272 Ok(())
273 }
274
275 pub async fn may_fill_backward_state_table_info(&self) -> Result<()> {
276 let mut versioning = self.versioning.write().await;
277 if versioning
278 .current_version
279 .need_fill_backward_compatible_state_table_info_delta()
280 {
281 let versioning: &mut Versioning = &mut versioning;
282 let mut version = HummockVersionTransaction::new(
283 &mut versioning.current_version,
284 &mut versioning.hummock_version_deltas,
285 &mut versioning.table_change_log,
286 self.env.notification_manager(),
287 None,
288 &self.metrics,
289 &self.env.opts,
290 );
291 let mut new_version_delta = version.new_delta();
292 new_version_delta.with_latest_version(|version, delta| {
293 version.may_fill_backward_compatible_state_table_info_delta(delta)
294 });
295 new_version_delta.pre_apply();
296 commit_multi_var!(self.meta_store_ref(), version)?;
297 }
298 Ok(())
299 }
300
301 pub async fn may_fill_backward_table_change_logs(&self) -> Result<()> {
302 let mut versioning = self.versioning.write().await;
303 let version = &mut versioning.current_version;
304
305 let is_nonempty_meta_store =
306 risingwave_meta_model::hummock_table_change_log::Entity::find()
307 .select_only()
308 .columns([
309 hummock_table_change_log::Column::TableId,
310 hummock_table_change_log::Column::CheckpointEpoch,
311 ])
312 .into_tuple::<(TableId, Epoch)>()
313 .one(&self.env.meta_store_ref().conn)
314 .await?
315 .is_some();
316 #[expect(deprecated)]
317 if version.table_change_log.is_empty() || is_nonempty_meta_store {
318 return Ok(());
320 }
321
322 #[expect(deprecated)]
324 let table_change_logs = {
325 let table_change_logs = std::mem::take(&mut version.table_change_log);
326 if table_change_logs.values().all(|t| t.is_empty()) {
327 return Ok(());
328 }
329 table_change_logs
330 .into_iter()
331 .flat_map(|(table_id, change_logs)| {
332 change_logs
333 .into_iter()
334 .map(move |change_log| (table_id, change_log))
335 })
336 };
337
338 let insert_batch_size = self.env.opts.table_change_log_insert_batch_size as usize;
340 use futures::stream::{self, StreamExt};
341 let mut stream = stream::iter(table_change_logs).chunks(insert_batch_size);
342 let txn = self.env.meta_store_ref().conn.begin().await?;
343 while let Some(change_log_batch) = stream.next().await {
344 if change_log_batch.is_empty() {
345 break;
346 }
347 let insert_many = change_log_batch
348 .into_iter()
349 .map(|(table_id, change_log)| {
350 to_table_change_log_meta_store_model(table_id, &change_log)
351 })
352 .collect::<Vec<_>>();
353 risingwave_meta_model::hummock_table_change_log::Entity::insert_many(insert_many)
354 .exec(&txn)
355 .await?;
356 }
357 txn.commit().await?;
358 Ok(())
359 }
360
361 pub async fn get_table_change_logs(
362 &self,
363 epoch_only: bool,
364 start_epoch_inclusive: Option<u64>,
365 end_epoch_inclusive: Option<u64>,
366 table_ids: Option<HashSet<TableId>>,
367 exclude_empty: bool,
368 limit: Option<u32>,
369 ) -> TableChangeLogs {
370 self.on_current_version_and_table_change_log(|_, table_change_logs| {
371 table_change_logs
372 .iter()
373 .filter_map(|(id, change_log)| {
374 if let Some(table_filter) = &table_ids
375 && !table_filter.contains(id)
376 {
377 return None;
378 }
379 let filtered_change_logs = change_log
380 .filter_epoch((
381 start_epoch_inclusive.unwrap_or(0),
382 end_epoch_inclusive.unwrap_or(u64::MAX),
383 ))
384 .filter(|change_log| {
385 if exclude_empty
386 && change_log.new_value.is_empty()
387 && change_log.old_value.is_empty()
388 {
389 return false;
390 }
391 true
392 })
393 .take(limit.map(|l| l as usize).unwrap_or(usize::MAX))
394 .map(|change_log| {
395 if epoch_only {
396 EpochNewChangeLog {
397 new_value: vec![],
398 old_value: vec![],
399 non_checkpoint_epochs: change_log.non_checkpoint_epochs.clone(),
400 checkpoint_epoch: change_log.checkpoint_epoch,
401 }
402 } else {
403 change_log.clone()
404 }
405 });
406 Some((id.to_owned(), TableChangeLog::new(filtered_change_logs)))
407 })
408 .collect()
409 })
410 .await
411 }
412}
413
414pub(super) fn calc_new_write_limits(
417 target_groups: HashMap<CompactionGroupId, CompactionGroup>,
418 origin_snapshot: HashMap<CompactionGroupId, WriteLimit>,
419 version: &HummockVersion,
420) -> HashMap<CompactionGroupId, WriteLimit> {
421 let mut new_write_limits = origin_snapshot;
422 for (id, config) in &target_groups {
423 let levels = match version.levels.get(id) {
424 None => {
425 new_write_limits.remove(id);
426 continue;
427 }
428 Some(levels) => levels,
429 };
430
431 let group_state = GroupStateValidator::check_single_group_write_stop(
432 levels,
433 config.compaction_config.as_ref(),
434 );
435
436 if group_state.is_write_stop() {
437 new_write_limits.insert(
438 *id,
439 WriteLimit {
440 table_ids: version
441 .state_table_info
442 .compaction_group_member_table_ids(*id)
443 .iter()
444 .copied()
445 .collect(),
446 reason: group_state.reason().unwrap().to_owned(),
447 },
448 );
449 continue;
450 }
451 new_write_limits.remove(id);
453 }
454 new_write_limits
455}
456
457fn rebuild_table_stats(version: &HummockVersion) -> HummockVersionStats {
460 let mut stats = HummockVersionStats {
461 hummock_version_id: version.id,
462 table_stats: Default::default(),
463 };
464 for level in version.get_combined_levels() {
465 for sst in &level.table_infos {
466 let changes = estimate_table_stats(sst);
467 add_prost_table_stats_map(&mut stats.table_stats, &changes);
468 }
469 }
470 stats
471}
472
473fn estimate_table_stats(sst: &SstableInfo) -> PbTableStatsMap {
478 let mut changes: PbTableStatsMap = HashMap::default();
479 let weighted_value =
480 |value: i64| -> i64 { (value as f64 / sst.table_ids.len() as f64).ceil() as i64 };
481 let key_range = &sst.key_range;
482 let estimated_key_size: u64 = (key_range.left.len() + key_range.right.len()) as u64 / 2;
483 let mut estimated_total_key_size = estimated_key_size * sst.total_key_count;
484 if estimated_total_key_size > sst.uncompressed_file_size {
485 estimated_total_key_size = sst.uncompressed_file_size / 2;
486 tracing::warn!(
487 %sst.sst_id,
488 "Calculated estimated_total_key_size {} > uncompressed_file_size {}. Use uncompressed_file_size/2 as estimated_total_key_size instead.",
489 estimated_total_key_size,
490 sst.uncompressed_file_size
491 );
492 }
493 let estimated_total_value_size = sst.uncompressed_file_size - estimated_total_key_size;
494 for table_id in &sst.table_ids {
495 let e = changes.entry(*table_id).or_default();
496 e.total_key_count += weighted_value(sst.total_key_count as i64);
497 e.total_key_size += weighted_value(estimated_total_key_size as i64);
498 e.total_value_size += weighted_value(estimated_total_value_size as i64);
499 }
500 changes
501}
502
503#[cfg(test)]
504mod tests {
505 use std::collections::HashMap;
506 use std::sync::Arc;
507
508 use itertools::Itertools;
509 use risingwave_hummock_sdk::key_range::KeyRange;
510 use risingwave_hummock_sdk::level::{Level, Levels};
511 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
512 use risingwave_hummock_sdk::version::{HummockVersion, MAX_HUMMOCK_VERSION_ID};
513 use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
514 use risingwave_pb::hummock::write_limits::WriteLimit;
515 use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats};
516
517 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
518 use crate::hummock::manager::context::ContextInfo;
519 use crate::hummock::manager::versioning::{
520 calc_new_write_limits, estimate_table_stats, rebuild_table_stats,
521 };
522 use crate::hummock::model::CompactionGroup;
523
524 #[test]
525 fn test_min_pinned_version_id() {
526 let mut context_info = ContextInfo::default();
527 assert_eq!(context_info.min_pinned_version_id(), MAX_HUMMOCK_VERSION_ID);
528 context_info.pinned_versions.insert(
529 1.into(),
530 HummockPinnedVersion {
531 context_id: 1.into(),
532 min_pinned_id: 10.into(),
533 },
534 );
535 assert_eq!(context_info.min_pinned_version_id(), 10);
536 context_info
537 .version_safe_points
538 .push(HummockVersionId::new(5));
539 assert_eq!(context_info.min_pinned_version_id(), 5);
540 context_info.version_safe_points.clear();
541 assert_eq!(context_info.min_pinned_version_id(), 10);
542 context_info.pinned_versions.clear();
543 assert_eq!(context_info.min_pinned_version_id(), MAX_HUMMOCK_VERSION_ID);
544 }
545
546 #[test]
547 fn test_calc_new_write_limits() {
548 let add_level_to_l0 = |levels: &mut Levels| {
549 levels.l0.sub_levels.push(Level::default());
550 };
551 let set_sub_level_number_threshold_for_group_1 =
552 |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
553 sub_level_number_threshold: u64| {
554 target_groups.insert(
555 1.into(),
556 CompactionGroup {
557 group_id: 1.into(),
558 compaction_config: Arc::new(
559 CompactionConfigBuilder::new()
560 .level0_stop_write_threshold_sub_level_number(
561 sub_level_number_threshold,
562 )
563 .build(),
564 ),
565 },
566 );
567 };
568
569 let set_level_0_max_sst_count_threshold_for_group_1 =
570 |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
571 max_sst_count_threshold: u32| {
572 target_groups.insert(
573 1.into(),
574 CompactionGroup {
575 group_id: 1.into(),
576 compaction_config: Arc::new(
577 CompactionConfigBuilder::new()
578 .level0_stop_write_threshold_max_sst_count(Some(
579 max_sst_count_threshold,
580 ))
581 .build(),
582 ),
583 },
584 );
585 };
586
587 let set_level_0_max_size_threshold_for_group_1 =
588 |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
589 max_size_threshold: u64| {
590 target_groups.insert(
591 1.into(),
592 CompactionGroup {
593 group_id: 1.into(),
594 compaction_config: Arc::new(
595 CompactionConfigBuilder::new()
596 .level0_stop_write_threshold_max_size(Some(max_size_threshold))
597 .build(),
598 ),
599 },
600 );
601 };
602
603 let mut target_groups: HashMap<CompactionGroupId, CompactionGroup> = Default::default();
604 set_sub_level_number_threshold_for_group_1(&mut target_groups, 10);
605 let origin_snapshot: HashMap<CompactionGroupId, WriteLimit> = [(
606 2.into(),
607 WriteLimit {
608 table_ids: [1, 2, 3].into_iter().map_into().collect(),
609 reason: "for test".to_owned(),
610 },
611 )]
612 .into_iter()
613 .collect();
614 let mut version: HummockVersion = Default::default();
615 for group_id in 1..=3 {
616 version.levels.insert(group_id.into(), Levels::default());
617 }
618 let new_write_limits =
619 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
620 assert_eq!(
621 new_write_limits, origin_snapshot,
622 "write limit should not be triggered for group 1"
623 );
624 assert_eq!(new_write_limits.len(), 1);
625 for _ in 1..=10 {
626 add_level_to_l0(version.levels.get_mut(&1).unwrap());
627 let new_write_limits =
628 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
629 assert_eq!(
630 new_write_limits, origin_snapshot,
631 "write limit should not be triggered for group 1"
632 );
633 }
634 add_level_to_l0(version.levels.get_mut(&1).unwrap());
635 let new_write_limits =
636 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
637 assert_ne!(
638 new_write_limits, origin_snapshot,
639 "write limit should be triggered for group 1"
640 );
641 assert_eq!(
642 new_write_limits.get(&1).as_ref().unwrap().reason,
643 "WriteStop(l0_level_count: 11, threshold: 10) too many L0 sub levels"
644 );
645 assert_eq!(new_write_limits.len(), 2);
646
647 set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
648 let new_write_limits =
649 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
650 assert_eq!(
651 new_write_limits, origin_snapshot,
652 "write limit should not be triggered for group 1"
653 );
654
655 set_sub_level_number_threshold_for_group_1(&mut target_groups, 5);
656 let new_write_limits =
657 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
658 assert_ne!(
659 new_write_limits, origin_snapshot,
660 "write limit should be triggered for group 1"
661 );
662 assert_eq!(
663 new_write_limits.get(&1).as_ref().unwrap().reason,
664 "WriteStop(l0_level_count: 11, threshold: 5) too many L0 sub levels"
665 );
666
667 set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
668 let last_level = version
669 .levels
670 .get_mut(&1)
671 .unwrap()
672 .l0
673 .sub_levels
674 .last_mut()
675 .unwrap();
676 last_level.table_infos.extend(vec![
677 SstableInfoInner {
678 key_range: KeyRange::default(),
679 table_ids: vec![1.into(), 2.into(), 3.into()],
680 total_key_count: 100,
681 sst_size: 100,
682 uncompressed_file_size: 100,
683 ..Default::default()
684 }
685 .into(),
686 SstableInfoInner {
687 key_range: KeyRange::default(),
688 table_ids: vec![1.into(), 2.into(), 3.into()],
689 total_key_count: 100,
690 sst_size: 100,
691 uncompressed_file_size: 100,
692 ..Default::default()
693 }
694 .into(),
695 ]);
696 version.levels.get_mut(&1).unwrap().l0.total_file_size += 200;
697 let new_write_limits =
698 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
699 assert_eq!(
700 new_write_limits, origin_snapshot,
701 "write limit should not be triggered for group 1"
702 );
703
704 set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10);
705 let new_write_limits =
706 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
707 assert_ne!(
708 new_write_limits, origin_snapshot,
709 "write limit should be triggered for group 1"
710 );
711 assert_eq!(
712 new_write_limits.get(&1).as_ref().unwrap().reason,
713 "WriteStop(l0_size: 200, threshold: 10) too large L0 size"
714 );
715
716 set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10000);
717 let new_write_limits =
718 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
719 assert_eq!(
720 new_write_limits, origin_snapshot,
721 "write limit should not be triggered for group 1"
722 );
723
724 set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 1);
725 let new_write_limits =
726 calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
727 assert_ne!(
728 new_write_limits, origin_snapshot,
729 "write limit should be triggered for group 1"
730 );
731 assert_eq!(
732 new_write_limits.get(&1).as_ref().unwrap().reason,
733 "WriteStop(l0_sst_count: 2, threshold: 1) too many L0 sst files"
734 );
735
736 set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 100);
737 let new_write_limits =
738 calc_new_write_limits(target_groups, origin_snapshot.clone(), &version);
739
740 assert_eq!(
741 new_write_limits, origin_snapshot,
742 "write limit should not be triggered for group 1"
743 );
744 }
745
746 #[test]
747 fn test_estimate_table_stats() {
748 let sst = SstableInfoInner {
749 key_range: KeyRange {
750 left: vec![1; 10].into(),
751 right: vec![1; 20].into(),
752 ..Default::default()
753 },
754 table_ids: vec![1.into(), 2.into(), 3.into()],
755 total_key_count: 6000,
756 uncompressed_file_size: 6_000_000,
757 ..Default::default()
758 }
759 .into();
760 let changes = estimate_table_stats(&sst);
761 assert_eq!(changes.len(), 3);
762 for stats in changes.values() {
763 assert_eq!(stats.total_key_count, 6000 / 3);
764 assert_eq!(stats.total_key_size, (10 + 20) / 2 * 6000 / 3);
765 assert_eq!(
766 stats.total_value_size,
767 (6_000_000 - (10 + 20) / 2 * 6000) / 3
768 );
769 }
770
771 let mut version = HummockVersion::default();
772 version.id = HummockVersionId::new(123);
773
774 for cg in 1..3 {
775 version.levels.insert(
776 cg.into(),
777 Levels {
778 levels: vec![Level {
779 table_infos: vec![sst.clone()],
780 ..Default::default()
781 }],
782 ..Default::default()
783 },
784 );
785 }
786 let HummockVersionStats {
787 hummock_version_id,
788 table_stats,
789 } = rebuild_table_stats(&version);
790 assert_eq!(hummock_version_id, version.id);
791 assert_eq!(table_stats.len(), 3);
792 for (tid, stats) in table_stats {
793 assert_eq!(
794 stats.total_key_count,
795 changes.get(&tid).unwrap().total_key_count * 2
796 );
797 assert_eq!(
798 stats.total_key_size,
799 changes.get(&tid).unwrap().total_key_size * 2
800 );
801 assert_eq!(
802 stats.total_value_size,
803 changes.get(&tid).unwrap().total_value_size * 2
804 );
805 }
806 }
807
808 #[test]
809 fn test_estimate_table_stats_large_key_range() {
810 let sst = SstableInfoInner {
811 key_range: KeyRange {
812 left: vec![1; 1000].into(),
813 right: vec![1; 2000].into(),
814 ..Default::default()
815 },
816 table_ids: vec![1.into(), 2.into(), 3.into()],
817 total_key_count: 6000,
818 uncompressed_file_size: 60_000,
819 ..Default::default()
820 }
821 .into();
822 let changes = estimate_table_stats(&sst);
823 assert_eq!(changes.len(), 3);
824 for t in &sst.table_ids {
825 let stats = changes.get(t).unwrap();
826 assert_eq!(stats.total_key_count, 6000 / 3);
827 assert_eq!(stats.total_key_size, 60_000 / 2 / 3);
828 assert_eq!(stats.total_value_size, (60_000 - 60_000 / 2) / 3);
829 }
830 }
831}