risingwave_meta/barrier/
utils.rs1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21use risingwave_hummock_sdk::table_stats::from_prost_table_stats_map;
22use risingwave_hummock_sdk::table_watermark::{
23 TableWatermarks, merge_multiple_new_table_watermarks,
24};
25use risingwave_hummock_sdk::vector_index::{VectorIndexAdd, VectorIndexDelta};
26use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo};
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::stream_service::BarrierCompleteResponse;
29
30use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
31
32#[expect(clippy::type_complexity)]
33pub(super) fn collect_resp_info(
34 resps: Vec<BarrierCompleteResponse>,
35) -> (
36 HashMap<HummockSstableObjectId, u32>,
37 Vec<LocalSstableInfo>,
38 HashMap<TableId, TableWatermarks>,
39 Vec<SstableInfo>,
40 HashMap<TableId, Vec<VectorIndexAdd>>,
41) {
42 let mut sst_to_worker: HashMap<HummockSstableObjectId, u32> = HashMap::new();
43 let mut synced_ssts: Vec<LocalSstableInfo> = vec![];
44 let mut table_watermarks = Vec::with_capacity(resps.len());
45 let mut old_value_ssts = Vec::with_capacity(resps.len());
46 let mut vector_index_adds = HashMap::new();
47
48 for resp in resps {
49 let ssts_iter = resp.synced_sstables.into_iter().map(|local_sst| {
50 let sst_info = local_sst.sst.expect("field not None");
51 sst_to_worker.insert(sst_info.object_id.into(), resp.worker_id);
52 LocalSstableInfo::new(
53 sst_info.into(),
54 from_prost_table_stats_map(local_sst.table_stats_map),
55 local_sst.created_at,
56 )
57 });
58 synced_ssts.extend(ssts_iter);
59 table_watermarks.push(resp.table_watermarks);
60 old_value_ssts.extend(resp.old_value_sstables.into_iter().map(|s| s.into()));
61 for (table_id, vector_index_add) in resp.vector_index_adds {
62 let table_id = TableId::new(table_id);
63 vector_index_adds
64 .try_insert(
65 table_id,
66 vector_index_add
67 .adds
68 .into_iter()
69 .map(VectorIndexAdd::from)
70 .collect(),
71 )
72 .expect("non-duplicate");
73 }
74 }
75
76 (
77 sst_to_worker,
78 synced_ssts,
79 merge_multiple_new_table_watermarks(
80 table_watermarks
81 .into_iter()
82 .map(|watermarks| {
83 watermarks
84 .into_iter()
85 .map(|(table_id, watermarks)| {
86 (TableId::new(table_id), TableWatermarks::from(&watermarks))
87 })
88 .collect()
89 })
90 .collect_vec(),
91 ),
92 old_value_ssts,
93 vector_index_adds,
94 )
95}
96
97pub(super) fn collect_creating_job_commit_epoch_info(
98 commit_info: &mut CommitEpochInfo,
99 epoch: u64,
100 resps: Vec<BarrierCompleteResponse>,
101 tables_to_commit: impl Iterator<Item = TableId>,
102 is_first_time: bool,
103) {
104 let (sst_to_context, sstables, new_table_watermarks, old_value_sst, vector_index_adds) =
105 collect_resp_info(resps);
106 assert!(old_value_sst.is_empty());
107 commit_info.sst_to_context.extend(sst_to_context);
108 commit_info.sstables.extend(sstables);
109 commit_info
110 .new_table_watermarks
111 .extend(new_table_watermarks);
112 for (table_id, vector_index_adds) in vector_index_adds {
113 commit_info
114 .vector_index_delta
115 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
116 .expect("non-duplicate");
117 }
118 let tables_to_commit: HashSet<_> = tables_to_commit.collect();
119 tables_to_commit.iter().for_each(|table_id| {
120 commit_info
121 .tables_to_commit
122 .try_insert(*table_id, epoch)
123 .expect("non duplicate");
124 });
125 if is_first_time {
126 commit_info
127 .new_table_fragment_infos
128 .push(NewTableFragmentInfo {
129 table_ids: tables_to_commit,
130 });
131 };
132}
133
134pub(super) type NodeToCollect = HashMap<WorkerId, bool>;
135pub(super) fn is_valid_after_worker_err(
136 node_to_collect: &mut NodeToCollect,
137 worker_id: WorkerId,
138) -> bool {
139 match node_to_collect.entry(worker_id) {
140 Entry::Occupied(entry) => {
141 if *entry.get() {
142 entry.remove();
143 true
144 } else {
145 false
146 }
147 }
148 Entry::Vacant(_) => true,
149 }
150}