risingwave_meta/barrier/
utils.rs1use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::TableId;
19use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21use risingwave_hummock_sdk::table_stats::from_prost_table_stats_map;
22use risingwave_hummock_sdk::table_watermark::{
23 TableWatermarks, merge_multiple_new_table_watermarks,
24};
25use risingwave_hummock_sdk::vector_index::{VectorIndexAdd, VectorIndexDelta};
26use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo};
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::catalog::PbTable;
29use risingwave_pb::catalog::table::PbTableType;
30use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
31use risingwave_pb::stream_plan::stream_node::NodeBody;
32use risingwave_pb::stream_service::BarrierCompleteResponse;
33
34use crate::barrier::CreateStreamingJobCommandInfo;
35use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
36use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
37
38#[expect(clippy::type_complexity)]
39pub(super) fn collect_resp_info(
40 resps: Vec<BarrierCompleteResponse>,
41) -> (
42 HashMap<HummockSstableObjectId, WorkerId>,
43 Vec<LocalSstableInfo>,
44 HashMap<TableId, TableWatermarks>,
45 Vec<SstableInfo>,
46 HashMap<TableId, Vec<VectorIndexAdd>>,
47 HashSet<TableId>,
48) {
49 let mut sst_to_worker: HashMap<HummockSstableObjectId, _> = HashMap::new();
50 let mut synced_ssts: Vec<LocalSstableInfo> = vec![];
51 let mut table_watermarks = Vec::with_capacity(resps.len());
52 let mut old_value_ssts = Vec::with_capacity(resps.len());
53 let mut vector_index_adds = HashMap::new();
54 let mut truncate_tables: HashSet<TableId> = HashSet::new();
55
56 for resp in resps {
57 let ssts_iter = resp.synced_sstables.into_iter().map(|local_sst| {
58 let sst_info = local_sst.sst.expect("field not None");
59 sst_to_worker.insert(sst_info.object_id.into(), resp.worker_id);
60 LocalSstableInfo::new(
61 sst_info.into(),
62 from_prost_table_stats_map(local_sst.table_stats_map),
63 local_sst.created_at,
64 )
65 });
66 synced_ssts.extend(ssts_iter);
67 table_watermarks.push(resp.table_watermarks);
68 old_value_ssts.extend(resp.old_value_sstables.into_iter().map(|s| s.into()));
69 for (table_id, vector_index_add) in resp.vector_index_adds {
70 vector_index_adds
71 .try_insert(
72 table_id,
73 vector_index_add
74 .adds
75 .into_iter()
76 .map(VectorIndexAdd::from)
77 .collect(),
78 )
79 .expect("non-duplicate");
80 }
81 truncate_tables.extend(resp.truncate_tables);
82 }
83
84 (
85 sst_to_worker,
86 synced_ssts,
87 merge_multiple_new_table_watermarks(
88 table_watermarks
89 .into_iter()
90 .map(|watermarks| {
91 watermarks
92 .into_iter()
93 .map(|(table_id, watermarks)| {
94 (table_id, TableWatermarks::from(&watermarks))
95 })
96 .collect()
97 })
98 .collect_vec(),
99 ),
100 old_value_ssts,
101 vector_index_adds,
102 truncate_tables,
103 )
104}
105
106pub(super) fn collect_new_vector_index_info(
107 info: &CreateStreamingJobCommandInfo,
108) -> Option<&PbTable> {
109 let mut vector_index_table = None;
110 {
111 for fragment in info.stream_job_fragments.fragments.values() {
112 visit_stream_node_cont(&fragment.nodes, |node| {
113 match node.node_body.as_ref().unwrap() {
114 NodeBody::VectorIndexWrite(vector_index_write) => {
115 let index_table = vector_index_write.table.as_ref().unwrap();
116 assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
117 vector_index_table = Some(index_table);
118 false
119 }
120 _ => true,
121 }
122 })
123 }
124 vector_index_table
125 }
126}
127
128pub(super) fn collect_creating_job_commit_epoch_info(
129 commit_info: &mut CommitEpochInfo,
130 epoch: u64,
131 resps: Vec<BarrierCompleteResponse>,
132 tables_to_commit: impl Iterator<Item = TableId>,
133 create_info: Option<&CreateSnapshotBackfillJobCommandInfo>,
134) {
135 let (
136 sst_to_context,
137 sstables,
138 new_table_watermarks,
139 old_value_sst,
140 vector_index_adds,
141 truncate_tables,
142 ) = collect_resp_info(resps);
143 assert!(old_value_sst.is_empty());
144 commit_info.sst_to_context.extend(sst_to_context);
145 commit_info.sstables.extend(sstables);
146 commit_info
147 .new_table_watermarks
148 .extend(new_table_watermarks);
149 for (table_id, vector_index_adds) in vector_index_adds {
150 commit_info
151 .vector_index_delta
152 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
153 .expect("non-duplicate");
154 }
155 commit_info.truncate_tables.extend(truncate_tables);
156 let tables_to_commit: HashSet<_> = tables_to_commit.collect();
157 tables_to_commit.iter().for_each(|table_id| {
158 commit_info
159 .tables_to_commit
160 .try_insert(*table_id, epoch)
161 .expect("non duplicate");
162 });
163 if let Some(info) = create_info {
164 commit_info
165 .new_table_fragment_infos
166 .push(NewTableFragmentInfo {
167 table_ids: tables_to_commit,
168 });
169 if let Some(index_table) = collect_new_vector_index_info(&info.info) {
170 commit_info
171 .vector_index_delta
172 .try_insert(
173 index_table.id,
174 VectorIndexDelta::Init(PbVectorIndexInit {
175 info: Some(index_table.vector_index_info.unwrap()),
176 }),
177 )
178 .expect("non-duplicate");
179 }
180 };
181}
182
183pub(super) type NodeToCollect = HashSet<WorkerId>;
184pub(super) fn is_valid_after_worker_err(
185 node_to_collect: &NodeToCollect,
186 worker_id: WorkerId,
187) -> bool {
188 !node_to_collect.contains(&worker_id)
189}