risingwave_meta/hummock/manager/
commit_epoch.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::bail;
20use risingwave_common::catalog::TableId;
21use risingwave_common::config::meta::default::compaction_config;
22use risingwave_common::system_param::reader::SystemParamsRead;
23use risingwave_hummock_sdk::change_log::ChangeLogDelta;
24use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
25use risingwave_hummock_sdk::sstable_info::SstableInfo;
26use risingwave_hummock_sdk::table_stats::{
27    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats, to_prost_table_stats_map,
28};
29use risingwave_hummock_sdk::table_watermark::TableWatermarks;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
32use risingwave_hummock_sdk::{
33    CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo,
34};
35use risingwave_pb::hummock::{CompactionConfig, compact_task};
36use sea_orm::TransactionTrait;
37
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
40use crate::hummock::manager::transaction::{
41    HummockVersionStatsTransaction, HummockVersionTransaction,
42};
43use crate::hummock::manager::versioning::Versioning;
44use crate::hummock::metrics_utils::{
45    get_or_create_local_table_stat, trigger_epoch_stat, trigger_local_table_stat, trigger_sst_stat,
46};
47use crate::hummock::model::CompactionGroup;
48use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
49use crate::hummock::time_travel::should_mark_next_time_travel_version_snapshot;
50use crate::hummock::{
51    HummockManager, commit_multi_var_with_provided_txn, start_measure_real_process_timer,
52};
53
54pub struct NewTableFragmentInfo {
55    pub table_ids: HashSet<TableId>,
56}
57
58#[derive(Default)]
59pub struct CommitEpochInfo {
60    pub sstables: Vec<LocalSstableInfo>,
61    pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
62    pub sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
63    pub new_table_fragment_infos: Vec<NewTableFragmentInfo>,
64    pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
65    pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
66    /// `table_id` -> `committed_epoch`
67    pub tables_to_commit: HashMap<TableId, u64>,
68
69    pub truncate_tables: HashSet<TableId>,
70}
71
72impl HummockManager {
73    /// Caller should ensure `epoch` > `committed_epoch` of `tables_to_commit`
74    /// if tables are not newly added via `new_table_fragment_info`
75    pub async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> Result<()> {
76        let CommitEpochInfo {
77            mut sstables,
78            new_table_watermarks,
79            sst_to_context,
80            new_table_fragment_infos,
81            change_log_delta,
82            vector_index_delta,
83            tables_to_commit,
84            truncate_tables,
85        } = commit_info;
86        let mut versioning_guard = self.versioning.write().await;
87        let _timer = start_measure_real_process_timer!(self, "commit_epoch");
88        // Prevent commit new epochs if this flag is set
89        if versioning_guard.disable_commit_epochs {
90            return Ok(());
91        }
92
93        assert!(!tables_to_commit.is_empty());
94
95        let versioning: &mut Versioning = &mut versioning_guard;
96        self.commit_epoch_sanity_check(
97            &tables_to_commit,
98            &sstables,
99            &sst_to_context,
100            &versioning.current_version,
101        )
102        .await?;
103
104        // Consume and aggregate table stats.
105        let mut table_stats_change = PbTableStatsMap::default();
106        for s in &mut sstables {
107            add_prost_table_stats_map(
108                &mut table_stats_change,
109                &to_prost_table_stats_map(s.table_stats.clone()),
110            );
111        }
112
113        let table_change_log_object_ids_before_commit = versioning
114            .table_change_log
115            .values()
116            .flat_map(|l| l.get_object_ids())
117            .collect::<HashSet<_>>();
118
119        let mut version = HummockVersionTransaction::new(
120            &mut versioning.current_version,
121            &mut versioning.hummock_version_deltas,
122            &mut versioning.table_change_log,
123            self.env.notification_manager(),
124            Some(&self.table_committed_epoch_notifiers),
125            &self.metrics,
126            &self.env.opts,
127        );
128
129        let state_table_info = &version.latest_version().state_table_info;
130        let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
131        let mut new_table_ids = HashMap::new();
132        let mut new_compaction_groups = Vec::new();
133        let mut compaction_group_manager_txn = None;
134        let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;
135
136        // Add new table
137        for NewTableFragmentInfo { table_ids } in new_table_fragment_infos {
138            let (compaction_group_manager, compaction_group_config) =
139                if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
140                    (
141                        compaction_group_manager,
142                        (*compaction_group_config
143                            .as_ref()
144                            .expect("must be set with compaction_group_manager_txn"))
145                        .clone(),
146                    )
147                } else {
148                    let compaction_group_manager_guard =
149                        self.compaction_group_manager.write().await;
150                    let new_compaction_group_config =
151                        compaction_group_manager_guard.default_compaction_config();
152                    compaction_group_config = Some(new_compaction_group_config.clone());
153                    (
154                        compaction_group_manager_txn.insert(
155                            CompactionGroupManager::start_owned_compaction_groups_txn(
156                                compaction_group_manager_guard,
157                            ),
158                        ),
159                        new_compaction_group_config,
160                    )
161                };
162            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
163            let new_compaction_group = CompactionGroup {
164                group_id: new_compaction_group_id,
165                compaction_config: compaction_group_config.clone(),
166            };
167
168            new_compaction_groups.push(new_compaction_group.clone());
169            compaction_group_manager.insert(new_compaction_group_id, new_compaction_group);
170
171            on_handle_add_new_table(
172                state_table_info,
173                &table_ids,
174                new_compaction_group_id,
175                &mut table_compaction_group_mapping,
176                &mut new_table_ids,
177            )?;
178        }
179
180        let commit_sstables = self
181            .correct_commit_ssts(sstables, &table_compaction_group_mapping)
182            .await?;
183
184        let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
185        // fill compaction_groups
186        let mut group_id_to_config = HashMap::new();
187        if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
188            for cg_id in &modified_compaction_groups {
189                let compaction_group = compaction_group_manager
190                    .get(cg_id)
191                    .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
192                    .compaction_config();
193                group_id_to_config.insert(*cg_id, compaction_group);
194            }
195        } else {
196            let compaction_group_manager = self.compaction_group_manager.read().await;
197            for cg_id in &modified_compaction_groups {
198                let compaction_group = compaction_group_manager
199                    .try_get_compaction_group_config(*cg_id)
200                    .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
201                    .compaction_config();
202                group_id_to_config.insert(*cg_id, compaction_group);
203            }
204        }
205
206        let group_id_to_sub_levels =
207            rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);
208
209        // build group_id to truncate tables
210        let mut group_id_to_truncate_tables: HashMap<CompactionGroupId, HashSet<TableId>> =
211            HashMap::new();
212        for table_id in &truncate_tables {
213            if let Some(compaction_group_id) = table_compaction_group_mapping.get(table_id) {
214                group_id_to_truncate_tables
215                    .entry(*compaction_group_id)
216                    .or_default()
217                    .insert(*table_id);
218            } else {
219                bail!(
220                    "table {} doesn't belong to any compaction group, skip truncating",
221                    table_id
222                );
223            }
224        }
225
226        let time_travel_delta = version.pre_commit_epoch(
227            &tables_to_commit,
228            new_compaction_groups,
229            group_id_to_sub_levels,
230            &new_table_ids,
231            new_table_watermarks,
232            change_log_delta,
233            vector_index_delta,
234            group_id_to_truncate_tables,
235        );
236
237        if should_mark_next_time_travel_version_snapshot(&time_travel_delta) {
238            // Unable to invoke mark_next_time_travel_version_snapshot because versioning is already mutable borrowed.
239            versioning.time_travel_snapshot_interval_counter = u64::MAX;
240        }
241
242        // Apply stats changes.
243        let mut version_stats = HummockVersionStatsTransaction::new(
244            &mut versioning.version_stats,
245            self.env.notification_manager(),
246        );
247        add_prost_table_stats_map(&mut version_stats.table_stats, &table_stats_change);
248        if purge_prost_table_stats(
249            &mut version_stats.table_stats,
250            version.latest_version(),
251            &truncate_tables,
252        ) {
253            self.metrics.version_stats.reset();
254            versioning.local_metrics.clear();
255        }
256
257        trigger_local_table_stat(
258            &self.metrics,
259            &mut versioning.local_metrics,
260            &version_stats,
261            &table_stats_change,
262        );
263        for (table_id, stats) in &table_stats_change {
264            if stats.total_key_size == 0
265                && stats.total_value_size == 0
266                && stats.total_key_count == 0
267            {
268                continue;
269            }
270            let stats_value = std::cmp::max(0, stats.total_key_size + stats.total_value_size);
271            let table_metrics = get_or_create_local_table_stat(
272                &self.metrics,
273                *table_id,
274                &mut versioning.local_metrics,
275            );
276            table_metrics.inc_write_throughput(stats_value as u64);
277        }
278        let mut time_travel_version = None;
279        if versioning.time_travel_snapshot_interval_counter
280            >= self.env.opts.hummock_time_travel_snapshot_interval
281        {
282            versioning.time_travel_snapshot_interval_counter = 0;
283            time_travel_version = Some(version.latest_version());
284        } else {
285            versioning.time_travel_snapshot_interval_counter = versioning
286                .time_travel_snapshot_interval_counter
287                .saturating_add(1);
288        }
289        let time_travel_tables_to_commit =
290            table_compaction_group_mapping
291                .iter()
292                .filter_map(|(table_id, cg_id)| {
293                    tables_to_commit
294                        .get(table_id)
295                        .map(|committed_epoch| (table_id, cg_id, *committed_epoch))
296                });
297        let time_travel_table_ids: HashSet<_> = self
298            .metadata_manager
299            .catalog_controller
300            .list_time_travel_table_ids()
301            .await
302            .map_err(|e| Error::Internal(e.into()))?
303            .into_iter()
304            .collect();
305        let mut txn = self.env.meta_store_ref().conn.begin().await?;
306        let version_snapshot_sst_ids = self
307            .write_time_travel_metadata(
308                &txn,
309                time_travel_version,
310                time_travel_delta,
311                time_travel_table_ids,
312                &versioning.last_time_travel_snapshot_sst_ids,
313                time_travel_tables_to_commit,
314            )
315            .await?;
316        commit_multi_var_with_provided_txn!(
317            txn,
318            version,
319            version_stats,
320            compaction_group_manager_txn
321        )?;
322        if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
323            versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
324        }
325
326        for compaction_group_id in &modified_compaction_groups {
327            trigger_sst_stat(
328                &self.metrics,
329                None,
330                &versioning.current_version,
331                *compaction_group_id,
332            );
333        }
334        trigger_epoch_stat(&self.metrics, &versioning.current_version);
335        let table_change_log_object_ids_after_commit = versioning
336            .table_change_log
337            .values()
338            .flat_map(|l| l.get_object_ids())
339            .collect::<HashSet<_>>();
340        drop(versioning_guard);
341        let may_delete_object_ids =
342            &table_change_log_object_ids_before_commit - &table_change_log_object_ids_after_commit;
343        self.gc_manager
344            .add_may_delete_object_ids(may_delete_object_ids.into_iter());
345
346        // Don't trigger compactions if we enable deterministic compaction
347        if !self.env.opts.compaction_deterministic_test {
348            // commit_epoch may contains SSTs from any compaction group
349            for id in &modified_compaction_groups {
350                self.try_send_compaction_request(*id, compact_task::TaskType::Dynamic);
351            }
352            if !table_stats_change.is_empty() {
353                self.collect_table_write_throughput(table_stats_change)
354                    .await;
355            }
356        }
357        if !modified_compaction_groups.is_empty() {
358            self.try_update_write_limits(&modified_compaction_groups)
359                .await;
360        }
361        #[cfg(test)]
362        {
363            self.check_state_consistency().await;
364        }
365        Ok(())
366    }
367
368    async fn collect_table_write_throughput(&self, table_stats: PbTableStatsMap) {
369        let params = self.env.system_params_reader().await;
370        let barrier_interval_ms = params.barrier_interval_ms() as u64;
371        let checkpoint_secs = std::cmp::max(
372            1,
373            params.checkpoint_frequency() * barrier_interval_ms / 1000,
374        );
375
376        let mut table_throughput_statistic_manager =
377            self.table_write_throughput_statistic_manager.write();
378        let timestamp = chrono::Utc::now().timestamp();
379
380        for (table_id, stat) in table_stats {
381            let throughput = ((stat.total_value_size + stat.total_key_size) as f64
382                / checkpoint_secs as f64) as u64;
383            table_throughput_statistic_manager
384                .add_table_throughput_with_ts(table_id, throughput, timestamp);
385        }
386    }
387
388    async fn correct_commit_ssts(
389        &self,
390        sstables: Vec<LocalSstableInfo>,
391        table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
392    ) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
393        let mut new_sst_id_number = 0;
394        let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
395        let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
396        for commit_sst in sstables {
397            let mut group_table_ids: BTreeMap<CompactionGroupId, Vec<TableId>> = BTreeMap::new();
398            for table_id in &commit_sst.sst_info.table_ids {
399                match table_compaction_group_mapping.get(table_id) {
400                    Some(cg_id_from_meta) => {
401                        group_table_ids
402                            .entry(*cg_id_from_meta)
403                            .or_default()
404                            .push(*table_id);
405                    }
406                    None => {
407                        tracing::warn!(
408                            %table_id,
409                            object_id = %commit_sst.sst_info.object_id,
410                            "table doesn't belong to any compaction group",
411                        );
412                    }
413                }
414            }
415
416            new_sst_id_number += group_table_ids.len() * 2; // `split_sst` will split the SST into two parts and consumer 2 SST IDs
417            sst_to_cg_vec.push((commit_sst, group_table_ids));
418        }
419
420        // Generate new SST IDs for each compaction group
421        // `next_sstable_id` will update the global SST ID and reserve the new SST IDs
422        // So we need to get the new SST ID first and then split the SSTs
423        let mut new_sst_id = next_sstable_id(&self.env, new_sst_id_number).await?;
424        let mut commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>> = BTreeMap::new();
425
426        for (mut sst, group_table_ids) in sst_to_cg_vec {
427            let len = group_table_ids.len();
428            for (index, (group_id, match_ids)) in group_table_ids.into_iter().enumerate() {
429                if sst.sst_info.table_ids == match_ids {
430                    // The SST contains all the tables in the group should be last key
431                    assert!(
432                        index == len - 1,
433                        "SST should be the last key in the group {} index {} len {}",
434                        group_id,
435                        index,
436                        len
437                    );
438                    commit_sstables
439                        .entry(group_id)
440                        .or_default()
441                        .push(sst.sst_info);
442                    break;
443                }
444
445                let origin_sst_size = sst.sst_info.sst_size;
446                let new_sst_size = match_ids
447                    .iter()
448                    .map(|id| {
449                        let stat = sst.table_stats.get(id).unwrap();
450                        stat.total_compressed_size
451                    })
452                    .sum();
453
454                if new_sst_size == 0 {
455                    tracing::warn!(
456                        id = %sst.sst_info.sst_id,
457                        object_id = %sst.sst_info.object_id,
458                        match_ids = ?match_ids,
459                        "Sstable doesn't contain any data for tables",
460                    );
461                }
462
463                let old_sst_size = origin_sst_size.saturating_sub(new_sst_size);
464                if old_sst_size == 0 {
465                    tracing::warn!(
466                        id = %sst.sst_info.sst_id,
467                        object_id = %sst.sst_info.object_id,
468                        match_ids = ?match_ids,
469                        origin_sst_size = origin_sst_size,
470                        new_sst_size = new_sst_size,
471                        "Sstable doesn't contain any data for tables",
472                    );
473                }
474                let (modified_sst_info, branch_sst) = split_sst_with_table_ids(
475                    &sst.sst_info,
476                    &mut new_sst_id,
477                    old_sst_size,
478                    new_sst_size,
479                    match_ids,
480                );
481                sst.sst_info = modified_sst_info;
482
483                commit_sstables
484                    .entry(group_id)
485                    .or_default()
486                    .push(branch_sst);
487            }
488        }
489
490        // order check
491        for ssts in commit_sstables.values() {
492            let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
493            assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
494        }
495
496        Ok(commit_sstables)
497    }
498}
499
500fn on_handle_add_new_table(
501    state_table_info: &HummockVersionStateTableInfo,
502    table_ids: impl IntoIterator<Item = &TableId>,
503    compaction_group_id: CompactionGroupId,
504    table_compaction_group_mapping: &mut HashMap<TableId, CompactionGroupId>,
505    new_table_ids: &mut HashMap<TableId, CompactionGroupId>,
506) -> Result<()> {
507    for table_id in table_ids {
508        if let Some(info) = state_table_info.info().get(table_id) {
509            return Err(Error::CompactionGroup(format!(
510                "table {} already exist {:?}",
511                table_id, info,
512            )));
513        }
514        table_compaction_group_mapping.insert(*table_id, compaction_group_id);
515        new_table_ids.insert(*table_id, compaction_group_id);
516    }
517
518    Ok(())
519}
520
521/// Rewrite the commit sstables to sub-levels based on the compaction group config.
522/// The type of `compaction_group_manager_txn` is too complex to be used in the function signature. So we use `HashMap` instead.
523fn rewrite_commit_sstables_to_sub_level(
524    commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
525    group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
526) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
527    let mut overlapping_sstables: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> =
528        BTreeMap::new();
529    for (group_id, inserted_table_infos) in commit_sstables {
530        let config = group_id_to_config
531            .get(&group_id)
532            .expect("compaction group should exist");
533
534        let mut accumulated_size = 0;
535        let mut ssts = vec![];
536        let sub_level_size_limit = config
537            .max_overlapping_level_size
538            .unwrap_or(compaction_config::max_overlapping_level_size());
539
540        let level = overlapping_sstables.entry(group_id).or_default();
541
542        for sst in inserted_table_infos {
543            accumulated_size += sst.sst_size;
544            ssts.push(sst);
545            if accumulated_size > sub_level_size_limit {
546                level.push(ssts);
547
548                // reset the accumulated size and ssts
549                accumulated_size = 0;
550                ssts = vec![];
551            }
552        }
553
554        if !ssts.is_empty() {
555            level.push(ssts);
556        }
557
558        // The uploader organizes the ssts in decreasing epoch order, so the level needs to be reversed to ensure that the latest epoch is at the top.
559        level.reverse();
560    }
561
562    overlapping_sstables
563}
564
565fn is_ordered_subset<T: PartialEq>(vec_1: &Vec<T>, vec_2: &Vec<T>) -> bool {
566    let mut vec_2_iter = vec_2.iter().peekable();
567    for item in vec_1 {
568        if vec_2_iter.peek() == Some(&item) {
569            vec_2_iter.next();
570        }
571    }
572
573    vec_2_iter.peek().is_none()
574}