risingwave_meta/barrier/
utils.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::TableId;
19use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21use risingwave_hummock_sdk::table_stats::from_prost_table_stats_map;
22use risingwave_hummock_sdk::table_watermark::{
23    TableWatermarks, merge_multiple_new_table_watermarks,
24};
25use risingwave_hummock_sdk::vector_index::{VectorIndexAdd, VectorIndexDelta};
26use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo};
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::catalog::PbTable;
29use risingwave_pb::catalog::table::PbTableType;
30use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
31use risingwave_pb::stream_plan::stream_node::NodeBody;
32use risingwave_pb::stream_service::BarrierCompleteResponse;
33
34use crate::barrier::CreateStreamingJobCommandInfo;
35use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
36use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
37
38#[expect(clippy::type_complexity)]
39pub(super) fn collect_resp_info(
40    resps: Vec<BarrierCompleteResponse>,
41) -> (
42    HashMap<HummockSstableObjectId, WorkerId>,
43    Vec<LocalSstableInfo>,
44    HashMap<TableId, TableWatermarks>,
45    Vec<SstableInfo>,
46    HashMap<TableId, Vec<VectorIndexAdd>>,
47    HashSet<TableId>,
48) {
49    let mut sst_to_worker: HashMap<HummockSstableObjectId, _> = HashMap::new();
50    let mut synced_ssts: Vec<LocalSstableInfo> = vec![];
51    let mut table_watermarks = Vec::with_capacity(resps.len());
52    let mut old_value_ssts = Vec::with_capacity(resps.len());
53    let mut vector_index_adds = HashMap::new();
54    let mut truncate_tables: HashSet<TableId> = HashSet::new();
55
56    for resp in resps {
57        let ssts_iter = resp.synced_sstables.into_iter().map(|local_sst| {
58            let sst_info = local_sst.sst.expect("field not None");
59            sst_to_worker.insert(sst_info.object_id.into(), resp.worker_id);
60            LocalSstableInfo::new(
61                sst_info.into(),
62                from_prost_table_stats_map(local_sst.table_stats_map),
63                local_sst.created_at,
64            )
65        });
66        synced_ssts.extend(ssts_iter);
67        table_watermarks.push(resp.table_watermarks);
68        old_value_ssts.extend(resp.old_value_sstables.into_iter().map(|s| s.into()));
69        for (table_id, vector_index_add) in resp.vector_index_adds {
70            vector_index_adds
71                .try_insert(
72                    table_id,
73                    vector_index_add
74                        .adds
75                        .into_iter()
76                        .map(VectorIndexAdd::from)
77                        .collect(),
78                )
79                .expect("non-duplicate");
80        }
81        truncate_tables.extend(resp.truncate_tables);
82    }
83
84    (
85        sst_to_worker,
86        synced_ssts,
87        merge_multiple_new_table_watermarks(
88            table_watermarks
89                .into_iter()
90                .map(|watermarks| {
91                    watermarks
92                        .into_iter()
93                        .map(|(table_id, watermarks)| {
94                            (table_id, TableWatermarks::from(&watermarks))
95                        })
96                        .collect()
97                })
98                .collect_vec(),
99        ),
100        old_value_ssts,
101        vector_index_adds,
102        truncate_tables,
103    )
104}
105
106pub(super) fn collect_new_vector_index_info(
107    info: &CreateStreamingJobCommandInfo,
108) -> Option<&PbTable> {
109    let mut vector_index_table = None;
110    {
111        for fragment in info.stream_job_fragments.fragments.values() {
112            visit_stream_node_cont(&fragment.nodes, |node| {
113                match node.node_body.as_ref().unwrap() {
114                    NodeBody::VectorIndexWrite(vector_index_write) => {
115                        let index_table = vector_index_write.table.as_ref().unwrap();
116                        assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
117                        vector_index_table = Some(index_table);
118                        false
119                    }
120                    _ => true,
121                }
122            })
123        }
124        vector_index_table
125    }
126}
127
128pub(super) fn collect_creating_job_commit_epoch_info(
129    commit_info: &mut CommitEpochInfo,
130    epoch: u64,
131    resps: Vec<BarrierCompleteResponse>,
132    tables_to_commit: impl Iterator<Item = TableId>,
133    create_info: Option<&CreateSnapshotBackfillJobCommandInfo>,
134) {
135    let (
136        sst_to_context,
137        sstables,
138        new_table_watermarks,
139        old_value_sst,
140        vector_index_adds,
141        truncate_tables,
142    ) = collect_resp_info(resps);
143    assert!(old_value_sst.is_empty());
144    commit_info.sst_to_context.extend(sst_to_context);
145    commit_info.sstables.extend(sstables);
146    commit_info
147        .new_table_watermarks
148        .extend(new_table_watermarks);
149    for (table_id, vector_index_adds) in vector_index_adds {
150        commit_info
151            .vector_index_delta
152            .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
153            .expect("non-duplicate");
154    }
155    commit_info.truncate_tables.extend(truncate_tables);
156    let tables_to_commit: HashSet<_> = tables_to_commit.collect();
157    tables_to_commit.iter().for_each(|table_id| {
158        commit_info
159            .tables_to_commit
160            .try_insert(*table_id, epoch)
161            .expect("non duplicate");
162    });
163    if let Some(info) = create_info {
164        commit_info
165            .new_table_fragment_infos
166            .push(NewTableFragmentInfo {
167                table_ids: tables_to_commit,
168            });
169        if let Some(index_table) = collect_new_vector_index_info(&info.info) {
170            commit_info
171                .vector_index_delta
172                .try_insert(
173                    index_table.id,
174                    VectorIndexDelta::Init(PbVectorIndexInit {
175                        info: Some(index_table.vector_index_info.unwrap()),
176                    }),
177                )
178                .expect("non-duplicate");
179        }
180    };
181}
182
183pub(super) type NodeToCollect = HashSet<WorkerId>;
184pub(super) fn is_valid_after_worker_err(
185    node_to_collect: &NodeToCollect,
186    worker_id: WorkerId,
187) -> bool {
188    !node_to_collect.contains(&worker_id)
189}