risingwave_meta/hummock/manager/
versioning.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp;
16use std::collections::Bound::{Excluded, Included};
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use itertools::Itertools;
20use risingwave_hummock_sdk::compaction_group::StateTableId;
21use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
22    BranchedSstInfo, get_compaction_group_ids, get_table_compaction_group_id_mapping,
23};
24use risingwave_hummock_sdk::sstable_info::SstableInfo;
25use risingwave_hummock_sdk::table_stats::{PbTableStatsMap, add_prost_table_stats_map};
26use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
27use risingwave_hummock_sdk::{
28    CompactionGroupId, HummockContextId, HummockObjectId, HummockSstableId, HummockSstableObjectId,
29    HummockVersionId, get_stale_object_ids,
30};
31use risingwave_pb::common::WorkerNode;
32use risingwave_pb::hummock::write_limits::WriteLimit;
33use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats};
34use risingwave_pb::id::TableId;
35use risingwave_pb::meta::subscribe_response::{Info, Operation};
36
37use super::GroupStateValidator;
38use crate::MetaResult;
39use crate::hummock::HummockManager;
40use crate::hummock::error::Result;
41use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
42use crate::hummock::manager::commit_multi_var;
43use crate::hummock::manager::context::ContextInfo;
44use crate::hummock::manager::transaction::HummockVersionTransaction;
45use crate::hummock::metrics_utils::{LocalTableMetrics, trigger_write_stop_stats};
46use crate::hummock::model::CompactionGroup;
47use crate::model::VarTransaction;
48
49#[derive(Default)]
50pub struct Versioning {
51    // Volatile states below
52    /// Avoid commit epoch epochs
53    /// Don't persist compaction version delta to meta store
54    pub disable_commit_epochs: bool,
55    /// Latest hummock version
56    pub current_version: HummockVersion,
57    pub local_metrics: HashMap<TableId, LocalTableMetrics>,
58    pub time_travel_snapshot_interval_counter: u64,
59    /// Used to avoid the attempts to rewrite the same SST to meta store
60    pub last_time_travel_snapshot_sst_ids: HashSet<HummockSstableId>,
61
62    // Persistent states below
63    pub hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta>,
64    /// Stats for latest hummock version.
65    pub version_stats: HummockVersionStats,
66    pub checkpoint: HummockVersionCheckpoint,
67}
68
69impl ContextInfo {
70    pub fn min_pinned_version_id(&self) -> HummockVersionId {
71        let mut min_pinned_version_id = HummockVersionId::MAX;
72        for id in self
73            .pinned_versions
74            .values()
75            .map(|v| HummockVersionId::new(v.min_pinned_id))
76            .chain(self.version_safe_points.iter().cloned())
77        {
78            min_pinned_version_id = cmp::min(id, min_pinned_version_id);
79        }
80        min_pinned_version_id
81    }
82}
83
84impl Versioning {
85    pub(super) fn mark_next_time_travel_version_snapshot(&mut self) {
86        self.time_travel_snapshot_interval_counter = u64::MAX;
87    }
88
89    pub fn get_tracked_object_ids(
90        &self,
91        min_pinned_version_id: HummockVersionId,
92    ) -> HashSet<HummockObjectId> {
93        // object ids in checkpoint version
94        let mut tracked_object_ids = self
95            .checkpoint
96            .version
97            .get_object_ids(false)
98            .collect::<HashSet<_>>();
99        // add object ids added between checkpoint version and current version
100        for (_, delta) in self.hummock_version_deltas.range((
101            Excluded(self.checkpoint.version.id),
102            Included(self.current_version.id),
103        )) {
104            tracked_object_ids.extend(delta.newly_added_object_ids(false));
105        }
106        // add stale object ids before the checkpoint version
107        tracked_object_ids.extend(
108            self.checkpoint
109                .stale_objects
110                .iter()
111                .filter(|(version_id, _)| **version_id >= min_pinned_version_id)
112                .flat_map(|(_, objects)| get_stale_object_ids(objects)),
113        );
114        tracked_object_ids
115    }
116}
117
118impl HummockManager {
119    pub async fn list_pinned_version(&self) -> Vec<HummockPinnedVersion> {
120        self.context_info
121            .read()
122            .await
123            .pinned_versions
124            .values()
125            .cloned()
126            .collect_vec()
127    }
128
129    pub async fn list_workers(
130        &self,
131        context_ids: &[HummockContextId],
132    ) -> MetaResult<HashMap<HummockContextId, WorkerNode>> {
133        let mut workers = HashMap::new();
134        for context_id in context_ids {
135            if let Some(worker_node) = self
136                .metadata_manager()
137                .get_worker_by_id(*context_id as _)
138                .await?
139            {
140                workers.insert(*context_id, worker_node);
141            }
142        }
143        Ok(workers)
144    }
145
146    /// Gets current version without pinning it.
147    /// Should not be called inside [`HummockManager`], because it requests locks internally.
148    ///
149    /// Note: this method can hurt performance because it will clone a large object.
150    #[cfg(any(test, feature = "test"))]
151    pub async fn get_current_version(&self) -> HummockVersion {
152        self.on_current_version(|version| version.clone()).await
153    }
154
155    pub async fn on_current_version<T>(&self, mut f: impl FnMut(&HummockVersion) -> T) -> T {
156        f(&self.versioning.read().await.current_version)
157    }
158
159    pub async fn get_version_id(&self) -> HummockVersionId {
160        self.on_current_version(|version| version.id).await
161    }
162
163    /// Gets the mapping from table id to compaction group id
164    pub async fn get_table_compaction_group_id_mapping(
165        &self,
166    ) -> HashMap<StateTableId, CompactionGroupId> {
167        get_table_compaction_group_id_mapping(&self.versioning.read().await.current_version)
168    }
169
170    /// Get version deltas from meta store
171    pub async fn list_version_deltas(
172        &self,
173        start_id: HummockVersionId,
174        num_limit: u32,
175    ) -> Result<Vec<HummockVersionDelta>> {
176        let versioning = self.versioning.read().await;
177        let version_deltas = versioning
178            .hummock_version_deltas
179            .range(start_id..)
180            .map(|(_id, delta)| delta)
181            .take(num_limit as _)
182            .cloned()
183            .collect();
184        Ok(version_deltas)
185    }
186
187    pub async fn get_version_stats(&self) -> HummockVersionStats {
188        self.versioning.read().await.version_stats.clone()
189    }
190
191    /// Updates write limits for `target_groups` and sends notification.
192    /// Returns true if `write_limit` has been modified.
193    /// The implementation acquires `versioning` lock and `compaction_group_manager` lock.
194    pub(super) async fn try_update_write_limits(
195        &self,
196        target_group_ids: &[CompactionGroupId],
197    ) -> bool {
198        let versioning = self.versioning.read().await;
199        let mut cg_manager = self.compaction_group_manager.write().await;
200        let target_group_configs = target_group_ids
201            .iter()
202            .filter_map(|id| {
203                cg_manager
204                    .try_get_compaction_group_config(*id)
205                    .map(|config| (*id, config))
206            })
207            .collect();
208        let mut new_write_limits = calc_new_write_limits(
209            target_group_configs,
210            cg_manager.write_limit.clone(),
211            &versioning.current_version,
212        );
213        let all_group_ids: HashSet<_> =
214            HashSet::from_iter(get_compaction_group_ids(&versioning.current_version));
215        new_write_limits.retain(|group_id, _| all_group_ids.contains(group_id));
216        if new_write_limits == cg_manager.write_limit {
217            return false;
218        }
219        tracing::debug!("Hummock stopped write is updated: {:#?}", new_write_limits);
220        trigger_write_stop_stats(&self.metrics, &new_write_limits);
221        cg_manager.write_limit = new_write_limits;
222        self.env
223            .notification_manager()
224            .notify_hummock_without_version(
225                Operation::Add,
226                Info::HummockWriteLimits(risingwave_pb::hummock::WriteLimits {
227                    write_limits: cg_manager.write_limit.clone(),
228                }),
229            );
230        true
231    }
232
233    /// Gets write limits.
234    /// The implementation acquires `versioning` lock.
235    pub async fn write_limits(&self) -> HashMap<CompactionGroupId, WriteLimit> {
236        let guard = self.compaction_group_manager.read().await;
237        guard.write_limit.clone()
238    }
239
240    pub async fn list_branched_objects(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
241        let guard = self.versioning.read().await;
242        guard.current_version.build_branched_sst_info()
243    }
244
245    pub async fn rebuild_table_stats(&self) -> Result<()> {
246        let mut versioning = self.versioning.write().await;
247        let new_stats = rebuild_table_stats(&versioning.current_version);
248        let mut version_stats = VarTransaction::new(&mut versioning.version_stats);
249        // version_stats.hummock_version_id is always 0 in meta store.
250        version_stats.table_stats = new_stats.table_stats;
251        commit_multi_var!(self.meta_store_ref(), version_stats)?;
252        Ok(())
253    }
254
255    pub async fn may_fill_backward_state_table_info(&self) -> Result<()> {
256        let mut versioning = self.versioning.write().await;
257        if versioning
258            .current_version
259            .need_fill_backward_compatible_state_table_info_delta()
260        {
261            let versioning: &mut Versioning = &mut versioning;
262            let mut version = HummockVersionTransaction::new(
263                &mut versioning.current_version,
264                &mut versioning.hummock_version_deltas,
265                self.env.notification_manager(),
266                None,
267                &self.metrics,
268            );
269            let mut new_version_delta = version.new_delta();
270            new_version_delta.with_latest_version(|version, delta| {
271                version.may_fill_backward_compatible_state_table_info_delta(delta)
272            });
273            new_version_delta.pre_apply();
274            commit_multi_var!(self.meta_store_ref(), version)?;
275        }
276        Ok(())
277    }
278}
279
280/// Calculates write limits for `target_groups`.
281/// Returns a new complete write limits snapshot based on `origin_snapshot` and `version`.
282pub(super) fn calc_new_write_limits(
283    target_groups: HashMap<CompactionGroupId, CompactionGroup>,
284    origin_snapshot: HashMap<CompactionGroupId, WriteLimit>,
285    version: &HummockVersion,
286) -> HashMap<CompactionGroupId, WriteLimit> {
287    let mut new_write_limits = origin_snapshot;
288    for (id, config) in &target_groups {
289        let levels = match version.levels.get(id) {
290            None => {
291                new_write_limits.remove(id);
292                continue;
293            }
294            Some(levels) => levels,
295        };
296
297        let group_state = GroupStateValidator::check_single_group_write_stop(
298            levels,
299            config.compaction_config.as_ref(),
300        );
301
302        if group_state.is_write_stop() {
303            new_write_limits.insert(
304                *id,
305                WriteLimit {
306                    table_ids: version
307                        .state_table_info
308                        .compaction_group_member_table_ids(*id)
309                        .iter()
310                        .copied()
311                        .collect(),
312                    reason: group_state.reason().unwrap().to_owned(),
313                },
314            );
315            continue;
316        }
317        // No condition is met.
318        new_write_limits.remove(id);
319    }
320    new_write_limits
321}
322
323/// Rebuilds table stats from the given version.
324/// Note that the result is approximate value. See `estimate_table_stats`.
325fn rebuild_table_stats(version: &HummockVersion) -> HummockVersionStats {
326    let mut stats = HummockVersionStats {
327        hummock_version_id: version.id.to_u64(),
328        table_stats: Default::default(),
329    };
330    for level in version.get_combined_levels() {
331        for sst in &level.table_infos {
332            let changes = estimate_table_stats(sst);
333            add_prost_table_stats_map(&mut stats.table_stats, &changes);
334        }
335    }
336    stats
337}
338
339/// Estimates table stats change from the given file.
340/// - The file stats is evenly distributed among multiple tables within the file.
341/// - The total key size and total value size are estimated based on key range and file size.
342/// - Branched files may lead to an overestimation.
343fn estimate_table_stats(sst: &SstableInfo) -> PbTableStatsMap {
344    let mut changes: PbTableStatsMap = HashMap::default();
345    let weighted_value =
346        |value: i64| -> i64 { (value as f64 / sst.table_ids.len() as f64).ceil() as i64 };
347    let key_range = &sst.key_range;
348    let estimated_key_size: u64 = (key_range.left.len() + key_range.right.len()) as u64 / 2;
349    let mut estimated_total_key_size = estimated_key_size * sst.total_key_count;
350    if estimated_total_key_size > sst.uncompressed_file_size {
351        estimated_total_key_size = sst.uncompressed_file_size / 2;
352        tracing::warn!(
353            %sst.sst_id,
354            "Calculated estimated_total_key_size {} > uncompressed_file_size {}. Use uncompressed_file_size/2 as estimated_total_key_size instead.",
355            estimated_total_key_size,
356            sst.uncompressed_file_size
357        );
358    }
359    let estimated_total_value_size = sst.uncompressed_file_size - estimated_total_key_size;
360    for table_id in &sst.table_ids {
361        let e = changes.entry(*table_id).or_default();
362        e.total_key_count += weighted_value(sst.total_key_count as i64);
363        e.total_key_size += weighted_value(estimated_total_key_size as i64);
364        e.total_value_size += weighted_value(estimated_total_value_size as i64);
365    }
366    changes
367}
368
369#[cfg(test)]
370mod tests {
371    use std::collections::HashMap;
372    use std::sync::Arc;
373
374    use itertools::Itertools;
375    use risingwave_hummock_sdk::key_range::KeyRange;
376    use risingwave_hummock_sdk::level::{Level, Levels};
377    use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
378    use risingwave_hummock_sdk::version::HummockVersion;
379    use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
380    use risingwave_pb::hummock::write_limits::WriteLimit;
381    use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats};
382
383    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
384    use crate::hummock::manager::context::ContextInfo;
385    use crate::hummock::manager::versioning::{
386        calc_new_write_limits, estimate_table_stats, rebuild_table_stats,
387    };
388    use crate::hummock::model::CompactionGroup;
389
390    #[test]
391    fn test_min_pinned_version_id() {
392        let mut context_info = ContextInfo::default();
393        assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX);
394        context_info.pinned_versions.insert(
395            1.into(),
396            HummockPinnedVersion {
397                context_id: 1.into(),
398                min_pinned_id: 10,
399            },
400        );
401        assert_eq!(context_info.min_pinned_version_id().to_u64(), 10);
402        context_info
403            .version_safe_points
404            .push(HummockVersionId::new(5));
405        assert_eq!(context_info.min_pinned_version_id().to_u64(), 5);
406        context_info.version_safe_points.clear();
407        assert_eq!(context_info.min_pinned_version_id().to_u64(), 10);
408        context_info.pinned_versions.clear();
409        assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX);
410    }
411
412    #[test]
413    fn test_calc_new_write_limits() {
414        let add_level_to_l0 = |levels: &mut Levels| {
415            levels.l0.sub_levels.push(Level::default());
416        };
417        let set_sub_level_number_threshold_for_group_1 =
418            |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
419             sub_level_number_threshold: u64| {
420                target_groups.insert(
421                    1,
422                    CompactionGroup {
423                        group_id: 1,
424                        compaction_config: Arc::new(
425                            CompactionConfigBuilder::new()
426                                .level0_stop_write_threshold_sub_level_number(
427                                    sub_level_number_threshold,
428                                )
429                                .build(),
430                        ),
431                    },
432                );
433            };
434
435        let set_level_0_max_sst_count_threshold_for_group_1 =
436            |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
437             max_sst_count_threshold: u32| {
438                target_groups.insert(
439                    1,
440                    CompactionGroup {
441                        group_id: 1,
442                        compaction_config: Arc::new(
443                            CompactionConfigBuilder::new()
444                                .level0_stop_write_threshold_max_sst_count(Some(
445                                    max_sst_count_threshold,
446                                ))
447                                .build(),
448                        ),
449                    },
450                );
451            };
452
453        let set_level_0_max_size_threshold_for_group_1 =
454            |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
455             max_size_threshold: u64| {
456                target_groups.insert(
457                    1,
458                    CompactionGroup {
459                        group_id: 1,
460                        compaction_config: Arc::new(
461                            CompactionConfigBuilder::new()
462                                .level0_stop_write_threshold_max_size(Some(max_size_threshold))
463                                .build(),
464                        ),
465                    },
466                );
467            };
468
469        let mut target_groups: HashMap<CompactionGroupId, CompactionGroup> = Default::default();
470        set_sub_level_number_threshold_for_group_1(&mut target_groups, 10);
471        let origin_snapshot: HashMap<CompactionGroupId, WriteLimit> = [(
472            2,
473            WriteLimit {
474                table_ids: [1, 2, 3].into_iter().map_into().collect(),
475                reason: "for test".to_owned(),
476            },
477        )]
478        .into_iter()
479        .collect();
480        let mut version: HummockVersion = Default::default();
481        for group_id in 1..=3 {
482            version.levels.insert(group_id, Levels::default());
483        }
484        let new_write_limits =
485            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
486        assert_eq!(
487            new_write_limits, origin_snapshot,
488            "write limit should not be triggered for group 1"
489        );
490        assert_eq!(new_write_limits.len(), 1);
491        for _ in 1..=10 {
492            add_level_to_l0(version.levels.get_mut(&1).unwrap());
493            let new_write_limits =
494                calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
495            assert_eq!(
496                new_write_limits, origin_snapshot,
497                "write limit should not be triggered for group 1"
498            );
499        }
500        add_level_to_l0(version.levels.get_mut(&1).unwrap());
501        let new_write_limits =
502            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
503        assert_ne!(
504            new_write_limits, origin_snapshot,
505            "write limit should be triggered for group 1"
506        );
507        assert_eq!(
508            new_write_limits.get(&1).as_ref().unwrap().reason,
509            "WriteStop(l0_level_count: 11, threshold: 10) too many L0 sub levels"
510        );
511        assert_eq!(new_write_limits.len(), 2);
512
513        set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
514        let new_write_limits =
515            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
516        assert_eq!(
517            new_write_limits, origin_snapshot,
518            "write limit should not be triggered for group 1"
519        );
520
521        set_sub_level_number_threshold_for_group_1(&mut target_groups, 5);
522        let new_write_limits =
523            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
524        assert_ne!(
525            new_write_limits, origin_snapshot,
526            "write limit should be triggered for group 1"
527        );
528        assert_eq!(
529            new_write_limits.get(&1).as_ref().unwrap().reason,
530            "WriteStop(l0_level_count: 11, threshold: 5) too many L0 sub levels"
531        );
532
533        set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
534        let last_level = version
535            .levels
536            .get_mut(&1)
537            .unwrap()
538            .l0
539            .sub_levels
540            .last_mut()
541            .unwrap();
542        last_level.table_infos.extend(vec![
543            SstableInfoInner {
544                key_range: KeyRange::default(),
545                table_ids: vec![1.into(), 2.into(), 3.into()],
546                total_key_count: 100,
547                sst_size: 100,
548                uncompressed_file_size: 100,
549                ..Default::default()
550            }
551            .into(),
552            SstableInfoInner {
553                key_range: KeyRange::default(),
554                table_ids: vec![1.into(), 2.into(), 3.into()],
555                total_key_count: 100,
556                sst_size: 100,
557                uncompressed_file_size: 100,
558                ..Default::default()
559            }
560            .into(),
561        ]);
562        version.levels.get_mut(&1).unwrap().l0.total_file_size += 200;
563        let new_write_limits =
564            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
565        assert_eq!(
566            new_write_limits, origin_snapshot,
567            "write limit should not be triggered for group 1"
568        );
569
570        set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10);
571        let new_write_limits =
572            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
573        assert_ne!(
574            new_write_limits, origin_snapshot,
575            "write limit should be triggered for group 1"
576        );
577        assert_eq!(
578            new_write_limits.get(&1).as_ref().unwrap().reason,
579            "WriteStop(l0_size: 200, threshold: 10) too large L0 size"
580        );
581
582        set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10000);
583        let new_write_limits =
584            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
585        assert_eq!(
586            new_write_limits, origin_snapshot,
587            "write limit should not be triggered for group 1"
588        );
589
590        set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 1);
591        let new_write_limits =
592            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
593        assert_ne!(
594            new_write_limits, origin_snapshot,
595            "write limit should be triggered for group 1"
596        );
597        assert_eq!(
598            new_write_limits.get(&1).as_ref().unwrap().reason,
599            "WriteStop(l0_sst_count: 2, threshold: 1) too many L0 sst files"
600        );
601
602        set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 100);
603        let new_write_limits =
604            calc_new_write_limits(target_groups, origin_snapshot.clone(), &version);
605
606        assert_eq!(
607            new_write_limits, origin_snapshot,
608            "write limit should not be triggered for group 1"
609        );
610    }
611
612    #[test]
613    fn test_estimate_table_stats() {
614        let sst = SstableInfoInner {
615            key_range: KeyRange {
616                left: vec![1; 10].into(),
617                right: vec![1; 20].into(),
618                ..Default::default()
619            },
620            table_ids: vec![1.into(), 2.into(), 3.into()],
621            total_key_count: 6000,
622            uncompressed_file_size: 6_000_000,
623            ..Default::default()
624        }
625        .into();
626        let changes = estimate_table_stats(&sst);
627        assert_eq!(changes.len(), 3);
628        for stats in changes.values() {
629            assert_eq!(stats.total_key_count, 6000 / 3);
630            assert_eq!(stats.total_key_size, (10 + 20) / 2 * 6000 / 3);
631            assert_eq!(
632                stats.total_value_size,
633                (6_000_000 - (10 + 20) / 2 * 6000) / 3
634            );
635        }
636
637        let mut version = HummockVersion::default();
638        version.id = HummockVersionId::new(123);
639
640        for cg in 1..3 {
641            version.levels.insert(
642                cg,
643                Levels {
644                    levels: vec![Level {
645                        table_infos: vec![sst.clone()],
646                        ..Default::default()
647                    }],
648                    ..Default::default()
649                },
650            );
651        }
652        let HummockVersionStats {
653            hummock_version_id,
654            table_stats,
655        } = rebuild_table_stats(&version);
656        assert_eq!(hummock_version_id, version.id.to_u64());
657        assert_eq!(table_stats.len(), 3);
658        for (tid, stats) in table_stats {
659            assert_eq!(
660                stats.total_key_count,
661                changes.get(&tid).unwrap().total_key_count * 2
662            );
663            assert_eq!(
664                stats.total_key_size,
665                changes.get(&tid).unwrap().total_key_size * 2
666            );
667            assert_eq!(
668                stats.total_value_size,
669                changes.get(&tid).unwrap().total_value_size * 2
670            );
671        }
672    }
673
674    #[test]
675    fn test_estimate_table_stats_large_key_range() {
676        let sst = SstableInfoInner {
677            key_range: KeyRange {
678                left: vec![1; 1000].into(),
679                right: vec![1; 2000].into(),
680                ..Default::default()
681            },
682            table_ids: vec![1.into(), 2.into(), 3.into()],
683            total_key_count: 6000,
684            uncompressed_file_size: 60_000,
685            ..Default::default()
686        }
687        .into();
688        let changes = estimate_table_stats(&sst);
689        assert_eq!(changes.len(), 3);
690        for t in &sst.table_ids {
691            let stats = changes.get(t).unwrap();
692            assert_eq!(stats.total_key_count, 6000 / 3);
693            assert_eq!(stats.total_key_size, 60_000 / 2 / 3);
694            assert_eq!(stats.total_value_size, (60_000 - 60_000 / 2) / 3);
695        }
696    }
697}