risingwave_meta/hummock/manager/
commit_epoch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::bail;
20use risingwave_common::catalog::TableId;
21use risingwave_common::config::meta::default::compaction_config;
22use risingwave_common::system_param::reader::SystemParamsRead;
23use risingwave_hummock_sdk::change_log::ChangeLogDelta;
24use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
25use risingwave_hummock_sdk::sstable_info::SstableInfo;
26use risingwave_hummock_sdk::table_stats::{
27    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats, to_prost_table_stats_map,
28};
29use risingwave_hummock_sdk::table_watermark::TableWatermarks;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
32use risingwave_hummock_sdk::{
33    CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo,
34};
35use risingwave_pb::hummock::{CompactionConfig, compact_task};
36use sea_orm::TransactionTrait;
37
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
40use crate::hummock::manager::transaction::{
41    HummockVersionStatsTransaction, HummockVersionTransaction,
42};
43use crate::hummock::manager::versioning::Versioning;
44use crate::hummock::metrics_utils::{
45    get_or_create_local_table_stat, trigger_epoch_stat, trigger_local_table_stat, trigger_sst_stat,
46};
47use crate::hummock::model::CompactionGroup;
48use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
49use crate::hummock::time_travel::should_mark_next_time_travel_version_snapshot;
50use crate::hummock::{
51    HummockManager, commit_multi_var_with_provided_txn, start_measure_real_process_timer,
52};
53
54pub struct NewTableFragmentInfo {
55    pub table_ids: HashSet<TableId>,
56}
57
58#[derive(Default)]
59pub struct CommitEpochInfo {
60    pub sstables: Vec<LocalSstableInfo>,
61    pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
62    pub sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
63    pub new_table_fragment_infos: Vec<NewTableFragmentInfo>,
64    pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
65    pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
66    /// `table_id` -> `committed_epoch`
67    pub tables_to_commit: HashMap<TableId, u64>,
68
69    pub truncate_tables: HashSet<TableId>,
70}
71
72impl HummockManager {
73    /// Caller should ensure `epoch` > `committed_epoch` of `tables_to_commit`
74    /// if tables are not newly added via `new_table_fragment_info`
75    pub async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> Result<()> {
76        let CommitEpochInfo {
77            mut sstables,
78            new_table_watermarks,
79            sst_to_context,
80            new_table_fragment_infos,
81            change_log_delta,
82            vector_index_delta,
83            tables_to_commit,
84            truncate_tables,
85        } = commit_info;
86        let mut versioning_guard = self.versioning.write().await;
87        let _timer = start_measure_real_process_timer!(self, "commit_epoch");
88        // Prevent commit new epochs if this flag is set
89        if versioning_guard.disable_commit_epochs {
90            return Ok(());
91        }
92
93        assert!(!tables_to_commit.is_empty());
94
95        let versioning: &mut Versioning = &mut versioning_guard;
96        self.commit_epoch_sanity_check(
97            &tables_to_commit,
98            &sstables,
99            &sst_to_context,
100            &versioning.current_version,
101        )
102        .await?;
103
104        // Consume and aggregate table stats.
105        let mut table_stats_change = PbTableStatsMap::default();
106        for s in &mut sstables {
107            add_prost_table_stats_map(
108                &mut table_stats_change,
109                &to_prost_table_stats_map(s.table_stats.clone()),
110            );
111        }
112
113        let mut version = HummockVersionTransaction::new(
114            &mut versioning.current_version,
115            &mut versioning.hummock_version_deltas,
116            self.env.notification_manager(),
117            Some(&self.table_committed_epoch_notifiers),
118            &self.metrics,
119        );
120
121        let state_table_info = &version.latest_version().state_table_info;
122        let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
123        let mut new_table_ids = HashMap::new();
124        let mut new_compaction_groups = Vec::new();
125        let mut compaction_group_manager_txn = None;
126        let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;
127
128        // Add new table
129        for NewTableFragmentInfo { table_ids } in new_table_fragment_infos {
130            let (compaction_group_manager, compaction_group_config) =
131                if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
132                    (
133                        compaction_group_manager,
134                        (*compaction_group_config
135                            .as_ref()
136                            .expect("must be set with compaction_group_manager_txn"))
137                        .clone(),
138                    )
139                } else {
140                    let compaction_group_manager_guard =
141                        self.compaction_group_manager.write().await;
142                    let new_compaction_group_config =
143                        compaction_group_manager_guard.default_compaction_config();
144                    compaction_group_config = Some(new_compaction_group_config.clone());
145                    (
146                        compaction_group_manager_txn.insert(
147                            CompactionGroupManager::start_owned_compaction_groups_txn(
148                                compaction_group_manager_guard,
149                            ),
150                        ),
151                        new_compaction_group_config,
152                    )
153                };
154            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
155            let new_compaction_group = CompactionGroup {
156                group_id: new_compaction_group_id,
157                compaction_config: compaction_group_config.clone(),
158            };
159
160            new_compaction_groups.push(new_compaction_group.clone());
161            compaction_group_manager.insert(new_compaction_group_id, new_compaction_group);
162
163            on_handle_add_new_table(
164                state_table_info,
165                &table_ids,
166                new_compaction_group_id,
167                &mut table_compaction_group_mapping,
168                &mut new_table_ids,
169            )?;
170        }
171
172        let commit_sstables = self
173            .correct_commit_ssts(sstables, &table_compaction_group_mapping)
174            .await?;
175
176        let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
177        // fill compaction_groups
178        let mut group_id_to_config = HashMap::new();
179        if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
180            for cg_id in &modified_compaction_groups {
181                let compaction_group = compaction_group_manager
182                    .get(cg_id)
183                    .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
184                    .compaction_config();
185                group_id_to_config.insert(*cg_id, compaction_group);
186            }
187        } else {
188            let compaction_group_manager = self.compaction_group_manager.read().await;
189            for cg_id in &modified_compaction_groups {
190                let compaction_group = compaction_group_manager
191                    .try_get_compaction_group_config(*cg_id)
192                    .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
193                    .compaction_config();
194                group_id_to_config.insert(*cg_id, compaction_group);
195            }
196        }
197
198        let group_id_to_sub_levels =
199            rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);
200
201        // build group_id to truncate tables
202        let mut group_id_to_truncate_tables: HashMap<u64, Vec<TableId>> = HashMap::new();
203        for table_id in &truncate_tables {
204            if let Some(compaction_group_id) = table_compaction_group_mapping.get(table_id) {
205                group_id_to_truncate_tables
206                    .entry(*compaction_group_id)
207                    .or_default()
208                    .push(*table_id);
209            } else {
210                bail!(
211                    "table {} doesn't belong to any compaction group, skip truncating",
212                    table_id
213                );
214            }
215        }
216
217        let time_travel_delta = version.pre_commit_epoch(
218            &tables_to_commit,
219            new_compaction_groups,
220            group_id_to_sub_levels,
221            &new_table_ids,
222            new_table_watermarks,
223            change_log_delta,
224            vector_index_delta,
225            group_id_to_truncate_tables,
226        );
227
228        if should_mark_next_time_travel_version_snapshot(&time_travel_delta) {
229            // Unable to invoke mark_next_time_travel_version_snapshot because versioning is already mutable borrowed.
230            versioning.time_travel_snapshot_interval_counter = u64::MAX;
231        }
232
233        // Apply stats changes.
234        let mut version_stats = HummockVersionStatsTransaction::new(
235            &mut versioning.version_stats,
236            self.env.notification_manager(),
237        );
238        add_prost_table_stats_map(&mut version_stats.table_stats, &table_stats_change);
239        if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version()) {
240            self.metrics.version_stats.reset();
241            versioning.local_metrics.clear();
242        }
243
244        trigger_local_table_stat(
245            &self.metrics,
246            &mut versioning.local_metrics,
247            &version_stats,
248            &table_stats_change,
249        );
250        for (table_id, stats) in &table_stats_change {
251            if stats.total_key_size == 0
252                && stats.total_value_size == 0
253                && stats.total_key_count == 0
254            {
255                continue;
256            }
257            let stats_value = std::cmp::max(0, stats.total_key_size + stats.total_value_size);
258            let table_metrics = get_or_create_local_table_stat(
259                &self.metrics,
260                *table_id,
261                &mut versioning.local_metrics,
262            );
263            table_metrics.inc_write_throughput(stats_value as u64);
264        }
265        let mut time_travel_version = None;
266        if versioning.time_travel_snapshot_interval_counter
267            >= self.env.opts.hummock_time_travel_snapshot_interval
268        {
269            versioning.time_travel_snapshot_interval_counter = 0;
270            time_travel_version = Some(version.latest_version());
271        } else {
272            versioning.time_travel_snapshot_interval_counter = versioning
273                .time_travel_snapshot_interval_counter
274                .saturating_add(1);
275        }
276        let time_travel_tables_to_commit =
277            table_compaction_group_mapping
278                .iter()
279                .filter_map(|(table_id, cg_id)| {
280                    tables_to_commit
281                        .get(table_id)
282                        .map(|committed_epoch| (table_id, cg_id, *committed_epoch))
283                });
284        let time_travel_table_ids: HashSet<_> = self
285            .metadata_manager
286            .catalog_controller
287            .list_time_travel_table_ids()
288            .await
289            .map_err(|e| Error::Internal(e.into()))?
290            .into_iter()
291            .map(|id| id.try_into().unwrap())
292            .collect();
293        let mut txn = self.env.meta_store_ref().conn.begin().await?;
294        let version_snapshot_sst_ids = self
295            .write_time_travel_metadata(
296                &txn,
297                time_travel_version,
298                time_travel_delta,
299                time_travel_table_ids,
300                &versioning.last_time_travel_snapshot_sst_ids,
301                time_travel_tables_to_commit,
302            )
303            .await?;
304        commit_multi_var_with_provided_txn!(
305            txn,
306            version,
307            version_stats,
308            compaction_group_manager_txn
309        )?;
310        if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
311            versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
312        }
313
314        for compaction_group_id in &modified_compaction_groups {
315            trigger_sst_stat(
316                &self.metrics,
317                None,
318                &versioning.current_version,
319                *compaction_group_id,
320            );
321        }
322        trigger_epoch_stat(&self.metrics, &versioning.current_version);
323
324        drop(versioning_guard);
325
326        // Don't trigger compactions if we enable deterministic compaction
327        if !self.env.opts.compaction_deterministic_test {
328            // commit_epoch may contains SSTs from any compaction group
329            for id in &modified_compaction_groups {
330                self.try_send_compaction_request(*id, compact_task::TaskType::Dynamic);
331            }
332            if !table_stats_change.is_empty() {
333                self.collect_table_write_throughput(table_stats_change)
334                    .await;
335            }
336        }
337        if !modified_compaction_groups.is_empty() {
338            self.try_update_write_limits(&modified_compaction_groups)
339                .await;
340        }
341        #[cfg(test)]
342        {
343            self.check_state_consistency().await;
344        }
345        Ok(())
346    }
347
348    async fn collect_table_write_throughput(&self, table_stats: PbTableStatsMap) {
349        let params = self.env.system_params_reader().await;
350        let barrier_interval_ms = params.barrier_interval_ms() as u64;
351        let checkpoint_secs = std::cmp::max(
352            1,
353            params.checkpoint_frequency() * barrier_interval_ms / 1000,
354        );
355
356        let mut table_throughput_statistic_manager =
357            self.table_write_throughput_statistic_manager.write();
358        let timestamp = chrono::Utc::now().timestamp();
359
360        for (table_id, stat) in table_stats {
361            let throughput = ((stat.total_value_size + stat.total_key_size) as f64
362                / checkpoint_secs as f64) as u64;
363            table_throughput_statistic_manager
364                .add_table_throughput_with_ts(table_id, throughput, timestamp);
365        }
366    }
367
368    async fn correct_commit_ssts(
369        &self,
370        sstables: Vec<LocalSstableInfo>,
371        table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
372    ) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
373        let mut new_sst_id_number = 0;
374        let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
375        let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
376        for commit_sst in sstables {
377            let mut group_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
378            for table_id in &commit_sst.sst_info.table_ids {
379                match table_compaction_group_mapping.get(&TableId::new(*table_id)) {
380                    Some(cg_id_from_meta) => {
381                        group_table_ids
382                            .entry(*cg_id_from_meta)
383                            .or_default()
384                            .push(*table_id);
385                    }
386                    None => {
387                        tracing::warn!(
388                            table_id = *table_id,
389                            object_id = %commit_sst.sst_info.object_id,
390                            "table doesn't belong to any compaction group",
391                        );
392                    }
393                }
394            }
395
396            new_sst_id_number += group_table_ids.len() * 2; // `split_sst` will split the SST into two parts and consumer 2 SST IDs
397            sst_to_cg_vec.push((commit_sst, group_table_ids));
398        }
399
400        // Generate new SST IDs for each compaction group
401        // `next_sstable_object_id` will update the global SST ID and reserve the new SST IDs
402        // So we need to get the new SST ID first and then split the SSTs
403        let mut new_sst_id = next_sstable_id(&self.env, new_sst_id_number).await?;
404        let mut commit_sstables: BTreeMap<u64, Vec<SstableInfo>> = BTreeMap::new();
405
406        for (mut sst, group_table_ids) in sst_to_cg_vec {
407            let len = group_table_ids.len();
408            for (index, (group_id, match_ids)) in group_table_ids.into_iter().enumerate() {
409                if sst.sst_info.table_ids == match_ids {
410                    // The SST contains all the tables in the group should be last key
411                    assert!(
412                        index == len - 1,
413                        "SST should be the last key in the group {} index {} len {}",
414                        group_id,
415                        index,
416                        len
417                    );
418                    commit_sstables
419                        .entry(group_id)
420                        .or_default()
421                        .push(sst.sst_info);
422                    break;
423                }
424
425                let origin_sst_size = sst.sst_info.sst_size;
426                let new_sst_size = match_ids
427                    .iter()
428                    .map(|id| {
429                        let stat = sst.table_stats.get(id).unwrap();
430                        stat.total_compressed_size
431                    })
432                    .sum();
433
434                if new_sst_size == 0 {
435                    tracing::warn!(
436                        id = %sst.sst_info.sst_id,
437                        object_id = %sst.sst_info.object_id,
438                        match_ids = ?match_ids,
439                        "Sstable doesn't contain any data for tables",
440                    );
441                }
442
443                let old_sst_size = origin_sst_size.saturating_sub(new_sst_size);
444                if old_sst_size == 0 {
445                    tracing::warn!(
446                        id = %sst.sst_info.sst_id,
447                        object_id = %sst.sst_info.object_id,
448                        match_ids = ?match_ids,
449                        origin_sst_size = origin_sst_size,
450                        new_sst_size = new_sst_size,
451                        "Sstable doesn't contain any data for tables",
452                    );
453                }
454                let (modified_sst_info, branch_sst) = split_sst_with_table_ids(
455                    &sst.sst_info,
456                    &mut new_sst_id,
457                    old_sst_size,
458                    new_sst_size,
459                    match_ids,
460                );
461                sst.sst_info = modified_sst_info;
462
463                commit_sstables
464                    .entry(group_id)
465                    .or_default()
466                    .push(branch_sst);
467            }
468        }
469
470        // order check
471        for ssts in commit_sstables.values() {
472            let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
473            assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
474        }
475
476        Ok(commit_sstables)
477    }
478}
479
480fn on_handle_add_new_table(
481    state_table_info: &HummockVersionStateTableInfo,
482    table_ids: impl IntoIterator<Item = &TableId>,
483    compaction_group_id: CompactionGroupId,
484    table_compaction_group_mapping: &mut HashMap<TableId, CompactionGroupId>,
485    new_table_ids: &mut HashMap<TableId, CompactionGroupId>,
486) -> Result<()> {
487    for table_id in table_ids {
488        if let Some(info) = state_table_info.info().get(table_id) {
489            return Err(Error::CompactionGroup(format!(
490                "table {} already exist {:?}",
491                table_id.table_id, info,
492            )));
493        }
494        table_compaction_group_mapping.insert(*table_id, compaction_group_id);
495        new_table_ids.insert(*table_id, compaction_group_id);
496    }
497
498    Ok(())
499}
500
501/// Rewrite the commit sstables to sub-levels based on the compaction group config.
502/// The type of `compaction_group_manager_txn` is too complex to be used in the function signature. So we use `HashMap` instead.
503fn rewrite_commit_sstables_to_sub_level(
504    commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
505    group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
506) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
507    let mut overlapping_sstables: BTreeMap<u64, Vec<Vec<SstableInfo>>> = BTreeMap::new();
508    for (group_id, inserted_table_infos) in commit_sstables {
509        let config = group_id_to_config
510            .get(&group_id)
511            .expect("compaction group should exist");
512
513        let mut accumulated_size = 0;
514        let mut ssts = vec![];
515        let sub_level_size_limit = config
516            .max_overlapping_level_size
517            .unwrap_or(compaction_config::max_overlapping_level_size());
518
519        let level = overlapping_sstables.entry(group_id).or_default();
520
521        for sst in inserted_table_infos {
522            accumulated_size += sst.sst_size;
523            ssts.push(sst);
524            if accumulated_size > sub_level_size_limit {
525                level.push(ssts);
526
527                // reset the accumulated size and ssts
528                accumulated_size = 0;
529                ssts = vec![];
530            }
531        }
532
533        if !ssts.is_empty() {
534            level.push(ssts);
535        }
536
537        // The uploader organizes the ssts in decreasing epoch order, so the level needs to be reversed to ensure that the latest epoch is at the top.
538        level.reverse();
539    }
540
541    overlapping_sstables
542}
543
544fn is_ordered_subset<T: PartialEq>(vec_1: &Vec<T>, vec_2: &Vec<T>) -> bool {
545    let mut vec_2_iter = vec_2.iter().peekable();
546    for item in vec_1 {
547        if vec_2_iter.peek() == Some(&item) {
548            vec_2_iter.next();
549        }
550    }
551
552    vec_2_iter.peek().is_none()
553}