risingwave_meta/hummock/manager/
commit_epoch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::bail;
20use risingwave_common::catalog::TableId;
21use risingwave_common::config::meta::default::compaction_config;
22use risingwave_common::system_param::reader::SystemParamsRead;
23use risingwave_hummock_sdk::change_log::ChangeLogDelta;
24use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
25use risingwave_hummock_sdk::sstable_info::SstableInfo;
26use risingwave_hummock_sdk::table_stats::{
27    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats, to_prost_table_stats_map,
28};
29use risingwave_hummock_sdk::table_watermark::TableWatermarks;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
32use risingwave_hummock_sdk::{
33    CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo,
34};
35use risingwave_pb::hummock::{CompactionConfig, compact_task};
36use sea_orm::TransactionTrait;
37
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
40use crate::hummock::manager::transaction::{
41    HummockVersionStatsTransaction, HummockVersionTransaction,
42};
43use crate::hummock::manager::versioning::Versioning;
44use crate::hummock::metrics_utils::{
45    get_or_create_local_table_stat, trigger_epoch_stat, trigger_local_table_stat, trigger_sst_stat,
46};
47use crate::hummock::model::CompactionGroup;
48use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
49use crate::hummock::time_travel::should_mark_next_time_travel_version_snapshot;
50use crate::hummock::{
51    HummockManager, commit_multi_var_with_provided_txn, start_measure_real_process_timer,
52};
53
54pub struct NewTableFragmentInfo {
55    pub table_ids: HashSet<TableId>,
56}
57
58#[derive(Default)]
59pub struct CommitEpochInfo {
60    pub sstables: Vec<LocalSstableInfo>,
61    pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
62    pub sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
63    pub new_table_fragment_infos: Vec<NewTableFragmentInfo>,
64    pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
65    pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
66    /// `table_id` -> `committed_epoch`
67    pub tables_to_commit: HashMap<TableId, u64>,
68
69    pub truncate_tables: HashSet<TableId>,
70}
71
72impl HummockManager {
73    /// Caller should ensure `epoch` > `committed_epoch` of `tables_to_commit`
74    /// if tables are not newly added via `new_table_fragment_info`
75    pub async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> Result<()> {
76        let CommitEpochInfo {
77            mut sstables,
78            new_table_watermarks,
79            sst_to_context,
80            new_table_fragment_infos,
81            change_log_delta,
82            vector_index_delta,
83            tables_to_commit,
84            truncate_tables,
85        } = commit_info;
86        let mut versioning_guard = self.versioning.write().await;
87        let _timer = start_measure_real_process_timer!(self, "commit_epoch");
88        // Prevent commit new epochs if this flag is set
89        if versioning_guard.disable_commit_epochs {
90            return Ok(());
91        }
92
93        assert!(!tables_to_commit.is_empty());
94
95        let versioning: &mut Versioning = &mut versioning_guard;
96        self.commit_epoch_sanity_check(
97            &tables_to_commit,
98            &sstables,
99            &sst_to_context,
100            &versioning.current_version,
101        )
102        .await?;
103
104        // Consume and aggregate table stats.
105        let mut table_stats_change = PbTableStatsMap::default();
106        for s in &mut sstables {
107            add_prost_table_stats_map(
108                &mut table_stats_change,
109                &to_prost_table_stats_map(s.table_stats.clone()),
110            );
111        }
112
113        let mut version = HummockVersionTransaction::new(
114            &mut versioning.current_version,
115            &mut versioning.hummock_version_deltas,
116            self.env.notification_manager(),
117            Some(&self.table_committed_epoch_notifiers),
118            &self.metrics,
119        );
120
121        let state_table_info = &version.latest_version().state_table_info;
122        let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
123        let mut new_table_ids = HashMap::new();
124        let mut new_compaction_groups = Vec::new();
125        let mut compaction_group_manager_txn = None;
126        let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;
127
128        // Add new table
129        for NewTableFragmentInfo { table_ids } in new_table_fragment_infos {
130            let (compaction_group_manager, compaction_group_config) =
131                if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
132                    (
133                        compaction_group_manager,
134                        (*compaction_group_config
135                            .as_ref()
136                            .expect("must be set with compaction_group_manager_txn"))
137                        .clone(),
138                    )
139                } else {
140                    let compaction_group_manager_guard =
141                        self.compaction_group_manager.write().await;
142                    let new_compaction_group_config =
143                        compaction_group_manager_guard.default_compaction_config();
144                    compaction_group_config = Some(new_compaction_group_config.clone());
145                    (
146                        compaction_group_manager_txn.insert(
147                            CompactionGroupManager::start_owned_compaction_groups_txn(
148                                compaction_group_manager_guard,
149                            ),
150                        ),
151                        new_compaction_group_config,
152                    )
153                };
154            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
155            let new_compaction_group = CompactionGroup {
156                group_id: new_compaction_group_id,
157                compaction_config: compaction_group_config.clone(),
158            };
159
160            new_compaction_groups.push(new_compaction_group.clone());
161            compaction_group_manager.insert(new_compaction_group_id, new_compaction_group);
162
163            on_handle_add_new_table(
164                state_table_info,
165                &table_ids,
166                new_compaction_group_id,
167                &mut table_compaction_group_mapping,
168                &mut new_table_ids,
169            )?;
170        }
171
172        let commit_sstables = self
173            .correct_commit_ssts(sstables, &table_compaction_group_mapping)
174            .await?;
175
176        let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
177        // fill compaction_groups
178        let mut group_id_to_config = HashMap::new();
179        if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
180            for cg_id in &modified_compaction_groups {
181                let compaction_group = compaction_group_manager
182                    .get(cg_id)
183                    .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
184                    .compaction_config();
185                group_id_to_config.insert(*cg_id, compaction_group);
186            }
187        } else {
188            let compaction_group_manager = self.compaction_group_manager.read().await;
189            for cg_id in &modified_compaction_groups {
190                let compaction_group = compaction_group_manager
191                    .try_get_compaction_group_config(*cg_id)
192                    .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
193                    .compaction_config();
194                group_id_to_config.insert(*cg_id, compaction_group);
195            }
196        }
197
198        let group_id_to_sub_levels =
199            rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);
200
201        // build group_id to truncate tables
202        let mut group_id_to_truncate_tables: HashMap<u64, HashSet<TableId>> = HashMap::new();
203        for table_id in &truncate_tables {
204            if let Some(compaction_group_id) = table_compaction_group_mapping.get(table_id) {
205                group_id_to_truncate_tables
206                    .entry(*compaction_group_id)
207                    .or_default()
208                    .insert(*table_id);
209            } else {
210                bail!(
211                    "table {} doesn't belong to any compaction group, skip truncating",
212                    table_id
213                );
214            }
215        }
216
217        let time_travel_delta = version.pre_commit_epoch(
218            &tables_to_commit,
219            new_compaction_groups,
220            group_id_to_sub_levels,
221            &new_table_ids,
222            new_table_watermarks,
223            change_log_delta,
224            vector_index_delta,
225            group_id_to_truncate_tables,
226        );
227
228        if should_mark_next_time_travel_version_snapshot(&time_travel_delta) {
229            // Unable to invoke mark_next_time_travel_version_snapshot because versioning is already mutable borrowed.
230            versioning.time_travel_snapshot_interval_counter = u64::MAX;
231        }
232
233        // Apply stats changes.
234        let mut version_stats = HummockVersionStatsTransaction::new(
235            &mut versioning.version_stats,
236            self.env.notification_manager(),
237        );
238        add_prost_table_stats_map(&mut version_stats.table_stats, &table_stats_change);
239        if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version()) {
240            self.metrics.version_stats.reset();
241            versioning.local_metrics.clear();
242        }
243
244        trigger_local_table_stat(
245            &self.metrics,
246            &mut versioning.local_metrics,
247            &version_stats,
248            &table_stats_change,
249        );
250        for (table_id, stats) in &table_stats_change {
251            if stats.total_key_size == 0
252                && stats.total_value_size == 0
253                && stats.total_key_count == 0
254            {
255                continue;
256            }
257            let stats_value = std::cmp::max(0, stats.total_key_size + stats.total_value_size);
258            let table_metrics = get_or_create_local_table_stat(
259                &self.metrics,
260                *table_id,
261                &mut versioning.local_metrics,
262            );
263            table_metrics.inc_write_throughput(stats_value as u64);
264        }
265        let mut time_travel_version = None;
266        if versioning.time_travel_snapshot_interval_counter
267            >= self.env.opts.hummock_time_travel_snapshot_interval
268        {
269            versioning.time_travel_snapshot_interval_counter = 0;
270            time_travel_version = Some(version.latest_version());
271        } else {
272            versioning.time_travel_snapshot_interval_counter = versioning
273                .time_travel_snapshot_interval_counter
274                .saturating_add(1);
275        }
276        let time_travel_tables_to_commit =
277            table_compaction_group_mapping
278                .iter()
279                .filter_map(|(table_id, cg_id)| {
280                    tables_to_commit
281                        .get(table_id)
282                        .map(|committed_epoch| (table_id, cg_id, *committed_epoch))
283                });
284        let time_travel_table_ids: HashSet<_> = self
285            .metadata_manager
286            .catalog_controller
287            .list_time_travel_table_ids()
288            .await
289            .map_err(|e| Error::Internal(e.into()))?
290            .into_iter()
291            .collect();
292        let mut txn = self.env.meta_store_ref().conn.begin().await?;
293        let version_snapshot_sst_ids = self
294            .write_time_travel_metadata(
295                &txn,
296                time_travel_version,
297                time_travel_delta,
298                time_travel_table_ids,
299                &versioning.last_time_travel_snapshot_sst_ids,
300                time_travel_tables_to_commit,
301            )
302            .await?;
303        commit_multi_var_with_provided_txn!(
304            txn,
305            version,
306            version_stats,
307            compaction_group_manager_txn
308        )?;
309        if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
310            versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
311        }
312
313        for compaction_group_id in &modified_compaction_groups {
314            trigger_sst_stat(
315                &self.metrics,
316                None,
317                &versioning.current_version,
318                *compaction_group_id,
319            );
320        }
321        trigger_epoch_stat(&self.metrics, &versioning.current_version);
322
323        drop(versioning_guard);
324
325        // Don't trigger compactions if we enable deterministic compaction
326        if !self.env.opts.compaction_deterministic_test {
327            // commit_epoch may contains SSTs from any compaction group
328            for id in &modified_compaction_groups {
329                self.try_send_compaction_request(*id, compact_task::TaskType::Dynamic);
330            }
331            if !table_stats_change.is_empty() {
332                self.collect_table_write_throughput(table_stats_change)
333                    .await;
334            }
335        }
336        if !modified_compaction_groups.is_empty() {
337            self.try_update_write_limits(&modified_compaction_groups)
338                .await;
339        }
340        #[cfg(test)]
341        {
342            self.check_state_consistency().await;
343        }
344        Ok(())
345    }
346
347    async fn collect_table_write_throughput(&self, table_stats: PbTableStatsMap) {
348        let params = self.env.system_params_reader().await;
349        let barrier_interval_ms = params.barrier_interval_ms() as u64;
350        let checkpoint_secs = std::cmp::max(
351            1,
352            params.checkpoint_frequency() * barrier_interval_ms / 1000,
353        );
354
355        let mut table_throughput_statistic_manager =
356            self.table_write_throughput_statistic_manager.write();
357        let timestamp = chrono::Utc::now().timestamp();
358
359        for (table_id, stat) in table_stats {
360            let throughput = ((stat.total_value_size + stat.total_key_size) as f64
361                / checkpoint_secs as f64) as u64;
362            table_throughput_statistic_manager.add_table_throughput_with_ts(
363                table_id.into(),
364                throughput,
365                timestamp,
366            );
367        }
368    }
369
370    async fn correct_commit_ssts(
371        &self,
372        sstables: Vec<LocalSstableInfo>,
373        table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
374    ) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
375        let mut new_sst_id_number = 0;
376        let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
377        let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
378        for commit_sst in sstables {
379            let mut group_table_ids: BTreeMap<u64, Vec<TableId>> = BTreeMap::new();
380            for table_id in &commit_sst.sst_info.table_ids {
381                match table_compaction_group_mapping.get(table_id) {
382                    Some(cg_id_from_meta) => {
383                        group_table_ids
384                            .entry(*cg_id_from_meta)
385                            .or_default()
386                            .push(*table_id);
387                    }
388                    None => {
389                        tracing::warn!(
390                            %table_id,
391                            object_id = %commit_sst.sst_info.object_id,
392                            "table doesn't belong to any compaction group",
393                        );
394                    }
395                }
396            }
397
398            new_sst_id_number += group_table_ids.len() * 2; // `split_sst` will split the SST into two parts and consumer 2 SST IDs
399            sst_to_cg_vec.push((commit_sst, group_table_ids));
400        }
401
402        // Generate new SST IDs for each compaction group
403        // `next_sstable_object_id` will update the global SST ID and reserve the new SST IDs
404        // So we need to get the new SST ID first and then split the SSTs
405        let mut new_sst_id = next_sstable_id(&self.env, new_sst_id_number).await?;
406        let mut commit_sstables: BTreeMap<u64, Vec<SstableInfo>> = BTreeMap::new();
407
408        for (mut sst, group_table_ids) in sst_to_cg_vec {
409            let len = group_table_ids.len();
410            for (index, (group_id, match_ids)) in group_table_ids.into_iter().enumerate() {
411                if sst.sst_info.table_ids == match_ids {
412                    // The SST contains all the tables in the group should be last key
413                    assert!(
414                        index == len - 1,
415                        "SST should be the last key in the group {} index {} len {}",
416                        group_id,
417                        index,
418                        len
419                    );
420                    commit_sstables
421                        .entry(group_id)
422                        .or_default()
423                        .push(sst.sst_info);
424                    break;
425                }
426
427                let origin_sst_size = sst.sst_info.sst_size;
428                let new_sst_size = match_ids
429                    .iter()
430                    .map(|id| {
431                        let stat = sst.table_stats.get(id).unwrap();
432                        stat.total_compressed_size
433                    })
434                    .sum();
435
436                if new_sst_size == 0 {
437                    tracing::warn!(
438                        id = %sst.sst_info.sst_id,
439                        object_id = %sst.sst_info.object_id,
440                        match_ids = ?match_ids,
441                        "Sstable doesn't contain any data for tables",
442                    );
443                }
444
445                let old_sst_size = origin_sst_size.saturating_sub(new_sst_size);
446                if old_sst_size == 0 {
447                    tracing::warn!(
448                        id = %sst.sst_info.sst_id,
449                        object_id = %sst.sst_info.object_id,
450                        match_ids = ?match_ids,
451                        origin_sst_size = origin_sst_size,
452                        new_sst_size = new_sst_size,
453                        "Sstable doesn't contain any data for tables",
454                    );
455                }
456                let (modified_sst_info, branch_sst) = split_sst_with_table_ids(
457                    &sst.sst_info,
458                    &mut new_sst_id,
459                    old_sst_size,
460                    new_sst_size,
461                    match_ids,
462                );
463                sst.sst_info = modified_sst_info;
464
465                commit_sstables
466                    .entry(group_id)
467                    .or_default()
468                    .push(branch_sst);
469            }
470        }
471
472        // order check
473        for ssts in commit_sstables.values() {
474            let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
475            assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
476        }
477
478        Ok(commit_sstables)
479    }
480}
481
482fn on_handle_add_new_table(
483    state_table_info: &HummockVersionStateTableInfo,
484    table_ids: impl IntoIterator<Item = &TableId>,
485    compaction_group_id: CompactionGroupId,
486    table_compaction_group_mapping: &mut HashMap<TableId, CompactionGroupId>,
487    new_table_ids: &mut HashMap<TableId, CompactionGroupId>,
488) -> Result<()> {
489    for table_id in table_ids {
490        if let Some(info) = state_table_info.info().get(table_id) {
491            return Err(Error::CompactionGroup(format!(
492                "table {} already exist {:?}",
493                table_id, info,
494            )));
495        }
496        table_compaction_group_mapping.insert(*table_id, compaction_group_id);
497        new_table_ids.insert(*table_id, compaction_group_id);
498    }
499
500    Ok(())
501}
502
503/// Rewrite the commit sstables to sub-levels based on the compaction group config.
504/// The type of `compaction_group_manager_txn` is too complex to be used in the function signature. So we use `HashMap` instead.
505fn rewrite_commit_sstables_to_sub_level(
506    commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
507    group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
508) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
509    let mut overlapping_sstables: BTreeMap<u64, Vec<Vec<SstableInfo>>> = BTreeMap::new();
510    for (group_id, inserted_table_infos) in commit_sstables {
511        let config = group_id_to_config
512            .get(&group_id)
513            .expect("compaction group should exist");
514
515        let mut accumulated_size = 0;
516        let mut ssts = vec![];
517        let sub_level_size_limit = config
518            .max_overlapping_level_size
519            .unwrap_or(compaction_config::max_overlapping_level_size());
520
521        let level = overlapping_sstables.entry(group_id).or_default();
522
523        for sst in inserted_table_infos {
524            accumulated_size += sst.sst_size;
525            ssts.push(sst);
526            if accumulated_size > sub_level_size_limit {
527                level.push(ssts);
528
529                // reset the accumulated size and ssts
530                accumulated_size = 0;
531                ssts = vec![];
532            }
533        }
534
535        if !ssts.is_empty() {
536            level.push(ssts);
537        }
538
539        // The uploader organizes the ssts in decreasing epoch order, so the level needs to be reversed to ensure that the latest epoch is at the top.
540        level.reverse();
541    }
542
543    overlapping_sstables
544}
545
546fn is_ordered_subset<T: PartialEq>(vec_1: &Vec<T>, vec_2: &Vec<T>) -> bool {
547    let mut vec_2_iter = vec_2.iter().peekable();
548    for item in vec_1 {
549        if vec_2_iter.peek() == Some(&item) {
550            vec_2_iter.next();
551        }
552    }
553
554    vec_2_iter.peek().is_none()
555}