risingwave_meta/barrier/
utils.rs1use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::TableId;
19use risingwave_hummock_sdk::sstable_info::SstableInfo;
20use risingwave_hummock_sdk::table_stats::from_prost_table_stats_map;
21use risingwave_hummock_sdk::table_watermark::{
22 TableWatermarks, merge_multiple_new_table_watermarks,
23};
24use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo};
25use risingwave_meta_model::WorkerId;
26use risingwave_pb::stream_service::BarrierCompleteResponse;
27
28use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
29
30#[expect(clippy::type_complexity)]
31pub(super) fn collect_resp_info(
32 resps: Vec<BarrierCompleteResponse>,
33) -> (
34 HashMap<HummockSstableObjectId, u32>,
35 Vec<LocalSstableInfo>,
36 HashMap<TableId, TableWatermarks>,
37 Vec<SstableInfo>,
38) {
39 let mut sst_to_worker: HashMap<HummockSstableObjectId, u32> = HashMap::new();
40 let mut synced_ssts: Vec<LocalSstableInfo> = vec![];
41 let mut table_watermarks = Vec::with_capacity(resps.len());
42 let mut old_value_ssts = Vec::with_capacity(resps.len());
43
44 for resp in resps {
45 let ssts_iter = resp.synced_sstables.into_iter().map(|local_sst| {
46 let sst_info = local_sst.sst.expect("field not None");
47 sst_to_worker.insert(sst_info.object_id, resp.worker_id);
48 LocalSstableInfo::new(
49 sst_info.into(),
50 from_prost_table_stats_map(local_sst.table_stats_map),
51 local_sst.created_at,
52 )
53 });
54 synced_ssts.extend(ssts_iter);
55 table_watermarks.push(resp.table_watermarks);
56 old_value_ssts.extend(resp.old_value_sstables.into_iter().map(|s| s.into()));
57 }
58
59 (
60 sst_to_worker,
61 synced_ssts,
62 merge_multiple_new_table_watermarks(
63 table_watermarks
64 .into_iter()
65 .map(|watermarks| {
66 watermarks
67 .into_iter()
68 .map(|(table_id, watermarks)| {
69 (TableId::new(table_id), TableWatermarks::from(&watermarks))
70 })
71 .collect()
72 })
73 .collect_vec(),
74 ),
75 old_value_ssts,
76 )
77}
78
79pub(super) fn collect_creating_job_commit_epoch_info(
80 commit_info: &mut CommitEpochInfo,
81 epoch: u64,
82 resps: Vec<BarrierCompleteResponse>,
83 tables_to_commit: impl Iterator<Item = TableId>,
84 is_first_time: bool,
85) {
86 let (sst_to_context, sstables, new_table_watermarks, old_value_sst) = collect_resp_info(resps);
87 assert!(old_value_sst.is_empty());
88 commit_info.sst_to_context.extend(sst_to_context);
89 commit_info.sstables.extend(sstables);
90 commit_info
91 .new_table_watermarks
92 .extend(new_table_watermarks);
93 let tables_to_commit: HashSet<_> = tables_to_commit.collect();
94 tables_to_commit.iter().for_each(|table_id| {
95 commit_info
96 .tables_to_commit
97 .try_insert(*table_id, epoch)
98 .expect("non duplicate");
99 });
100 if is_first_time {
101 commit_info
102 .new_table_fragment_infos
103 .push(NewTableFragmentInfo {
104 table_ids: tables_to_commit,
105 });
106 };
107}
108
109pub(super) type NodeToCollect = HashMap<WorkerId, bool>;
110pub(super) fn is_valid_after_worker_err(
111 node_to_collect: &mut NodeToCollect,
112 worker_id: WorkerId,
113) -> bool {
114 node_to_collect.remove(&worker_id).unwrap_or(true)
115}