1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::bail;
20use risingwave_common::catalog::TableId;
21use risingwave_common::config::meta::default::compaction_config;
22use risingwave_common::system_param::reader::SystemParamsRead;
23use risingwave_hummock_sdk::change_log::ChangeLogDelta;
24use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
25use risingwave_hummock_sdk::sstable_info::SstableInfo;
26use risingwave_hummock_sdk::table_stats::{
27 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats, to_prost_table_stats_map,
28};
29use risingwave_hummock_sdk::table_watermark::TableWatermarks;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
32use risingwave_hummock_sdk::{
33 CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo,
34};
35use risingwave_pb::hummock::{CompactionConfig, compact_task};
36use sea_orm::TransactionTrait;
37
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
40use crate::hummock::manager::transaction::{
41 HummockVersionStatsTransaction, HummockVersionTransaction,
42};
43use crate::hummock::manager::versioning::Versioning;
44use crate::hummock::metrics_utils::{
45 get_or_create_local_table_stat, trigger_epoch_stat, trigger_local_table_stat, trigger_sst_stat,
46};
47use crate::hummock::model::CompactionGroup;
48use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
49use crate::hummock::time_travel::should_mark_next_time_travel_version_snapshot;
50use crate::hummock::{
51 HummockManager, commit_multi_var_with_provided_txn, start_measure_real_process_timer,
52};
53
54pub struct NewTableFragmentInfo {
55 pub table_ids: HashSet<TableId>,
56}
57
58#[derive(Default)]
59pub struct CommitEpochInfo {
60 pub sstables: Vec<LocalSstableInfo>,
61 pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
62 pub sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
63 pub new_table_fragment_infos: Vec<NewTableFragmentInfo>,
64 pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
65 pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
66 pub tables_to_commit: HashMap<TableId, u64>,
68
69 pub truncate_tables: HashSet<TableId>,
70}
71
72impl HummockManager {
73 pub async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> Result<()> {
76 let CommitEpochInfo {
77 mut sstables,
78 new_table_watermarks,
79 sst_to_context,
80 new_table_fragment_infos,
81 change_log_delta,
82 vector_index_delta,
83 tables_to_commit,
84 truncate_tables,
85 } = commit_info;
86 let mut versioning_guard = self.versioning.write().await;
87 let _timer = start_measure_real_process_timer!(self, "commit_epoch");
88 if versioning_guard.disable_commit_epochs {
90 return Ok(());
91 }
92
93 assert!(!tables_to_commit.is_empty());
94
95 let versioning: &mut Versioning = &mut versioning_guard;
96 self.commit_epoch_sanity_check(
97 &tables_to_commit,
98 &sstables,
99 &sst_to_context,
100 &versioning.current_version,
101 )
102 .await?;
103
104 let mut table_stats_change = PbTableStatsMap::default();
106 for s in &mut sstables {
107 add_prost_table_stats_map(
108 &mut table_stats_change,
109 &to_prost_table_stats_map(s.table_stats.clone()),
110 );
111 }
112
113 let table_change_log_object_ids_before_commit = versioning
114 .table_change_log
115 .values()
116 .flat_map(|l| l.get_object_ids())
117 .collect::<HashSet<_>>();
118
119 let mut version = HummockVersionTransaction::new(
120 &mut versioning.current_version,
121 &mut versioning.hummock_version_deltas,
122 &mut versioning.table_change_log,
123 self.env.notification_manager(),
124 Some(&self.table_committed_epoch_notifiers),
125 &self.metrics,
126 &self.env.opts,
127 );
128
129 let state_table_info = &version.latest_version().state_table_info;
130 let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
131 let mut new_table_ids = HashMap::new();
132 let mut new_compaction_groups = Vec::new();
133 let mut compaction_group_manager_txn = None;
134 let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;
135
136 for NewTableFragmentInfo { table_ids } in new_table_fragment_infos {
138 let (compaction_group_manager, compaction_group_config) =
139 if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
140 (
141 compaction_group_manager,
142 (*compaction_group_config
143 .as_ref()
144 .expect("must be set with compaction_group_manager_txn"))
145 .clone(),
146 )
147 } else {
148 let compaction_group_manager_guard =
149 self.compaction_group_manager.write().await;
150 let new_compaction_group_config =
151 compaction_group_manager_guard.default_compaction_config();
152 compaction_group_config = Some(new_compaction_group_config.clone());
153 (
154 compaction_group_manager_txn.insert(
155 CompactionGroupManager::start_owned_compaction_groups_txn(
156 compaction_group_manager_guard,
157 ),
158 ),
159 new_compaction_group_config,
160 )
161 };
162 let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
163 let new_compaction_group = CompactionGroup {
164 group_id: new_compaction_group_id,
165 compaction_config: compaction_group_config.clone(),
166 };
167
168 new_compaction_groups.push(new_compaction_group.clone());
169 compaction_group_manager.insert(new_compaction_group_id, new_compaction_group);
170
171 on_handle_add_new_table(
172 state_table_info,
173 &table_ids,
174 new_compaction_group_id,
175 &mut table_compaction_group_mapping,
176 &mut new_table_ids,
177 )?;
178 }
179
180 let commit_sstables = self
181 .correct_commit_ssts(sstables, &table_compaction_group_mapping)
182 .await?;
183
184 let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
185 let mut group_id_to_config = HashMap::new();
187 if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
188 for cg_id in &modified_compaction_groups {
189 let compaction_group = compaction_group_manager
190 .get(cg_id)
191 .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
192 .compaction_config();
193 group_id_to_config.insert(*cg_id, compaction_group);
194 }
195 } else {
196 let compaction_group_manager = self.compaction_group_manager.read().await;
197 for cg_id in &modified_compaction_groups {
198 let compaction_group = compaction_group_manager
199 .try_get_compaction_group_config(*cg_id)
200 .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
201 .compaction_config();
202 group_id_to_config.insert(*cg_id, compaction_group);
203 }
204 }
205
206 let group_id_to_sub_levels =
207 rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);
208
209 let mut group_id_to_truncate_tables: HashMap<CompactionGroupId, HashSet<TableId>> =
211 HashMap::new();
212 for table_id in &truncate_tables {
213 if let Some(compaction_group_id) = table_compaction_group_mapping.get(table_id) {
214 group_id_to_truncate_tables
215 .entry(*compaction_group_id)
216 .or_default()
217 .insert(*table_id);
218 } else {
219 bail!(
220 "table {} doesn't belong to any compaction group, skip truncating",
221 table_id
222 );
223 }
224 }
225
226 let time_travel_delta = version.pre_commit_epoch(
227 &tables_to_commit,
228 new_compaction_groups,
229 group_id_to_sub_levels,
230 &new_table_ids,
231 new_table_watermarks,
232 change_log_delta,
233 vector_index_delta,
234 group_id_to_truncate_tables,
235 );
236
237 if should_mark_next_time_travel_version_snapshot(&time_travel_delta) {
238 versioning.time_travel_snapshot_interval_counter = u64::MAX;
240 }
241
242 let mut version_stats = HummockVersionStatsTransaction::new(
244 &mut versioning.version_stats,
245 self.env.notification_manager(),
246 );
247 add_prost_table_stats_map(&mut version_stats.table_stats, &table_stats_change);
248 if purge_prost_table_stats(
249 &mut version_stats.table_stats,
250 version.latest_version(),
251 &truncate_tables,
252 ) {
253 self.metrics.version_stats.reset();
254 versioning.local_metrics.clear();
255 }
256
257 trigger_local_table_stat(
258 &self.metrics,
259 &mut versioning.local_metrics,
260 &version_stats,
261 &table_stats_change,
262 );
263 for (table_id, stats) in &table_stats_change {
264 if stats.total_key_size == 0
265 && stats.total_value_size == 0
266 && stats.total_key_count == 0
267 {
268 continue;
269 }
270 let stats_value = std::cmp::max(0, stats.total_key_size + stats.total_value_size);
271 let table_metrics = get_or_create_local_table_stat(
272 &self.metrics,
273 *table_id,
274 &mut versioning.local_metrics,
275 );
276 table_metrics.inc_write_throughput(stats_value as u64);
277 }
278 let mut time_travel_version = None;
279 if versioning.time_travel_snapshot_interval_counter
280 >= self.env.opts.hummock_time_travel_snapshot_interval
281 {
282 versioning.time_travel_snapshot_interval_counter = 0;
283 time_travel_version = Some(version.latest_version());
284 } else {
285 versioning.time_travel_snapshot_interval_counter = versioning
286 .time_travel_snapshot_interval_counter
287 .saturating_add(1);
288 }
289 let time_travel_tables_to_commit =
290 table_compaction_group_mapping
291 .iter()
292 .filter_map(|(table_id, cg_id)| {
293 tables_to_commit
294 .get(table_id)
295 .map(|committed_epoch| (table_id, cg_id, *committed_epoch))
296 });
297 let time_travel_table_ids: HashSet<_> = self
298 .metadata_manager
299 .catalog_controller
300 .list_time_travel_table_ids()
301 .await
302 .map_err(|e| Error::Internal(e.into()))?
303 .into_iter()
304 .collect();
305 let mut txn = self.env.meta_store_ref().conn.begin().await?;
306 let version_snapshot_sst_ids = self
307 .write_time_travel_metadata(
308 &txn,
309 time_travel_version,
310 time_travel_delta,
311 time_travel_table_ids,
312 &versioning.last_time_travel_snapshot_sst_ids,
313 time_travel_tables_to_commit,
314 )
315 .await?;
316 commit_multi_var_with_provided_txn!(
317 txn,
318 version,
319 version_stats,
320 compaction_group_manager_txn
321 )?;
322 if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
323 versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
324 }
325
326 for compaction_group_id in &modified_compaction_groups {
327 trigger_sst_stat(
328 &self.metrics,
329 None,
330 &versioning.current_version,
331 *compaction_group_id,
332 );
333 }
334 trigger_epoch_stat(&self.metrics, &versioning.current_version);
335 let table_change_log_object_ids_after_commit = versioning
336 .table_change_log
337 .values()
338 .flat_map(|l| l.get_object_ids())
339 .collect::<HashSet<_>>();
340 drop(versioning_guard);
341 let may_delete_object_ids =
342 &table_change_log_object_ids_before_commit - &table_change_log_object_ids_after_commit;
343 self.gc_manager
344 .add_may_delete_object_ids(may_delete_object_ids.into_iter());
345
346 if !self.env.opts.compaction_deterministic_test {
348 for id in &modified_compaction_groups {
350 self.try_send_compaction_request(*id, compact_task::TaskType::Dynamic);
351 }
352 if !table_stats_change.is_empty() {
353 self.collect_table_write_throughput(table_stats_change)
354 .await;
355 }
356 }
357 if !modified_compaction_groups.is_empty() {
358 self.try_update_write_limits(&modified_compaction_groups)
359 .await;
360 }
361 #[cfg(test)]
362 {
363 self.check_state_consistency().await;
364 }
365 Ok(())
366 }
367
368 async fn collect_table_write_throughput(&self, table_stats: PbTableStatsMap) {
369 let params = self.env.system_params_reader().await;
370 let barrier_interval_ms = params.barrier_interval_ms() as u64;
371 let checkpoint_secs = std::cmp::max(
372 1,
373 params.checkpoint_frequency() * barrier_interval_ms / 1000,
374 );
375
376 let mut table_throughput_statistic_manager =
377 self.table_write_throughput_statistic_manager.write();
378 let timestamp = chrono::Utc::now().timestamp();
379
380 for (table_id, stat) in table_stats {
381 let throughput = ((stat.total_value_size + stat.total_key_size) as f64
382 / checkpoint_secs as f64) as u64;
383 table_throughput_statistic_manager
384 .add_table_throughput_with_ts(table_id, throughput, timestamp);
385 }
386 }
387
388 async fn correct_commit_ssts(
389 &self,
390 sstables: Vec<LocalSstableInfo>,
391 table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
392 ) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
393 let mut new_sst_id_number = 0;
394 let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
395 let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
396 for commit_sst in sstables {
397 let mut group_table_ids: BTreeMap<CompactionGroupId, Vec<TableId>> = BTreeMap::new();
398 for table_id in &commit_sst.sst_info.table_ids {
399 match table_compaction_group_mapping.get(table_id) {
400 Some(cg_id_from_meta) => {
401 group_table_ids
402 .entry(*cg_id_from_meta)
403 .or_default()
404 .push(*table_id);
405 }
406 None => {
407 tracing::warn!(
408 %table_id,
409 object_id = %commit_sst.sst_info.object_id,
410 "table doesn't belong to any compaction group",
411 );
412 }
413 }
414 }
415
416 new_sst_id_number += group_table_ids.len() * 2; sst_to_cg_vec.push((commit_sst, group_table_ids));
418 }
419
420 let mut new_sst_id = next_sstable_id(&self.env, new_sst_id_number).await?;
424 let mut commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>> = BTreeMap::new();
425
426 for (mut sst, group_table_ids) in sst_to_cg_vec {
427 let len = group_table_ids.len();
428 for (index, (group_id, match_ids)) in group_table_ids.into_iter().enumerate() {
429 if sst.sst_info.table_ids == match_ids {
430 assert!(
432 index == len - 1,
433 "SST should be the last key in the group {} index {} len {}",
434 group_id,
435 index,
436 len
437 );
438 commit_sstables
439 .entry(group_id)
440 .or_default()
441 .push(sst.sst_info);
442 break;
443 }
444
445 let origin_sst_size = sst.sst_info.sst_size;
446 let new_sst_size = match_ids
447 .iter()
448 .map(|id| {
449 let stat = sst.table_stats.get(id).unwrap();
450 stat.total_compressed_size
451 })
452 .sum();
453
454 if new_sst_size == 0 {
455 tracing::warn!(
456 id = %sst.sst_info.sst_id,
457 object_id = %sst.sst_info.object_id,
458 match_ids = ?match_ids,
459 "Sstable doesn't contain any data for tables",
460 );
461 }
462
463 let old_sst_size = origin_sst_size.saturating_sub(new_sst_size);
464 if old_sst_size == 0 {
465 tracing::warn!(
466 id = %sst.sst_info.sst_id,
467 object_id = %sst.sst_info.object_id,
468 match_ids = ?match_ids,
469 origin_sst_size = origin_sst_size,
470 new_sst_size = new_sst_size,
471 "Sstable doesn't contain any data for tables",
472 );
473 }
474 let (modified_sst_info, branch_sst) = split_sst_with_table_ids(
475 &sst.sst_info,
476 &mut new_sst_id,
477 old_sst_size,
478 new_sst_size,
479 match_ids,
480 );
481 sst.sst_info = modified_sst_info;
482
483 commit_sstables
484 .entry(group_id)
485 .or_default()
486 .push(branch_sst);
487 }
488 }
489
490 for ssts in commit_sstables.values() {
492 let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
493 assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
494 }
495
496 Ok(commit_sstables)
497 }
498}
499
500fn on_handle_add_new_table(
501 state_table_info: &HummockVersionStateTableInfo,
502 table_ids: impl IntoIterator<Item = &TableId>,
503 compaction_group_id: CompactionGroupId,
504 table_compaction_group_mapping: &mut HashMap<TableId, CompactionGroupId>,
505 new_table_ids: &mut HashMap<TableId, CompactionGroupId>,
506) -> Result<()> {
507 for table_id in table_ids {
508 if let Some(info) = state_table_info.info().get(table_id) {
509 return Err(Error::CompactionGroup(format!(
510 "table {} already exist {:?}",
511 table_id, info,
512 )));
513 }
514 table_compaction_group_mapping.insert(*table_id, compaction_group_id);
515 new_table_ids.insert(*table_id, compaction_group_id);
516 }
517
518 Ok(())
519}
520
521fn rewrite_commit_sstables_to_sub_level(
524 commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
525 group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
526) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
527 let mut overlapping_sstables: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> =
528 BTreeMap::new();
529 for (group_id, inserted_table_infos) in commit_sstables {
530 let config = group_id_to_config
531 .get(&group_id)
532 .expect("compaction group should exist");
533
534 let mut accumulated_size = 0;
535 let mut ssts = vec![];
536 let sub_level_size_limit = config
537 .max_overlapping_level_size
538 .unwrap_or(compaction_config::max_overlapping_level_size());
539
540 let level = overlapping_sstables.entry(group_id).or_default();
541
542 for sst in inserted_table_infos {
543 accumulated_size += sst.sst_size;
544 ssts.push(sst);
545 if accumulated_size > sub_level_size_limit {
546 level.push(ssts);
547
548 accumulated_size = 0;
550 ssts = vec![];
551 }
552 }
553
554 if !ssts.is_empty() {
555 level.push(ssts);
556 }
557
558 level.reverse();
560 }
561
562 overlapping_sstables
563}
564
565fn is_ordered_subset<T: PartialEq>(vec_1: &Vec<T>, vec_2: &Vec<T>) -> bool {
566 let mut vec_2_iter = vec_2.iter().peekable();
567 for item in vec_1 {
568 if vec_2_iter.peek() == Some(&item) {
569 vec_2_iter.next();
570 }
571 }
572
573 vec_2_iter.peek().is_none()
574}