risingwave_meta/hummock/manager/
commit_epoch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_common::config::default::compaction_config;
21use risingwave_common::system_param::reader::SystemParamsRead;
22use risingwave_hummock_sdk::change_log::ChangeLogDelta;
23use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
24use risingwave_hummock_sdk::sstable_info::SstableInfo;
25use risingwave_hummock_sdk::table_stats::{
26    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats, to_prost_table_stats_map,
27};
28use risingwave_hummock_sdk::table_watermark::TableWatermarks;
29use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
30use risingwave_hummock_sdk::{
31    CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo,
32};
33use risingwave_pb::hummock::CompactionConfig;
34use risingwave_pb::hummock::compact_task::{self};
35use sea_orm::TransactionTrait;
36
37use crate::hummock::error::{Error, Result};
38use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
39use crate::hummock::manager::transaction::{
40    HummockVersionStatsTransaction, HummockVersionTransaction,
41};
42use crate::hummock::manager::versioning::Versioning;
43use crate::hummock::metrics_utils::{
44    get_or_create_local_table_stat, trigger_epoch_stat, trigger_local_table_stat, trigger_sst_stat,
45};
46use crate::hummock::model::CompactionGroup;
47use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
48use crate::hummock::time_travel::should_mark_next_time_travel_version_snapshot;
49use crate::hummock::{
50    HummockManager, commit_multi_var_with_provided_txn, start_measure_real_process_timer,
51};
52
53pub struct NewTableFragmentInfo {
54    pub table_ids: HashSet<TableId>,
55}
56
57#[derive(Default)]
58pub struct CommitEpochInfo {
59    pub sstables: Vec<LocalSstableInfo>,
60    pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
61    pub sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
62    pub new_table_fragment_infos: Vec<NewTableFragmentInfo>,
63    pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
64    /// `table_id` -> `committed_epoch`
65    pub tables_to_commit: HashMap<TableId, u64>,
66}
67
68impl HummockManager {
69    /// Caller should ensure `epoch` > `committed_epoch` of `tables_to_commit`
70    /// if tables are not newly added via `new_table_fragment_info`
71    pub async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> Result<()> {
72        let CommitEpochInfo {
73            mut sstables,
74            new_table_watermarks,
75            sst_to_context,
76            new_table_fragment_infos,
77            change_log_delta,
78            tables_to_commit,
79        } = commit_info;
80        let mut versioning_guard = self.versioning.write().await;
81        let _timer = start_measure_real_process_timer!(self, "commit_epoch");
82        // Prevent commit new epochs if this flag is set
83        if versioning_guard.disable_commit_epochs {
84            return Ok(());
85        }
86
87        assert!(!tables_to_commit.is_empty());
88
89        let versioning: &mut Versioning = &mut versioning_guard;
90        self.commit_epoch_sanity_check(
91            &tables_to_commit,
92            &sstables,
93            &sst_to_context,
94            &versioning.current_version,
95        )
96        .await?;
97
98        // Consume and aggregate table stats.
99        let mut table_stats_change = PbTableStatsMap::default();
100        for s in &mut sstables {
101            add_prost_table_stats_map(
102                &mut table_stats_change,
103                &to_prost_table_stats_map(s.table_stats.clone()),
104            );
105        }
106
107        let mut version = HummockVersionTransaction::new(
108            &mut versioning.current_version,
109            &mut versioning.hummock_version_deltas,
110            self.env.notification_manager(),
111            Some(&self.table_committed_epoch_notifiers),
112            &self.metrics,
113        );
114
115        let state_table_info = &version.latest_version().state_table_info;
116        let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
117        let mut new_table_ids = HashMap::new();
118        let mut new_compaction_groups = Vec::new();
119        let mut compaction_group_manager_txn = None;
120        let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;
121
122        // Add new table
123        for NewTableFragmentInfo { table_ids } in new_table_fragment_infos {
124            let (compaction_group_manager, compaction_group_config) =
125                if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
126                    (
127                        compaction_group_manager,
128                        (*compaction_group_config
129                            .as_ref()
130                            .expect("must be set with compaction_group_manager_txn"))
131                        .clone(),
132                    )
133                } else {
134                    let compaction_group_manager_guard =
135                        self.compaction_group_manager.write().await;
136                    let new_compaction_group_config =
137                        compaction_group_manager_guard.default_compaction_config();
138                    compaction_group_config = Some(new_compaction_group_config.clone());
139                    (
140                        compaction_group_manager_txn.insert(
141                            CompactionGroupManager::start_owned_compaction_groups_txn(
142                                compaction_group_manager_guard,
143                            ),
144                        ),
145                        new_compaction_group_config,
146                    )
147                };
148            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
149            let new_compaction_group = CompactionGroup {
150                group_id: new_compaction_group_id,
151                compaction_config: compaction_group_config.clone(),
152            };
153
154            new_compaction_groups.push(new_compaction_group.clone());
155            compaction_group_manager.insert(new_compaction_group_id, new_compaction_group);
156
157            on_handle_add_new_table(
158                state_table_info,
159                &table_ids,
160                new_compaction_group_id,
161                &mut table_compaction_group_mapping,
162                &mut new_table_ids,
163            )?;
164        }
165
166        let commit_sstables = self
167            .correct_commit_ssts(sstables, &table_compaction_group_mapping)
168            .await?;
169
170        let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
171        // fill compaction_groups
172        let mut group_id_to_config = HashMap::new();
173        if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
174            for cg_id in &modified_compaction_groups {
175                let compaction_group = compaction_group_manager
176                    .get(cg_id)
177                    .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
178                    .compaction_config();
179                group_id_to_config.insert(*cg_id, compaction_group);
180            }
181        } else {
182            let compaction_group_manager = self.compaction_group_manager.read().await;
183            for cg_id in &modified_compaction_groups {
184                let compaction_group = compaction_group_manager
185                    .try_get_compaction_group_config(*cg_id)
186                    .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
187                    .compaction_config();
188                group_id_to_config.insert(*cg_id, compaction_group);
189            }
190        }
191
192        let group_id_to_sub_levels =
193            rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);
194
195        let time_travel_delta = version.pre_commit_epoch(
196            &tables_to_commit,
197            new_compaction_groups,
198            group_id_to_sub_levels,
199            &new_table_ids,
200            new_table_watermarks,
201            change_log_delta,
202        );
203        if should_mark_next_time_travel_version_snapshot(&time_travel_delta) {
204            // Unable to invoke mark_next_time_travel_version_snapshot because versioning is already mutable borrowed.
205            versioning.time_travel_snapshot_interval_counter = u64::MAX;
206        }
207
208        // Apply stats changes.
209        let mut version_stats = HummockVersionStatsTransaction::new(
210            &mut versioning.version_stats,
211            self.env.notification_manager(),
212        );
213        add_prost_table_stats_map(&mut version_stats.table_stats, &table_stats_change);
214        if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version()) {
215            self.metrics.version_stats.reset();
216            versioning.local_metrics.clear();
217        }
218
219        trigger_local_table_stat(
220            &self.metrics,
221            &mut versioning.local_metrics,
222            &version_stats,
223            &table_stats_change,
224        );
225        for (table_id, stats) in &table_stats_change {
226            if stats.total_key_size == 0
227                && stats.total_value_size == 0
228                && stats.total_key_count == 0
229            {
230                continue;
231            }
232            let stats_value = std::cmp::max(0, stats.total_key_size + stats.total_value_size);
233            let table_metrics = get_or_create_local_table_stat(
234                &self.metrics,
235                *table_id,
236                &mut versioning.local_metrics,
237            );
238            table_metrics.inc_write_throughput(stats_value as u64);
239        }
240        let mut time_travel_version = None;
241        if versioning.time_travel_snapshot_interval_counter
242            >= self.env.opts.hummock_time_travel_snapshot_interval
243        {
244            versioning.time_travel_snapshot_interval_counter = 0;
245            time_travel_version = Some(version.latest_version());
246        } else {
247            versioning.time_travel_snapshot_interval_counter = versioning
248                .time_travel_snapshot_interval_counter
249                .saturating_add(1);
250        }
251        let time_travel_tables_to_commit =
252            table_compaction_group_mapping
253                .iter()
254                .filter_map(|(table_id, cg_id)| {
255                    tables_to_commit
256                        .get(table_id)
257                        .map(|committed_epoch| (table_id, cg_id, *committed_epoch))
258                });
259        let time_travel_table_ids: HashSet<_> = self
260            .metadata_manager
261            .catalog_controller
262            .list_time_travel_table_ids()
263            .await
264            .map_err(|e| Error::Internal(e.into()))?
265            .into_iter()
266            .map(|id| id.try_into().unwrap())
267            .collect();
268        let mut txn = self.env.meta_store_ref().conn.begin().await?;
269        let version_snapshot_sst_ids = self
270            .write_time_travel_metadata(
271                &txn,
272                time_travel_version,
273                time_travel_delta,
274                time_travel_table_ids,
275                &versioning.last_time_travel_snapshot_sst_ids,
276                time_travel_tables_to_commit,
277            )
278            .await?;
279        commit_multi_var_with_provided_txn!(
280            txn,
281            version,
282            version_stats,
283            compaction_group_manager_txn
284        )?;
285        if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
286            versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
287        }
288
289        for compaction_group_id in &modified_compaction_groups {
290            trigger_sst_stat(
291                &self.metrics,
292                None,
293                &versioning.current_version,
294                *compaction_group_id,
295            );
296        }
297        trigger_epoch_stat(&self.metrics, &versioning.current_version);
298
299        drop(versioning_guard);
300
301        // Don't trigger compactions if we enable deterministic compaction
302        if !self.env.opts.compaction_deterministic_test {
303            // commit_epoch may contains SSTs from any compaction group
304            for id in &modified_compaction_groups {
305                self.try_send_compaction_request(*id, compact_task::TaskType::Dynamic);
306            }
307            if !table_stats_change.is_empty() {
308                self.collect_table_write_throughput(table_stats_change)
309                    .await;
310            }
311        }
312        if !modified_compaction_groups.is_empty() {
313            self.try_update_write_limits(&modified_compaction_groups)
314                .await;
315        }
316        #[cfg(test)]
317        {
318            self.check_state_consistency().await;
319        }
320        Ok(())
321    }
322
323    async fn collect_table_write_throughput(&self, table_stats: PbTableStatsMap) {
324        let params = self.env.system_params_reader().await;
325        let barrier_interval_ms = params.barrier_interval_ms() as u64;
326        let checkpoint_secs = std::cmp::max(
327            1,
328            params.checkpoint_frequency() * barrier_interval_ms / 1000,
329        );
330
331        let mut table_throughput_statistic_manager =
332            self.table_write_throughput_statistic_manager.write();
333        let timestamp = chrono::Utc::now().timestamp();
334
335        for (table_id, stat) in table_stats {
336            let throughput = ((stat.total_value_size + stat.total_key_size) as f64
337                / checkpoint_secs as f64) as u64;
338            table_throughput_statistic_manager
339                .add_table_throughput_with_ts(table_id, throughput, timestamp);
340        }
341    }
342
343    async fn correct_commit_ssts(
344        &self,
345        sstables: Vec<LocalSstableInfo>,
346        table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
347    ) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
348        let mut new_sst_id_number = 0;
349        let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
350        let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
351        for commit_sst in sstables {
352            let mut group_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
353            for table_id in &commit_sst.sst_info.table_ids {
354                match table_compaction_group_mapping.get(&TableId::new(*table_id)) {
355                    Some(cg_id_from_meta) => {
356                        group_table_ids
357                            .entry(*cg_id_from_meta)
358                            .or_default()
359                            .push(*table_id);
360                    }
361                    None => {
362                        tracing::warn!(
363                            table_id = *table_id,
364                            object_id = commit_sst.sst_info.object_id,
365                            "table doesn't belong to any compaction group",
366                        );
367                    }
368                }
369            }
370
371            new_sst_id_number += group_table_ids.len() * 2; // `split_sst` will split the SST into two parts and consumer 2 SST IDs
372            sst_to_cg_vec.push((commit_sst, group_table_ids));
373        }
374
375        // Generate new SST IDs for each compaction group
376        // `next_sstable_object_id` will update the global SST ID and reserve the new SST IDs
377        // So we need to get the new SST ID first and then split the SSTs
378        let mut new_sst_id = next_sstable_object_id(&self.env, new_sst_id_number).await?;
379        let mut commit_sstables: BTreeMap<u64, Vec<SstableInfo>> = BTreeMap::new();
380
381        for (mut sst, group_table_ids) in sst_to_cg_vec {
382            let len = group_table_ids.len();
383            for (index, (group_id, match_ids)) in group_table_ids.into_iter().enumerate() {
384                if sst.sst_info.table_ids == match_ids {
385                    // The SST contains all the tables in the group should be last key
386                    assert!(
387                        index == len - 1,
388                        "SST should be the last key in the group {} index {} len {}",
389                        group_id,
390                        index,
391                        len
392                    );
393                    commit_sstables
394                        .entry(group_id)
395                        .or_default()
396                        .push(sst.sst_info);
397                    break;
398                }
399
400                let origin_sst_size = sst.sst_info.sst_size;
401                let new_sst_size = match_ids
402                    .iter()
403                    .map(|id| {
404                        let stat = sst.table_stats.get(id).unwrap();
405                        stat.total_compressed_size
406                    })
407                    .sum();
408
409                if new_sst_size == 0 {
410                    tracing::warn!(
411                        id = sst.sst_info.sst_id,
412                        object_id = sst.sst_info.object_id,
413                        match_ids = ?match_ids,
414                        "Sstable doesn't contain any data for tables",
415                    );
416                }
417
418                let old_sst_size = origin_sst_size.saturating_sub(new_sst_size);
419                if old_sst_size == 0 {
420                    tracing::warn!(
421                        id = sst.sst_info.sst_id,
422                        object_id = sst.sst_info.object_id,
423                        match_ids = ?match_ids,
424                        origin_sst_size = origin_sst_size,
425                        new_sst_size = new_sst_size,
426                        "Sstable doesn't contain any data for tables",
427                    );
428                }
429                let (modified_sst_info, branch_sst) = split_sst_with_table_ids(
430                    &sst.sst_info,
431                    &mut new_sst_id,
432                    old_sst_size,
433                    new_sst_size,
434                    match_ids,
435                );
436                sst.sst_info = modified_sst_info;
437
438                commit_sstables
439                    .entry(group_id)
440                    .or_default()
441                    .push(branch_sst);
442            }
443        }
444
445        // order check
446        for ssts in commit_sstables.values() {
447            let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
448            assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
449        }
450
451        Ok(commit_sstables)
452    }
453}
454
455fn on_handle_add_new_table(
456    state_table_info: &HummockVersionStateTableInfo,
457    table_ids: impl IntoIterator<Item = &TableId>,
458    compaction_group_id: CompactionGroupId,
459    table_compaction_group_mapping: &mut HashMap<TableId, CompactionGroupId>,
460    new_table_ids: &mut HashMap<TableId, CompactionGroupId>,
461) -> Result<()> {
462    for table_id in table_ids {
463        if let Some(info) = state_table_info.info().get(table_id) {
464            return Err(Error::CompactionGroup(format!(
465                "table {} already exist {:?}",
466                table_id.table_id, info,
467            )));
468        }
469        table_compaction_group_mapping.insert(*table_id, compaction_group_id);
470        new_table_ids.insert(*table_id, compaction_group_id);
471    }
472
473    Ok(())
474}
475
476/// Rewrite the commit sstables to sub-levels based on the compaction group config.
477/// The type of `compaction_group_manager_txn` is too complex to be used in the function signature. So we use `HashMap` instead.
478fn rewrite_commit_sstables_to_sub_level(
479    commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
480    group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
481) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
482    let mut overlapping_sstables: BTreeMap<u64, Vec<Vec<SstableInfo>>> = BTreeMap::new();
483    for (group_id, inserted_table_infos) in commit_sstables {
484        let config = group_id_to_config
485            .get(&group_id)
486            .expect("compaction group should exist");
487
488        let mut accumulated_size = 0;
489        let mut ssts = vec![];
490        let sub_level_size_limit = config
491            .max_overlapping_level_size
492            .unwrap_or(compaction_config::max_overlapping_level_size());
493
494        let level = overlapping_sstables.entry(group_id).or_default();
495
496        for sst in inserted_table_infos {
497            accumulated_size += sst.sst_size;
498            ssts.push(sst);
499            if accumulated_size > sub_level_size_limit {
500                level.push(ssts);
501
502                // reset the accumulated size and ssts
503                accumulated_size = 0;
504                ssts = vec![];
505            }
506        }
507
508        if !ssts.is_empty() {
509            level.push(ssts);
510        }
511
512        // The uploader organizes the ssts in decreasing epoch order, so the level needs to be reversed to ensure that the latest epoch is at the top.
513        level.reverse();
514    }
515
516    overlapping_sstables
517}
518
519fn is_ordered_subset(vec_1: &Vec<u64>, vec_2: &Vec<u64>) -> bool {
520    let mut vec_2_iter = vec_2.iter().peekable();
521    for item in vec_1 {
522        if vec_2_iter.peek() == Some(&item) {
523            vec_2_iter.next();
524        }
525    }
526
527    vec_2_iter.peek().is_none()
528}