1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::bail;
20use risingwave_common::catalog::TableId;
21use risingwave_common::config::meta::default::compaction_config;
22use risingwave_common::system_param::reader::SystemParamsRead;
23use risingwave_hummock_sdk::change_log::ChangeLogDelta;
24use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
25use risingwave_hummock_sdk::sstable_info::SstableInfo;
26use risingwave_hummock_sdk::table_stats::{
27 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats, to_prost_table_stats_map,
28};
29use risingwave_hummock_sdk::table_watermark::TableWatermarks;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
32use risingwave_hummock_sdk::{
33 CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo,
34};
35use risingwave_pb::hummock::{CompactionConfig, compact_task};
36use sea_orm::TransactionTrait;
37
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
40use crate::hummock::manager::transaction::{
41 HummockVersionStatsTransaction, HummockVersionTransaction,
42};
43use crate::hummock::manager::versioning::Versioning;
44use crate::hummock::metrics_utils::{
45 get_or_create_local_table_stat, trigger_epoch_stat, trigger_local_table_stat, trigger_sst_stat,
46};
47use crate::hummock::model::CompactionGroup;
48use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
49use crate::hummock::time_travel::should_mark_next_time_travel_version_snapshot;
50use crate::hummock::{
51 HummockManager, commit_multi_var_with_provided_txn, start_measure_real_process_timer,
52};
53
54pub struct NewTableFragmentInfo {
55 pub table_ids: HashSet<TableId>,
56}
57
58#[derive(Default)]
59pub struct CommitEpochInfo {
60 pub sstables: Vec<LocalSstableInfo>,
61 pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
62 pub sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
63 pub new_table_fragment_infos: Vec<NewTableFragmentInfo>,
64 pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
65 pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
66 pub tables_to_commit: HashMap<TableId, u64>,
68
69 pub truncate_tables: HashSet<TableId>,
70}
71
72impl HummockManager {
73 pub async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> Result<()> {
76 let CommitEpochInfo {
77 mut sstables,
78 new_table_watermarks,
79 sst_to_context,
80 new_table_fragment_infos,
81 change_log_delta,
82 vector_index_delta,
83 tables_to_commit,
84 truncate_tables,
85 } = commit_info;
86 let mut versioning_guard = self.versioning.write().await;
87 let _timer = start_measure_real_process_timer!(self, "commit_epoch");
88 if versioning_guard.disable_commit_epochs {
90 return Ok(());
91 }
92
93 assert!(!tables_to_commit.is_empty());
94
95 let versioning: &mut Versioning = &mut versioning_guard;
96 self.commit_epoch_sanity_check(
97 &tables_to_commit,
98 &sstables,
99 &sst_to_context,
100 &versioning.current_version,
101 )
102 .await?;
103
104 let mut table_stats_change = PbTableStatsMap::default();
106 for s in &mut sstables {
107 add_prost_table_stats_map(
108 &mut table_stats_change,
109 &to_prost_table_stats_map(s.table_stats.clone()),
110 );
111 }
112
113 let mut version = HummockVersionTransaction::new(
114 &mut versioning.current_version,
115 &mut versioning.hummock_version_deltas,
116 self.env.notification_manager(),
117 Some(&self.table_committed_epoch_notifiers),
118 &self.metrics,
119 );
120
121 let state_table_info = &version.latest_version().state_table_info;
122 let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
123 let mut new_table_ids = HashMap::new();
124 let mut new_compaction_groups = Vec::new();
125 let mut compaction_group_manager_txn = None;
126 let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;
127
128 for NewTableFragmentInfo { table_ids } in new_table_fragment_infos {
130 let (compaction_group_manager, compaction_group_config) =
131 if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
132 (
133 compaction_group_manager,
134 (*compaction_group_config
135 .as_ref()
136 .expect("must be set with compaction_group_manager_txn"))
137 .clone(),
138 )
139 } else {
140 let compaction_group_manager_guard =
141 self.compaction_group_manager.write().await;
142 let new_compaction_group_config =
143 compaction_group_manager_guard.default_compaction_config();
144 compaction_group_config = Some(new_compaction_group_config.clone());
145 (
146 compaction_group_manager_txn.insert(
147 CompactionGroupManager::start_owned_compaction_groups_txn(
148 compaction_group_manager_guard,
149 ),
150 ),
151 new_compaction_group_config,
152 )
153 };
154 let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
155 let new_compaction_group = CompactionGroup {
156 group_id: new_compaction_group_id,
157 compaction_config: compaction_group_config.clone(),
158 };
159
160 new_compaction_groups.push(new_compaction_group.clone());
161 compaction_group_manager.insert(new_compaction_group_id, new_compaction_group);
162
163 on_handle_add_new_table(
164 state_table_info,
165 &table_ids,
166 new_compaction_group_id,
167 &mut table_compaction_group_mapping,
168 &mut new_table_ids,
169 )?;
170 }
171
172 let commit_sstables = self
173 .correct_commit_ssts(sstables, &table_compaction_group_mapping)
174 .await?;
175
176 let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
177 let mut group_id_to_config = HashMap::new();
179 if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
180 for cg_id in &modified_compaction_groups {
181 let compaction_group = compaction_group_manager
182 .get(cg_id)
183 .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
184 .compaction_config();
185 group_id_to_config.insert(*cg_id, compaction_group);
186 }
187 } else {
188 let compaction_group_manager = self.compaction_group_manager.read().await;
189 for cg_id in &modified_compaction_groups {
190 let compaction_group = compaction_group_manager
191 .try_get_compaction_group_config(*cg_id)
192 .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
193 .compaction_config();
194 group_id_to_config.insert(*cg_id, compaction_group);
195 }
196 }
197
198 let group_id_to_sub_levels =
199 rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);
200
201 let mut group_id_to_truncate_tables: HashMap<u64, HashSet<TableId>> = HashMap::new();
203 for table_id in &truncate_tables {
204 if let Some(compaction_group_id) = table_compaction_group_mapping.get(table_id) {
205 group_id_to_truncate_tables
206 .entry(*compaction_group_id)
207 .or_default()
208 .insert(*table_id);
209 } else {
210 bail!(
211 "table {} doesn't belong to any compaction group, skip truncating",
212 table_id
213 );
214 }
215 }
216
217 let time_travel_delta = version.pre_commit_epoch(
218 &tables_to_commit,
219 new_compaction_groups,
220 group_id_to_sub_levels,
221 &new_table_ids,
222 new_table_watermarks,
223 change_log_delta,
224 vector_index_delta,
225 group_id_to_truncate_tables,
226 );
227
228 if should_mark_next_time_travel_version_snapshot(&time_travel_delta) {
229 versioning.time_travel_snapshot_interval_counter = u64::MAX;
231 }
232
233 let mut version_stats = HummockVersionStatsTransaction::new(
235 &mut versioning.version_stats,
236 self.env.notification_manager(),
237 );
238 add_prost_table_stats_map(&mut version_stats.table_stats, &table_stats_change);
239 if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version()) {
240 self.metrics.version_stats.reset();
241 versioning.local_metrics.clear();
242 }
243
244 trigger_local_table_stat(
245 &self.metrics,
246 &mut versioning.local_metrics,
247 &version_stats,
248 &table_stats_change,
249 );
250 for (table_id, stats) in &table_stats_change {
251 if stats.total_key_size == 0
252 && stats.total_value_size == 0
253 && stats.total_key_count == 0
254 {
255 continue;
256 }
257 let stats_value = std::cmp::max(0, stats.total_key_size + stats.total_value_size);
258 let table_metrics = get_or_create_local_table_stat(
259 &self.metrics,
260 *table_id,
261 &mut versioning.local_metrics,
262 );
263 table_metrics.inc_write_throughput(stats_value as u64);
264 }
265 let mut time_travel_version = None;
266 if versioning.time_travel_snapshot_interval_counter
267 >= self.env.opts.hummock_time_travel_snapshot_interval
268 {
269 versioning.time_travel_snapshot_interval_counter = 0;
270 time_travel_version = Some(version.latest_version());
271 } else {
272 versioning.time_travel_snapshot_interval_counter = versioning
273 .time_travel_snapshot_interval_counter
274 .saturating_add(1);
275 }
276 let time_travel_tables_to_commit =
277 table_compaction_group_mapping
278 .iter()
279 .filter_map(|(table_id, cg_id)| {
280 tables_to_commit
281 .get(table_id)
282 .map(|committed_epoch| (table_id, cg_id, *committed_epoch))
283 });
284 let time_travel_table_ids: HashSet<_> = self
285 .metadata_manager
286 .catalog_controller
287 .list_time_travel_table_ids()
288 .await
289 .map_err(|e| Error::Internal(e.into()))?
290 .into_iter()
291 .collect();
292 let mut txn = self.env.meta_store_ref().conn.begin().await?;
293 let version_snapshot_sst_ids = self
294 .write_time_travel_metadata(
295 &txn,
296 time_travel_version,
297 time_travel_delta,
298 time_travel_table_ids,
299 &versioning.last_time_travel_snapshot_sst_ids,
300 time_travel_tables_to_commit,
301 )
302 .await?;
303 commit_multi_var_with_provided_txn!(
304 txn,
305 version,
306 version_stats,
307 compaction_group_manager_txn
308 )?;
309 if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
310 versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
311 }
312
313 for compaction_group_id in &modified_compaction_groups {
314 trigger_sst_stat(
315 &self.metrics,
316 None,
317 &versioning.current_version,
318 *compaction_group_id,
319 );
320 }
321 trigger_epoch_stat(&self.metrics, &versioning.current_version);
322
323 drop(versioning_guard);
324
325 if !self.env.opts.compaction_deterministic_test {
327 for id in &modified_compaction_groups {
329 self.try_send_compaction_request(*id, compact_task::TaskType::Dynamic);
330 }
331 if !table_stats_change.is_empty() {
332 self.collect_table_write_throughput(table_stats_change)
333 .await;
334 }
335 }
336 if !modified_compaction_groups.is_empty() {
337 self.try_update_write_limits(&modified_compaction_groups)
338 .await;
339 }
340 #[cfg(test)]
341 {
342 self.check_state_consistency().await;
343 }
344 Ok(())
345 }
346
347 async fn collect_table_write_throughput(&self, table_stats: PbTableStatsMap) {
348 let params = self.env.system_params_reader().await;
349 let barrier_interval_ms = params.barrier_interval_ms() as u64;
350 let checkpoint_secs = std::cmp::max(
351 1,
352 params.checkpoint_frequency() * barrier_interval_ms / 1000,
353 );
354
355 let mut table_throughput_statistic_manager =
356 self.table_write_throughput_statistic_manager.write();
357 let timestamp = chrono::Utc::now().timestamp();
358
359 for (table_id, stat) in table_stats {
360 let throughput = ((stat.total_value_size + stat.total_key_size) as f64
361 / checkpoint_secs as f64) as u64;
362 table_throughput_statistic_manager.add_table_throughput_with_ts(
363 table_id.into(),
364 throughput,
365 timestamp,
366 );
367 }
368 }
369
370 async fn correct_commit_ssts(
371 &self,
372 sstables: Vec<LocalSstableInfo>,
373 table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
374 ) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
375 let mut new_sst_id_number = 0;
376 let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
377 let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
378 for commit_sst in sstables {
379 let mut group_table_ids: BTreeMap<u64, Vec<TableId>> = BTreeMap::new();
380 for table_id in &commit_sst.sst_info.table_ids {
381 match table_compaction_group_mapping.get(table_id) {
382 Some(cg_id_from_meta) => {
383 group_table_ids
384 .entry(*cg_id_from_meta)
385 .or_default()
386 .push(*table_id);
387 }
388 None => {
389 tracing::warn!(
390 %table_id,
391 object_id = %commit_sst.sst_info.object_id,
392 "table doesn't belong to any compaction group",
393 );
394 }
395 }
396 }
397
398 new_sst_id_number += group_table_ids.len() * 2; sst_to_cg_vec.push((commit_sst, group_table_ids));
400 }
401
402 let mut new_sst_id = next_sstable_id(&self.env, new_sst_id_number).await?;
406 let mut commit_sstables: BTreeMap<u64, Vec<SstableInfo>> = BTreeMap::new();
407
408 for (mut sst, group_table_ids) in sst_to_cg_vec {
409 let len = group_table_ids.len();
410 for (index, (group_id, match_ids)) in group_table_ids.into_iter().enumerate() {
411 if sst.sst_info.table_ids == match_ids {
412 assert!(
414 index == len - 1,
415 "SST should be the last key in the group {} index {} len {}",
416 group_id,
417 index,
418 len
419 );
420 commit_sstables
421 .entry(group_id)
422 .or_default()
423 .push(sst.sst_info);
424 break;
425 }
426
427 let origin_sst_size = sst.sst_info.sst_size;
428 let new_sst_size = match_ids
429 .iter()
430 .map(|id| {
431 let stat = sst.table_stats.get(id).unwrap();
432 stat.total_compressed_size
433 })
434 .sum();
435
436 if new_sst_size == 0 {
437 tracing::warn!(
438 id = %sst.sst_info.sst_id,
439 object_id = %sst.sst_info.object_id,
440 match_ids = ?match_ids,
441 "Sstable doesn't contain any data for tables",
442 );
443 }
444
445 let old_sst_size = origin_sst_size.saturating_sub(new_sst_size);
446 if old_sst_size == 0 {
447 tracing::warn!(
448 id = %sst.sst_info.sst_id,
449 object_id = %sst.sst_info.object_id,
450 match_ids = ?match_ids,
451 origin_sst_size = origin_sst_size,
452 new_sst_size = new_sst_size,
453 "Sstable doesn't contain any data for tables",
454 );
455 }
456 let (modified_sst_info, branch_sst) = split_sst_with_table_ids(
457 &sst.sst_info,
458 &mut new_sst_id,
459 old_sst_size,
460 new_sst_size,
461 match_ids,
462 );
463 sst.sst_info = modified_sst_info;
464
465 commit_sstables
466 .entry(group_id)
467 .or_default()
468 .push(branch_sst);
469 }
470 }
471
472 for ssts in commit_sstables.values() {
474 let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
475 assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
476 }
477
478 Ok(commit_sstables)
479 }
480}
481
482fn on_handle_add_new_table(
483 state_table_info: &HummockVersionStateTableInfo,
484 table_ids: impl IntoIterator<Item = &TableId>,
485 compaction_group_id: CompactionGroupId,
486 table_compaction_group_mapping: &mut HashMap<TableId, CompactionGroupId>,
487 new_table_ids: &mut HashMap<TableId, CompactionGroupId>,
488) -> Result<()> {
489 for table_id in table_ids {
490 if let Some(info) = state_table_info.info().get(table_id) {
491 return Err(Error::CompactionGroup(format!(
492 "table {} already exist {:?}",
493 table_id, info,
494 )));
495 }
496 table_compaction_group_mapping.insert(*table_id, compaction_group_id);
497 new_table_ids.insert(*table_id, compaction_group_id);
498 }
499
500 Ok(())
501}
502
503fn rewrite_commit_sstables_to_sub_level(
506 commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
507 group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
508) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
509 let mut overlapping_sstables: BTreeMap<u64, Vec<Vec<SstableInfo>>> = BTreeMap::new();
510 for (group_id, inserted_table_infos) in commit_sstables {
511 let config = group_id_to_config
512 .get(&group_id)
513 .expect("compaction group should exist");
514
515 let mut accumulated_size = 0;
516 let mut ssts = vec![];
517 let sub_level_size_limit = config
518 .max_overlapping_level_size
519 .unwrap_or(compaction_config::max_overlapping_level_size());
520
521 let level = overlapping_sstables.entry(group_id).or_default();
522
523 for sst in inserted_table_infos {
524 accumulated_size += sst.sst_size;
525 ssts.push(sst);
526 if accumulated_size > sub_level_size_limit {
527 level.push(ssts);
528
529 accumulated_size = 0;
531 ssts = vec![];
532 }
533 }
534
535 if !ssts.is_empty() {
536 level.push(ssts);
537 }
538
539 level.reverse();
541 }
542
543 overlapping_sstables
544}
545
546fn is_ordered_subset<T: PartialEq>(vec_1: &Vec<T>, vec_2: &Vec<T>) -> bool {
547 let mut vec_2_iter = vec_2.iter().peekable();
548 for item in vec_1 {
549 if vec_2_iter.peek() == Some(&item) {
550 vec_2_iter.next();
551 }
552 }
553
554 vec_2_iter.peek().is_none()
555}