risingwave_meta/barrier/
utils.rs1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21use risingwave_hummock_sdk::table_stats::from_prost_table_stats_map;
22use risingwave_hummock_sdk::table_watermark::{
23 TableWatermarks, merge_multiple_new_table_watermarks,
24};
25use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo};
26use risingwave_meta_model::WorkerId;
27use risingwave_pb::stream_service::BarrierCompleteResponse;
28
29use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
30
31#[expect(clippy::type_complexity)]
32pub(super) fn collect_resp_info(
33 resps: Vec<BarrierCompleteResponse>,
34) -> (
35 HashMap<HummockSstableObjectId, u32>,
36 Vec<LocalSstableInfo>,
37 HashMap<TableId, TableWatermarks>,
38 Vec<SstableInfo>,
39) {
40 let mut sst_to_worker: HashMap<HummockSstableObjectId, u32> = HashMap::new();
41 let mut synced_ssts: Vec<LocalSstableInfo> = vec![];
42 let mut table_watermarks = Vec::with_capacity(resps.len());
43 let mut old_value_ssts = Vec::with_capacity(resps.len());
44
45 for resp in resps {
46 let ssts_iter = resp.synced_sstables.into_iter().map(|local_sst| {
47 let sst_info = local_sst.sst.expect("field not None");
48 sst_to_worker.insert(sst_info.object_id.into(), resp.worker_id);
49 LocalSstableInfo::new(
50 sst_info.into(),
51 from_prost_table_stats_map(local_sst.table_stats_map),
52 local_sst.created_at,
53 )
54 });
55 synced_ssts.extend(ssts_iter);
56 table_watermarks.push(resp.table_watermarks);
57 old_value_ssts.extend(resp.old_value_sstables.into_iter().map(|s| s.into()));
58 }
59
60 (
61 sst_to_worker,
62 synced_ssts,
63 merge_multiple_new_table_watermarks(
64 table_watermarks
65 .into_iter()
66 .map(|watermarks| {
67 watermarks
68 .into_iter()
69 .map(|(table_id, watermarks)| {
70 (TableId::new(table_id), TableWatermarks::from(&watermarks))
71 })
72 .collect()
73 })
74 .collect_vec(),
75 ),
76 old_value_ssts,
77 )
78}
79
80pub(super) fn collect_creating_job_commit_epoch_info(
81 commit_info: &mut CommitEpochInfo,
82 epoch: u64,
83 resps: Vec<BarrierCompleteResponse>,
84 tables_to_commit: impl Iterator<Item = TableId>,
85 is_first_time: bool,
86) {
87 let (sst_to_context, sstables, new_table_watermarks, old_value_sst) = collect_resp_info(resps);
88 assert!(old_value_sst.is_empty());
89 commit_info.sst_to_context.extend(sst_to_context);
90 commit_info.sstables.extend(sstables);
91 commit_info
92 .new_table_watermarks
93 .extend(new_table_watermarks);
94 let tables_to_commit: HashSet<_> = tables_to_commit.collect();
95 tables_to_commit.iter().for_each(|table_id| {
96 commit_info
97 .tables_to_commit
98 .try_insert(*table_id, epoch)
99 .expect("non duplicate");
100 });
101 if is_first_time {
102 commit_info
103 .new_table_fragment_infos
104 .push(NewTableFragmentInfo {
105 table_ids: tables_to_commit,
106 });
107 };
108}
109
110pub(super) type NodeToCollect = HashMap<WorkerId, bool>;
111pub(super) fn is_valid_after_worker_err(
112 node_to_collect: &mut NodeToCollect,
113 worker_id: WorkerId,
114) -> bool {
115 match node_to_collect.entry(worker_id) {
116 Entry::Occupied(entry) => {
117 if *entry.get() {
118 entry.remove();
119 true
120 } else {
121 false
122 }
123 }
124 Entry::Vacant(_) => true,
125 }
126}