risingwave_meta/hummock/manager/
versioning.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp;
16use std::collections::Bound::{Excluded, Included};
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use itertools::Itertools;
20use risingwave_hummock_sdk::compaction_group::StateTableId;
21use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
22    BranchedSstInfo, get_compaction_group_ids, get_table_compaction_group_id_mapping,
23};
24use risingwave_hummock_sdk::sstable_info::SstableInfo;
25use risingwave_hummock_sdk::table_stats::add_prost_table_stats_map;
26use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
27use risingwave_hummock_sdk::{
28    CompactionGroupId, HummockContextId, HummockSstableId, HummockSstableObjectId, HummockVersionId,
29};
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::write_limits::WriteLimit;
32use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats, TableStats};
33use risingwave_pb::meta::subscribe_response::{Info, Operation};
34
35use super::GroupStateValidator;
36use crate::MetaResult;
37use crate::hummock::HummockManager;
38use crate::hummock::error::Result;
39use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
40use crate::hummock::manager::commit_multi_var;
41use crate::hummock::manager::context::ContextInfo;
42use crate::hummock::manager::transaction::HummockVersionTransaction;
43use crate::hummock::metrics_utils::{LocalTableMetrics, trigger_write_stop_stats};
44use crate::hummock::model::CompactionGroup;
45use crate::model::VarTransaction;
46
47#[derive(Default)]
48pub struct Versioning {
49    // Volatile states below
50    /// Avoid commit epoch epochs
51    /// Don't persist compaction version delta to meta store
52    pub disable_commit_epochs: bool,
53    /// Latest hummock version
54    pub current_version: HummockVersion,
55    pub local_metrics: HashMap<u32, LocalTableMetrics>,
56    pub time_travel_snapshot_interval_counter: u64,
57    /// Used to avoid the attempts to rewrite the same SST to meta store
58    pub last_time_travel_snapshot_sst_ids: HashSet<HummockSstableId>,
59
60    // Persistent states below
61    pub hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta>,
62    /// Stats for latest hummock version.
63    pub version_stats: HummockVersionStats,
64    pub checkpoint: HummockVersionCheckpoint,
65}
66
67impl ContextInfo {
68    pub fn min_pinned_version_id(&self) -> HummockVersionId {
69        let mut min_pinned_version_id = HummockVersionId::MAX;
70        for id in self
71            .pinned_versions
72            .values()
73            .map(|v| HummockVersionId::new(v.min_pinned_id))
74            .chain(self.version_safe_points.iter().cloned())
75        {
76            min_pinned_version_id = cmp::min(id, min_pinned_version_id);
77        }
78        min_pinned_version_id
79    }
80}
81
82impl Versioning {
83    pub(super) fn mark_next_time_travel_version_snapshot(&mut self) {
84        self.time_travel_snapshot_interval_counter = u64::MAX;
85    }
86
87    pub fn get_tracked_object_ids(
88        &self,
89        min_pinned_version_id: HummockVersionId,
90    ) -> HashSet<HummockSstableObjectId> {
91        // object ids in checkpoint version
92        let mut tracked_object_ids = self.checkpoint.version.get_object_ids(false);
93        // add object ids added between checkpoint version and current version
94        for (_, delta) in self.hummock_version_deltas.range((
95            Excluded(self.checkpoint.version.id),
96            Included(self.current_version.id),
97        )) {
98            tracked_object_ids.extend(delta.newly_added_object_ids(false));
99        }
100        // add stale object ids before the checkpoint version
101        tracked_object_ids.extend(
102            self.checkpoint
103                .stale_objects
104                .iter()
105                .filter(|(version_id, _)| **version_id >= min_pinned_version_id)
106                .flat_map(|(_, objects)| objects.id.iter())
107                .cloned(),
108        );
109        tracked_object_ids
110    }
111}
112
113impl HummockManager {
114    pub async fn list_pinned_version(&self) -> Vec<HummockPinnedVersion> {
115        self.context_info
116            .read()
117            .await
118            .pinned_versions
119            .values()
120            .cloned()
121            .collect_vec()
122    }
123
124    pub async fn list_workers(
125        &self,
126        context_ids: &[HummockContextId],
127    ) -> MetaResult<HashMap<HummockContextId, WorkerNode>> {
128        let mut workers = HashMap::new();
129        for context_id in context_ids {
130            if let Some(worker_node) = self
131                .metadata_manager()
132                .get_worker_by_id(*context_id as _)
133                .await?
134            {
135                workers.insert(*context_id, worker_node);
136            }
137        }
138        Ok(workers)
139    }
140
141    /// Gets current version without pinning it.
142    /// Should not be called inside [`HummockManager`], because it requests locks internally.
143    ///
144    /// Note: this method can hurt performance because it will clone a large object.
145    #[cfg(any(test, feature = "test"))]
146    pub async fn get_current_version(&self) -> HummockVersion {
147        self.on_current_version(|version| version.clone()).await
148    }
149
150    pub async fn on_current_version<T>(&self, mut f: impl FnMut(&HummockVersion) -> T) -> T {
151        f(&self.versioning.read().await.current_version)
152    }
153
154    pub async fn get_version_id(&self) -> HummockVersionId {
155        self.on_current_version(|version| version.id).await
156    }
157
158    /// Gets the mapping from table id to compaction group id
159    pub async fn get_table_compaction_group_id_mapping(
160        &self,
161    ) -> HashMap<StateTableId, CompactionGroupId> {
162        get_table_compaction_group_id_mapping(&self.versioning.read().await.current_version)
163    }
164
165    /// Get version deltas from meta store
166    #[cfg_attr(coverage, coverage(off))]
167    pub async fn list_version_deltas(
168        &self,
169        start_id: HummockVersionId,
170        num_limit: u32,
171    ) -> Result<Vec<HummockVersionDelta>> {
172        let versioning = self.versioning.read().await;
173        let version_deltas = versioning
174            .hummock_version_deltas
175            .range(start_id..)
176            .map(|(_id, delta)| delta)
177            .take(num_limit as _)
178            .cloned()
179            .collect();
180        Ok(version_deltas)
181    }
182
183    pub async fn get_version_stats(&self) -> HummockVersionStats {
184        self.versioning.read().await.version_stats.clone()
185    }
186
187    /// Updates write limits for `target_groups` and sends notification.
188    /// Returns true if `write_limit` has been modified.
189    /// The implementation acquires `versioning` lock and `compaction_group_manager` lock.
190    pub(super) async fn try_update_write_limits(
191        &self,
192        target_group_ids: &[CompactionGroupId],
193    ) -> bool {
194        let versioning = self.versioning.read().await;
195        let mut cg_manager = self.compaction_group_manager.write().await;
196        let target_group_configs = target_group_ids
197            .iter()
198            .filter_map(|id| {
199                cg_manager
200                    .try_get_compaction_group_config(*id)
201                    .map(|config| (*id, config))
202            })
203            .collect();
204        let mut new_write_limits = calc_new_write_limits(
205            target_group_configs,
206            cg_manager.write_limit.clone(),
207            &versioning.current_version,
208        );
209        let all_group_ids: HashSet<_> =
210            HashSet::from_iter(get_compaction_group_ids(&versioning.current_version));
211        new_write_limits.retain(|group_id, _| all_group_ids.contains(group_id));
212        if new_write_limits == cg_manager.write_limit {
213            return false;
214        }
215        tracing::debug!("Hummock stopped write is updated: {:#?}", new_write_limits);
216        trigger_write_stop_stats(&self.metrics, &new_write_limits);
217        cg_manager.write_limit = new_write_limits;
218        self.env
219            .notification_manager()
220            .notify_hummock_without_version(
221                Operation::Add,
222                Info::HummockWriteLimits(risingwave_pb::hummock::WriteLimits {
223                    write_limits: cg_manager.write_limit.clone(),
224                }),
225            );
226        true
227    }
228
229    /// Gets write limits.
230    /// The implementation acquires `versioning` lock.
231    pub async fn write_limits(&self) -> HashMap<CompactionGroupId, WriteLimit> {
232        let guard = self.compaction_group_manager.read().await;
233        guard.write_limit.clone()
234    }
235
236    pub async fn list_branched_objects(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
237        let guard = self.versioning.read().await;
238        guard.current_version.build_branched_sst_info()
239    }
240
241    pub async fn rebuild_table_stats(&self) -> Result<()> {
242        let mut versioning = self.versioning.write().await;
243        let new_stats = rebuild_table_stats(&versioning.current_version);
244        let mut version_stats = VarTransaction::new(&mut versioning.version_stats);
245        // version_stats.hummock_version_id is always 0 in meta store.
246        version_stats.table_stats = new_stats.table_stats;
247        commit_multi_var!(self.meta_store_ref(), version_stats)?;
248        Ok(())
249    }
250
251    pub async fn may_fill_backward_state_table_info(&self) -> Result<()> {
252        let mut versioning = self.versioning.write().await;
253        if versioning
254            .current_version
255            .need_fill_backward_compatible_state_table_info_delta()
256        {
257            let versioning: &mut Versioning = &mut versioning;
258            let mut version = HummockVersionTransaction::new(
259                &mut versioning.current_version,
260                &mut versioning.hummock_version_deltas,
261                self.env.notification_manager(),
262                None,
263                &self.metrics,
264            );
265            let mut new_version_delta = version.new_delta();
266            new_version_delta.with_latest_version(|version, delta| {
267                version.may_fill_backward_compatible_state_table_info_delta(delta)
268            });
269            new_version_delta.pre_apply();
270            commit_multi_var!(self.meta_store_ref(), version)?;
271        }
272        Ok(())
273    }
274}
275
276/// Calculates write limits for `target_groups`.
277/// Returns a new complete write limits snapshot based on `origin_snapshot` and `version`.
278pub(super) fn calc_new_write_limits(
279    target_groups: HashMap<CompactionGroupId, CompactionGroup>,
280    origin_snapshot: HashMap<CompactionGroupId, WriteLimit>,
281    version: &HummockVersion,
282) -> HashMap<CompactionGroupId, WriteLimit> {
283    let mut new_write_limits = origin_snapshot;
284    for (id, config) in &target_groups {
285        let levels = match version.levels.get(id) {
286            None => {
287                new_write_limits.remove(id);
288                continue;
289            }
290            Some(levels) => levels,
291        };
292
293        let group_state = GroupStateValidator::check_single_group_write_stop(
294            levels,
295            config.compaction_config.as_ref(),
296        );
297
298        if group_state.is_write_stop() {
299            new_write_limits.insert(
300                *id,
301                WriteLimit {
302                    table_ids: version
303                        .state_table_info
304                        .compaction_group_member_table_ids(*id)
305                        .iter()
306                        .map(|table_id| table_id.table_id)
307                        .collect(),
308                    reason: group_state.reason().unwrap().to_owned(),
309                },
310            );
311            continue;
312        }
313        // No condition is met.
314        new_write_limits.remove(id);
315    }
316    new_write_limits
317}
318
319/// Rebuilds table stats from the given version.
320/// Note that the result is approximate value. See `estimate_table_stats`.
321fn rebuild_table_stats(version: &HummockVersion) -> HummockVersionStats {
322    let mut stats = HummockVersionStats {
323        hummock_version_id: version.id.to_u64(),
324        table_stats: Default::default(),
325    };
326    for level in version.get_combined_levels() {
327        for sst in &level.table_infos {
328            let changes = estimate_table_stats(sst);
329            add_prost_table_stats_map(&mut stats.table_stats, &changes);
330        }
331    }
332    stats
333}
334
335/// Estimates table stats change from the given file.
336/// - The file stats is evenly distributed among multiple tables within the file.
337/// - The total key size and total value size are estimated based on key range and file size.
338/// - Branched files may lead to an overestimation.
339fn estimate_table_stats(sst: &SstableInfo) -> HashMap<u32, TableStats> {
340    let mut changes: HashMap<u32, TableStats> = HashMap::default();
341    let weighted_value =
342        |value: i64| -> i64 { (value as f64 / sst.table_ids.len() as f64).ceil() as i64 };
343    let key_range = &sst.key_range;
344    let estimated_key_size: u64 = (key_range.left.len() + key_range.right.len()) as u64 / 2;
345    let mut estimated_total_key_size = estimated_key_size * sst.total_key_count;
346    if estimated_total_key_size > sst.uncompressed_file_size {
347        estimated_total_key_size = sst.uncompressed_file_size / 2;
348        tracing::warn!(
349            sst.sst_id,
350            "Calculated estimated_total_key_size {} > uncompressed_file_size {}. Use uncompressed_file_size/2 as estimated_total_key_size instead.",
351            estimated_total_key_size,
352            sst.uncompressed_file_size
353        );
354    }
355    let estimated_total_value_size = sst.uncompressed_file_size - estimated_total_key_size;
356    for table_id in &sst.table_ids {
357        let e = changes.entry(*table_id).or_default();
358        e.total_key_count += weighted_value(sst.total_key_count as i64);
359        e.total_key_size += weighted_value(estimated_total_key_size as i64);
360        e.total_value_size += weighted_value(estimated_total_value_size as i64);
361    }
362    changes
363}
364
365#[cfg(test)]
366mod tests {
367    use std::collections::HashMap;
368    use std::sync::Arc;
369
370    use risingwave_hummock_sdk::key_range::KeyRange;
371    use risingwave_hummock_sdk::level::{Level, Levels};
372    use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
373    use risingwave_hummock_sdk::version::HummockVersion;
374    use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
375    use risingwave_pb::hummock::write_limits::WriteLimit;
376    use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats};
377
378    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
379    use crate::hummock::manager::context::ContextInfo;
380    use crate::hummock::manager::versioning::{
381        calc_new_write_limits, estimate_table_stats, rebuild_table_stats,
382    };
383    use crate::hummock::model::CompactionGroup;
384
385    #[test]
386    fn test_min_pinned_version_id() {
387        let mut context_info = ContextInfo::default();
388        assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX);
389        context_info.pinned_versions.insert(
390            1,
391            HummockPinnedVersion {
392                context_id: 1,
393                min_pinned_id: 10,
394            },
395        );
396        assert_eq!(context_info.min_pinned_version_id().to_u64(), 10);
397        context_info
398            .version_safe_points
399            .push(HummockVersionId::new(5));
400        assert_eq!(context_info.min_pinned_version_id().to_u64(), 5);
401        context_info.version_safe_points.clear();
402        assert_eq!(context_info.min_pinned_version_id().to_u64(), 10);
403        context_info.pinned_versions.clear();
404        assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX);
405    }
406
407    #[test]
408    fn test_calc_new_write_limits() {
409        let add_level_to_l0 = |levels: &mut Levels| {
410            levels.l0.sub_levels.push(Level::default());
411        };
412        let set_sub_level_number_threshold_for_group_1 =
413            |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
414             sub_level_number_threshold: u64| {
415                target_groups.insert(
416                    1,
417                    CompactionGroup {
418                        group_id: 1,
419                        compaction_config: Arc::new(
420                            CompactionConfigBuilder::new()
421                                .level0_stop_write_threshold_sub_level_number(
422                                    sub_level_number_threshold,
423                                )
424                                .build(),
425                        ),
426                    },
427                );
428            };
429
430        let set_level_0_max_sst_count_threshold_for_group_1 =
431            |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
432             max_sst_count_threshold: u32| {
433                target_groups.insert(
434                    1,
435                    CompactionGroup {
436                        group_id: 1,
437                        compaction_config: Arc::new(
438                            CompactionConfigBuilder::new()
439                                .level0_stop_write_threshold_max_sst_count(Some(
440                                    max_sst_count_threshold,
441                                ))
442                                .build(),
443                        ),
444                    },
445                );
446            };
447
448        let set_level_0_max_size_threshold_for_group_1 =
449            |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
450             max_size_threshold: u64| {
451                target_groups.insert(
452                    1,
453                    CompactionGroup {
454                        group_id: 1,
455                        compaction_config: Arc::new(
456                            CompactionConfigBuilder::new()
457                                .level0_stop_write_threshold_max_size(Some(max_size_threshold))
458                                .build(),
459                        ),
460                    },
461                );
462            };
463
464        let mut target_groups: HashMap<CompactionGroupId, CompactionGroup> = Default::default();
465        set_sub_level_number_threshold_for_group_1(&mut target_groups, 10);
466        let origin_snapshot: HashMap<CompactionGroupId, WriteLimit> = [(
467            2,
468            WriteLimit {
469                table_ids: vec![1, 2, 3],
470                reason: "for test".to_owned(),
471            },
472        )]
473        .into_iter()
474        .collect();
475        let mut version: HummockVersion = Default::default();
476        for group_id in 1..=3 {
477            version.levels.insert(group_id, Levels::default());
478        }
479        let new_write_limits =
480            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
481        assert_eq!(
482            new_write_limits, origin_snapshot,
483            "write limit should not be triggered for group 1"
484        );
485        assert_eq!(new_write_limits.len(), 1);
486        for _ in 1..=10 {
487            add_level_to_l0(version.levels.get_mut(&1).unwrap());
488            let new_write_limits =
489                calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
490            assert_eq!(
491                new_write_limits, origin_snapshot,
492                "write limit should not be triggered for group 1"
493            );
494        }
495        add_level_to_l0(version.levels.get_mut(&1).unwrap());
496        let new_write_limits =
497            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
498        assert_ne!(
499            new_write_limits, origin_snapshot,
500            "write limit should be triggered for group 1"
501        );
502        assert_eq!(
503            new_write_limits.get(&1).as_ref().unwrap().reason,
504            "WriteStop(l0_level_count: 11, threshold: 10) too many L0 sub levels"
505        );
506        assert_eq!(new_write_limits.len(), 2);
507
508        set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
509        let new_write_limits =
510            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
511        assert_eq!(
512            new_write_limits, origin_snapshot,
513            "write limit should not be triggered for group 1"
514        );
515
516        set_sub_level_number_threshold_for_group_1(&mut target_groups, 5);
517        let new_write_limits =
518            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
519        assert_ne!(
520            new_write_limits, origin_snapshot,
521            "write limit should be triggered for group 1"
522        );
523        assert_eq!(
524            new_write_limits.get(&1).as_ref().unwrap().reason,
525            "WriteStop(l0_level_count: 11, threshold: 5) too many L0 sub levels"
526        );
527
528        set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
529        let last_level = version
530            .levels
531            .get_mut(&1)
532            .unwrap()
533            .l0
534            .sub_levels
535            .last_mut()
536            .unwrap();
537        last_level.table_infos.extend(vec![
538            SstableInfoInner {
539                key_range: KeyRange::default(),
540                table_ids: vec![1, 2, 3],
541                total_key_count: 100,
542                sst_size: 100,
543                uncompressed_file_size: 100,
544                ..Default::default()
545            }
546            .into(),
547            SstableInfoInner {
548                key_range: KeyRange::default(),
549                table_ids: vec![1, 2, 3],
550                total_key_count: 100,
551                sst_size: 100,
552                uncompressed_file_size: 100,
553                ..Default::default()
554            }
555            .into(),
556        ]);
557        version.levels.get_mut(&1).unwrap().l0.total_file_size += 200;
558        let new_write_limits =
559            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
560        assert_eq!(
561            new_write_limits, origin_snapshot,
562            "write limit should not be triggered for group 1"
563        );
564
565        set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10);
566        let new_write_limits =
567            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
568        assert_ne!(
569            new_write_limits, origin_snapshot,
570            "write limit should be triggered for group 1"
571        );
572        assert_eq!(
573            new_write_limits.get(&1).as_ref().unwrap().reason,
574            "WriteStop(l0_size: 200, threshold: 10) too large L0 size"
575        );
576
577        set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10000);
578        let new_write_limits =
579            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
580        assert_eq!(
581            new_write_limits, origin_snapshot,
582            "write limit should not be triggered for group 1"
583        );
584
585        set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 1);
586        let new_write_limits =
587            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
588        assert_ne!(
589            new_write_limits, origin_snapshot,
590            "write limit should be triggered for group 1"
591        );
592        assert_eq!(
593            new_write_limits.get(&1).as_ref().unwrap().reason,
594            "WriteStop(l0_sst_count: 2, threshold: 1) too many L0 sst files"
595        );
596
597        set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 100);
598        let new_write_limits =
599            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
600
601        assert_eq!(
602            new_write_limits, origin_snapshot,
603            "write limit should not be triggered for group 1"
604        );
605    }
606
607    #[test]
608    fn test_estimate_table_stats() {
609        let sst = SstableInfoInner {
610            key_range: KeyRange {
611                left: vec![1; 10].into(),
612                right: vec![1; 20].into(),
613                ..Default::default()
614            },
615            table_ids: vec![1, 2, 3],
616            total_key_count: 6000,
617            uncompressed_file_size: 6_000_000,
618            ..Default::default()
619        }
620        .into();
621        let changes = estimate_table_stats(&sst);
622        assert_eq!(changes.len(), 3);
623        for stats in changes.values() {
624            assert_eq!(stats.total_key_count, 6000 / 3);
625            assert_eq!(stats.total_key_size, (10 + 20) / 2 * 6000 / 3);
626            assert_eq!(
627                stats.total_value_size,
628                (6_000_000 - (10 + 20) / 2 * 6000) / 3
629            );
630        }
631
632        let mut version = HummockVersion::default();
633        version.id = HummockVersionId::new(123);
634
635        for cg in 1..3 {
636            version.levels.insert(
637                cg,
638                Levels {
639                    levels: vec![Level {
640                        table_infos: vec![sst.clone()],
641                        ..Default::default()
642                    }],
643                    ..Default::default()
644                },
645            );
646        }
647        let HummockVersionStats {
648            hummock_version_id,
649            table_stats,
650        } = rebuild_table_stats(&version);
651        assert_eq!(hummock_version_id, version.id.to_u64());
652        assert_eq!(table_stats.len(), 3);
653        for (tid, stats) in table_stats {
654            assert_eq!(
655                stats.total_key_count,
656                changes.get(&tid).unwrap().total_key_count * 2
657            );
658            assert_eq!(
659                stats.total_key_size,
660                changes.get(&tid).unwrap().total_key_size * 2
661            );
662            assert_eq!(
663                stats.total_value_size,
664                changes.get(&tid).unwrap().total_value_size * 2
665            );
666        }
667    }
668
669    #[test]
670    fn test_estimate_table_stats_large_key_range() {
671        let sst = SstableInfoInner {
672            key_range: KeyRange {
673                left: vec![1; 1000].into(),
674                right: vec![1; 2000].into(),
675                ..Default::default()
676            },
677            table_ids: vec![1, 2, 3],
678            total_key_count: 6000,
679            uncompressed_file_size: 60_000,
680            ..Default::default()
681        }
682        .into();
683        let changes = estimate_table_stats(&sst);
684        assert_eq!(changes.len(), 3);
685        for t in &sst.table_ids {
686            let stats = changes.get(t).unwrap();
687            assert_eq!(stats.total_key_count, 6000 / 3);
688            assert_eq!(stats.total_key_size, 60_000 / 2 / 3);
689            assert_eq!(stats.total_value_size, (60_000 - 60_000 / 2) / 3);
690        }
691    }
692}