1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::bail;
20use risingwave_common::catalog::TableId;
21use risingwave_common::config::meta::default::compaction_config;
22use risingwave_common::system_param::reader::SystemParamsRead;
23use risingwave_hummock_sdk::change_log::ChangeLogDelta;
24use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
25use risingwave_hummock_sdk::sstable_info::SstableInfo;
26use risingwave_hummock_sdk::table_stats::{
27 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats, to_prost_table_stats_map,
28};
29use risingwave_hummock_sdk::table_watermark::TableWatermarks;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
32use risingwave_hummock_sdk::{
33 CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo,
34};
35use risingwave_pb::hummock::{CompactionConfig, compact_task};
36use sea_orm::TransactionTrait;
37
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
40use crate::hummock::manager::transaction::{
41 HummockVersionStatsTransaction, HummockVersionTransaction,
42};
43use crate::hummock::manager::versioning::Versioning;
44use crate::hummock::metrics_utils::{
45 get_or_create_local_table_stat, trigger_epoch_stat, trigger_local_table_stat, trigger_sst_stat,
46};
47use crate::hummock::model::CompactionGroup;
48use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
49use crate::hummock::time_travel::should_mark_next_time_travel_version_snapshot;
50use crate::hummock::{
51 HummockManager, commit_multi_var_with_provided_txn, start_measure_real_process_timer,
52};
53
54pub struct NewTableFragmentInfo {
55 pub table_ids: HashSet<TableId>,
56}
57
58#[derive(Default)]
59pub struct CommitEpochInfo {
60 pub sstables: Vec<LocalSstableInfo>,
61 pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
62 pub sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
63 pub new_table_fragment_infos: Vec<NewTableFragmentInfo>,
64 pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
65 pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
66 pub tables_to_commit: HashMap<TableId, u64>,
68
69 pub truncate_tables: HashSet<TableId>,
70}
71
72impl HummockManager {
73 pub async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> Result<()> {
76 let CommitEpochInfo {
77 mut sstables,
78 new_table_watermarks,
79 sst_to_context,
80 new_table_fragment_infos,
81 change_log_delta,
82 vector_index_delta,
83 tables_to_commit,
84 truncate_tables,
85 } = commit_info;
86 let mut versioning_guard = self.versioning.write().await;
87 let _timer = start_measure_real_process_timer!(self, "commit_epoch");
88 if versioning_guard.disable_commit_epochs {
90 return Ok(());
91 }
92
93 assert!(!tables_to_commit.is_empty());
94
95 let versioning: &mut Versioning = &mut versioning_guard;
96 self.commit_epoch_sanity_check(
97 &tables_to_commit,
98 &sstables,
99 &sst_to_context,
100 &versioning.current_version,
101 )
102 .await?;
103
104 let mut table_stats_change = PbTableStatsMap::default();
106 for s in &mut sstables {
107 add_prost_table_stats_map(
108 &mut table_stats_change,
109 &to_prost_table_stats_map(s.table_stats.clone()),
110 );
111 }
112
113 let mut version = HummockVersionTransaction::new(
114 &mut versioning.current_version,
115 &mut versioning.hummock_version_deltas,
116 self.env.notification_manager(),
117 Some(&self.table_committed_epoch_notifiers),
118 &self.metrics,
119 );
120
121 let state_table_info = &version.latest_version().state_table_info;
122 let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
123 let mut new_table_ids = HashMap::new();
124 let mut new_compaction_groups = Vec::new();
125 let mut compaction_group_manager_txn = None;
126 let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;
127
128 for NewTableFragmentInfo { table_ids } in new_table_fragment_infos {
130 let (compaction_group_manager, compaction_group_config) =
131 if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
132 (
133 compaction_group_manager,
134 (*compaction_group_config
135 .as_ref()
136 .expect("must be set with compaction_group_manager_txn"))
137 .clone(),
138 )
139 } else {
140 let compaction_group_manager_guard =
141 self.compaction_group_manager.write().await;
142 let new_compaction_group_config =
143 compaction_group_manager_guard.default_compaction_config();
144 compaction_group_config = Some(new_compaction_group_config.clone());
145 (
146 compaction_group_manager_txn.insert(
147 CompactionGroupManager::start_owned_compaction_groups_txn(
148 compaction_group_manager_guard,
149 ),
150 ),
151 new_compaction_group_config,
152 )
153 };
154 let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
155 let new_compaction_group = CompactionGroup {
156 group_id: new_compaction_group_id,
157 compaction_config: compaction_group_config.clone(),
158 };
159
160 new_compaction_groups.push(new_compaction_group.clone());
161 compaction_group_manager.insert(new_compaction_group_id, new_compaction_group);
162
163 on_handle_add_new_table(
164 state_table_info,
165 &table_ids,
166 new_compaction_group_id,
167 &mut table_compaction_group_mapping,
168 &mut new_table_ids,
169 )?;
170 }
171
172 let commit_sstables = self
173 .correct_commit_ssts(sstables, &table_compaction_group_mapping)
174 .await?;
175
176 let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
177 let mut group_id_to_config = HashMap::new();
179 if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
180 for cg_id in &modified_compaction_groups {
181 let compaction_group = compaction_group_manager
182 .get(cg_id)
183 .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
184 .compaction_config();
185 group_id_to_config.insert(*cg_id, compaction_group);
186 }
187 } else {
188 let compaction_group_manager = self.compaction_group_manager.read().await;
189 for cg_id in &modified_compaction_groups {
190 let compaction_group = compaction_group_manager
191 .try_get_compaction_group_config(*cg_id)
192 .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
193 .compaction_config();
194 group_id_to_config.insert(*cg_id, compaction_group);
195 }
196 }
197
198 let group_id_to_sub_levels =
199 rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);
200
201 let mut group_id_to_truncate_tables: HashMap<u64, Vec<TableId>> = HashMap::new();
203 for table_id in &truncate_tables {
204 if let Some(compaction_group_id) = table_compaction_group_mapping.get(table_id) {
205 group_id_to_truncate_tables
206 .entry(*compaction_group_id)
207 .or_default()
208 .push(*table_id);
209 } else {
210 bail!(
211 "table {} doesn't belong to any compaction group, skip truncating",
212 table_id
213 );
214 }
215 }
216
217 let time_travel_delta = version.pre_commit_epoch(
218 &tables_to_commit,
219 new_compaction_groups,
220 group_id_to_sub_levels,
221 &new_table_ids,
222 new_table_watermarks,
223 change_log_delta,
224 vector_index_delta,
225 group_id_to_truncate_tables,
226 );
227
228 if should_mark_next_time_travel_version_snapshot(&time_travel_delta) {
229 versioning.time_travel_snapshot_interval_counter = u64::MAX;
231 }
232
233 let mut version_stats = HummockVersionStatsTransaction::new(
235 &mut versioning.version_stats,
236 self.env.notification_manager(),
237 );
238 add_prost_table_stats_map(&mut version_stats.table_stats, &table_stats_change);
239 if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version()) {
240 self.metrics.version_stats.reset();
241 versioning.local_metrics.clear();
242 }
243
244 trigger_local_table_stat(
245 &self.metrics,
246 &mut versioning.local_metrics,
247 &version_stats,
248 &table_stats_change,
249 );
250 for (table_id, stats) in &table_stats_change {
251 if stats.total_key_size == 0
252 && stats.total_value_size == 0
253 && stats.total_key_count == 0
254 {
255 continue;
256 }
257 let stats_value = std::cmp::max(0, stats.total_key_size + stats.total_value_size);
258 let table_metrics = get_or_create_local_table_stat(
259 &self.metrics,
260 *table_id,
261 &mut versioning.local_metrics,
262 );
263 table_metrics.inc_write_throughput(stats_value as u64);
264 }
265 let mut time_travel_version = None;
266 if versioning.time_travel_snapshot_interval_counter
267 >= self.env.opts.hummock_time_travel_snapshot_interval
268 {
269 versioning.time_travel_snapshot_interval_counter = 0;
270 time_travel_version = Some(version.latest_version());
271 } else {
272 versioning.time_travel_snapshot_interval_counter = versioning
273 .time_travel_snapshot_interval_counter
274 .saturating_add(1);
275 }
276 let time_travel_tables_to_commit =
277 table_compaction_group_mapping
278 .iter()
279 .filter_map(|(table_id, cg_id)| {
280 tables_to_commit
281 .get(table_id)
282 .map(|committed_epoch| (table_id, cg_id, *committed_epoch))
283 });
284 let time_travel_table_ids: HashSet<_> = self
285 .metadata_manager
286 .catalog_controller
287 .list_time_travel_table_ids()
288 .await
289 .map_err(|e| Error::Internal(e.into()))?
290 .into_iter()
291 .map(|id| id.try_into().unwrap())
292 .collect();
293 let mut txn = self.env.meta_store_ref().conn.begin().await?;
294 let version_snapshot_sst_ids = self
295 .write_time_travel_metadata(
296 &txn,
297 time_travel_version,
298 time_travel_delta,
299 time_travel_table_ids,
300 &versioning.last_time_travel_snapshot_sst_ids,
301 time_travel_tables_to_commit,
302 )
303 .await?;
304 commit_multi_var_with_provided_txn!(
305 txn,
306 version,
307 version_stats,
308 compaction_group_manager_txn
309 )?;
310 if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
311 versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
312 }
313
314 for compaction_group_id in &modified_compaction_groups {
315 trigger_sst_stat(
316 &self.metrics,
317 None,
318 &versioning.current_version,
319 *compaction_group_id,
320 );
321 }
322 trigger_epoch_stat(&self.metrics, &versioning.current_version);
323
324 drop(versioning_guard);
325
326 if !self.env.opts.compaction_deterministic_test {
328 for id in &modified_compaction_groups {
330 self.try_send_compaction_request(*id, compact_task::TaskType::Dynamic);
331 }
332 if !table_stats_change.is_empty() {
333 self.collect_table_write_throughput(table_stats_change)
334 .await;
335 }
336 }
337 if !modified_compaction_groups.is_empty() {
338 self.try_update_write_limits(&modified_compaction_groups)
339 .await;
340 }
341 #[cfg(test)]
342 {
343 self.check_state_consistency().await;
344 }
345 Ok(())
346 }
347
348 async fn collect_table_write_throughput(&self, table_stats: PbTableStatsMap) {
349 let params = self.env.system_params_reader().await;
350 let barrier_interval_ms = params.barrier_interval_ms() as u64;
351 let checkpoint_secs = std::cmp::max(
352 1,
353 params.checkpoint_frequency() * barrier_interval_ms / 1000,
354 );
355
356 let mut table_throughput_statistic_manager =
357 self.table_write_throughput_statistic_manager.write();
358 let timestamp = chrono::Utc::now().timestamp();
359
360 for (table_id, stat) in table_stats {
361 let throughput = ((stat.total_value_size + stat.total_key_size) as f64
362 / checkpoint_secs as f64) as u64;
363 table_throughput_statistic_manager
364 .add_table_throughput_with_ts(table_id, throughput, timestamp);
365 }
366 }
367
368 async fn correct_commit_ssts(
369 &self,
370 sstables: Vec<LocalSstableInfo>,
371 table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
372 ) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
373 let mut new_sst_id_number = 0;
374 let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
375 let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
376 for commit_sst in sstables {
377 let mut group_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
378 for table_id in &commit_sst.sst_info.table_ids {
379 match table_compaction_group_mapping.get(&TableId::new(*table_id)) {
380 Some(cg_id_from_meta) => {
381 group_table_ids
382 .entry(*cg_id_from_meta)
383 .or_default()
384 .push(*table_id);
385 }
386 None => {
387 tracing::warn!(
388 table_id = *table_id,
389 object_id = %commit_sst.sst_info.object_id,
390 "table doesn't belong to any compaction group",
391 );
392 }
393 }
394 }
395
396 new_sst_id_number += group_table_ids.len() * 2; sst_to_cg_vec.push((commit_sst, group_table_ids));
398 }
399
400 let mut new_sst_id = next_sstable_id(&self.env, new_sst_id_number).await?;
404 let mut commit_sstables: BTreeMap<u64, Vec<SstableInfo>> = BTreeMap::new();
405
406 for (mut sst, group_table_ids) in sst_to_cg_vec {
407 let len = group_table_ids.len();
408 for (index, (group_id, match_ids)) in group_table_ids.into_iter().enumerate() {
409 if sst.sst_info.table_ids == match_ids {
410 assert!(
412 index == len - 1,
413 "SST should be the last key in the group {} index {} len {}",
414 group_id,
415 index,
416 len
417 );
418 commit_sstables
419 .entry(group_id)
420 .or_default()
421 .push(sst.sst_info);
422 break;
423 }
424
425 let origin_sst_size = sst.sst_info.sst_size;
426 let new_sst_size = match_ids
427 .iter()
428 .map(|id| {
429 let stat = sst.table_stats.get(id).unwrap();
430 stat.total_compressed_size
431 })
432 .sum();
433
434 if new_sst_size == 0 {
435 tracing::warn!(
436 id = %sst.sst_info.sst_id,
437 object_id = %sst.sst_info.object_id,
438 match_ids = ?match_ids,
439 "Sstable doesn't contain any data for tables",
440 );
441 }
442
443 let old_sst_size = origin_sst_size.saturating_sub(new_sst_size);
444 if old_sst_size == 0 {
445 tracing::warn!(
446 id = %sst.sst_info.sst_id,
447 object_id = %sst.sst_info.object_id,
448 match_ids = ?match_ids,
449 origin_sst_size = origin_sst_size,
450 new_sst_size = new_sst_size,
451 "Sstable doesn't contain any data for tables",
452 );
453 }
454 let (modified_sst_info, branch_sst) = split_sst_with_table_ids(
455 &sst.sst_info,
456 &mut new_sst_id,
457 old_sst_size,
458 new_sst_size,
459 match_ids,
460 );
461 sst.sst_info = modified_sst_info;
462
463 commit_sstables
464 .entry(group_id)
465 .or_default()
466 .push(branch_sst);
467 }
468 }
469
470 for ssts in commit_sstables.values() {
472 let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
473 assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
474 }
475
476 Ok(commit_sstables)
477 }
478}
479
480fn on_handle_add_new_table(
481 state_table_info: &HummockVersionStateTableInfo,
482 table_ids: impl IntoIterator<Item = &TableId>,
483 compaction_group_id: CompactionGroupId,
484 table_compaction_group_mapping: &mut HashMap<TableId, CompactionGroupId>,
485 new_table_ids: &mut HashMap<TableId, CompactionGroupId>,
486) -> Result<()> {
487 for table_id in table_ids {
488 if let Some(info) = state_table_info.info().get(table_id) {
489 return Err(Error::CompactionGroup(format!(
490 "table {} already exist {:?}",
491 table_id.table_id, info,
492 )));
493 }
494 table_compaction_group_mapping.insert(*table_id, compaction_group_id);
495 new_table_ids.insert(*table_id, compaction_group_id);
496 }
497
498 Ok(())
499}
500
501fn rewrite_commit_sstables_to_sub_level(
504 commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
505 group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
506) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
507 let mut overlapping_sstables: BTreeMap<u64, Vec<Vec<SstableInfo>>> = BTreeMap::new();
508 for (group_id, inserted_table_infos) in commit_sstables {
509 let config = group_id_to_config
510 .get(&group_id)
511 .expect("compaction group should exist");
512
513 let mut accumulated_size = 0;
514 let mut ssts = vec![];
515 let sub_level_size_limit = config
516 .max_overlapping_level_size
517 .unwrap_or(compaction_config::max_overlapping_level_size());
518
519 let level = overlapping_sstables.entry(group_id).or_default();
520
521 for sst in inserted_table_infos {
522 accumulated_size += sst.sst_size;
523 ssts.push(sst);
524 if accumulated_size > sub_level_size_limit {
525 level.push(ssts);
526
527 accumulated_size = 0;
529 ssts = vec![];
530 }
531 }
532
533 if !ssts.is_empty() {
534 level.push(ssts);
535 }
536
537 level.reverse();
539 }
540
541 overlapping_sstables
542}
543
544fn is_ordered_subset<T: PartialEq>(vec_1: &Vec<T>, vec_2: &Vec<T>) -> bool {
545 let mut vec_2_iter = vec_2.iter().peekable();
546 for item in vec_1 {
547 if vec_2_iter.peek() == Some(&item) {
548 vec_2_iter.next();
549 }
550 }
551
552 vec_2_iter.peek().is_none()
553}