risingwave_meta/hummock/manager/
commit_epoch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_common::config::default::compaction_config;
21use risingwave_common::system_param::reader::SystemParamsRead;
22use risingwave_hummock_sdk::change_log::ChangeLogDelta;
23use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
24use risingwave_hummock_sdk::sstable_info::SstableInfo;
25use risingwave_hummock_sdk::table_stats::{
26    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats, to_prost_table_stats_map,
27};
28use risingwave_hummock_sdk::table_watermark::TableWatermarks;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
31use risingwave_hummock_sdk::{
32    CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo,
33};
34use risingwave_pb::hummock::{CompactionConfig, compact_task};
35use sea_orm::TransactionTrait;
36
37use crate::hummock::error::{Error, Result};
38use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
39use crate::hummock::manager::transaction::{
40    HummockVersionStatsTransaction, HummockVersionTransaction,
41};
42use crate::hummock::manager::versioning::Versioning;
43use crate::hummock::metrics_utils::{
44    get_or_create_local_table_stat, trigger_epoch_stat, trigger_local_table_stat, trigger_sst_stat,
45};
46use crate::hummock::model::CompactionGroup;
47use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
48use crate::hummock::time_travel::should_mark_next_time_travel_version_snapshot;
49use crate::hummock::{
50    HummockManager, commit_multi_var_with_provided_txn, start_measure_real_process_timer,
51};
52
53pub struct NewTableFragmentInfo {
54    pub table_ids: HashSet<TableId>,
55}
56
57#[derive(Default)]
58pub struct CommitEpochInfo {
59    pub sstables: Vec<LocalSstableInfo>,
60    pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
61    pub sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
62    pub new_table_fragment_infos: Vec<NewTableFragmentInfo>,
63    pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
64    pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
65    /// `table_id` -> `committed_epoch`
66    pub tables_to_commit: HashMap<TableId, u64>,
67}
68
69impl HummockManager {
70    /// Caller should ensure `epoch` > `committed_epoch` of `tables_to_commit`
71    /// if tables are not newly added via `new_table_fragment_info`
72    pub async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> Result<()> {
73        let CommitEpochInfo {
74            mut sstables,
75            new_table_watermarks,
76            sst_to_context,
77            new_table_fragment_infos,
78            change_log_delta,
79            vector_index_delta,
80            tables_to_commit,
81        } = commit_info;
82        let mut versioning_guard = self.versioning.write().await;
83        let _timer = start_measure_real_process_timer!(self, "commit_epoch");
84        // Prevent commit new epochs if this flag is set
85        if versioning_guard.disable_commit_epochs {
86            return Ok(());
87        }
88
89        assert!(!tables_to_commit.is_empty());
90
91        let versioning: &mut Versioning = &mut versioning_guard;
92        self.commit_epoch_sanity_check(
93            &tables_to_commit,
94            &sstables,
95            &sst_to_context,
96            &versioning.current_version,
97        )
98        .await?;
99
100        // Consume and aggregate table stats.
101        let mut table_stats_change = PbTableStatsMap::default();
102        for s in &mut sstables {
103            add_prost_table_stats_map(
104                &mut table_stats_change,
105                &to_prost_table_stats_map(s.table_stats.clone()),
106            );
107        }
108
109        let mut version = HummockVersionTransaction::new(
110            &mut versioning.current_version,
111            &mut versioning.hummock_version_deltas,
112            self.env.notification_manager(),
113            Some(&self.table_committed_epoch_notifiers),
114            &self.metrics,
115        );
116
117        let state_table_info = &version.latest_version().state_table_info;
118        let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
119        let mut new_table_ids = HashMap::new();
120        let mut new_compaction_groups = Vec::new();
121        let mut compaction_group_manager_txn = None;
122        let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;
123
124        // Add new table
125        for NewTableFragmentInfo { table_ids } in new_table_fragment_infos {
126            let (compaction_group_manager, compaction_group_config) =
127                if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
128                    (
129                        compaction_group_manager,
130                        (*compaction_group_config
131                            .as_ref()
132                            .expect("must be set with compaction_group_manager_txn"))
133                        .clone(),
134                    )
135                } else {
136                    let compaction_group_manager_guard =
137                        self.compaction_group_manager.write().await;
138                    let new_compaction_group_config =
139                        compaction_group_manager_guard.default_compaction_config();
140                    compaction_group_config = Some(new_compaction_group_config.clone());
141                    (
142                        compaction_group_manager_txn.insert(
143                            CompactionGroupManager::start_owned_compaction_groups_txn(
144                                compaction_group_manager_guard,
145                            ),
146                        ),
147                        new_compaction_group_config,
148                    )
149                };
150            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
151            let new_compaction_group = CompactionGroup {
152                group_id: new_compaction_group_id,
153                compaction_config: compaction_group_config.clone(),
154            };
155
156            new_compaction_groups.push(new_compaction_group.clone());
157            compaction_group_manager.insert(new_compaction_group_id, new_compaction_group);
158
159            on_handle_add_new_table(
160                state_table_info,
161                &table_ids,
162                new_compaction_group_id,
163                &mut table_compaction_group_mapping,
164                &mut new_table_ids,
165            )?;
166        }
167
168        let commit_sstables = self
169            .correct_commit_ssts(sstables, &table_compaction_group_mapping)
170            .await?;
171
172        let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
173        // fill compaction_groups
174        let mut group_id_to_config = HashMap::new();
175        if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
176            for cg_id in &modified_compaction_groups {
177                let compaction_group = compaction_group_manager
178                    .get(cg_id)
179                    .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
180                    .compaction_config();
181                group_id_to_config.insert(*cg_id, compaction_group);
182            }
183        } else {
184            let compaction_group_manager = self.compaction_group_manager.read().await;
185            for cg_id in &modified_compaction_groups {
186                let compaction_group = compaction_group_manager
187                    .try_get_compaction_group_config(*cg_id)
188                    .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
189                    .compaction_config();
190                group_id_to_config.insert(*cg_id, compaction_group);
191            }
192        }
193
194        let group_id_to_sub_levels =
195            rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);
196
197        let time_travel_delta = version.pre_commit_epoch(
198            &tables_to_commit,
199            new_compaction_groups,
200            group_id_to_sub_levels,
201            &new_table_ids,
202            new_table_watermarks,
203            change_log_delta,
204            vector_index_delta,
205        );
206        if should_mark_next_time_travel_version_snapshot(&time_travel_delta) {
207            // Unable to invoke mark_next_time_travel_version_snapshot because versioning is already mutable borrowed.
208            versioning.time_travel_snapshot_interval_counter = u64::MAX;
209        }
210
211        // Apply stats changes.
212        let mut version_stats = HummockVersionStatsTransaction::new(
213            &mut versioning.version_stats,
214            self.env.notification_manager(),
215        );
216        add_prost_table_stats_map(&mut version_stats.table_stats, &table_stats_change);
217        if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version()) {
218            self.metrics.version_stats.reset();
219            versioning.local_metrics.clear();
220        }
221
222        trigger_local_table_stat(
223            &self.metrics,
224            &mut versioning.local_metrics,
225            &version_stats,
226            &table_stats_change,
227        );
228        for (table_id, stats) in &table_stats_change {
229            if stats.total_key_size == 0
230                && stats.total_value_size == 0
231                && stats.total_key_count == 0
232            {
233                continue;
234            }
235            let stats_value = std::cmp::max(0, stats.total_key_size + stats.total_value_size);
236            let table_metrics = get_or_create_local_table_stat(
237                &self.metrics,
238                *table_id,
239                &mut versioning.local_metrics,
240            );
241            table_metrics.inc_write_throughput(stats_value as u64);
242        }
243        let mut time_travel_version = None;
244        if versioning.time_travel_snapshot_interval_counter
245            >= self.env.opts.hummock_time_travel_snapshot_interval
246        {
247            versioning.time_travel_snapshot_interval_counter = 0;
248            time_travel_version = Some(version.latest_version());
249        } else {
250            versioning.time_travel_snapshot_interval_counter = versioning
251                .time_travel_snapshot_interval_counter
252                .saturating_add(1);
253        }
254        let time_travel_tables_to_commit =
255            table_compaction_group_mapping
256                .iter()
257                .filter_map(|(table_id, cg_id)| {
258                    tables_to_commit
259                        .get(table_id)
260                        .map(|committed_epoch| (table_id, cg_id, *committed_epoch))
261                });
262        let time_travel_table_ids: HashSet<_> = self
263            .metadata_manager
264            .catalog_controller
265            .list_time_travel_table_ids()
266            .await
267            .map_err(|e| Error::Internal(e.into()))?
268            .into_iter()
269            .map(|id| id.try_into().unwrap())
270            .collect();
271        let mut txn = self.env.meta_store_ref().conn.begin().await?;
272        let version_snapshot_sst_ids = self
273            .write_time_travel_metadata(
274                &txn,
275                time_travel_version,
276                time_travel_delta,
277                time_travel_table_ids,
278                &versioning.last_time_travel_snapshot_sst_ids,
279                time_travel_tables_to_commit,
280            )
281            .await?;
282        commit_multi_var_with_provided_txn!(
283            txn,
284            version,
285            version_stats,
286            compaction_group_manager_txn
287        )?;
288        if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
289            versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
290        }
291
292        for compaction_group_id in &modified_compaction_groups {
293            trigger_sst_stat(
294                &self.metrics,
295                None,
296                &versioning.current_version,
297                *compaction_group_id,
298            );
299        }
300        trigger_epoch_stat(&self.metrics, &versioning.current_version);
301
302        drop(versioning_guard);
303
304        // Don't trigger compactions if we enable deterministic compaction
305        if !self.env.opts.compaction_deterministic_test {
306            // commit_epoch may contains SSTs from any compaction group
307            for id in &modified_compaction_groups {
308                self.try_send_compaction_request(*id, compact_task::TaskType::Dynamic);
309            }
310            if !table_stats_change.is_empty() {
311                self.collect_table_write_throughput(table_stats_change)
312                    .await;
313            }
314        }
315        if !modified_compaction_groups.is_empty() {
316            self.try_update_write_limits(&modified_compaction_groups)
317                .await;
318        }
319        #[cfg(test)]
320        {
321            self.check_state_consistency().await;
322        }
323        Ok(())
324    }
325
326    async fn collect_table_write_throughput(&self, table_stats: PbTableStatsMap) {
327        let params = self.env.system_params_reader().await;
328        let barrier_interval_ms = params.barrier_interval_ms() as u64;
329        let checkpoint_secs = std::cmp::max(
330            1,
331            params.checkpoint_frequency() * barrier_interval_ms / 1000,
332        );
333
334        let mut table_throughput_statistic_manager =
335            self.table_write_throughput_statistic_manager.write();
336        let timestamp = chrono::Utc::now().timestamp();
337
338        for (table_id, stat) in table_stats {
339            let throughput = ((stat.total_value_size + stat.total_key_size) as f64
340                / checkpoint_secs as f64) as u64;
341            table_throughput_statistic_manager
342                .add_table_throughput_with_ts(table_id, throughput, timestamp);
343        }
344    }
345
346    async fn correct_commit_ssts(
347        &self,
348        sstables: Vec<LocalSstableInfo>,
349        table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
350    ) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
351        let mut new_sst_id_number = 0;
352        let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
353        let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
354        for commit_sst in sstables {
355            let mut group_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
356            for table_id in &commit_sst.sst_info.table_ids {
357                match table_compaction_group_mapping.get(&TableId::new(*table_id)) {
358                    Some(cg_id_from_meta) => {
359                        group_table_ids
360                            .entry(*cg_id_from_meta)
361                            .or_default()
362                            .push(*table_id);
363                    }
364                    None => {
365                        tracing::warn!(
366                            table_id = *table_id,
367                            object_id = %commit_sst.sst_info.object_id,
368                            "table doesn't belong to any compaction group",
369                        );
370                    }
371                }
372            }
373
374            new_sst_id_number += group_table_ids.len() * 2; // `split_sst` will split the SST into two parts and consumer 2 SST IDs
375            sst_to_cg_vec.push((commit_sst, group_table_ids));
376        }
377
378        // Generate new SST IDs for each compaction group
379        // `next_sstable_object_id` will update the global SST ID and reserve the new SST IDs
380        // So we need to get the new SST ID first and then split the SSTs
381        let mut new_sst_id = next_sstable_id(&self.env, new_sst_id_number).await?;
382        let mut commit_sstables: BTreeMap<u64, Vec<SstableInfo>> = BTreeMap::new();
383
384        for (mut sst, group_table_ids) in sst_to_cg_vec {
385            let len = group_table_ids.len();
386            for (index, (group_id, match_ids)) in group_table_ids.into_iter().enumerate() {
387                if sst.sst_info.table_ids == match_ids {
388                    // The SST contains all the tables in the group should be last key
389                    assert!(
390                        index == len - 1,
391                        "SST should be the last key in the group {} index {} len {}",
392                        group_id,
393                        index,
394                        len
395                    );
396                    commit_sstables
397                        .entry(group_id)
398                        .or_default()
399                        .push(sst.sst_info);
400                    break;
401                }
402
403                let origin_sst_size = sst.sst_info.sst_size;
404                let new_sst_size = match_ids
405                    .iter()
406                    .map(|id| {
407                        let stat = sst.table_stats.get(id).unwrap();
408                        stat.total_compressed_size
409                    })
410                    .sum();
411
412                if new_sst_size == 0 {
413                    tracing::warn!(
414                        id = %sst.sst_info.sst_id,
415                        object_id = %sst.sst_info.object_id,
416                        match_ids = ?match_ids,
417                        "Sstable doesn't contain any data for tables",
418                    );
419                }
420
421                let old_sst_size = origin_sst_size.saturating_sub(new_sst_size);
422                if old_sst_size == 0 {
423                    tracing::warn!(
424                        id = %sst.sst_info.sst_id,
425                        object_id = %sst.sst_info.object_id,
426                        match_ids = ?match_ids,
427                        origin_sst_size = origin_sst_size,
428                        new_sst_size = new_sst_size,
429                        "Sstable doesn't contain any data for tables",
430                    );
431                }
432                let (modified_sst_info, branch_sst) = split_sst_with_table_ids(
433                    &sst.sst_info,
434                    &mut new_sst_id,
435                    old_sst_size,
436                    new_sst_size,
437                    match_ids,
438                );
439                sst.sst_info = modified_sst_info;
440
441                commit_sstables
442                    .entry(group_id)
443                    .or_default()
444                    .push(branch_sst);
445            }
446        }
447
448        // order check
449        for ssts in commit_sstables.values() {
450            let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
451            assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
452        }
453
454        Ok(commit_sstables)
455    }
456}
457
458fn on_handle_add_new_table(
459    state_table_info: &HummockVersionStateTableInfo,
460    table_ids: impl IntoIterator<Item = &TableId>,
461    compaction_group_id: CompactionGroupId,
462    table_compaction_group_mapping: &mut HashMap<TableId, CompactionGroupId>,
463    new_table_ids: &mut HashMap<TableId, CompactionGroupId>,
464) -> Result<()> {
465    for table_id in table_ids {
466        if let Some(info) = state_table_info.info().get(table_id) {
467            return Err(Error::CompactionGroup(format!(
468                "table {} already exist {:?}",
469                table_id.table_id, info,
470            )));
471        }
472        table_compaction_group_mapping.insert(*table_id, compaction_group_id);
473        new_table_ids.insert(*table_id, compaction_group_id);
474    }
475
476    Ok(())
477}
478
479/// Rewrite the commit sstables to sub-levels based on the compaction group config.
480/// The type of `compaction_group_manager_txn` is too complex to be used in the function signature. So we use `HashMap` instead.
481fn rewrite_commit_sstables_to_sub_level(
482    commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
483    group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
484) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
485    let mut overlapping_sstables: BTreeMap<u64, Vec<Vec<SstableInfo>>> = BTreeMap::new();
486    for (group_id, inserted_table_infos) in commit_sstables {
487        let config = group_id_to_config
488            .get(&group_id)
489            .expect("compaction group should exist");
490
491        let mut accumulated_size = 0;
492        let mut ssts = vec![];
493        let sub_level_size_limit = config
494            .max_overlapping_level_size
495            .unwrap_or(compaction_config::max_overlapping_level_size());
496
497        let level = overlapping_sstables.entry(group_id).or_default();
498
499        for sst in inserted_table_infos {
500            accumulated_size += sst.sst_size;
501            ssts.push(sst);
502            if accumulated_size > sub_level_size_limit {
503                level.push(ssts);
504
505                // reset the accumulated size and ssts
506                accumulated_size = 0;
507                ssts = vec![];
508            }
509        }
510
511        if !ssts.is_empty() {
512            level.push(ssts);
513        }
514
515        // The uploader organizes the ssts in decreasing epoch order, so the level needs to be reversed to ensure that the latest epoch is at the top.
516        level.reverse();
517    }
518
519    overlapping_sstables
520}
521
522fn is_ordered_subset<T: PartialEq>(vec_1: &Vec<T>, vec_2: &Vec<T>) -> bool {
523    let mut vec_2_iter = vec_2.iter().peekable();
524    for item in vec_1 {
525        if vec_2_iter.peek() == Some(&item) {
526            vec_2_iter.next();
527        }
528    }
529
530    vec_2_iter.peek().is_none()
531}