1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_common::config::default::compaction_config;
21use risingwave_common::system_param::reader::SystemParamsRead;
22use risingwave_hummock_sdk::change_log::ChangeLogDelta;
23use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
24use risingwave_hummock_sdk::sstable_info::SstableInfo;
25use risingwave_hummock_sdk::table_stats::{
26 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats, to_prost_table_stats_map,
27};
28use risingwave_hummock_sdk::table_watermark::TableWatermarks;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
31use risingwave_hummock_sdk::{
32 CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo,
33};
34use risingwave_pb::hummock::{CompactionConfig, compact_task};
35use sea_orm::TransactionTrait;
36
37use crate::hummock::error::{Error, Result};
38use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
39use crate::hummock::manager::transaction::{
40 HummockVersionStatsTransaction, HummockVersionTransaction,
41};
42use crate::hummock::manager::versioning::Versioning;
43use crate::hummock::metrics_utils::{
44 get_or_create_local_table_stat, trigger_epoch_stat, trigger_local_table_stat, trigger_sst_stat,
45};
46use crate::hummock::model::CompactionGroup;
47use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
48use crate::hummock::time_travel::should_mark_next_time_travel_version_snapshot;
49use crate::hummock::{
50 HummockManager, commit_multi_var_with_provided_txn, start_measure_real_process_timer,
51};
52
53pub struct NewTableFragmentInfo {
54 pub table_ids: HashSet<TableId>,
55}
56
57#[derive(Default)]
58pub struct CommitEpochInfo {
59 pub sstables: Vec<LocalSstableInfo>,
60 pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
61 pub sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
62 pub new_table_fragment_infos: Vec<NewTableFragmentInfo>,
63 pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
64 pub vector_index_delta: HashMap<TableId, VectorIndexDelta>,
65 pub tables_to_commit: HashMap<TableId, u64>,
67}
68
69impl HummockManager {
70 pub async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> Result<()> {
73 let CommitEpochInfo {
74 mut sstables,
75 new_table_watermarks,
76 sst_to_context,
77 new_table_fragment_infos,
78 change_log_delta,
79 vector_index_delta,
80 tables_to_commit,
81 } = commit_info;
82 let mut versioning_guard = self.versioning.write().await;
83 let _timer = start_measure_real_process_timer!(self, "commit_epoch");
84 if versioning_guard.disable_commit_epochs {
86 return Ok(());
87 }
88
89 assert!(!tables_to_commit.is_empty());
90
91 let versioning: &mut Versioning = &mut versioning_guard;
92 self.commit_epoch_sanity_check(
93 &tables_to_commit,
94 &sstables,
95 &sst_to_context,
96 &versioning.current_version,
97 )
98 .await?;
99
100 let mut table_stats_change = PbTableStatsMap::default();
102 for s in &mut sstables {
103 add_prost_table_stats_map(
104 &mut table_stats_change,
105 &to_prost_table_stats_map(s.table_stats.clone()),
106 );
107 }
108
109 let mut version = HummockVersionTransaction::new(
110 &mut versioning.current_version,
111 &mut versioning.hummock_version_deltas,
112 self.env.notification_manager(),
113 Some(&self.table_committed_epoch_notifiers),
114 &self.metrics,
115 );
116
117 let state_table_info = &version.latest_version().state_table_info;
118 let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
119 let mut new_table_ids = HashMap::new();
120 let mut new_compaction_groups = Vec::new();
121 let mut compaction_group_manager_txn = None;
122 let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;
123
124 for NewTableFragmentInfo { table_ids } in new_table_fragment_infos {
126 let (compaction_group_manager, compaction_group_config) =
127 if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
128 (
129 compaction_group_manager,
130 (*compaction_group_config
131 .as_ref()
132 .expect("must be set with compaction_group_manager_txn"))
133 .clone(),
134 )
135 } else {
136 let compaction_group_manager_guard =
137 self.compaction_group_manager.write().await;
138 let new_compaction_group_config =
139 compaction_group_manager_guard.default_compaction_config();
140 compaction_group_config = Some(new_compaction_group_config.clone());
141 (
142 compaction_group_manager_txn.insert(
143 CompactionGroupManager::start_owned_compaction_groups_txn(
144 compaction_group_manager_guard,
145 ),
146 ),
147 new_compaction_group_config,
148 )
149 };
150 let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
151 let new_compaction_group = CompactionGroup {
152 group_id: new_compaction_group_id,
153 compaction_config: compaction_group_config.clone(),
154 };
155
156 new_compaction_groups.push(new_compaction_group.clone());
157 compaction_group_manager.insert(new_compaction_group_id, new_compaction_group);
158
159 on_handle_add_new_table(
160 state_table_info,
161 &table_ids,
162 new_compaction_group_id,
163 &mut table_compaction_group_mapping,
164 &mut new_table_ids,
165 )?;
166 }
167
168 let commit_sstables = self
169 .correct_commit_ssts(sstables, &table_compaction_group_mapping)
170 .await?;
171
172 let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
173 let mut group_id_to_config = HashMap::new();
175 if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
176 for cg_id in &modified_compaction_groups {
177 let compaction_group = compaction_group_manager
178 .get(cg_id)
179 .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
180 .compaction_config();
181 group_id_to_config.insert(*cg_id, compaction_group);
182 }
183 } else {
184 let compaction_group_manager = self.compaction_group_manager.read().await;
185 for cg_id in &modified_compaction_groups {
186 let compaction_group = compaction_group_manager
187 .try_get_compaction_group_config(*cg_id)
188 .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
189 .compaction_config();
190 group_id_to_config.insert(*cg_id, compaction_group);
191 }
192 }
193
194 let group_id_to_sub_levels =
195 rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);
196
197 let time_travel_delta = version.pre_commit_epoch(
198 &tables_to_commit,
199 new_compaction_groups,
200 group_id_to_sub_levels,
201 &new_table_ids,
202 new_table_watermarks,
203 change_log_delta,
204 vector_index_delta,
205 );
206 if should_mark_next_time_travel_version_snapshot(&time_travel_delta) {
207 versioning.time_travel_snapshot_interval_counter = u64::MAX;
209 }
210
211 let mut version_stats = HummockVersionStatsTransaction::new(
213 &mut versioning.version_stats,
214 self.env.notification_manager(),
215 );
216 add_prost_table_stats_map(&mut version_stats.table_stats, &table_stats_change);
217 if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version()) {
218 self.metrics.version_stats.reset();
219 versioning.local_metrics.clear();
220 }
221
222 trigger_local_table_stat(
223 &self.metrics,
224 &mut versioning.local_metrics,
225 &version_stats,
226 &table_stats_change,
227 );
228 for (table_id, stats) in &table_stats_change {
229 if stats.total_key_size == 0
230 && stats.total_value_size == 0
231 && stats.total_key_count == 0
232 {
233 continue;
234 }
235 let stats_value = std::cmp::max(0, stats.total_key_size + stats.total_value_size);
236 let table_metrics = get_or_create_local_table_stat(
237 &self.metrics,
238 *table_id,
239 &mut versioning.local_metrics,
240 );
241 table_metrics.inc_write_throughput(stats_value as u64);
242 }
243 let mut time_travel_version = None;
244 if versioning.time_travel_snapshot_interval_counter
245 >= self.env.opts.hummock_time_travel_snapshot_interval
246 {
247 versioning.time_travel_snapshot_interval_counter = 0;
248 time_travel_version = Some(version.latest_version());
249 } else {
250 versioning.time_travel_snapshot_interval_counter = versioning
251 .time_travel_snapshot_interval_counter
252 .saturating_add(1);
253 }
254 let time_travel_tables_to_commit =
255 table_compaction_group_mapping
256 .iter()
257 .filter_map(|(table_id, cg_id)| {
258 tables_to_commit
259 .get(table_id)
260 .map(|committed_epoch| (table_id, cg_id, *committed_epoch))
261 });
262 let time_travel_table_ids: HashSet<_> = self
263 .metadata_manager
264 .catalog_controller
265 .list_time_travel_table_ids()
266 .await
267 .map_err(|e| Error::Internal(e.into()))?
268 .into_iter()
269 .map(|id| id.try_into().unwrap())
270 .collect();
271 let mut txn = self.env.meta_store_ref().conn.begin().await?;
272 let version_snapshot_sst_ids = self
273 .write_time_travel_metadata(
274 &txn,
275 time_travel_version,
276 time_travel_delta,
277 time_travel_table_ids,
278 &versioning.last_time_travel_snapshot_sst_ids,
279 time_travel_tables_to_commit,
280 )
281 .await?;
282 commit_multi_var_with_provided_txn!(
283 txn,
284 version,
285 version_stats,
286 compaction_group_manager_txn
287 )?;
288 if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
289 versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
290 }
291
292 for compaction_group_id in &modified_compaction_groups {
293 trigger_sst_stat(
294 &self.metrics,
295 None,
296 &versioning.current_version,
297 *compaction_group_id,
298 );
299 }
300 trigger_epoch_stat(&self.metrics, &versioning.current_version);
301
302 drop(versioning_guard);
303
304 if !self.env.opts.compaction_deterministic_test {
306 for id in &modified_compaction_groups {
308 self.try_send_compaction_request(*id, compact_task::TaskType::Dynamic);
309 }
310 if !table_stats_change.is_empty() {
311 self.collect_table_write_throughput(table_stats_change)
312 .await;
313 }
314 }
315 if !modified_compaction_groups.is_empty() {
316 self.try_update_write_limits(&modified_compaction_groups)
317 .await;
318 }
319 #[cfg(test)]
320 {
321 self.check_state_consistency().await;
322 }
323 Ok(())
324 }
325
326 async fn collect_table_write_throughput(&self, table_stats: PbTableStatsMap) {
327 let params = self.env.system_params_reader().await;
328 let barrier_interval_ms = params.barrier_interval_ms() as u64;
329 let checkpoint_secs = std::cmp::max(
330 1,
331 params.checkpoint_frequency() * barrier_interval_ms / 1000,
332 );
333
334 let mut table_throughput_statistic_manager =
335 self.table_write_throughput_statistic_manager.write();
336 let timestamp = chrono::Utc::now().timestamp();
337
338 for (table_id, stat) in table_stats {
339 let throughput = ((stat.total_value_size + stat.total_key_size) as f64
340 / checkpoint_secs as f64) as u64;
341 table_throughput_statistic_manager
342 .add_table_throughput_with_ts(table_id, throughput, timestamp);
343 }
344 }
345
346 async fn correct_commit_ssts(
347 &self,
348 sstables: Vec<LocalSstableInfo>,
349 table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
350 ) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
351 let mut new_sst_id_number = 0;
352 let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
353 let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
354 for commit_sst in sstables {
355 let mut group_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
356 for table_id in &commit_sst.sst_info.table_ids {
357 match table_compaction_group_mapping.get(&TableId::new(*table_id)) {
358 Some(cg_id_from_meta) => {
359 group_table_ids
360 .entry(*cg_id_from_meta)
361 .or_default()
362 .push(*table_id);
363 }
364 None => {
365 tracing::warn!(
366 table_id = *table_id,
367 object_id = %commit_sst.sst_info.object_id,
368 "table doesn't belong to any compaction group",
369 );
370 }
371 }
372 }
373
374 new_sst_id_number += group_table_ids.len() * 2; sst_to_cg_vec.push((commit_sst, group_table_ids));
376 }
377
378 let mut new_sst_id = next_sstable_id(&self.env, new_sst_id_number).await?;
382 let mut commit_sstables: BTreeMap<u64, Vec<SstableInfo>> = BTreeMap::new();
383
384 for (mut sst, group_table_ids) in sst_to_cg_vec {
385 let len = group_table_ids.len();
386 for (index, (group_id, match_ids)) in group_table_ids.into_iter().enumerate() {
387 if sst.sst_info.table_ids == match_ids {
388 assert!(
390 index == len - 1,
391 "SST should be the last key in the group {} index {} len {}",
392 group_id,
393 index,
394 len
395 );
396 commit_sstables
397 .entry(group_id)
398 .or_default()
399 .push(sst.sst_info);
400 break;
401 }
402
403 let origin_sst_size = sst.sst_info.sst_size;
404 let new_sst_size = match_ids
405 .iter()
406 .map(|id| {
407 let stat = sst.table_stats.get(id).unwrap();
408 stat.total_compressed_size
409 })
410 .sum();
411
412 if new_sst_size == 0 {
413 tracing::warn!(
414 id = %sst.sst_info.sst_id,
415 object_id = %sst.sst_info.object_id,
416 match_ids = ?match_ids,
417 "Sstable doesn't contain any data for tables",
418 );
419 }
420
421 let old_sst_size = origin_sst_size.saturating_sub(new_sst_size);
422 if old_sst_size == 0 {
423 tracing::warn!(
424 id = %sst.sst_info.sst_id,
425 object_id = %sst.sst_info.object_id,
426 match_ids = ?match_ids,
427 origin_sst_size = origin_sst_size,
428 new_sst_size = new_sst_size,
429 "Sstable doesn't contain any data for tables",
430 );
431 }
432 let (modified_sst_info, branch_sst) = split_sst_with_table_ids(
433 &sst.sst_info,
434 &mut new_sst_id,
435 old_sst_size,
436 new_sst_size,
437 match_ids,
438 );
439 sst.sst_info = modified_sst_info;
440
441 commit_sstables
442 .entry(group_id)
443 .or_default()
444 .push(branch_sst);
445 }
446 }
447
448 for ssts in commit_sstables.values() {
450 let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
451 assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
452 }
453
454 Ok(commit_sstables)
455 }
456}
457
458fn on_handle_add_new_table(
459 state_table_info: &HummockVersionStateTableInfo,
460 table_ids: impl IntoIterator<Item = &TableId>,
461 compaction_group_id: CompactionGroupId,
462 table_compaction_group_mapping: &mut HashMap<TableId, CompactionGroupId>,
463 new_table_ids: &mut HashMap<TableId, CompactionGroupId>,
464) -> Result<()> {
465 for table_id in table_ids {
466 if let Some(info) = state_table_info.info().get(table_id) {
467 return Err(Error::CompactionGroup(format!(
468 "table {} already exist {:?}",
469 table_id.table_id, info,
470 )));
471 }
472 table_compaction_group_mapping.insert(*table_id, compaction_group_id);
473 new_table_ids.insert(*table_id, compaction_group_id);
474 }
475
476 Ok(())
477}
478
479fn rewrite_commit_sstables_to_sub_level(
482 commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
483 group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
484) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
485 let mut overlapping_sstables: BTreeMap<u64, Vec<Vec<SstableInfo>>> = BTreeMap::new();
486 for (group_id, inserted_table_infos) in commit_sstables {
487 let config = group_id_to_config
488 .get(&group_id)
489 .expect("compaction group should exist");
490
491 let mut accumulated_size = 0;
492 let mut ssts = vec![];
493 let sub_level_size_limit = config
494 .max_overlapping_level_size
495 .unwrap_or(compaction_config::max_overlapping_level_size());
496
497 let level = overlapping_sstables.entry(group_id).or_default();
498
499 for sst in inserted_table_infos {
500 accumulated_size += sst.sst_size;
501 ssts.push(sst);
502 if accumulated_size > sub_level_size_limit {
503 level.push(ssts);
504
505 accumulated_size = 0;
507 ssts = vec![];
508 }
509 }
510
511 if !ssts.is_empty() {
512 level.push(ssts);
513 }
514
515 level.reverse();
517 }
518
519 overlapping_sstables
520}
521
522fn is_ordered_subset<T: PartialEq>(vec_1: &Vec<T>, vec_2: &Vec<T>) -> bool {
523 let mut vec_2_iter = vec_2.iter().peekable();
524 for item in vec_1 {
525 if vec_2_iter.peek() == Some(&item) {
526 vec_2_iter.next();
527 }
528 }
529
530 vec_2_iter.peek().is_none()
531}