risingwave_meta/hummock/manager/
versioning.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp;
16use std::collections::Bound::{Excluded, Included};
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use itertools::Itertools;
20use risingwave_hummock_sdk::compaction_group::StateTableId;
21use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
22    BranchedSstInfo, get_compaction_group_ids, get_table_compaction_group_id_mapping,
23};
24use risingwave_hummock_sdk::sstable_info::SstableInfo;
25use risingwave_hummock_sdk::table_stats::add_prost_table_stats_map;
26use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
27use risingwave_hummock_sdk::{
28    CompactionGroupId, HummockContextId, HummockObjectId, HummockSstableId, HummockSstableObjectId,
29    HummockVersionId, get_stale_object_ids,
30};
31use risingwave_pb::common::WorkerNode;
32use risingwave_pb::hummock::write_limits::WriteLimit;
33use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats, TableStats};
34use risingwave_pb::meta::subscribe_response::{Info, Operation};
35
36use super::GroupStateValidator;
37use crate::MetaResult;
38use crate::hummock::HummockManager;
39use crate::hummock::error::Result;
40use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
41use crate::hummock::manager::commit_multi_var;
42use crate::hummock::manager::context::ContextInfo;
43use crate::hummock::manager::transaction::HummockVersionTransaction;
44use crate::hummock::metrics_utils::{LocalTableMetrics, trigger_write_stop_stats};
45use crate::hummock::model::CompactionGroup;
46use crate::model::VarTransaction;
47
48#[derive(Default)]
49pub struct Versioning {
50    // Volatile states below
51    /// Avoid commit epoch epochs
52    /// Don't persist compaction version delta to meta store
53    pub disable_commit_epochs: bool,
54    /// Latest hummock version
55    pub current_version: HummockVersion,
56    pub local_metrics: HashMap<u32, LocalTableMetrics>,
57    pub time_travel_snapshot_interval_counter: u64,
58    /// Used to avoid the attempts to rewrite the same SST to meta store
59    pub last_time_travel_snapshot_sst_ids: HashSet<HummockSstableId>,
60
61    // Persistent states below
62    pub hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta>,
63    /// Stats for latest hummock version.
64    pub version_stats: HummockVersionStats,
65    pub checkpoint: HummockVersionCheckpoint,
66}
67
68impl ContextInfo {
69    pub fn min_pinned_version_id(&self) -> HummockVersionId {
70        let mut min_pinned_version_id = HummockVersionId::MAX;
71        for id in self
72            .pinned_versions
73            .values()
74            .map(|v| HummockVersionId::new(v.min_pinned_id))
75            .chain(self.version_safe_points.iter().cloned())
76        {
77            min_pinned_version_id = cmp::min(id, min_pinned_version_id);
78        }
79        min_pinned_version_id
80    }
81}
82
83impl Versioning {
84    pub(super) fn mark_next_time_travel_version_snapshot(&mut self) {
85        self.time_travel_snapshot_interval_counter = u64::MAX;
86    }
87
88    pub fn get_tracked_object_ids(
89        &self,
90        min_pinned_version_id: HummockVersionId,
91    ) -> HashSet<HummockObjectId> {
92        // object ids in checkpoint version
93        let mut tracked_object_ids = self
94            .checkpoint
95            .version
96            .get_object_ids(false)
97            .collect::<HashSet<_>>();
98        // add object ids added between checkpoint version and current version
99        for (_, delta) in self.hummock_version_deltas.range((
100            Excluded(self.checkpoint.version.id),
101            Included(self.current_version.id),
102        )) {
103            tracked_object_ids.extend(delta.newly_added_object_ids(false));
104        }
105        // add stale object ids before the checkpoint version
106        tracked_object_ids.extend(
107            self.checkpoint
108                .stale_objects
109                .iter()
110                .filter(|(version_id, _)| **version_id >= min_pinned_version_id)
111                .flat_map(|(_, objects)| get_stale_object_ids(objects)),
112        );
113        tracked_object_ids
114    }
115}
116
117impl HummockManager {
118    pub async fn list_pinned_version(&self) -> Vec<HummockPinnedVersion> {
119        self.context_info
120            .read()
121            .await
122            .pinned_versions
123            .values()
124            .cloned()
125            .collect_vec()
126    }
127
128    pub async fn list_workers(
129        &self,
130        context_ids: &[HummockContextId],
131    ) -> MetaResult<HashMap<HummockContextId, WorkerNode>> {
132        let mut workers = HashMap::new();
133        for context_id in context_ids {
134            if let Some(worker_node) = self
135                .metadata_manager()
136                .get_worker_by_id(*context_id as _)
137                .await?
138            {
139                workers.insert(*context_id, worker_node);
140            }
141        }
142        Ok(workers)
143    }
144
145    /// Gets current version without pinning it.
146    /// Should not be called inside [`HummockManager`], because it requests locks internally.
147    ///
148    /// Note: this method can hurt performance because it will clone a large object.
149    #[cfg(any(test, feature = "test"))]
150    pub async fn get_current_version(&self) -> HummockVersion {
151        self.on_current_version(|version| version.clone()).await
152    }
153
154    pub async fn on_current_version<T>(&self, mut f: impl FnMut(&HummockVersion) -> T) -> T {
155        f(&self.versioning.read().await.current_version)
156    }
157
158    pub async fn get_version_id(&self) -> HummockVersionId {
159        self.on_current_version(|version| version.id).await
160    }
161
162    /// Gets the mapping from table id to compaction group id
163    pub async fn get_table_compaction_group_id_mapping(
164        &self,
165    ) -> HashMap<StateTableId, CompactionGroupId> {
166        get_table_compaction_group_id_mapping(&self.versioning.read().await.current_version)
167    }
168
169    /// Get version deltas from meta store
170    pub async fn list_version_deltas(
171        &self,
172        start_id: HummockVersionId,
173        num_limit: u32,
174    ) -> Result<Vec<HummockVersionDelta>> {
175        let versioning = self.versioning.read().await;
176        let version_deltas = versioning
177            .hummock_version_deltas
178            .range(start_id..)
179            .map(|(_id, delta)| delta)
180            .take(num_limit as _)
181            .cloned()
182            .collect();
183        Ok(version_deltas)
184    }
185
186    pub async fn get_version_stats(&self) -> HummockVersionStats {
187        self.versioning.read().await.version_stats.clone()
188    }
189
190    /// Updates write limits for `target_groups` and sends notification.
191    /// Returns true if `write_limit` has been modified.
192    /// The implementation acquires `versioning` lock and `compaction_group_manager` lock.
193    pub(super) async fn try_update_write_limits(
194        &self,
195        target_group_ids: &[CompactionGroupId],
196    ) -> bool {
197        let versioning = self.versioning.read().await;
198        let mut cg_manager = self.compaction_group_manager.write().await;
199        let target_group_configs = target_group_ids
200            .iter()
201            .filter_map(|id| {
202                cg_manager
203                    .try_get_compaction_group_config(*id)
204                    .map(|config| (*id, config))
205            })
206            .collect();
207        let mut new_write_limits = calc_new_write_limits(
208            target_group_configs,
209            cg_manager.write_limit.clone(),
210            &versioning.current_version,
211        );
212        let all_group_ids: HashSet<_> =
213            HashSet::from_iter(get_compaction_group_ids(&versioning.current_version));
214        new_write_limits.retain(|group_id, _| all_group_ids.contains(group_id));
215        if new_write_limits == cg_manager.write_limit {
216            return false;
217        }
218        tracing::debug!("Hummock stopped write is updated: {:#?}", new_write_limits);
219        trigger_write_stop_stats(&self.metrics, &new_write_limits);
220        cg_manager.write_limit = new_write_limits;
221        self.env
222            .notification_manager()
223            .notify_hummock_without_version(
224                Operation::Add,
225                Info::HummockWriteLimits(risingwave_pb::hummock::WriteLimits {
226                    write_limits: cg_manager.write_limit.clone(),
227                }),
228            );
229        true
230    }
231
232    /// Gets write limits.
233    /// The implementation acquires `versioning` lock.
234    pub async fn write_limits(&self) -> HashMap<CompactionGroupId, WriteLimit> {
235        let guard = self.compaction_group_manager.read().await;
236        guard.write_limit.clone()
237    }
238
239    pub async fn list_branched_objects(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
240        let guard = self.versioning.read().await;
241        guard.current_version.build_branched_sst_info()
242    }
243
244    pub async fn rebuild_table_stats(&self) -> Result<()> {
245        let mut versioning = self.versioning.write().await;
246        let new_stats = rebuild_table_stats(&versioning.current_version);
247        let mut version_stats = VarTransaction::new(&mut versioning.version_stats);
248        // version_stats.hummock_version_id is always 0 in meta store.
249        version_stats.table_stats = new_stats.table_stats;
250        commit_multi_var!(self.meta_store_ref(), version_stats)?;
251        Ok(())
252    }
253
254    pub async fn may_fill_backward_state_table_info(&self) -> Result<()> {
255        let mut versioning = self.versioning.write().await;
256        if versioning
257            .current_version
258            .need_fill_backward_compatible_state_table_info_delta()
259        {
260            let versioning: &mut Versioning = &mut versioning;
261            let mut version = HummockVersionTransaction::new(
262                &mut versioning.current_version,
263                &mut versioning.hummock_version_deltas,
264                self.env.notification_manager(),
265                None,
266                &self.metrics,
267            );
268            let mut new_version_delta = version.new_delta();
269            new_version_delta.with_latest_version(|version, delta| {
270                version.may_fill_backward_compatible_state_table_info_delta(delta)
271            });
272            new_version_delta.pre_apply();
273            commit_multi_var!(self.meta_store_ref(), version)?;
274        }
275        Ok(())
276    }
277}
278
279/// Calculates write limits for `target_groups`.
280/// Returns a new complete write limits snapshot based on `origin_snapshot` and `version`.
281pub(super) fn calc_new_write_limits(
282    target_groups: HashMap<CompactionGroupId, CompactionGroup>,
283    origin_snapshot: HashMap<CompactionGroupId, WriteLimit>,
284    version: &HummockVersion,
285) -> HashMap<CompactionGroupId, WriteLimit> {
286    let mut new_write_limits = origin_snapshot;
287    for (id, config) in &target_groups {
288        let levels = match version.levels.get(id) {
289            None => {
290                new_write_limits.remove(id);
291                continue;
292            }
293            Some(levels) => levels,
294        };
295
296        let group_state = GroupStateValidator::check_single_group_write_stop(
297            levels,
298            config.compaction_config.as_ref(),
299        );
300
301        if group_state.is_write_stop() {
302            new_write_limits.insert(
303                *id,
304                WriteLimit {
305                    table_ids: version
306                        .state_table_info
307                        .compaction_group_member_table_ids(*id)
308                        .iter()
309                        .map(|table_id| table_id.table_id)
310                        .collect(),
311                    reason: group_state.reason().unwrap().to_owned(),
312                },
313            );
314            continue;
315        }
316        // No condition is met.
317        new_write_limits.remove(id);
318    }
319    new_write_limits
320}
321
322/// Rebuilds table stats from the given version.
323/// Note that the result is approximate value. See `estimate_table_stats`.
324fn rebuild_table_stats(version: &HummockVersion) -> HummockVersionStats {
325    let mut stats = HummockVersionStats {
326        hummock_version_id: version.id.to_u64(),
327        table_stats: Default::default(),
328    };
329    for level in version.get_combined_levels() {
330        for sst in &level.table_infos {
331            let changes = estimate_table_stats(sst);
332            add_prost_table_stats_map(&mut stats.table_stats, &changes);
333        }
334    }
335    stats
336}
337
338/// Estimates table stats change from the given file.
339/// - The file stats is evenly distributed among multiple tables within the file.
340/// - The total key size and total value size are estimated based on key range and file size.
341/// - Branched files may lead to an overestimation.
342fn estimate_table_stats(sst: &SstableInfo) -> HashMap<u32, TableStats> {
343    let mut changes: HashMap<u32, TableStats> = HashMap::default();
344    let weighted_value =
345        |value: i64| -> i64 { (value as f64 / sst.table_ids.len() as f64).ceil() as i64 };
346    let key_range = &sst.key_range;
347    let estimated_key_size: u64 = (key_range.left.len() + key_range.right.len()) as u64 / 2;
348    let mut estimated_total_key_size = estimated_key_size * sst.total_key_count;
349    if estimated_total_key_size > sst.uncompressed_file_size {
350        estimated_total_key_size = sst.uncompressed_file_size / 2;
351        tracing::warn!(
352            %sst.sst_id,
353            "Calculated estimated_total_key_size {} > uncompressed_file_size {}. Use uncompressed_file_size/2 as estimated_total_key_size instead.",
354            estimated_total_key_size,
355            sst.uncompressed_file_size
356        );
357    }
358    let estimated_total_value_size = sst.uncompressed_file_size - estimated_total_key_size;
359    for table_id in &sst.table_ids {
360        let e = changes.entry(*table_id).or_default();
361        e.total_key_count += weighted_value(sst.total_key_count as i64);
362        e.total_key_size += weighted_value(estimated_total_key_size as i64);
363        e.total_value_size += weighted_value(estimated_total_value_size as i64);
364    }
365    changes
366}
367
368#[cfg(test)]
369mod tests {
370    use std::collections::HashMap;
371    use std::sync::Arc;
372
373    use risingwave_hummock_sdk::key_range::KeyRange;
374    use risingwave_hummock_sdk::level::{Level, Levels};
375    use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
376    use risingwave_hummock_sdk::version::HummockVersion;
377    use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
378    use risingwave_pb::hummock::write_limits::WriteLimit;
379    use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats};
380
381    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
382    use crate::hummock::manager::context::ContextInfo;
383    use crate::hummock::manager::versioning::{
384        calc_new_write_limits, estimate_table_stats, rebuild_table_stats,
385    };
386    use crate::hummock::model::CompactionGroup;
387
388    #[test]
389    fn test_min_pinned_version_id() {
390        let mut context_info = ContextInfo::default();
391        assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX);
392        context_info.pinned_versions.insert(
393            1,
394            HummockPinnedVersion {
395                context_id: 1,
396                min_pinned_id: 10,
397            },
398        );
399        assert_eq!(context_info.min_pinned_version_id().to_u64(), 10);
400        context_info
401            .version_safe_points
402            .push(HummockVersionId::new(5));
403        assert_eq!(context_info.min_pinned_version_id().to_u64(), 5);
404        context_info.version_safe_points.clear();
405        assert_eq!(context_info.min_pinned_version_id().to_u64(), 10);
406        context_info.pinned_versions.clear();
407        assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX);
408    }
409
410    #[test]
411    fn test_calc_new_write_limits() {
412        let add_level_to_l0 = |levels: &mut Levels| {
413            levels.l0.sub_levels.push(Level::default());
414        };
415        let set_sub_level_number_threshold_for_group_1 =
416            |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
417             sub_level_number_threshold: u64| {
418                target_groups.insert(
419                    1,
420                    CompactionGroup {
421                        group_id: 1,
422                        compaction_config: Arc::new(
423                            CompactionConfigBuilder::new()
424                                .level0_stop_write_threshold_sub_level_number(
425                                    sub_level_number_threshold,
426                                )
427                                .build(),
428                        ),
429                    },
430                );
431            };
432
433        let set_level_0_max_sst_count_threshold_for_group_1 =
434            |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
435             max_sst_count_threshold: u32| {
436                target_groups.insert(
437                    1,
438                    CompactionGroup {
439                        group_id: 1,
440                        compaction_config: Arc::new(
441                            CompactionConfigBuilder::new()
442                                .level0_stop_write_threshold_max_sst_count(Some(
443                                    max_sst_count_threshold,
444                                ))
445                                .build(),
446                        ),
447                    },
448                );
449            };
450
451        let set_level_0_max_size_threshold_for_group_1 =
452            |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
453             max_size_threshold: u64| {
454                target_groups.insert(
455                    1,
456                    CompactionGroup {
457                        group_id: 1,
458                        compaction_config: Arc::new(
459                            CompactionConfigBuilder::new()
460                                .level0_stop_write_threshold_max_size(Some(max_size_threshold))
461                                .build(),
462                        ),
463                    },
464                );
465            };
466
467        let mut target_groups: HashMap<CompactionGroupId, CompactionGroup> = Default::default();
468        set_sub_level_number_threshold_for_group_1(&mut target_groups, 10);
469        let origin_snapshot: HashMap<CompactionGroupId, WriteLimit> = [(
470            2,
471            WriteLimit {
472                table_ids: vec![1, 2, 3],
473                reason: "for test".to_owned(),
474            },
475        )]
476        .into_iter()
477        .collect();
478        let mut version: HummockVersion = Default::default();
479        for group_id in 1..=3 {
480            version.levels.insert(group_id, Levels::default());
481        }
482        let new_write_limits =
483            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
484        assert_eq!(
485            new_write_limits, origin_snapshot,
486            "write limit should not be triggered for group 1"
487        );
488        assert_eq!(new_write_limits.len(), 1);
489        for _ in 1..=10 {
490            add_level_to_l0(version.levels.get_mut(&1).unwrap());
491            let new_write_limits =
492                calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
493            assert_eq!(
494                new_write_limits, origin_snapshot,
495                "write limit should not be triggered for group 1"
496            );
497        }
498        add_level_to_l0(version.levels.get_mut(&1).unwrap());
499        let new_write_limits =
500            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
501        assert_ne!(
502            new_write_limits, origin_snapshot,
503            "write limit should be triggered for group 1"
504        );
505        assert_eq!(
506            new_write_limits.get(&1).as_ref().unwrap().reason,
507            "WriteStop(l0_level_count: 11, threshold: 10) too many L0 sub levels"
508        );
509        assert_eq!(new_write_limits.len(), 2);
510
511        set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
512        let new_write_limits =
513            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
514        assert_eq!(
515            new_write_limits, origin_snapshot,
516            "write limit should not be triggered for group 1"
517        );
518
519        set_sub_level_number_threshold_for_group_1(&mut target_groups, 5);
520        let new_write_limits =
521            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
522        assert_ne!(
523            new_write_limits, origin_snapshot,
524            "write limit should be triggered for group 1"
525        );
526        assert_eq!(
527            new_write_limits.get(&1).as_ref().unwrap().reason,
528            "WriteStop(l0_level_count: 11, threshold: 5) too many L0 sub levels"
529        );
530
531        set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
532        let last_level = version
533            .levels
534            .get_mut(&1)
535            .unwrap()
536            .l0
537            .sub_levels
538            .last_mut()
539            .unwrap();
540        last_level.table_infos.extend(vec![
541            SstableInfoInner {
542                key_range: KeyRange::default(),
543                table_ids: vec![1, 2, 3],
544                total_key_count: 100,
545                sst_size: 100,
546                uncompressed_file_size: 100,
547                ..Default::default()
548            }
549            .into(),
550            SstableInfoInner {
551                key_range: KeyRange::default(),
552                table_ids: vec![1, 2, 3],
553                total_key_count: 100,
554                sst_size: 100,
555                uncompressed_file_size: 100,
556                ..Default::default()
557            }
558            .into(),
559        ]);
560        version.levels.get_mut(&1).unwrap().l0.total_file_size += 200;
561        let new_write_limits =
562            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
563        assert_eq!(
564            new_write_limits, origin_snapshot,
565            "write limit should not be triggered for group 1"
566        );
567
568        set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10);
569        let new_write_limits =
570            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
571        assert_ne!(
572            new_write_limits, origin_snapshot,
573            "write limit should be triggered for group 1"
574        );
575        assert_eq!(
576            new_write_limits.get(&1).as_ref().unwrap().reason,
577            "WriteStop(l0_size: 200, threshold: 10) too large L0 size"
578        );
579
580        set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10000);
581        let new_write_limits =
582            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
583        assert_eq!(
584            new_write_limits, origin_snapshot,
585            "write limit should not be triggered for group 1"
586        );
587
588        set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 1);
589        let new_write_limits =
590            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
591        assert_ne!(
592            new_write_limits, origin_snapshot,
593            "write limit should be triggered for group 1"
594        );
595        assert_eq!(
596            new_write_limits.get(&1).as_ref().unwrap().reason,
597            "WriteStop(l0_sst_count: 2, threshold: 1) too many L0 sst files"
598        );
599
600        set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 100);
601        let new_write_limits =
602            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
603
604        assert_eq!(
605            new_write_limits, origin_snapshot,
606            "write limit should not be triggered for group 1"
607        );
608    }
609
610    #[test]
611    fn test_estimate_table_stats() {
612        let sst = SstableInfoInner {
613            key_range: KeyRange {
614                left: vec![1; 10].into(),
615                right: vec![1; 20].into(),
616                ..Default::default()
617            },
618            table_ids: vec![1, 2, 3],
619            total_key_count: 6000,
620            uncompressed_file_size: 6_000_000,
621            ..Default::default()
622        }
623        .into();
624        let changes = estimate_table_stats(&sst);
625        assert_eq!(changes.len(), 3);
626        for stats in changes.values() {
627            assert_eq!(stats.total_key_count, 6000 / 3);
628            assert_eq!(stats.total_key_size, (10 + 20) / 2 * 6000 / 3);
629            assert_eq!(
630                stats.total_value_size,
631                (6_000_000 - (10 + 20) / 2 * 6000) / 3
632            );
633        }
634
635        let mut version = HummockVersion::default();
636        version.id = HummockVersionId::new(123);
637
638        for cg in 1..3 {
639            version.levels.insert(
640                cg,
641                Levels {
642                    levels: vec![Level {
643                        table_infos: vec![sst.clone()],
644                        ..Default::default()
645                    }],
646                    ..Default::default()
647                },
648            );
649        }
650        let HummockVersionStats {
651            hummock_version_id,
652            table_stats,
653        } = rebuild_table_stats(&version);
654        assert_eq!(hummock_version_id, version.id.to_u64());
655        assert_eq!(table_stats.len(), 3);
656        for (tid, stats) in table_stats {
657            assert_eq!(
658                stats.total_key_count,
659                changes.get(&tid).unwrap().total_key_count * 2
660            );
661            assert_eq!(
662                stats.total_key_size,
663                changes.get(&tid).unwrap().total_key_size * 2
664            );
665            assert_eq!(
666                stats.total_value_size,
667                changes.get(&tid).unwrap().total_value_size * 2
668            );
669        }
670    }
671
672    #[test]
673    fn test_estimate_table_stats_large_key_range() {
674        let sst = SstableInfoInner {
675            key_range: KeyRange {
676                left: vec![1; 1000].into(),
677                right: vec![1; 2000].into(),
678                ..Default::default()
679            },
680            table_ids: vec![1, 2, 3],
681            total_key_count: 6000,
682            uncompressed_file_size: 60_000,
683            ..Default::default()
684        }
685        .into();
686        let changes = estimate_table_stats(&sst);
687        assert_eq!(changes.len(), 3);
688        for t in &sst.table_ids {
689            let stats = changes.get(t).unwrap();
690            assert_eq!(stats.total_key_count, 6000 / 3);
691            assert_eq!(stats.total_key_size, 60_000 / 2 / 3);
692            assert_eq!(stats.total_value_size, (60_000 - 60_000 / 2) / 3);
693        }
694    }
695}