risingwave_meta/barrier/
utils.rs1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21use risingwave_hummock_sdk::table_stats::from_prost_table_stats_map;
22use risingwave_hummock_sdk::table_watermark::{
23 TableWatermarks, merge_multiple_new_table_watermarks,
24};
25use risingwave_hummock_sdk::vector_index::{VectorIndexAdd, VectorIndexDelta};
26use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo};
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::stream_service::BarrierCompleteResponse;
29
30use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
31
32#[expect(clippy::type_complexity)]
33pub(super) fn collect_resp_info(
34 resps: Vec<BarrierCompleteResponse>,
35) -> (
36 HashMap<HummockSstableObjectId, WorkerId>,
37 Vec<LocalSstableInfo>,
38 HashMap<TableId, TableWatermarks>,
39 Vec<SstableInfo>,
40 HashMap<TableId, Vec<VectorIndexAdd>>,
41 HashSet<TableId>,
42) {
43 let mut sst_to_worker: HashMap<HummockSstableObjectId, _> = HashMap::new();
44 let mut synced_ssts: Vec<LocalSstableInfo> = vec![];
45 let mut table_watermarks = Vec::with_capacity(resps.len());
46 let mut old_value_ssts = Vec::with_capacity(resps.len());
47 let mut vector_index_adds = HashMap::new();
48 let mut truncate_tables: HashSet<TableId> = HashSet::new();
49
50 for resp in resps {
51 let ssts_iter = resp.synced_sstables.into_iter().map(|local_sst| {
52 let sst_info = local_sst.sst.expect("field not None");
53 sst_to_worker.insert(sst_info.object_id.into(), resp.worker_id);
54 LocalSstableInfo::new(
55 sst_info.into(),
56 from_prost_table_stats_map(local_sst.table_stats_map),
57 local_sst.created_at,
58 )
59 });
60 synced_ssts.extend(ssts_iter);
61 table_watermarks.push(resp.table_watermarks);
62 old_value_ssts.extend(resp.old_value_sstables.into_iter().map(|s| s.into()));
63 for (table_id, vector_index_add) in resp.vector_index_adds {
64 vector_index_adds
65 .try_insert(
66 table_id,
67 vector_index_add
68 .adds
69 .into_iter()
70 .map(VectorIndexAdd::from)
71 .collect(),
72 )
73 .expect("non-duplicate");
74 }
75 truncate_tables.extend(resp.truncate_tables);
76 }
77
78 (
79 sst_to_worker,
80 synced_ssts,
81 merge_multiple_new_table_watermarks(
82 table_watermarks
83 .into_iter()
84 .map(|watermarks| {
85 watermarks
86 .into_iter()
87 .map(|(table_id, watermarks)| {
88 (table_id, TableWatermarks::from(&watermarks))
89 })
90 .collect()
91 })
92 .collect_vec(),
93 ),
94 old_value_ssts,
95 vector_index_adds,
96 truncate_tables,
97 )
98}
99
100pub(super) fn collect_creating_job_commit_epoch_info(
101 commit_info: &mut CommitEpochInfo,
102 epoch: u64,
103 resps: Vec<BarrierCompleteResponse>,
104 tables_to_commit: impl Iterator<Item = TableId>,
105 is_first_time: bool,
106) {
107 let (
108 sst_to_context,
109 sstables,
110 new_table_watermarks,
111 old_value_sst,
112 vector_index_adds,
113 truncate_tables,
114 ) = collect_resp_info(resps);
115 assert!(old_value_sst.is_empty());
116 commit_info.sst_to_context.extend(sst_to_context);
117 commit_info.sstables.extend(sstables);
118 commit_info
119 .new_table_watermarks
120 .extend(new_table_watermarks);
121 for (table_id, vector_index_adds) in vector_index_adds {
122 commit_info
123 .vector_index_delta
124 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
125 .expect("non-duplicate");
126 }
127 commit_info.truncate_tables.extend(truncate_tables);
128 let tables_to_commit: HashSet<_> = tables_to_commit.collect();
129 tables_to_commit.iter().for_each(|table_id| {
130 commit_info
131 .tables_to_commit
132 .try_insert(*table_id, epoch)
133 .expect("non duplicate");
134 });
135 if is_first_time {
136 commit_info
137 .new_table_fragment_infos
138 .push(NewTableFragmentInfo {
139 table_ids: tables_to_commit,
140 });
141 };
142}
143
144pub(super) type NodeToCollect = HashMap<WorkerId, bool>;
145pub(super) fn is_valid_after_worker_err(
146 node_to_collect: &mut NodeToCollect,
147 worker_id: WorkerId,
148) -> bool {
149 match node_to_collect.entry(worker_id) {
150 Entry::Occupied(entry) => {
151 if *entry.get() {
152 entry.remove();
153 true
154 } else {
155 false
156 }
157 }
158 Entry::Vacant(_) => true,
159 }
160}