risingwave_meta/hummock/manager/
versioning.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp;
16use std::collections::Bound::{Excluded, Included};
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use itertools::Itertools;
20use risingwave_hummock_sdk::change_log::{EpochNewChangeLog, TableChangeLog, TableChangeLogs};
21use risingwave_hummock_sdk::compaction_group::StateTableId;
22use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
23    BranchedSstInfo, get_compaction_group_ids, get_table_compaction_group_id_mapping,
24};
25use risingwave_hummock_sdk::sstable_info::SstableInfo;
26use risingwave_hummock_sdk::table_stats::{PbTableStatsMap, add_prost_table_stats_map};
27use risingwave_hummock_sdk::version::{
28    HummockVersion, HummockVersionDelta, MAX_HUMMOCK_VERSION_ID,
29};
30use risingwave_hummock_sdk::{
31    CompactionGroupId, HummockContextId, HummockObjectId, HummockSstableId, HummockSstableObjectId,
32    HummockVersionId, get_stale_object_ids,
33};
34use risingwave_meta_model::{Epoch, hummock_table_change_log};
35use risingwave_pb::common::WorkerNode;
36use risingwave_pb::hummock::write_limits::WriteLimit;
37use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats};
38use risingwave_pb::id::TableId;
39use risingwave_pb::meta::subscribe_response::{Info, Operation};
40use sea_orm::{EntityTrait, QuerySelect, TransactionTrait};
41
42use super::GroupStateValidator;
43use crate::MetaResult;
44use crate::hummock::HummockManager;
45use crate::hummock::error::Result;
46use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
47use crate::hummock::manager::commit_multi_var;
48use crate::hummock::manager::context::ContextInfo;
49use crate::hummock::manager::transaction::HummockVersionTransaction;
50use crate::hummock::metrics_utils::{LocalTableMetrics, trigger_write_stop_stats};
51use crate::hummock::model::CompactionGroup;
52use crate::hummock::model::ext::to_table_change_log_meta_store_model;
53use crate::model::VarTransaction;
54
55#[derive(Default)]
56pub struct Versioning {
57    // Volatile states below
58    /// Avoid commit epoch epochs
59    /// Don't persist compaction version delta to meta store
60    pub disable_commit_epochs: bool,
61    /// Latest hummock version
62    pub current_version: HummockVersion,
63    pub local_metrics: HashMap<TableId, LocalTableMetrics>,
64    pub time_travel_snapshot_interval_counter: u64,
65    /// Used to avoid the attempts to rewrite the same SST to meta store
66    pub last_time_travel_snapshot_sst_ids: HashSet<HummockSstableId>,
67
68    // Persistent states below
69    pub hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta>,
70    /// Stats for latest hummock version.
71    pub version_stats: HummockVersionStats,
72    pub checkpoint: HummockVersionCheckpoint,
73    pub table_change_log: HashMap<TableId, TableChangeLog>,
74}
75
76impl ContextInfo {
77    pub fn min_pinned_version_id(&self) -> HummockVersionId {
78        let mut min_pinned_version_id = MAX_HUMMOCK_VERSION_ID;
79        for id in self
80            .pinned_versions
81            .values()
82            .map(|v| v.min_pinned_id)
83            .chain(self.version_safe_points.iter().cloned())
84        {
85            min_pinned_version_id = cmp::min(id, min_pinned_version_id);
86        }
87        min_pinned_version_id
88    }
89}
90
91impl Versioning {
92    pub(super) fn mark_next_time_travel_version_snapshot(&mut self) {
93        self.time_travel_snapshot_interval_counter = u64::MAX;
94    }
95
96    pub fn get_tracked_object_ids(
97        &self,
98        min_pinned_version_id: HummockVersionId,
99    ) -> HashSet<HummockObjectId> {
100        // object ids in checkpoint version
101        let mut tracked_object_ids = self
102            .checkpoint
103            .version
104            .get_object_ids()
105            .chain(
106                self.table_change_log
107                    .values()
108                    .flat_map(|c| c.get_object_ids()),
109            )
110            .collect::<HashSet<_>>();
111        // add object ids added between checkpoint version and current version
112        for (_, delta) in self.hummock_version_deltas.range((
113            Excluded(self.checkpoint.version.id),
114            Included(self.current_version.id),
115        )) {
116            tracked_object_ids.extend(delta.newly_added_object_ids(false));
117        }
118        // add stale object ids before the checkpoint version
119        tracked_object_ids.extend(
120            self.checkpoint
121                .stale_objects
122                .iter()
123                .filter(|(version_id, _)| **version_id >= min_pinned_version_id)
124                .flat_map(|(_, objects)| get_stale_object_ids(objects)),
125        );
126        tracked_object_ids
127    }
128}
129
130impl HummockManager {
131    pub async fn list_pinned_version(&self) -> Vec<HummockPinnedVersion> {
132        self.context_info
133            .read()
134            .await
135            .pinned_versions
136            .values()
137            .cloned()
138            .collect_vec()
139    }
140
141    pub async fn list_workers(
142        &self,
143        context_ids: &[HummockContextId],
144    ) -> MetaResult<HashMap<HummockContextId, WorkerNode>> {
145        let mut workers = HashMap::new();
146        for context_id in context_ids {
147            if let Some(worker_node) = self
148                .metadata_manager()
149                .get_worker_by_id(*context_id as _)
150                .await?
151            {
152                workers.insert(*context_id, worker_node);
153            }
154        }
155        Ok(workers)
156    }
157
158    /// Gets current version without pinning it.
159    /// Should not be called inside [`HummockManager`], because it requests locks internally.
160    ///
161    /// Note: this method can hurt performance because it will clone a large object.
162    #[cfg(any(test, feature = "test"))]
163    pub async fn get_current_version(&self) -> HummockVersion {
164        self.on_current_version(|version| version.clone()).await
165    }
166
167    pub async fn on_current_version<T>(&self, mut f: impl FnMut(&HummockVersion) -> T) -> T {
168        f(&self.versioning.read().await.current_version)
169    }
170
171    pub async fn on_current_version_and_table_change_log<T>(
172        &self,
173        mut f: impl FnMut(&HummockVersion, &TableChangeLogs) -> T,
174    ) -> T {
175        let guard = self.versioning.read().await;
176        f(&guard.current_version, &guard.table_change_log)
177    }
178
179    pub async fn get_version_id(&self) -> HummockVersionId {
180        self.on_current_version(|version| version.id).await
181    }
182
183    /// Gets the mapping from table id to compaction group id
184    pub async fn get_table_compaction_group_id_mapping(
185        &self,
186    ) -> HashMap<StateTableId, CompactionGroupId> {
187        get_table_compaction_group_id_mapping(&self.versioning.read().await.current_version)
188    }
189
190    /// Get version deltas from meta store
191    pub async fn list_version_deltas(
192        &self,
193        start_id: HummockVersionId,
194        num_limit: u32,
195    ) -> Result<Vec<HummockVersionDelta>> {
196        let versioning = self.versioning.read().await;
197        let version_deltas = versioning
198            .hummock_version_deltas
199            .range(start_id..)
200            .map(|(_id, delta)| delta)
201            .take(num_limit as _)
202            .cloned()
203            .collect();
204        Ok(version_deltas)
205    }
206
207    pub async fn get_version_stats(&self) -> HummockVersionStats {
208        self.versioning.read().await.version_stats.clone()
209    }
210
211    /// Updates write limits for `target_groups` and sends notification.
212    /// Returns true if `write_limit` has been modified.
213    /// The implementation acquires `versioning` lock and `compaction_group_manager` lock.
214    pub(super) async fn try_update_write_limits(
215        &self,
216        target_group_ids: &[CompactionGroupId],
217    ) -> bool {
218        let versioning = self.versioning.read().await;
219        let mut cg_manager = self.compaction_group_manager.write().await;
220        let target_group_configs = target_group_ids
221            .iter()
222            .filter_map(|id| {
223                cg_manager
224                    .try_get_compaction_group_config(*id)
225                    .map(|config| (*id, config))
226            })
227            .collect();
228        let mut new_write_limits = calc_new_write_limits(
229            target_group_configs,
230            cg_manager.write_limit.clone(),
231            &versioning.current_version,
232        );
233        let all_group_ids: HashSet<_> =
234            HashSet::from_iter(get_compaction_group_ids(&versioning.current_version));
235        new_write_limits.retain(|group_id, _| all_group_ids.contains(group_id));
236        if new_write_limits == cg_manager.write_limit {
237            return false;
238        }
239        tracing::debug!("Hummock stopped write is updated: {:#?}", new_write_limits);
240        trigger_write_stop_stats(&self.metrics, &new_write_limits);
241        cg_manager.write_limit = new_write_limits;
242        self.env
243            .notification_manager()
244            .notify_hummock_without_version(
245                Operation::Add,
246                Info::HummockWriteLimits(risingwave_pb::hummock::WriteLimits {
247                    write_limits: cg_manager.write_limit.clone(),
248                }),
249            );
250        true
251    }
252
253    /// Gets write limits.
254    /// The implementation acquires `versioning` lock.
255    pub async fn write_limits(&self) -> HashMap<CompactionGroupId, WriteLimit> {
256        let guard = self.compaction_group_manager.read().await;
257        guard.write_limit.clone()
258    }
259
260    pub async fn list_branched_objects(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
261        let guard = self.versioning.read().await;
262        guard.current_version.build_branched_sst_info()
263    }
264
265    pub async fn rebuild_table_stats(&self) -> Result<()> {
266        let mut versioning = self.versioning.write().await;
267        let new_stats = rebuild_table_stats(&versioning.current_version);
268        let mut version_stats = VarTransaction::new(&mut versioning.version_stats);
269        // version_stats.hummock_version_id is always 0 in meta store.
270        version_stats.table_stats = new_stats.table_stats;
271        commit_multi_var!(self.meta_store_ref(), version_stats)?;
272        Ok(())
273    }
274
275    pub async fn may_fill_backward_state_table_info(&self) -> Result<()> {
276        let mut versioning = self.versioning.write().await;
277        if versioning
278            .current_version
279            .need_fill_backward_compatible_state_table_info_delta()
280        {
281            let versioning: &mut Versioning = &mut versioning;
282            let mut version = HummockVersionTransaction::new(
283                &mut versioning.current_version,
284                &mut versioning.hummock_version_deltas,
285                &mut versioning.table_change_log,
286                self.env.notification_manager(),
287                None,
288                &self.metrics,
289                &self.env.opts,
290            );
291            let mut new_version_delta = version.new_delta();
292            new_version_delta.with_latest_version(|version, delta| {
293                version.may_fill_backward_compatible_state_table_info_delta(delta)
294            });
295            new_version_delta.pre_apply();
296            commit_multi_var!(self.meta_store_ref(), version)?;
297        }
298        Ok(())
299    }
300
301    pub async fn may_fill_backward_table_change_logs(&self) -> Result<()> {
302        let mut versioning = self.versioning.write().await;
303        let version = &mut versioning.current_version;
304
305        let is_nonempty_meta_store =
306            risingwave_meta_model::hummock_table_change_log::Entity::find()
307                .select_only()
308                .columns([
309                    hummock_table_change_log::Column::TableId,
310                    hummock_table_change_log::Column::CheckpointEpoch,
311                ])
312                .into_tuple::<(TableId, Epoch)>()
313                .one(&self.env.meta_store_ref().conn)
314                .await?
315                .is_some();
316        #[expect(deprecated)]
317        if version.table_change_log.is_empty() || is_nonempty_meta_store {
318            // Either there are no table change logs to commit to the metastore, or the operation has already been completed.
319            return Ok(());
320        }
321
322        // Remove table change log from version.
323        #[expect(deprecated)]
324        let table_change_logs = {
325            let table_change_logs = std::mem::take(&mut version.table_change_log);
326            if table_change_logs.values().all(|t| t.is_empty()) {
327                return Ok(());
328            }
329            table_change_logs
330                .into_iter()
331                .flat_map(|(table_id, change_logs)| {
332                    change_logs
333                        .into_iter()
334                        .map(move |change_log| (table_id, change_log))
335                })
336        };
337
338        // Store table change log in meta store.
339        let insert_batch_size = self.env.opts.table_change_log_insert_batch_size as usize;
340        use futures::stream::{self, StreamExt};
341        let mut stream = stream::iter(table_change_logs).chunks(insert_batch_size);
342        let txn = self.env.meta_store_ref().conn.begin().await?;
343        while let Some(change_log_batch) = stream.next().await {
344            if change_log_batch.is_empty() {
345                break;
346            }
347            let insert_many = change_log_batch
348                .into_iter()
349                .map(|(table_id, change_log)| {
350                    to_table_change_log_meta_store_model(table_id, &change_log)
351                })
352                .collect::<Vec<_>>();
353            risingwave_meta_model::hummock_table_change_log::Entity::insert_many(insert_many)
354                .exec(&txn)
355                .await?;
356        }
357        txn.commit().await?;
358        Ok(())
359    }
360
361    pub async fn get_table_change_logs(
362        &self,
363        epoch_only: bool,
364        start_epoch_inclusive: Option<u64>,
365        end_epoch_inclusive: Option<u64>,
366        table_ids: Option<HashSet<TableId>>,
367        exclude_empty: bool,
368        limit: Option<u32>,
369    ) -> TableChangeLogs {
370        self.on_current_version_and_table_change_log(|_, table_change_logs| {
371            table_change_logs
372                .iter()
373                .filter_map(|(id, change_log)| {
374                    if let Some(table_filter) = &table_ids
375                        && !table_filter.contains(id)
376                    {
377                        return None;
378                    }
379                    let filtered_change_logs = change_log
380                        .filter_epoch((
381                            start_epoch_inclusive.unwrap_or(0),
382                            end_epoch_inclusive.unwrap_or(u64::MAX),
383                        ))
384                        .filter(|change_log| {
385                            if exclude_empty
386                                && change_log.new_value.is_empty()
387                                && change_log.old_value.is_empty()
388                            {
389                                return false;
390                            }
391                            true
392                        })
393                        .take(limit.map(|l| l as usize).unwrap_or(usize::MAX))
394                        .map(|change_log| {
395                            if epoch_only {
396                                EpochNewChangeLog {
397                                    new_value: vec![],
398                                    old_value: vec![],
399                                    non_checkpoint_epochs: change_log.non_checkpoint_epochs.clone(),
400                                    checkpoint_epoch: change_log.checkpoint_epoch,
401                                }
402                            } else {
403                                change_log.clone()
404                            }
405                        });
406                    Some((id.to_owned(), TableChangeLog::new(filtered_change_logs)))
407                })
408                .collect()
409        })
410        .await
411    }
412}
413
414/// Calculates write limits for `target_groups`.
415/// Returns a new complete write limits snapshot based on `origin_snapshot` and `version`.
416pub(super) fn calc_new_write_limits(
417    target_groups: HashMap<CompactionGroupId, CompactionGroup>,
418    origin_snapshot: HashMap<CompactionGroupId, WriteLimit>,
419    version: &HummockVersion,
420) -> HashMap<CompactionGroupId, WriteLimit> {
421    let mut new_write_limits = origin_snapshot;
422    for (id, config) in &target_groups {
423        let levels = match version.levels.get(id) {
424            None => {
425                new_write_limits.remove(id);
426                continue;
427            }
428            Some(levels) => levels,
429        };
430
431        let group_state = GroupStateValidator::check_single_group_write_stop(
432            levels,
433            config.compaction_config.as_ref(),
434        );
435
436        if group_state.is_write_stop() {
437            new_write_limits.insert(
438                *id,
439                WriteLimit {
440                    table_ids: version
441                        .state_table_info
442                        .compaction_group_member_table_ids(*id)
443                        .iter()
444                        .copied()
445                        .collect(),
446                    reason: group_state.reason().unwrap().to_owned(),
447                },
448            );
449            continue;
450        }
451        // No condition is met.
452        new_write_limits.remove(id);
453    }
454    new_write_limits
455}
456
457/// Rebuilds table stats from the given version.
458/// Note that the result is approximate value. See `estimate_table_stats`.
459fn rebuild_table_stats(version: &HummockVersion) -> HummockVersionStats {
460    let mut stats = HummockVersionStats {
461        hummock_version_id: version.id,
462        table_stats: Default::default(),
463    };
464    for level in version.get_combined_levels() {
465        for sst in &level.table_infos {
466            let changes = estimate_table_stats(sst);
467            add_prost_table_stats_map(&mut stats.table_stats, &changes);
468        }
469    }
470    stats
471}
472
473/// Estimates table stats change from the given file.
474/// - The file stats is evenly distributed among multiple tables within the file.
475/// - The total key size and total value size are estimated based on key range and file size.
476/// - Branched files may lead to an overestimation.
477fn estimate_table_stats(sst: &SstableInfo) -> PbTableStatsMap {
478    let mut changes: PbTableStatsMap = HashMap::default();
479    let weighted_value =
480        |value: i64| -> i64 { (value as f64 / sst.table_ids.len() as f64).ceil() as i64 };
481    let key_range = &sst.key_range;
482    let estimated_key_size: u64 = (key_range.left.len() + key_range.right.len()) as u64 / 2;
483    let mut estimated_total_key_size = estimated_key_size * sst.total_key_count;
484    if estimated_total_key_size > sst.uncompressed_file_size {
485        estimated_total_key_size = sst.uncompressed_file_size / 2;
486        tracing::warn!(
487            %sst.sst_id,
488            "Calculated estimated_total_key_size {} > uncompressed_file_size {}. Use uncompressed_file_size/2 as estimated_total_key_size instead.",
489            estimated_total_key_size,
490            sst.uncompressed_file_size
491        );
492    }
493    let estimated_total_value_size = sst.uncompressed_file_size - estimated_total_key_size;
494    for table_id in &sst.table_ids {
495        let e = changes.entry(*table_id).or_default();
496        e.total_key_count += weighted_value(sst.total_key_count as i64);
497        e.total_key_size += weighted_value(estimated_total_key_size as i64);
498        e.total_value_size += weighted_value(estimated_total_value_size as i64);
499    }
500    changes
501}
502
503#[cfg(test)]
504mod tests {
505    use std::collections::HashMap;
506    use std::sync::Arc;
507
508    use itertools::Itertools;
509    use risingwave_hummock_sdk::key_range::KeyRange;
510    use risingwave_hummock_sdk::level::{Level, Levels};
511    use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
512    use risingwave_hummock_sdk::version::{HummockVersion, MAX_HUMMOCK_VERSION_ID};
513    use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId};
514    use risingwave_pb::hummock::write_limits::WriteLimit;
515    use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats};
516
517    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
518    use crate::hummock::manager::context::ContextInfo;
519    use crate::hummock::manager::versioning::{
520        calc_new_write_limits, estimate_table_stats, rebuild_table_stats,
521    };
522    use crate::hummock::model::CompactionGroup;
523
524    #[test]
525    fn test_min_pinned_version_id() {
526        let mut context_info = ContextInfo::default();
527        assert_eq!(context_info.min_pinned_version_id(), MAX_HUMMOCK_VERSION_ID);
528        context_info.pinned_versions.insert(
529            1.into(),
530            HummockPinnedVersion {
531                context_id: 1.into(),
532                min_pinned_id: 10.into(),
533            },
534        );
535        assert_eq!(context_info.min_pinned_version_id(), 10);
536        context_info
537            .version_safe_points
538            .push(HummockVersionId::new(5));
539        assert_eq!(context_info.min_pinned_version_id(), 5);
540        context_info.version_safe_points.clear();
541        assert_eq!(context_info.min_pinned_version_id(), 10);
542        context_info.pinned_versions.clear();
543        assert_eq!(context_info.min_pinned_version_id(), MAX_HUMMOCK_VERSION_ID);
544    }
545
546    #[test]
547    fn test_calc_new_write_limits() {
548        let add_level_to_l0 = |levels: &mut Levels| {
549            levels.l0.sub_levels.push(Level::default());
550        };
551        let set_sub_level_number_threshold_for_group_1 =
552            |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
553             sub_level_number_threshold: u64| {
554                target_groups.insert(
555                    1.into(),
556                    CompactionGroup {
557                        group_id: 1.into(),
558                        compaction_config: Arc::new(
559                            CompactionConfigBuilder::new()
560                                .level0_stop_write_threshold_sub_level_number(
561                                    sub_level_number_threshold,
562                                )
563                                .build(),
564                        ),
565                    },
566                );
567            };
568
569        let set_level_0_max_sst_count_threshold_for_group_1 =
570            |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
571             max_sst_count_threshold: u32| {
572                target_groups.insert(
573                    1.into(),
574                    CompactionGroup {
575                        group_id: 1.into(),
576                        compaction_config: Arc::new(
577                            CompactionConfigBuilder::new()
578                                .level0_stop_write_threshold_max_sst_count(Some(
579                                    max_sst_count_threshold,
580                                ))
581                                .build(),
582                        ),
583                    },
584                );
585            };
586
587        let set_level_0_max_size_threshold_for_group_1 =
588            |target_groups: &mut HashMap<CompactionGroupId, CompactionGroup>,
589             max_size_threshold: u64| {
590                target_groups.insert(
591                    1.into(),
592                    CompactionGroup {
593                        group_id: 1.into(),
594                        compaction_config: Arc::new(
595                            CompactionConfigBuilder::new()
596                                .level0_stop_write_threshold_max_size(Some(max_size_threshold))
597                                .build(),
598                        ),
599                    },
600                );
601            };
602
603        let mut target_groups: HashMap<CompactionGroupId, CompactionGroup> = Default::default();
604        set_sub_level_number_threshold_for_group_1(&mut target_groups, 10);
605        let origin_snapshot: HashMap<CompactionGroupId, WriteLimit> = [(
606            2.into(),
607            WriteLimit {
608                table_ids: [1, 2, 3].into_iter().map_into().collect(),
609                reason: "for test".to_owned(),
610            },
611        )]
612        .into_iter()
613        .collect();
614        let mut version: HummockVersion = Default::default();
615        for group_id in 1..=3 {
616            version.levels.insert(group_id.into(), Levels::default());
617        }
618        let new_write_limits =
619            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
620        assert_eq!(
621            new_write_limits, origin_snapshot,
622            "write limit should not be triggered for group 1"
623        );
624        assert_eq!(new_write_limits.len(), 1);
625        for _ in 1..=10 {
626            add_level_to_l0(version.levels.get_mut(&1).unwrap());
627            let new_write_limits =
628                calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
629            assert_eq!(
630                new_write_limits, origin_snapshot,
631                "write limit should not be triggered for group 1"
632            );
633        }
634        add_level_to_l0(version.levels.get_mut(&1).unwrap());
635        let new_write_limits =
636            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
637        assert_ne!(
638            new_write_limits, origin_snapshot,
639            "write limit should be triggered for group 1"
640        );
641        assert_eq!(
642            new_write_limits.get(&1).as_ref().unwrap().reason,
643            "WriteStop(l0_level_count: 11, threshold: 10) too many L0 sub levels"
644        );
645        assert_eq!(new_write_limits.len(), 2);
646
647        set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
648        let new_write_limits =
649            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
650        assert_eq!(
651            new_write_limits, origin_snapshot,
652            "write limit should not be triggered for group 1"
653        );
654
655        set_sub_level_number_threshold_for_group_1(&mut target_groups, 5);
656        let new_write_limits =
657            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
658        assert_ne!(
659            new_write_limits, origin_snapshot,
660            "write limit should be triggered for group 1"
661        );
662        assert_eq!(
663            new_write_limits.get(&1).as_ref().unwrap().reason,
664            "WriteStop(l0_level_count: 11, threshold: 5) too many L0 sub levels"
665        );
666
667        set_sub_level_number_threshold_for_group_1(&mut target_groups, 100);
668        let last_level = version
669            .levels
670            .get_mut(&1)
671            .unwrap()
672            .l0
673            .sub_levels
674            .last_mut()
675            .unwrap();
676        last_level.table_infos.extend(vec![
677            SstableInfoInner {
678                key_range: KeyRange::default(),
679                table_ids: vec![1.into(), 2.into(), 3.into()],
680                total_key_count: 100,
681                sst_size: 100,
682                uncompressed_file_size: 100,
683                ..Default::default()
684            }
685            .into(),
686            SstableInfoInner {
687                key_range: KeyRange::default(),
688                table_ids: vec![1.into(), 2.into(), 3.into()],
689                total_key_count: 100,
690                sst_size: 100,
691                uncompressed_file_size: 100,
692                ..Default::default()
693            }
694            .into(),
695        ]);
696        version.levels.get_mut(&1).unwrap().l0.total_file_size += 200;
697        let new_write_limits =
698            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
699        assert_eq!(
700            new_write_limits, origin_snapshot,
701            "write limit should not be triggered for group 1"
702        );
703
704        set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10);
705        let new_write_limits =
706            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
707        assert_ne!(
708            new_write_limits, origin_snapshot,
709            "write limit should be triggered for group 1"
710        );
711        assert_eq!(
712            new_write_limits.get(&1).as_ref().unwrap().reason,
713            "WriteStop(l0_size: 200, threshold: 10) too large L0 size"
714        );
715
716        set_level_0_max_size_threshold_for_group_1(&mut target_groups, 10000);
717        let new_write_limits =
718            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
719        assert_eq!(
720            new_write_limits, origin_snapshot,
721            "write limit should not be triggered for group 1"
722        );
723
724        set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 1);
725        let new_write_limits =
726            calc_new_write_limits(target_groups.clone(), origin_snapshot.clone(), &version);
727        assert_ne!(
728            new_write_limits, origin_snapshot,
729            "write limit should be triggered for group 1"
730        );
731        assert_eq!(
732            new_write_limits.get(&1).as_ref().unwrap().reason,
733            "WriteStop(l0_sst_count: 2, threshold: 1) too many L0 sst files"
734        );
735
736        set_level_0_max_sst_count_threshold_for_group_1(&mut target_groups, 100);
737        let new_write_limits =
738            calc_new_write_limits(target_groups, origin_snapshot.clone(), &version);
739
740        assert_eq!(
741            new_write_limits, origin_snapshot,
742            "write limit should not be triggered for group 1"
743        );
744    }
745
746    #[test]
747    fn test_estimate_table_stats() {
748        let sst = SstableInfoInner {
749            key_range: KeyRange {
750                left: vec![1; 10].into(),
751                right: vec![1; 20].into(),
752                ..Default::default()
753            },
754            table_ids: vec![1.into(), 2.into(), 3.into()],
755            total_key_count: 6000,
756            uncompressed_file_size: 6_000_000,
757            ..Default::default()
758        }
759        .into();
760        let changes = estimate_table_stats(&sst);
761        assert_eq!(changes.len(), 3);
762        for stats in changes.values() {
763            assert_eq!(stats.total_key_count, 6000 / 3);
764            assert_eq!(stats.total_key_size, (10 + 20) / 2 * 6000 / 3);
765            assert_eq!(
766                stats.total_value_size,
767                (6_000_000 - (10 + 20) / 2 * 6000) / 3
768            );
769        }
770
771        let mut version = HummockVersion::default();
772        version.id = HummockVersionId::new(123);
773
774        for cg in 1..3 {
775            version.levels.insert(
776                cg.into(),
777                Levels {
778                    levels: vec![Level {
779                        table_infos: vec![sst.clone()],
780                        ..Default::default()
781                    }],
782                    ..Default::default()
783                },
784            );
785        }
786        let HummockVersionStats {
787            hummock_version_id,
788            table_stats,
789        } = rebuild_table_stats(&version);
790        assert_eq!(hummock_version_id, version.id);
791        assert_eq!(table_stats.len(), 3);
792        for (tid, stats) in table_stats {
793            assert_eq!(
794                stats.total_key_count,
795                changes.get(&tid).unwrap().total_key_count * 2
796            );
797            assert_eq!(
798                stats.total_key_size,
799                changes.get(&tid).unwrap().total_key_size * 2
800            );
801            assert_eq!(
802                stats.total_value_size,
803                changes.get(&tid).unwrap().total_value_size * 2
804            );
805        }
806    }
807
808    #[test]
809    fn test_estimate_table_stats_large_key_range() {
810        let sst = SstableInfoInner {
811            key_range: KeyRange {
812                left: vec![1; 1000].into(),
813                right: vec![1; 2000].into(),
814                ..Default::default()
815            },
816            table_ids: vec![1.into(), 2.into(), 3.into()],
817            total_key_count: 6000,
818            uncompressed_file_size: 60_000,
819            ..Default::default()
820        }
821        .into();
822        let changes = estimate_table_stats(&sst);
823        assert_eq!(changes.len(), 3);
824        for t in &sst.table_ids {
825            let stats = changes.get(t).unwrap();
826            assert_eq!(stats.total_key_count, 6000 / 3);
827            assert_eq!(stats.total_key_size, 60_000 / 2 / 3);
828            assert_eq!(stats.total_value_size, (60_000 - 60_000 / 2) / 3);
829        }
830    }
831}