risingwave_meta/hummock/manager/
versioning.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp;
16use std::collections::Bound::{Excluded, Included};
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use itertools::Itertools;
20use risingwave_hummock_sdk::compaction_group::StateTableId;
21use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
22    BranchedSstInfo, get_compaction_group_ids, get_table_compaction_group_id_mapping,
23};
24use risingwave_hummock_sdk::sstable_info::SstableInfo;
25use risingwave_hummock_sdk::table_stats::{PbTableStatsMap, add_prost_table_stats_map};
26use risingwave_hummock_sdk::version::{
27    HummockVersion, HummockVersionDelta, MAX_HUMMOCK_VERSION_ID,
28};
29use risingwave_hummock_sdk::{
30    CompactionGroupId, HummockContextId, HummockObjectId, HummockSstableId, HummockSstableObjectId,
31    HummockVersionId, get_stale_object_ids,
32};
33use risingwave_pb::common::WorkerNode;
34use risingwave_pb::hummock::write_limits::WriteLimit;
35use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats};
36use risingwave_pb::id::TableId;
37use risingwave_pb::meta::subscribe_response::{Info, Operation};
38
39use super::GroupStateValidator;
40use crate::MetaResult;
41use crate::hummock::HummockManager;
42use crate::hummock::error::Result;
43use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
44use crate::hummock::manager::commit_multi_var;
45use crate::hummock::manager::context::ContextInfo;
46use crate::hummock::manager::transaction::HummockVersionTransaction;
47use crate::hummock::metrics_utils::{LocalTableMetrics, trigger_write_stop_stats};
48use crate::hummock::model::CompactionGroup;
49use crate::model::VarTransaction;
50
51#[derive(Default)]
52pub struct Versioning {
53    // Volatile states below
54    /// Avoid commit epoch epochs
55    /// Don't persist compaction version delta to meta store
56    pub disable_commit_epochs: bool,
57    /// Latest hummock version
58    pub current_version: HummockVersion,
59    pub local_metrics: HashMap<TableId, LocalTableMetrics>,
60    pub time_travel_snapshot_interval_counter: u64,
61    /// Used to avoid the attempts to rewrite the same SST to meta store
62    pub last_time_travel_snapshot_sst_ids: HashSet<HummockSstableId>,
63
64    // Persistent states below
65    pub hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta>,
66    /// Stats for latest hummock version.
67    pub version_stats: HummockVersionStats,
68    pub checkpoint: HummockVersionCheckpoint,
69}
70
71impl ContextInfo {
72    pub fn min_pinned_version_id(&self) -> HummockVersionId {
73        let mut min_pinned_version_id = MAX_HUMMOCK_VERSION_ID;
74        for id in self
75            .pinned_versions
76            .values()
77            .map(|v| v.min_pinned_id)
78            .chain(self.version_safe_points.iter().cloned())
79        {
80            min_pinned_version_id = cmp::min(id, min_pinned_version_id);
81        }
82        min_pinned_version_id
83    }
84}
85
86impl Versioning {
87    pub(super) fn mark_next_time_travel_version_snapshot(&mut self) {
88        self.time_travel_snapshot_interval_counter = u64::MAX;
89    }
90
91    pub fn get_tracked_object_ids(
92        &self,
93        min_pinned_version_id: HummockVersionId,
94    ) -> HashSet<HummockObjectId> {
95        // object ids in checkpoint version
96        let mut tracked_object_ids = self
97            .checkpoint
98            .version
99            .get_object_ids(false)
100            .collect::<HashSet<_>>();
101        // add object ids added between checkpoint version and current version
102        for (_, delta) in self.hummock_version_deltas.range((
103            Excluded(self.checkpoint.version.id),
104            Included(self.current_version.id),
105        )) {
106            tracked_object_ids.extend(delta.newly_added_object_ids(false));
107        }
108        // add stale object ids before the checkpoint version
109        tracked_object_ids.extend(
110            self.checkpoint
111                .stale_objects
112                .iter()
113                .filter(|(version_id, _)| **version_id >= min_pinned_version_id)
114                .flat_map(|(_, objects)| get_stale_object_ids(objects)),
115        );
116        tracked_object_ids
117    }
118}
119
120impl HummockManager {
121    pub async fn list_pinned_version(&self) -> Vec<HummockPinnedVersion> {
122        self.context_info
123            .read()
124            .await
125            .pinned_versions
126            .values()
127            .cloned()
128            .collect_vec()
129    }
130
131    pub async fn list_workers(
132        &self,
133        context_ids: &[HummockContextId],
134    ) -> MetaResult<HashMap<HummockContextId, WorkerNode>> {
135        let mut workers = HashMap::new();
136        for context_id in context_ids {
137            if let Some(worker_node) = self
138                .metadata_manager()
139                .get_worker_by_id(*context_id as _)
140                .await?
141            {
142                workers.insert(*context_id, worker_node);
143            }
144        }
145        Ok(workers)
146    }
147
148    /// Gets current version without pinning it.
149    /// Should not be called inside [`HummockManager`], because it requests locks internally.
150    ///
151    /// Note: this method can hurt performance because it will clone a large object.
152    #[cfg(any(test, feature = "test"))]
153    pub async fn get_current_version(&self) -> HummockVersion {
154        self.on_current_version(|version| version.clone()).await
155    }
156
157    pub async fn on_current_version<T>(&self, mut f: impl FnMut(&HummockVersion) -> T) -> T {
158        f(&self.versioning.read().await.current_version)
159    }
160
161    pub async fn get_version_id(&self) -> HummockVersionId {
162        self.on_current_version(|version| version.id).await
163    }
164
165    /// Gets the mapping from table id to compaction group id
166    pub async fn get_table_compaction_group_id_mapping(
167        &self,
168    ) -> HashMap<StateTableId, CompactionGroupId> {
169        get_table_compaction_group_id_mapping(&self.versioning.read().await.current_version)
170    }
171
172    /// Get version deltas from meta store
173    pub async fn list_version_deltas(
174        &self,
175        start_id: HummockVersionId,
176        num_limit: u32,
177    ) -> Result<Vec<HummockVersionDelta>> {
178        let versioning = self.versioning.read().await;
179        let version_deltas = versioning
180            .hummock_version_deltas
181            .range(start_id..)
182            .map(|(_id, delta)| delta)
183            .take(num_limit as _)
184            .cloned()
185            .collect();
186        Ok(version_deltas)
187    }
188
189    pub async fn get_version_stats(&self) -> HummockVersionStats {
190        self.versioning.read().await.version_stats.clone()
191    }
192
193    /// Updates write limits for `target_groups` and sends notification.
194    /// Returns true if `write_limit` has been modified.
195    /// The implementation acquires `versioning` lock and `compaction_group_manager` lock.
196    pub(super) async fn try_update_write_limits(
197        &self,
198        target_group_ids: &[CompactionGroupId],
199    ) -> bool {
200        let versioning = self.versioning.read().await;
201        let mut cg_manager = self.compaction_group_manager.write().await;
202        let target_group_configs = target_group_ids
203            .iter()
204            .filter_map(|id| {
205                cg_manager
206                    .try_get_compaction_group_config(*id)
207                    .map(|config| (*id, config))
208            })
209            .collect();
210        let mut new_write_limits = calc_new_write_limits(
211            target_group_configs,
212            cg_manager.write_limit.clone(),
213            &versioning.current_version,
214        );
215        let all_group_ids: HashSet<_> =
216            HashSet::from_iter(get_compaction_group_ids(&versioning.current_version));
217        new_write_limits.retain(|group_id, _| all_group_ids.contains(group_id));
218        if new_write_limits == cg_manager.write_limit {
219            return false;
220        }
221        tracing::debug!("Hummock stopped write is updated: {:#?}", new_write_limits);
222        trigger_write_stop_stats(&self.metrics, &new_write_limits);
223        cg_manager.write_limit = new_write_limits;
224        self.env
225            .notification_manager()
226            .notify_hummock_without_version(
227                Operation::Add,
228                Info::HummockWriteLimits(risingwave_pb::hummock::WriteLimits {
229                    write_limits: cg_manager.write_limit.clone(),
230                }),
231            );
232        true
233    }
234
235    /// Gets write limits.
236    /// The implementation acquires `versioning` lock.
237    pub async fn write_limits(&self) -> HashMap<CompactionGroupId, WriteLimit> {
238        let guard = self.compaction_group_manager.read().await;
239        guard.write_limit.clone()
240    }
241
242    pub async fn list_branched_objects(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
243        let guard = self.versioning.read().await;
244        guard.current_version.build_branched_sst_info()
245    }
246
247    pub async fn rebuild_table_stats(&self) -> Result<()> {
248        let mut versioning = self.versioning.write().await;
249        let new_stats = rebuild_table_stats(&versioning.current_version);
250        let mut version_stats = VarTransaction::new(&mut versioning.version_stats);
251        // version_stats.hummock_version_id is always 0 in meta store.
252        version_stats.table_stats = new_stats.table_stats;
253        commit_multi_var!(self.meta_store_ref(), version_stats)?;
254        Ok(())
255    }
256
257    pub async fn may_fill_backward_state_table_info(&self) -> Result<()> {
258        let mut versioning = self.versioning.write().await;
259        if versioning
260            .current_version
261            .need_fill_backward_compatible_state_table_info_delta()
262        {
263            let versioning: &mut Versioning = &mut versioning;
264            let mut version = HummockVersionTransaction::new(
265                &mut versioning.current_version,
266                &mut versioning.hummock_version_deltas,
267                self.env.notification_manager(),
268                None,
269                &self.metrics,
270            );
271            let mut new_version_delta = version.new_delta();
272            new_version_delta.with_latest_version(|version, delta| {
273                version.may_fill_backward_compatible_state_table_info_delta(delta)
274            });
275            new_version_delta.pre_apply();
276            commit_multi_var!(self.meta_store_ref(), version)?;
277        }
278        Ok(())
279    }
280}
281
282/// Calculates write limits for `target_groups`.
283/// Returns a new complete write limits snapshot based on `origin_snapshot` and `version`.
284pub(super) fn calc_new_write_limits(
285    target_groups: HashMap<CompactionGroupId, CompactionGroup>,
286    origin_snapshot: HashMap<CompactionGroupId, WriteLimit>,
287    version: &HummockVersion,
288) -> HashMap<CompactionGroupId, WriteLimit> {
289    let mut new_write_limits = origin_snapshot;
290    for (id, config) in &target_groups {
291        let levels = match version.levels.get(id) {
292            None => {
293                new_write_limits.remove(id);
294                continue;
295            }
296            Some(levels) => levels,
297        };
298
299        let group_state = GroupStateValidator::check_single_group_write_stop(
300            levels,
301            config.compaction_config.as_ref(),
302        );
303
304        if group_state.is_write_stop() {
305            new_write_limits.insert(
306                *id,
307                WriteLimit {
308                    table_ids: version
309                        .state_table_info
310                        .compaction_group_member_table_ids(*id)
311                        .iter()
312                        .copied()
313                        .collect(),
314                    reason: group_state.reason().unwrap().to_owned(),
315                },
316            );
317            continue;
318        }
319        // No condition is met.
320        new_write_limits.remove(id);
321    }
322    new_write_limits
323}
324
325/// Rebuilds table stats from the given version.
326/// Note that the result is approximate value. See `estimate_table_stats`.
327fn rebuild_table_stats(version: &HummockVersion) -> HummockVersionStats {
328    let mut stats = HummockVersionStats {
329        hummock_version_id: version.id,
330        table_stats: Default::default(),
331    };
332    for level in version.get_combined_levels() {
333        for sst in &level.table_infos {
334            let changes = estimate_table_stats(sst);
335            add_prost_table_stats_map(&mut stats.table_stats, &changes);
336        }
337    }
338    stats
339}
340
341/// Estimates table stats change from the given file.
342/// - The file stats is evenly distributed among multiple tables within the file.
343/// - The total key size and total value size are estimated based on key range and file size.
344/// - Branched files may lead to an overestimation.
345fn estimate_table_stats(sst: &SstableInfo) -> PbTableStatsMap {
346    let mut changes: PbTableStatsMap = HashMap::default();
347    let weighted_value =
348        |value: i64| -> i64 { (value as f64 / sst.table_ids.len() as f64).ceil() as i64 };
349    let key_range = &sst.key_range;
350    let estimated_key_size: u64 = (key_range.left.len() + key_range.right.len()) as u64 / 2;
351    let mut estimated_total_key_size = estimated_key_size * sst.total_key_count;
352    if estimated_total_key_size > sst.uncompressed_file_size {
353        estimated_total_key_size = sst.uncompressed_file_size / 2;
354        tracing::warn!(
355            %sst.sst_id,
356            "Calculated estimated_total_key_size {} > uncompressed_file_size {}. Use uncompressed_file_size/2 as estimated_total_key_size instead.",
357            estimated_total_key_size,
358            sst.uncompressed_file_size
359        );
360    }
361    let estimated_total_value_size = sst.uncompressed_file_size - estimated_total_key_size;
362    for table_id in &sst.table_ids {
363        let e = changes.entry(*table_id).or_default();
364        e.total_key_count += weighted_value(sst.total_key_count as i64);
365        e.total_key_size += weighted_value(estimated_total_key_size as i64);
366        e.total_value_size += weighted_value(estimated_total_value_size as i64);
367    }
368    changes
369}
370
371#[cfg(test)]
372mod tests {
373    use std::collections::HashMap;
374    use std::sync::Arc;
375
376    use itertools::Itertools;
377    use risingwave_hummock_sdk::key_range::KeyRange;
378    use risingwave_hummock_sdk::level::{Level, Levels};
379    use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
380    use risingwave_hummock_sdk::version::{HummockVersion, MAX_HUMMOCK_VERSION_ID};
381    use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
382    use risingwave_pb::hummock::write_limits::WriteLimit;
383    use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats};
384
385    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
386    use crate::hummock::manager::context::ContextInfo;
387    use crate::hummock::manager::versioning::{
388        calc_new_write_limits, estimate_table_stats, rebuild_table_stats,
389    };
390    use crate::hummock::model::CompactionGroup;
391
392    #[test]
393    fn test_min_pinned_version_id() {
394        let mut context_info = ContextInfo::default();
395        assert_eq!(context_info.min_pinned_version_id(), MAX_HUMMOCK_VERSION_ID);
396        context_info.pinned_versions.insert(
397            1.into(),
398            HummockPinnedVersion {
399                context_id: 1.into(),
400                min_pinned_id: 10.into(),
401            },
402        );
403        assert_eq!(context_info.min_pinned_version_id(), 10);
404        context_info
405            .version_safe_points
406            .push(HummockVersionId::new(5));
407        assert_eq!(context_info.min_pinned_version_id(), 5);
408        context_info.version_safe_points.clear();
409        assert_eq!(context_info.min_pinned_version_id(), 10);
410        context_info.pinned_versions.clear();
411        assert_eq!(context_info.min_pinned_version_id(), MAX_HUMMOCK_VERSION_ID);
412    }
413
414    #[test]
415    fn test_calc_new_write_limits() {
416        let add_level_to_l0 = |levels: &mut Levels| {
417            levels.l0.sub_levels.push(Level::default());
418        };
419        let set_sub_level_number_threshold_for_group_1 =
420            |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
421             sub_level_number_threshold: u64| {
422                target_groups.insert(
423                    1.into(),
424                    CompactionGroup {
425                        group_id: 1.into(),
426                        compaction_config: Arc::new(
427                            CompactionConfigBuilder::new()
428                                .level0_stop_write_threshold_sub_level_number(
429                                    sub_level_number_threshold,
430                                )
431                                .build(),
432                        ),
433                    },
434                );
435            };
436
437        let set_level_0_max_sst_count_threshold_for_group_1 =
438            |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
439             max_sst_count_threshold: u32| {
440                target_groups.insert(
441                    1.into(),
442                    CompactionGroup {
443                        group_id: 1.into(),
444                        compaction_config: Arc::new(
445                            CompactionConfigBuilder::new()
446                                .level0_stop_write_threshold_max_sst_count(Some(
447                                    max_sst_count_threshold,
448                                ))
449                                .build(),
450                        ),
451                    },
452                );
453            };
454
455        let set_level_0_max_size_threshold_for_group_1 =
456            |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
457             max_size_threshold: u64| {
458                target_groups.insert(
459                    1.into(),
460                    CompactionGroup {
461                        group_id: 1.into(),
462                        compaction_config: Arc::new(
463                            CompactionConfigBuilder::new()
464                                .level0_stop_write_threshold_max_size(Some(max_size_threshold))
465                                .build(),
466                        ),
467                    },
468                );
469            };
470
471        let mut target_groups: HashMap<CompactionGroupId, CompactionGroup> = Default::default();
472        set_sub_level_number_threshold_for_group_1(&mut target_groups, 10);
473        let origin_snapshot: HashMap<CompactionGroupId, WriteLimit> = [(
474            2.into(),
475            WriteLimit {
476                table_ids: [1, 2, 3].into_iter().map_into().collect(),
477                reason: "for test".to_owned(),
478            },
479        )]
480        .into_iter()
481        .collect();
482        let mut version: HummockVersion = Default::default();
483        for group_id in 1..=3 {
484            version.levels.insert(group_id.into(), Levels::default());
485        }
486        let new_write_limits =
487            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
488        assert_eq!(
489            new_write_limits, origin_snapshot,
490            "write limit should not be triggered for group 1"
491        );
492        assert_eq!(new_write_limits.len(), 1);
493        for _ in 1..=10 {
494            add_level_to_l0(version.levels.get_mut(&1).unwrap());
495            let new_write_limits =
496                calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
497            assert_eq!(
498                new_write_limits, origin_snapshot,
499                "write limit should not be triggered for group 1"
500            );
501        }
502        add_level_to_l0(version.levels.get_mut(&1).unwrap());
503        let new_write_limits =
504            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
505        assert_ne!(
506            new_write_limits, origin_snapshot,
507            "write limit should be triggered for group 1"
508        );
509        assert_eq!(
510            new_write_limits.get(&1).as_ref().unwrap().reason,
511            "WriteStop(l0_level_count: 11, threshold: 10) too many L0 sub levels"
512        );
513        assert_eq!(new_write_limits.len(), 2);
514
515        set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
516        let new_write_limits =
517            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
518        assert_eq!(
519            new_write_limits, origin_snapshot,
520            "write limit should not be triggered for group 1"
521        );
522
523        set_sub_level_number_threshold_for_group_1(&mut target_groups, 5);
524        let new_write_limits =
525            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
526        assert_ne!(
527            new_write_limits, origin_snapshot,
528            "write limit should be triggered for group 1"
529        );
530        assert_eq!(
531            new_write_limits.get(&1).as_ref().unwrap().reason,
532            "WriteStop(l0_level_count: 11, threshold: 5) too many L0 sub levels"
533        );
534
535        set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
536        let last_level = version
537            .levels
538            .get_mut(&1)
539            .unwrap()
540            .l0
541            .sub_levels
542            .last_mut()
543            .unwrap();
544        last_level.table_infos.extend(vec![
545            SstableInfoInner {
546                key_range: KeyRange::default(),
547                table_ids: vec![1.into(), 2.into(), 3.into()],
548                total_key_count: 100,
549                sst_size: 100,
550                uncompressed_file_size: 100,
551                ..Default::default()
552            }
553            .into(),
554            SstableInfoInner {
555                key_range: KeyRange::default(),
556                table_ids: vec![1.into(), 2.into(), 3.into()],
557                total_key_count: 100,
558                sst_size: 100,
559                uncompressed_file_size: 100,
560                ..Default::default()
561            }
562            .into(),
563        ]);
564        version.levels.get_mut(&1).unwrap().l0.total_file_size += 200;
565        let new_write_limits =
566            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
567        assert_eq!(
568            new_write_limits, origin_snapshot,
569            "write limit should not be triggered for group 1"
570        );
571
572        set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10);
573        let new_write_limits =
574            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
575        assert_ne!(
576            new_write_limits, origin_snapshot,
577            "write limit should be triggered for group 1"
578        );
579        assert_eq!(
580            new_write_limits.get(&1).as_ref().unwrap().reason,
581            "WriteStop(l0_size: 200, threshold: 10) too large L0 size"
582        );
583
584        set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10000);
585        let new_write_limits =
586            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
587        assert_eq!(
588            new_write_limits, origin_snapshot,
589            "write limit should not be triggered for group 1"
590        );
591
592        set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 1);
593        let new_write_limits =
594            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
595        assert_ne!(
596            new_write_limits, origin_snapshot,
597            "write limit should be triggered for group 1"
598        );
599        assert_eq!(
600            new_write_limits.get(&1).as_ref().unwrap().reason,
601            "WriteStop(l0_sst_count: 2, threshold: 1) too many L0 sst files"
602        );
603
604        set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 100);
605        let new_write_limits =
606            calc_new_write_limits(target_groups, origin_snapshot.clone(), &version);
607
608        assert_eq!(
609            new_write_limits, origin_snapshot,
610            "write limit should not be triggered for group 1"
611        );
612    }
613
614    #[test]
615    fn test_estimate_table_stats() {
616        let sst = SstableInfoInner {
617            key_range: KeyRange {
618                left: vec![1; 10].into(),
619                right: vec![1; 20].into(),
620                ..Default::default()
621            },
622            table_ids: vec![1.into(), 2.into(), 3.into()],
623            total_key_count: 6000,
624            uncompressed_file_size: 6_000_000,
625            ..Default::default()
626        }
627        .into();
628        let changes = estimate_table_stats(&sst);
629        assert_eq!(changes.len(), 3);
630        for stats in changes.values() {
631            assert_eq!(stats.total_key_count, 6000 / 3);
632            assert_eq!(stats.total_key_size, (10 + 20) / 2 * 6000 / 3);
633            assert_eq!(
634                stats.total_value_size,
635                (6_000_000 - (10 + 20) / 2 * 6000) / 3
636            );
637        }
638
639        let mut version = HummockVersion::default();
640        version.id = HummockVersionId::new(123);
641
642        for cg in 1..3 {
643            version.levels.insert(
644                cg.into(),
645                Levels {
646                    levels: vec![Level {
647                        table_infos: vec![sst.clone()],
648                        ..Default::default()
649                    }],
650                    ..Default::default()
651                },
652            );
653        }
654        let HummockVersionStats {
655            hummock_version_id,
656            table_stats,
657        } = rebuild_table_stats(&version);
658        assert_eq!(hummock_version_id, version.id);
659        assert_eq!(table_stats.len(), 3);
660        for (tid, stats) in table_stats {
661            assert_eq!(
662                stats.total_key_count,
663                changes.get(&tid).unwrap().total_key_count * 2
664            );
665            assert_eq!(
666                stats.total_key_size,
667                changes.get(&tid).unwrap().total_key_size * 2
668            );
669            assert_eq!(
670                stats.total_value_size,
671                changes.get(&tid).unwrap().total_value_size * 2
672            );
673        }
674    }
675
676    #[test]
677    fn test_estimate_table_stats_large_key_range() {
678        let sst = SstableInfoInner {
679            key_range: KeyRange {
680                left: vec![1; 1000].into(),
681                right: vec![1; 2000].into(),
682                ..Default::default()
683            },
684            table_ids: vec![1.into(), 2.into(), 3.into()],
685            total_key_count: 6000,
686            uncompressed_file_size: 60_000,
687            ..Default::default()
688        }
689        .into();
690        let changes = estimate_table_stats(&sst);
691        assert_eq!(changes.len(), 3);
692        for t in &sst.table_ids {
693            let stats = changes.get(t).unwrap();
694            assert_eq!(stats.total_key_count, 6000 / 3);
695            assert_eq!(stats.total_key_size, 60_000 / 2 / 3);
696            assert_eq!(stats.total_value_size, (60_000 - 60_000 / 2) / 3);
697        }
698    }
699}