1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::bail;
20use risingwave_common::catalog::TableId;
21use risingwave_common::config::meta::default::compaction_config;
22use risingwave_common::system_param::reader::SystemParamsRead;
23use risingwave_hummock_sdk::change_log::ChangeLogDelta;
24use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
25use risingwave_hummock_sdk::sstable_info::SstableInfo;
26use risingwave_hummock_sdk::table_stats::{
27 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats, to_prost_table_stats_map,
28};
29use risingwave_hummock_sdk::table_watermark::TableWatermarks;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
32use risingwave_hummock_sdk::{
33 CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo,
34};
35use risingwave_pb::hummock::{CompactionConfig, compact_task};
36use sea_orm::TransactionTrait;
37
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
40use crate::hummock::manager::transaction::{
41 HummockVersionStatsTransaction, HummockVersionTransaction,
42};
43use crate::hummock::manager::versioning::Versioning;
44use crate::hummock::metrics_utils::{
45 get_or_create_local_table_stat, trigger_epoch_stat, trigger_local_table_stat, trigger_sst_stat,
46};
47use crate::hummock::model::CompactionGroup;
48use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
49use crate::hummock::time_travel::should_mark_next_time_travel_version_snapshot;
50use crate::hummock::{
51 HummockManager, commit_multi_var_with_provided_txn, start_measure_real_process_timer,
52};
53
54pub struct NewTableFragmentInfo {
55 pub table_ids: HashSet<TableId>,
56}
57
58#[derive(Default)]
59pub struct CommitEpochInfo {
60 pub sstables: Vec<LocalSstableInfo>,
61 pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
62 pub sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
63 pub new_table_fragment_infos: Vec<NewTableFragmentInfo>,
64 pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
65 pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
66 pub tables_to_commit: HashMap<TableId, u64>,
68
69 pub truncate_tables: HashSet<TableId>,
70}
71
72impl HummockManager {
73 pub async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> Result<()> {
76 let CommitEpochInfo {
77 mut sstables,
78 new_table_watermarks,
79 sst_to_context,
80 new_table_fragment_infos,
81 change_log_delta,
82 vector_index_delta,
83 tables_to_commit,
84 truncate_tables,
85 } = commit_info;
86 let mut versioning_guard = self.versioning.write().await;
87 let _timer = start_measure_real_process_timer!(self, "commit_epoch");
88 if versioning_guard.disable_commit_epochs {
90 return Ok(());
91 }
92
93 assert!(!tables_to_commit.is_empty());
94
95 let versioning: &mut Versioning = &mut versioning_guard;
96 self.commit_epoch_sanity_check(
97 &tables_to_commit,
98 &sstables,
99 &sst_to_context,
100 &versioning.current_version,
101 )
102 .await?;
103
104 let mut table_stats_change = PbTableStatsMap::default();
106 for s in &mut sstables {
107 add_prost_table_stats_map(
108 &mut table_stats_change,
109 &to_prost_table_stats_map(s.table_stats.clone()),
110 );
111 }
112
113 let mut version = HummockVersionTransaction::new(
114 &mut versioning.current_version,
115 &mut versioning.hummock_version_deltas,
116 self.env.notification_manager(),
117 Some(&self.table_committed_epoch_notifiers),
118 &self.metrics,
119 );
120
121 let state_table_info = &version.latest_version().state_table_info;
122 let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
123 let mut new_table_ids = HashMap::new();
124 let mut new_compaction_groups = Vec::new();
125 let mut compaction_group_manager_txn = None;
126 let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;
127
128 for NewTableFragmentInfo { table_ids } in new_table_fragment_infos {
130 let (compaction_group_manager, compaction_group_config) =
131 if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
132 (
133 compaction_group_manager,
134 (*compaction_group_config
135 .as_ref()
136 .expect("must be set with compaction_group_manager_txn"))
137 .clone(),
138 )
139 } else {
140 let compaction_group_manager_guard =
141 self.compaction_group_manager.write().await;
142 let new_compaction_group_config =
143 compaction_group_manager_guard.default_compaction_config();
144 compaction_group_config = Some(new_compaction_group_config.clone());
145 (
146 compaction_group_manager_txn.insert(
147 CompactionGroupManager::start_owned_compaction_groups_txn(
148 compaction_group_manager_guard,
149 ),
150 ),
151 new_compaction_group_config,
152 )
153 };
154 let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
155 let new_compaction_group = CompactionGroup {
156 group_id: new_compaction_group_id,
157 compaction_config: compaction_group_config.clone(),
158 };
159
160 new_compaction_groups.push(new_compaction_group.clone());
161 compaction_group_manager.insert(new_compaction_group_id, new_compaction_group);
162
163 on_handle_add_new_table(
164 state_table_info,
165 &table_ids,
166 new_compaction_group_id,
167 &mut table_compaction_group_mapping,
168 &mut new_table_ids,
169 )?;
170 }
171
172 let commit_sstables = self
173 .correct_commit_ssts(sstables, &table_compaction_group_mapping)
174 .await?;
175
176 let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
177 let mut group_id_to_config = HashMap::new();
179 if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
180 for cg_id in &modified_compaction_groups {
181 let compaction_group = compaction_group_manager
182 .get(cg_id)
183 .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
184 .compaction_config();
185 group_id_to_config.insert(*cg_id, compaction_group);
186 }
187 } else {
188 let compaction_group_manager = self.compaction_group_manager.read().await;
189 for cg_id in &modified_compaction_groups {
190 let compaction_group = compaction_group_manager
191 .try_get_compaction_group_config(*cg_id)
192 .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
193 .compaction_config();
194 group_id_to_config.insert(*cg_id, compaction_group);
195 }
196 }
197
198 let group_id_to_sub_levels =
199 rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);
200
201 let mut group_id_to_truncate_tables: HashMap<CompactionGroupId, HashSet<TableId>> =
203 HashMap::new();
204 for table_id in &truncate_tables {
205 if let Some(compaction_group_id) = table_compaction_group_mapping.get(table_id) {
206 group_id_to_truncate_tables
207 .entry(*compaction_group_id)
208 .or_default()
209 .insert(*table_id);
210 } else {
211 bail!(
212 "table {} doesn't belong to any compaction group, skip truncating",
213 table_id
214 );
215 }
216 }
217
218 let time_travel_delta = version.pre_commit_epoch(
219 &tables_to_commit,
220 new_compaction_groups,
221 group_id_to_sub_levels,
222 &new_table_ids,
223 new_table_watermarks,
224 change_log_delta,
225 vector_index_delta,
226 group_id_to_truncate_tables,
227 );
228
229 if should_mark_next_time_travel_version_snapshot(&time_travel_delta) {
230 versioning.time_travel_snapshot_interval_counter = u64::MAX;
232 }
233
234 let mut version_stats = HummockVersionStatsTransaction::new(
236 &mut versioning.version_stats,
237 self.env.notification_manager(),
238 );
239 add_prost_table_stats_map(&mut version_stats.table_stats, &table_stats_change);
240 if purge_prost_table_stats(
241 &mut version_stats.table_stats,
242 version.latest_version(),
243 &truncate_tables,
244 ) {
245 self.metrics.version_stats.reset();
246 versioning.local_metrics.clear();
247 }
248
249 trigger_local_table_stat(
250 &self.metrics,
251 &mut versioning.local_metrics,
252 &version_stats,
253 &table_stats_change,
254 );
255 for (table_id, stats) in &table_stats_change {
256 if stats.total_key_size == 0
257 && stats.total_value_size == 0
258 && stats.total_key_count == 0
259 {
260 continue;
261 }
262 let stats_value = std::cmp::max(0, stats.total_key_size + stats.total_value_size);
263 let table_metrics = get_or_create_local_table_stat(
264 &self.metrics,
265 *table_id,
266 &mut versioning.local_metrics,
267 );
268 table_metrics.inc_write_throughput(stats_value as u64);
269 }
270 let mut time_travel_version = None;
271 if versioning.time_travel_snapshot_interval_counter
272 >= self.env.opts.hummock_time_travel_snapshot_interval
273 {
274 versioning.time_travel_snapshot_interval_counter = 0;
275 time_travel_version = Some(version.latest_version());
276 } else {
277 versioning.time_travel_snapshot_interval_counter = versioning
278 .time_travel_snapshot_interval_counter
279 .saturating_add(1);
280 }
281 let time_travel_tables_to_commit =
282 table_compaction_group_mapping
283 .iter()
284 .filter_map(|(table_id, cg_id)| {
285 tables_to_commit
286 .get(table_id)
287 .map(|committed_epoch| (table_id, cg_id, *committed_epoch))
288 });
289 let time_travel_table_ids: HashSet<_> = self
290 .metadata_manager
291 .catalog_controller
292 .list_time_travel_table_ids()
293 .await
294 .map_err(|e| Error::Internal(e.into()))?
295 .into_iter()
296 .collect();
297 let mut txn = self.env.meta_store_ref().conn.begin().await?;
298 let version_snapshot_sst_ids = self
299 .write_time_travel_metadata(
300 &txn,
301 time_travel_version,
302 time_travel_delta,
303 time_travel_table_ids,
304 &versioning.last_time_travel_snapshot_sst_ids,
305 time_travel_tables_to_commit,
306 )
307 .await?;
308 commit_multi_var_with_provided_txn!(
309 txn,
310 version,
311 version_stats,
312 compaction_group_manager_txn
313 )?;
314 if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
315 versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
316 }
317
318 for compaction_group_id in &modified_compaction_groups {
319 trigger_sst_stat(
320 &self.metrics,
321 None,
322 &versioning.current_version,
323 *compaction_group_id,
324 );
325 }
326 trigger_epoch_stat(&self.metrics, &versioning.current_version);
327
328 drop(versioning_guard);
329
330 if !self.env.opts.compaction_deterministic_test {
332 for id in &modified_compaction_groups {
334 self.try_send_compaction_request(*id, compact_task::TaskType::Dynamic);
335 }
336 if !table_stats_change.is_empty() {
337 self.collect_table_write_throughput(table_stats_change)
338 .await;
339 }
340 }
341 if !modified_compaction_groups.is_empty() {
342 self.try_update_write_limits(&modified_compaction_groups)
343 .await;
344 }
345 #[cfg(test)]
346 {
347 self.check_state_consistency().await;
348 }
349 Ok(())
350 }
351
352 async fn collect_table_write_throughput(&self, table_stats: PbTableStatsMap) {
353 let params = self.env.system_params_reader().await;
354 let barrier_interval_ms = params.barrier_interval_ms() as u64;
355 let checkpoint_secs = std::cmp::max(
356 1,
357 params.checkpoint_frequency() * barrier_interval_ms / 1000,
358 );
359
360 let mut table_throughput_statistic_manager =
361 self.table_write_throughput_statistic_manager.write();
362 let timestamp = chrono::Utc::now().timestamp();
363
364 for (table_id, stat) in table_stats {
365 let throughput = ((stat.total_value_size + stat.total_key_size) as f64
366 / checkpoint_secs as f64) as u64;
367 table_throughput_statistic_manager
368 .add_table_throughput_with_ts(table_id, throughput, timestamp);
369 }
370 }
371
372 async fn correct_commit_ssts(
373 &self,
374 sstables: Vec<LocalSstableInfo>,
375 table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
376 ) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
377 let mut new_sst_id_number = 0;
378 let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
379 let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
380 for commit_sst in sstables {
381 let mut group_table_ids: BTreeMap<CompactionGroupId, Vec<TableId>> = BTreeMap::new();
382 for table_id in &commit_sst.sst_info.table_ids {
383 match table_compaction_group_mapping.get(table_id) {
384 Some(cg_id_from_meta) => {
385 group_table_ids
386 .entry(*cg_id_from_meta)
387 .or_default()
388 .push(*table_id);
389 }
390 None => {
391 tracing::warn!(
392 %table_id,
393 object_id = %commit_sst.sst_info.object_id,
394 "table doesn't belong to any compaction group",
395 );
396 }
397 }
398 }
399
400 new_sst_id_number += group_table_ids.len() * 2; sst_to_cg_vec.push((commit_sst, group_table_ids));
402 }
403
404 let mut new_sst_id = next_sstable_id(&self.env, new_sst_id_number).await?;
408 let mut commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>> = BTreeMap::new();
409
410 for (mut sst, group_table_ids) in sst_to_cg_vec {
411 let len = group_table_ids.len();
412 for (index, (group_id, match_ids)) in group_table_ids.into_iter().enumerate() {
413 if sst.sst_info.table_ids == match_ids {
414 assert!(
416 index == len - 1,
417 "SST should be the last key in the group {} index {} len {}",
418 group_id,
419 index,
420 len
421 );
422 commit_sstables
423 .entry(group_id)
424 .or_default()
425 .push(sst.sst_info);
426 break;
427 }
428
429 let origin_sst_size = sst.sst_info.sst_size;
430 let new_sst_size = match_ids
431 .iter()
432 .map(|id| {
433 let stat = sst.table_stats.get(id).unwrap();
434 stat.total_compressed_size
435 })
436 .sum();
437
438 if new_sst_size == 0 {
439 tracing::warn!(
440 id = %sst.sst_info.sst_id,
441 object_id = %sst.sst_info.object_id,
442 match_ids = ?match_ids,
443 "Sstable doesn't contain any data for tables",
444 );
445 }
446
447 let old_sst_size = origin_sst_size.saturating_sub(new_sst_size);
448 if old_sst_size == 0 {
449 tracing::warn!(
450 id = %sst.sst_info.sst_id,
451 object_id = %sst.sst_info.object_id,
452 match_ids = ?match_ids,
453 origin_sst_size = origin_sst_size,
454 new_sst_size = new_sst_size,
455 "Sstable doesn't contain any data for tables",
456 );
457 }
458 let (modified_sst_info, branch_sst) = split_sst_with_table_ids(
459 &sst.sst_info,
460 &mut new_sst_id,
461 old_sst_size,
462 new_sst_size,
463 match_ids,
464 );
465 sst.sst_info = modified_sst_info;
466
467 commit_sstables
468 .entry(group_id)
469 .or_default()
470 .push(branch_sst);
471 }
472 }
473
474 for ssts in commit_sstables.values() {
476 let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
477 assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
478 }
479
480 Ok(commit_sstables)
481 }
482}
483
484fn on_handle_add_new_table(
485 state_table_info: &HummockVersionStateTableInfo,
486 table_ids: impl IntoIterator<Item = &TableId>,
487 compaction_group_id: CompactionGroupId,
488 table_compaction_group_mapping: &mut HashMap<TableId, CompactionGroupId>,
489 new_table_ids: &mut HashMap<TableId, CompactionGroupId>,
490) -> Result<()> {
491 for table_id in table_ids {
492 if let Some(info) = state_table_info.info().get(table_id) {
493 return Err(Error::CompactionGroup(format!(
494 "table {} already exist {:?}",
495 table_id, info,
496 )));
497 }
498 table_compaction_group_mapping.insert(*table_id, compaction_group_id);
499 new_table_ids.insert(*table_id, compaction_group_id);
500 }
501
502 Ok(())
503}
504
505fn rewrite_commit_sstables_to_sub_level(
508 commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
509 group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
510) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
511 let mut overlapping_sstables: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> =
512 BTreeMap::new();
513 for (group_id, inserted_table_infos) in commit_sstables {
514 let config = group_id_to_config
515 .get(&group_id)
516 .expect("compaction group should exist");
517
518 let mut accumulated_size = 0;
519 let mut ssts = vec![];
520 let sub_level_size_limit = config
521 .max_overlapping_level_size
522 .unwrap_or(compaction_config::max_overlapping_level_size());
523
524 let level = overlapping_sstables.entry(group_id).or_default();
525
526 for sst in inserted_table_infos {
527 accumulated_size += sst.sst_size;
528 ssts.push(sst);
529 if accumulated_size > sub_level_size_limit {
530 level.push(ssts);
531
532 accumulated_size = 0;
534 ssts = vec![];
535 }
536 }
537
538 if !ssts.is_empty() {
539 level.push(ssts);
540 }
541
542 level.reverse();
544 }
545
546 overlapping_sstables
547}
548
549fn is_ordered_subset<T: PartialEq>(vec_1: &Vec<T>, vec_2: &Vec<T>) -> bool {
550 let mut vec_2_iter = vec_2.iter().peekable();
551 for item in vec_1 {
552 if vec_2_iter.peek() == Some(&item) {
553 vec_2_iter.next();
554 }
555 }
556
557 vec_2_iter.peek().is_none()
558}