1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_common::config::default::compaction_config;
21use risingwave_common::system_param::reader::SystemParamsRead;
22use risingwave_hummock_sdk::change_log::ChangeLogDelta;
23use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
24use risingwave_hummock_sdk::sstable_info::SstableInfo;
25use risingwave_hummock_sdk::table_stats::{
26 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats, to_prost_table_stats_map,
27};
28use risingwave_hummock_sdk::table_watermark::TableWatermarks;
29use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
30use risingwave_hummock_sdk::{
31 CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo,
32};
33use risingwave_pb::hummock::CompactionConfig;
34use risingwave_pb::hummock::compact_task::{self};
35use sea_orm::TransactionTrait;
36
37use crate::hummock::error::{Error, Result};
38use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
39use crate::hummock::manager::transaction::{
40 HummockVersionStatsTransaction, HummockVersionTransaction,
41};
42use crate::hummock::manager::versioning::Versioning;
43use crate::hummock::metrics_utils::{
44 get_or_create_local_table_stat, trigger_epoch_stat, trigger_local_table_stat, trigger_sst_stat,
45};
46use crate::hummock::model::CompactionGroup;
47use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
48use crate::hummock::time_travel::should_mark_next_time_travel_version_snapshot;
49use crate::hummock::{
50 HummockManager, commit_multi_var_with_provided_txn, start_measure_real_process_timer,
51};
52
53pub struct NewTableFragmentInfo {
54 pub table_ids: HashSet<TableId>,
55}
56
57#[derive(Default)]
58pub struct CommitEpochInfo {
59 pub sstables: Vec<LocalSstableInfo>,
60 pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
61 pub sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
62 pub new_table_fragment_infos: Vec<NewTableFragmentInfo>,
63 pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
64 pub tables_to_commit: HashMap<TableId, u64>,
66}
67
68impl HummockManager {
69 pub async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> Result<()> {
72 let CommitEpochInfo {
73 mut sstables,
74 new_table_watermarks,
75 sst_to_context,
76 new_table_fragment_infos,
77 change_log_delta,
78 tables_to_commit,
79 } = commit_info;
80 let mut versioning_guard = self.versioning.write().await;
81 let _timer = start_measure_real_process_timer!(self, "commit_epoch");
82 if versioning_guard.disable_commit_epochs {
84 return Ok(());
85 }
86
87 assert!(!tables_to_commit.is_empty());
88
89 let versioning: &mut Versioning = &mut versioning_guard;
90 self.commit_epoch_sanity_check(
91 &tables_to_commit,
92 &sstables,
93 &sst_to_context,
94 &versioning.current_version,
95 )
96 .await?;
97
98 let mut table_stats_change = PbTableStatsMap::default();
100 for s in &mut sstables {
101 add_prost_table_stats_map(
102 &mut table_stats_change,
103 &to_prost_table_stats_map(s.table_stats.clone()),
104 );
105 }
106
107 let mut version = HummockVersionTransaction::new(
108 &mut versioning.current_version,
109 &mut versioning.hummock_version_deltas,
110 self.env.notification_manager(),
111 Some(&self.table_committed_epoch_notifiers),
112 &self.metrics,
113 );
114
115 let state_table_info = &version.latest_version().state_table_info;
116 let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
117 let mut new_table_ids = HashMap::new();
118 let mut new_compaction_groups = Vec::new();
119 let mut compaction_group_manager_txn = None;
120 let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;
121
122 for NewTableFragmentInfo { table_ids } in new_table_fragment_infos {
124 let (compaction_group_manager, compaction_group_config) =
125 if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
126 (
127 compaction_group_manager,
128 (*compaction_group_config
129 .as_ref()
130 .expect("must be set with compaction_group_manager_txn"))
131 .clone(),
132 )
133 } else {
134 let compaction_group_manager_guard =
135 self.compaction_group_manager.write().await;
136 let new_compaction_group_config =
137 compaction_group_manager_guard.default_compaction_config();
138 compaction_group_config = Some(new_compaction_group_config.clone());
139 (
140 compaction_group_manager_txn.insert(
141 CompactionGroupManager::start_owned_compaction_groups_txn(
142 compaction_group_manager_guard,
143 ),
144 ),
145 new_compaction_group_config,
146 )
147 };
148 let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
149 let new_compaction_group = CompactionGroup {
150 group_id: new_compaction_group_id,
151 compaction_config: compaction_group_config.clone(),
152 };
153
154 new_compaction_groups.push(new_compaction_group.clone());
155 compaction_group_manager.insert(new_compaction_group_id, new_compaction_group);
156
157 on_handle_add_new_table(
158 state_table_info,
159 &table_ids,
160 new_compaction_group_id,
161 &mut table_compaction_group_mapping,
162 &mut new_table_ids,
163 )?;
164 }
165
166 let commit_sstables = self
167 .correct_commit_ssts(sstables, &table_compaction_group_mapping)
168 .await?;
169
170 let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
171 let mut group_id_to_config = HashMap::new();
173 if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
174 for cg_id in &modified_compaction_groups {
175 let compaction_group = compaction_group_manager
176 .get(cg_id)
177 .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
178 .compaction_config();
179 group_id_to_config.insert(*cg_id, compaction_group);
180 }
181 } else {
182 let compaction_group_manager = self.compaction_group_manager.read().await;
183 for cg_id in &modified_compaction_groups {
184 let compaction_group = compaction_group_manager
185 .try_get_compaction_group_config(*cg_id)
186 .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
187 .compaction_config();
188 group_id_to_config.insert(*cg_id, compaction_group);
189 }
190 }
191
192 let group_id_to_sub_levels =
193 rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);
194
195 let time_travel_delta = version.pre_commit_epoch(
196 &tables_to_commit,
197 new_compaction_groups,
198 group_id_to_sub_levels,
199 &new_table_ids,
200 new_table_watermarks,
201 change_log_delta,
202 );
203 if should_mark_next_time_travel_version_snapshot(&time_travel_delta) {
204 versioning.time_travel_snapshot_interval_counter = u64::MAX;
206 }
207
208 let mut version_stats = HummockVersionStatsTransaction::new(
210 &mut versioning.version_stats,
211 self.env.notification_manager(),
212 );
213 add_prost_table_stats_map(&mut version_stats.table_stats, &table_stats_change);
214 if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version()) {
215 self.metrics.version_stats.reset();
216 versioning.local_metrics.clear();
217 }
218
219 trigger_local_table_stat(
220 &self.metrics,
221 &mut versioning.local_metrics,
222 &version_stats,
223 &table_stats_change,
224 );
225 for (table_id, stats) in &table_stats_change {
226 if stats.total_key_size == 0
227 && stats.total_value_size == 0
228 && stats.total_key_count == 0
229 {
230 continue;
231 }
232 let stats_value = std::cmp::max(0, stats.total_key_size + stats.total_value_size);
233 let table_metrics = get_or_create_local_table_stat(
234 &self.metrics,
235 *table_id,
236 &mut versioning.local_metrics,
237 );
238 table_metrics.inc_write_throughput(stats_value as u64);
239 }
240 let mut time_travel_version = None;
241 if versioning.time_travel_snapshot_interval_counter
242 >= self.env.opts.hummock_time_travel_snapshot_interval
243 {
244 versioning.time_travel_snapshot_interval_counter = 0;
245 time_travel_version = Some(version.latest_version());
246 } else {
247 versioning.time_travel_snapshot_interval_counter = versioning
248 .time_travel_snapshot_interval_counter
249 .saturating_add(1);
250 }
251 let time_travel_tables_to_commit =
252 table_compaction_group_mapping
253 .iter()
254 .filter_map(|(table_id, cg_id)| {
255 tables_to_commit
256 .get(table_id)
257 .map(|committed_epoch| (table_id, cg_id, *committed_epoch))
258 });
259 let time_travel_table_ids: HashSet<_> = self
260 .metadata_manager
261 .catalog_controller
262 .list_time_travel_table_ids()
263 .await
264 .map_err(|e| Error::Internal(e.into()))?
265 .into_iter()
266 .map(|id| id.try_into().unwrap())
267 .collect();
268 let mut txn = self.env.meta_store_ref().conn.begin().await?;
269 let version_snapshot_sst_ids = self
270 .write_time_travel_metadata(
271 &txn,
272 time_travel_version,
273 time_travel_delta,
274 time_travel_table_ids,
275 &versioning.last_time_travel_snapshot_sst_ids,
276 time_travel_tables_to_commit,
277 )
278 .await?;
279 commit_multi_var_with_provided_txn!(
280 txn,
281 version,
282 version_stats,
283 compaction_group_manager_txn
284 )?;
285 if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
286 versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
287 }
288
289 for compaction_group_id in &modified_compaction_groups {
290 trigger_sst_stat(
291 &self.metrics,
292 None,
293 &versioning.current_version,
294 *compaction_group_id,
295 );
296 }
297 trigger_epoch_stat(&self.metrics, &versioning.current_version);
298
299 drop(versioning_guard);
300
301 if !self.env.opts.compaction_deterministic_test {
303 for id in &modified_compaction_groups {
305 self.try_send_compaction_request(*id, compact_task::TaskType::Dynamic);
306 }
307 if !table_stats_change.is_empty() {
308 self.collect_table_write_throughput(table_stats_change)
309 .await;
310 }
311 }
312 if !modified_compaction_groups.is_empty() {
313 self.try_update_write_limits(&modified_compaction_groups)
314 .await;
315 }
316 #[cfg(test)]
317 {
318 self.check_state_consistency().await;
319 }
320 Ok(())
321 }
322
323 async fn collect_table_write_throughput(&self, table_stats: PbTableStatsMap) {
324 let params = self.env.system_params_reader().await;
325 let barrier_interval_ms = params.barrier_interval_ms() as u64;
326 let checkpoint_secs = std::cmp::max(
327 1,
328 params.checkpoint_frequency() * barrier_interval_ms / 1000,
329 );
330
331 let mut table_throughput_statistic_manager =
332 self.table_write_throughput_statistic_manager.write();
333 let timestamp = chrono::Utc::now().timestamp();
334
335 for (table_id, stat) in table_stats {
336 let throughput = ((stat.total_value_size + stat.total_key_size) as f64
337 / checkpoint_secs as f64) as u64;
338 table_throughput_statistic_manager
339 .add_table_throughput_with_ts(table_id, throughput, timestamp);
340 }
341 }
342
343 async fn correct_commit_ssts(
344 &self,
345 sstables: Vec<LocalSstableInfo>,
346 table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
347 ) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
348 let mut new_sst_id_number = 0;
349 let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
350 let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
351 for commit_sst in sstables {
352 let mut group_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
353 for table_id in &commit_sst.sst_info.table_ids {
354 match table_compaction_group_mapping.get(&TableId::new(*table_id)) {
355 Some(cg_id_from_meta) => {
356 group_table_ids
357 .entry(*cg_id_from_meta)
358 .or_default()
359 .push(*table_id);
360 }
361 None => {
362 tracing::warn!(
363 table_id = *table_id,
364 object_id = commit_sst.sst_info.object_id,
365 "table doesn't belong to any compaction group",
366 );
367 }
368 }
369 }
370
371 new_sst_id_number += group_table_ids.len() * 2; sst_to_cg_vec.push((commit_sst, group_table_ids));
373 }
374
375 let mut new_sst_id = next_sstable_object_id(&self.env, new_sst_id_number).await?;
379 let mut commit_sstables: BTreeMap<u64, Vec<SstableInfo>> = BTreeMap::new();
380
381 for (mut sst, group_table_ids) in sst_to_cg_vec {
382 let len = group_table_ids.len();
383 for (index, (group_id, match_ids)) in group_table_ids.into_iter().enumerate() {
384 if sst.sst_info.table_ids == match_ids {
385 assert!(
387 index == len - 1,
388 "SST should be the last key in the group {} index {} len {}",
389 group_id,
390 index,
391 len
392 );
393 commit_sstables
394 .entry(group_id)
395 .or_default()
396 .push(sst.sst_info);
397 break;
398 }
399
400 let origin_sst_size = sst.sst_info.sst_size;
401 let new_sst_size = match_ids
402 .iter()
403 .map(|id| {
404 let stat = sst.table_stats.get(id).unwrap();
405 stat.total_compressed_size
406 })
407 .sum();
408
409 if new_sst_size == 0 {
410 tracing::warn!(
411 id = sst.sst_info.sst_id,
412 object_id = sst.sst_info.object_id,
413 match_ids = ?match_ids,
414 "Sstable doesn't contain any data for tables",
415 );
416 }
417
418 let old_sst_size = origin_sst_size.saturating_sub(new_sst_size);
419 if old_sst_size == 0 {
420 tracing::warn!(
421 id = sst.sst_info.sst_id,
422 object_id = sst.sst_info.object_id,
423 match_ids = ?match_ids,
424 origin_sst_size = origin_sst_size,
425 new_sst_size = new_sst_size,
426 "Sstable doesn't contain any data for tables",
427 );
428 }
429 let (modified_sst_info, branch_sst) = split_sst_with_table_ids(
430 &sst.sst_info,
431 &mut new_sst_id,
432 old_sst_size,
433 new_sst_size,
434 match_ids,
435 );
436 sst.sst_info = modified_sst_info;
437
438 commit_sstables
439 .entry(group_id)
440 .or_default()
441 .push(branch_sst);
442 }
443 }
444
445 for ssts in commit_sstables.values() {
447 let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
448 assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
449 }
450
451 Ok(commit_sstables)
452 }
453}
454
455fn on_handle_add_new_table(
456 state_table_info: &HummockVersionStateTableInfo,
457 table_ids: impl IntoIterator<Item = &TableId>,
458 compaction_group_id: CompactionGroupId,
459 table_compaction_group_mapping: &mut HashMap<TableId, CompactionGroupId>,
460 new_table_ids: &mut HashMap<TableId, CompactionGroupId>,
461) -> Result<()> {
462 for table_id in table_ids {
463 if let Some(info) = state_table_info.info().get(table_id) {
464 return Err(Error::CompactionGroup(format!(
465 "table {} already exist {:?}",
466 table_id.table_id, info,
467 )));
468 }
469 table_compaction_group_mapping.insert(*table_id, compaction_group_id);
470 new_table_ids.insert(*table_id, compaction_group_id);
471 }
472
473 Ok(())
474}
475
476fn rewrite_commit_sstables_to_sub_level(
479 commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
480 group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
481) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
482 let mut overlapping_sstables: BTreeMap<u64, Vec<Vec<SstableInfo>>> = BTreeMap::new();
483 for (group_id, inserted_table_infos) in commit_sstables {
484 let config = group_id_to_config
485 .get(&group_id)
486 .expect("compaction group should exist");
487
488 let mut accumulated_size = 0;
489 let mut ssts = vec![];
490 let sub_level_size_limit = config
491 .max_overlapping_level_size
492 .unwrap_or(compaction_config::max_overlapping_level_size());
493
494 let level = overlapping_sstables.entry(group_id).or_default();
495
496 for sst in inserted_table_infos {
497 accumulated_size += sst.sst_size;
498 ssts.push(sst);
499 if accumulated_size > sub_level_size_limit {
500 level.push(ssts);
501
502 accumulated_size = 0;
504 ssts = vec![];
505 }
506 }
507
508 if !ssts.is_empty() {
509 level.push(ssts);
510 }
511
512 level.reverse();
514 }
515
516 overlapping_sstables
517}
518
519fn is_ordered_subset(vec_1: &Vec<u64>, vec_2: &Vec<u64>) -> bool {
520 let mut vec_2_iter = vec_2.iter().peekable();
521 for item in vec_1 {
522 if vec_2_iter.peek() == Some(&item) {
523 vec_2_iter.next();
524 }
525 }
526
527 vec_2_iter.peek().is_none()
528}