risingwave_meta/hummock/manager/
commit_epoch.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::bail;
20use risingwave_common::catalog::TableId;
21use risingwave_common::config::meta::default::compaction_config;
22use risingwave_common::system_param::reader::SystemParamsRead;
23use risingwave_hummock_sdk::change_log::ChangeLogDelta;
24use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
25use risingwave_hummock_sdk::sstable_info::SstableInfo;
26use risingwave_hummock_sdk::table_stats::{
27    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats, to_prost_table_stats_map,
28};
29use risingwave_hummock_sdk::table_watermark::TableWatermarks;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
32use risingwave_hummock_sdk::{
33    CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo,
34};
35use risingwave_pb::hummock::{CompactionConfig, compact_task};
36use sea_orm::TransactionTrait;
37
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
40use crate::hummock::manager::transaction::{
41    HummockVersionStatsTransaction, HummockVersionTransaction,
42};
43use crate::hummock::manager::versioning::Versioning;
44use crate::hummock::metrics_utils::{
45    get_or_create_local_table_stat, trigger_epoch_stat, trigger_local_table_stat, trigger_sst_stat,
46};
47use crate::hummock::model::CompactionGroup;
48use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
49use crate::hummock::time_travel::should_mark_next_time_travel_version_snapshot;
50use crate::hummock::{
51    HummockManager, commit_multi_var_with_provided_txn, start_measure_real_process_timer,
52};
53
54pub struct NewTableFragmentInfo {
55    pub table_ids: HashSet<TableId>,
56}
57
58#[derive(Default)]
59pub struct CommitEpochInfo {
60    pub sstables: Vec<LocalSstableInfo>,
61    pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
62    pub sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
63    pub new_table_fragment_infos: Vec<NewTableFragmentInfo>,
64    pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
65    pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
66    /// `table_id` -> `committed_epoch`
67    pub tables_to_commit: HashMap<TableId, u64>,
68
69    pub truncate_tables: HashSet<TableId>,
70}
71
72impl HummockManager {
73    /// Caller should ensure `epoch` > `committed_epoch` of `tables_to_commit`
74    /// if tables are not newly added via `new_table_fragment_info`
75    pub async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> Result<()> {
76        let CommitEpochInfo {
77            mut sstables,
78            new_table_watermarks,
79            sst_to_context,
80            new_table_fragment_infos,
81            change_log_delta,
82            vector_index_delta,
83            tables_to_commit,
84            truncate_tables,
85        } = commit_info;
86        let mut versioning_guard = self.versioning.write().await;
87        let _timer = start_measure_real_process_timer!(self, "commit_epoch");
88        // Prevent commit new epochs if this flag is set
89        if versioning_guard.disable_commit_epochs {
90            return Ok(());
91        }
92
93        assert!(!tables_to_commit.is_empty());
94
95        let versioning: &mut Versioning = &mut versioning_guard;
96        self.commit_epoch_sanity_check(
97            &tables_to_commit,
98            &sstables,
99            &sst_to_context,
100            &versioning.current_version,
101        )
102        .await?;
103
104        // Consume and aggregate table stats.
105        let mut table_stats_change = PbTableStatsMap::default();
106        for s in &mut sstables {
107            add_prost_table_stats_map(
108                &mut table_stats_change,
109                &to_prost_table_stats_map(s.table_stats.clone()),
110            );
111        }
112
113        let mut version = HummockVersionTransaction::new(
114            &mut versioning.current_version,
115            &mut versioning.hummock_version_deltas,
116            self.env.notification_manager(),
117            Some(&self.table_committed_epoch_notifiers),
118            &self.metrics,
119        );
120
121        let state_table_info = &version.latest_version().state_table_info;
122        let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
123        let mut new_table_ids = HashMap::new();
124        let mut new_compaction_groups = Vec::new();
125        let mut compaction_group_manager_txn = None;
126        let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;
127
128        // Add new table
129        for NewTableFragmentInfo { table_ids } in new_table_fragment_infos {
130            let (compaction_group_manager, compaction_group_config) =
131                if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
132                    (
133                        compaction_group_manager,
134                        (*compaction_group_config
135                            .as_ref()
136                            .expect("must be set with compaction_group_manager_txn"))
137                        .clone(),
138                    )
139                } else {
140                    let compaction_group_manager_guard =
141                        self.compaction_group_manager.write().await;
142                    let new_compaction_group_config =
143                        compaction_group_manager_guard.default_compaction_config();
144                    compaction_group_config = Some(new_compaction_group_config.clone());
145                    (
146                        compaction_group_manager_txn.insert(
147                            CompactionGroupManager::start_owned_compaction_groups_txn(
148                                compaction_group_manager_guard,
149                            ),
150                        ),
151                        new_compaction_group_config,
152                    )
153                };
154            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
155            let new_compaction_group = CompactionGroup {
156                group_id: new_compaction_group_id,
157                compaction_config: compaction_group_config.clone(),
158            };
159
160            new_compaction_groups.push(new_compaction_group.clone());
161            compaction_group_manager.insert(new_compaction_group_id, new_compaction_group);
162
163            on_handle_add_new_table(
164                state_table_info,
165                &table_ids,
166                new_compaction_group_id,
167                &mut table_compaction_group_mapping,
168                &mut new_table_ids,
169            )?;
170        }
171
172        let commit_sstables = self
173            .correct_commit_ssts(sstables, &table_compaction_group_mapping)
174            .await?;
175
176        let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
177        // fill compaction_groups
178        let mut group_id_to_config = HashMap::new();
179        if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
180            for cg_id in &modified_compaction_groups {
181                let compaction_group = compaction_group_manager
182                    .get(cg_id)
183                    .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
184                    .compaction_config();
185                group_id_to_config.insert(*cg_id, compaction_group);
186            }
187        } else {
188            let compaction_group_manager = self.compaction_group_manager.read().await;
189            for cg_id in &modified_compaction_groups {
190                let compaction_group = compaction_group_manager
191                    .try_get_compaction_group_config(*cg_id)
192                    .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
193                    .compaction_config();
194                group_id_to_config.insert(*cg_id, compaction_group);
195            }
196        }
197
198        let group_id_to_sub_levels =
199            rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);
200
201        // build group_id to truncate tables
202        let mut group_id_to_truncate_tables: HashMap<CompactionGroupId, HashSet<TableId>> =
203            HashMap::new();
204        for table_id in &truncate_tables {
205            if let Some(compaction_group_id) = table_compaction_group_mapping.get(table_id) {
206                group_id_to_truncate_tables
207                    .entry(*compaction_group_id)
208                    .or_default()
209                    .insert(*table_id);
210            } else {
211                bail!(
212                    "table {} doesn't belong to any compaction group, skip truncating",
213                    table_id
214                );
215            }
216        }
217
218        let time_travel_delta = version.pre_commit_epoch(
219            &tables_to_commit,
220            new_compaction_groups,
221            group_id_to_sub_levels,
222            &new_table_ids,
223            new_table_watermarks,
224            change_log_delta,
225            vector_index_delta,
226            group_id_to_truncate_tables,
227        );
228
229        if should_mark_next_time_travel_version_snapshot(&time_travel_delta) {
230            // Unable to invoke mark_next_time_travel_version_snapshot because versioning is already mutable borrowed.
231            versioning.time_travel_snapshot_interval_counter = u64::MAX;
232        }
233
234        // Apply stats changes.
235        let mut version_stats = HummockVersionStatsTransaction::new(
236            &mut versioning.version_stats,
237            self.env.notification_manager(),
238        );
239        add_prost_table_stats_map(&mut version_stats.table_stats, &table_stats_change);
240        if purge_prost_table_stats(
241            &mut version_stats.table_stats,
242            version.latest_version(),
243            &truncate_tables,
244        ) {
245            self.metrics.version_stats.reset();
246            versioning.local_metrics.clear();
247        }
248
249        trigger_local_table_stat(
250            &self.metrics,
251            &mut versioning.local_metrics,
252            &version_stats,
253            &table_stats_change,
254        );
255        for (table_id, stats) in &table_stats_change {
256            if stats.total_key_size == 0
257                && stats.total_value_size == 0
258                && stats.total_key_count == 0
259            {
260                continue;
261            }
262            let stats_value = std::cmp::max(0, stats.total_key_size + stats.total_value_size);
263            let table_metrics = get_or_create_local_table_stat(
264                &self.metrics,
265                *table_id,
266                &mut versioning.local_metrics,
267            );
268            table_metrics.inc_write_throughput(stats_value as u64);
269        }
270        let mut time_travel_version = None;
271        if versioning.time_travel_snapshot_interval_counter
272            >= self.env.opts.hummock_time_travel_snapshot_interval
273        {
274            versioning.time_travel_snapshot_interval_counter = 0;
275            time_travel_version = Some(version.latest_version());
276        } else {
277            versioning.time_travel_snapshot_interval_counter = versioning
278                .time_travel_snapshot_interval_counter
279                .saturating_add(1);
280        }
281        let time_travel_tables_to_commit =
282            table_compaction_group_mapping
283                .iter()
284                .filter_map(|(table_id, cg_id)| {
285                    tables_to_commit
286                        .get(table_id)
287                        .map(|committed_epoch| (table_id, cg_id, *committed_epoch))
288                });
289        let time_travel_table_ids: HashSet<_> = self
290            .metadata_manager
291            .catalog_controller
292            .list_time_travel_table_ids()
293            .await
294            .map_err(|e| Error::Internal(e.into()))?
295            .into_iter()
296            .collect();
297        let mut txn = self.env.meta_store_ref().conn.begin().await?;
298        let version_snapshot_sst_ids = self
299            .write_time_travel_metadata(
300                &txn,
301                time_travel_version,
302                time_travel_delta,
303                time_travel_table_ids,
304                &versioning.last_time_travel_snapshot_sst_ids,
305                time_travel_tables_to_commit,
306            )
307            .await?;
308        commit_multi_var_with_provided_txn!(
309            txn,
310            version,
311            version_stats,
312            compaction_group_manager_txn
313        )?;
314        if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
315            versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
316        }
317
318        for compaction_group_id in &modified_compaction_groups {
319            trigger_sst_stat(
320                &self.metrics,
321                None,
322                &versioning.current_version,
323                *compaction_group_id,
324            );
325        }
326        trigger_epoch_stat(&self.metrics, &versioning.current_version);
327
328        drop(versioning_guard);
329
330        // Don't trigger compactions if we enable deterministic compaction
331        if !self.env.opts.compaction_deterministic_test {
332            // commit_epoch may contains SSTs from any compaction group
333            for id in &modified_compaction_groups {
334                self.try_send_compaction_request(*id, compact_task::TaskType::Dynamic);
335            }
336            if !table_stats_change.is_empty() {
337                self.collect_table_write_throughput(table_stats_change)
338                    .await;
339            }
340        }
341        if !modified_compaction_groups.is_empty() {
342            self.try_update_write_limits(&modified_compaction_groups)
343                .await;
344        }
345        #[cfg(test)]
346        {
347            self.check_state_consistency().await;
348        }
349        Ok(())
350    }
351
352    async fn collect_table_write_throughput(&self, table_stats: PbTableStatsMap) {
353        let params = self.env.system_params_reader().await;
354        let barrier_interval_ms = params.barrier_interval_ms() as u64;
355        let checkpoint_secs = std::cmp::max(
356            1,
357            params.checkpoint_frequency() * barrier_interval_ms / 1000,
358        );
359
360        let mut table_throughput_statistic_manager =
361            self.table_write_throughput_statistic_manager.write();
362        let timestamp = chrono::Utc::now().timestamp();
363
364        for (table_id, stat) in table_stats {
365            let throughput = ((stat.total_value_size + stat.total_key_size) as f64
366                / checkpoint_secs as f64) as u64;
367            table_throughput_statistic_manager
368                .add_table_throughput_with_ts(table_id, throughput, timestamp);
369        }
370    }
371
372    async fn correct_commit_ssts(
373        &self,
374        sstables: Vec<LocalSstableInfo>,
375        table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
376    ) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
377        let mut new_sst_id_number = 0;
378        let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
379        let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
380        for commit_sst in sstables {
381            let mut group_table_ids: BTreeMap<CompactionGroupId, Vec<TableId>> = BTreeMap::new();
382            for table_id in &commit_sst.sst_info.table_ids {
383                match table_compaction_group_mapping.get(table_id) {
384                    Some(cg_id_from_meta) => {
385                        group_table_ids
386                            .entry(*cg_id_from_meta)
387                            .or_default()
388                            .push(*table_id);
389                    }
390                    None => {
391                        tracing::warn!(
392                            %table_id,
393                            object_id = %commit_sst.sst_info.object_id,
394                            "table doesn't belong to any compaction group",
395                        );
396                    }
397                }
398            }
399
400            new_sst_id_number += group_table_ids.len() * 2; // `split_sst` will split the SST into two parts and consumer 2 SST IDs
401            sst_to_cg_vec.push((commit_sst, group_table_ids));
402        }
403
404        // Generate new SST IDs for each compaction group
405        // `next_sstable_id` will update the global SST ID and reserve the new SST IDs
406        // So we need to get the new SST ID first and then split the SSTs
407        let mut new_sst_id = next_sstable_id(&self.env, new_sst_id_number).await?;
408        let mut commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>> = BTreeMap::new();
409
410        for (mut sst, group_table_ids) in sst_to_cg_vec {
411            let len = group_table_ids.len();
412            for (index, (group_id, match_ids)) in group_table_ids.into_iter().enumerate() {
413                if sst.sst_info.table_ids == match_ids {
414                    // The SST contains all the tables in the group should be last key
415                    assert!(
416                        index == len - 1,
417                        "SST should be the last key in the group {} index {} len {}",
418                        group_id,
419                        index,
420                        len
421                    );
422                    commit_sstables
423                        .entry(group_id)
424                        .or_default()
425                        .push(sst.sst_info);
426                    break;
427                }
428
429                let origin_sst_size = sst.sst_info.sst_size;
430                let new_sst_size = match_ids
431                    .iter()
432                    .map(|id| {
433                        let stat = sst.table_stats.get(id).unwrap();
434                        stat.total_compressed_size
435                    })
436                    .sum();
437
438                if new_sst_size == 0 {
439                    tracing::warn!(
440                        id = %sst.sst_info.sst_id,
441                        object_id = %sst.sst_info.object_id,
442                        match_ids = ?match_ids,
443                        "Sstable doesn't contain any data for tables",
444                    );
445                }
446
447                let old_sst_size = origin_sst_size.saturating_sub(new_sst_size);
448                if old_sst_size == 0 {
449                    tracing::warn!(
450                        id = %sst.sst_info.sst_id,
451                        object_id = %sst.sst_info.object_id,
452                        match_ids = ?match_ids,
453                        origin_sst_size = origin_sst_size,
454                        new_sst_size = new_sst_size,
455                        "Sstable doesn't contain any data for tables",
456                    );
457                }
458                let (modified_sst_info, branch_sst) = split_sst_with_table_ids(
459                    &sst.sst_info,
460                    &mut new_sst_id,
461                    old_sst_size,
462                    new_sst_size,
463                    match_ids,
464                );
465                sst.sst_info = modified_sst_info;
466
467                commit_sstables
468                    .entry(group_id)
469                    .or_default()
470                    .push(branch_sst);
471            }
472        }
473
474        // order check
475        for ssts in commit_sstables.values() {
476            let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
477            assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
478        }
479
480        Ok(commit_sstables)
481    }
482}
483
484fn on_handle_add_new_table(
485    state_table_info: &HummockVersionStateTableInfo,
486    table_ids: impl IntoIterator<Item = &TableId>,
487    compaction_group_id: CompactionGroupId,
488    table_compaction_group_mapping: &mut HashMap<TableId, CompactionGroupId>,
489    new_table_ids: &mut HashMap<TableId, CompactionGroupId>,
490) -> Result<()> {
491    for table_id in table_ids {
492        if let Some(info) = state_table_info.info().get(table_id) {
493            return Err(Error::CompactionGroup(format!(
494                "table {} already exist {:?}",
495                table_id, info,
496            )));
497        }
498        table_compaction_group_mapping.insert(*table_id, compaction_group_id);
499        new_table_ids.insert(*table_id, compaction_group_id);
500    }
501
502    Ok(())
503}
504
505/// Rewrite the commit sstables to sub-levels based on the compaction group config.
506/// The type of `compaction_group_manager_txn` is too complex to be used in the function signature. So we use `HashMap` instead.
507fn rewrite_commit_sstables_to_sub_level(
508    commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
509    group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
510) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
511    let mut overlapping_sstables: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> =
512        BTreeMap::new();
513    for (group_id, inserted_table_infos) in commit_sstables {
514        let config = group_id_to_config
515            .get(&group_id)
516            .expect("compaction group should exist");
517
518        let mut accumulated_size = 0;
519        let mut ssts = vec![];
520        let sub_level_size_limit = config
521            .max_overlapping_level_size
522            .unwrap_or(compaction_config::max_overlapping_level_size());
523
524        let level = overlapping_sstables.entry(group_id).or_default();
525
526        for sst in inserted_table_infos {
527            accumulated_size += sst.sst_size;
528            ssts.push(sst);
529            if accumulated_size > sub_level_size_limit {
530                level.push(ssts);
531
532                // reset the accumulated size and ssts
533                accumulated_size = 0;
534                ssts = vec![];
535            }
536        }
537
538        if !ssts.is_empty() {
539            level.push(ssts);
540        }
541
542        // The uploader organizes the ssts in decreasing epoch order, so the level needs to be reversed to ensure that the latest epoch is at the top.
543        level.reverse();
544    }
545
546    overlapping_sstables
547}
548
549fn is_ordered_subset<T: PartialEq>(vec_1: &Vec<T>, vec_2: &Vec<T>) -> bool {
550    let mut vec_2_iter = vec_2.iter().peekable();
551    for item in vec_1 {
552        if vec_2_iter.peek() == Some(&item) {
553            vec_2_iter.next();
554        }
555    }
556
557    vec_2_iter.peek().is_none()
558}