risingwave_meta/barrier/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21use risingwave_hummock_sdk::table_stats::from_prost_table_stats_map;
22use risingwave_hummock_sdk::table_watermark::{
23    TableWatermarks, merge_multiple_new_table_watermarks,
24};
25use risingwave_hummock_sdk::vector_index::{VectorIndexAdd, VectorIndexDelta};
26use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo};
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::stream_service::BarrierCompleteResponse;
29
30use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
31
32#[expect(clippy::type_complexity)]
33pub(super) fn collect_resp_info(
34    resps: Vec<BarrierCompleteResponse>,
35) -> (
36    HashMap<HummockSstableObjectId, u32>,
37    Vec<LocalSstableInfo>,
38    HashMap<TableId, TableWatermarks>,
39    Vec<SstableInfo>,
40    HashMap<TableId, Vec<VectorIndexAdd>>,
41    HashSet<TableId>,
42) {
43    let mut sst_to_worker: HashMap<HummockSstableObjectId, u32> = HashMap::new();
44    let mut synced_ssts: Vec<LocalSstableInfo> = vec![];
45    let mut table_watermarks = Vec::with_capacity(resps.len());
46    let mut old_value_ssts = Vec::with_capacity(resps.len());
47    let mut vector_index_adds = HashMap::new();
48    let mut truncate_tables: HashSet<TableId> = HashSet::new();
49
50    for resp in resps {
51        let ssts_iter = resp.synced_sstables.into_iter().map(|local_sst| {
52            let sst_info = local_sst.sst.expect("field not None");
53            sst_to_worker.insert(sst_info.object_id.into(), resp.worker_id);
54            LocalSstableInfo::new(
55                sst_info.into(),
56                from_prost_table_stats_map(local_sst.table_stats_map),
57                local_sst.created_at,
58            )
59        });
60        synced_ssts.extend(ssts_iter);
61        table_watermarks.push(resp.table_watermarks);
62        old_value_ssts.extend(resp.old_value_sstables.into_iter().map(|s| s.into()));
63        for (table_id, vector_index_add) in resp.vector_index_adds {
64            let table_id = TableId::new(table_id);
65            vector_index_adds
66                .try_insert(
67                    table_id,
68                    vector_index_add
69                        .adds
70                        .into_iter()
71                        .map(VectorIndexAdd::from)
72                        .collect(),
73                )
74                .expect("non-duplicate");
75        }
76        truncate_tables.extend(resp.truncate_tables.into_iter().map(TableId::new));
77    }
78
79    (
80        sst_to_worker,
81        synced_ssts,
82        merge_multiple_new_table_watermarks(
83            table_watermarks
84                .into_iter()
85                .map(|watermarks| {
86                    watermarks
87                        .into_iter()
88                        .map(|(table_id, watermarks)| {
89                            (TableId::new(table_id), TableWatermarks::from(&watermarks))
90                        })
91                        .collect()
92                })
93                .collect_vec(),
94        ),
95        old_value_ssts,
96        vector_index_adds,
97        truncate_tables,
98    )
99}
100
101pub(super) fn collect_creating_job_commit_epoch_info(
102    commit_info: &mut CommitEpochInfo,
103    epoch: u64,
104    resps: Vec<BarrierCompleteResponse>,
105    tables_to_commit: impl Iterator<Item = TableId>,
106    is_first_time: bool,
107) {
108    let (
109        sst_to_context,
110        sstables,
111        new_table_watermarks,
112        old_value_sst,
113        vector_index_adds,
114        truncate_tables,
115    ) = collect_resp_info(resps);
116    assert!(old_value_sst.is_empty());
117    commit_info.sst_to_context.extend(sst_to_context);
118    commit_info.sstables.extend(sstables);
119    commit_info
120        .new_table_watermarks
121        .extend(new_table_watermarks);
122    for (table_id, vector_index_adds) in vector_index_adds {
123        commit_info
124            .vector_index_delta
125            .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
126            .expect("non-duplicate");
127    }
128    commit_info.truncate_tables.extend(truncate_tables);
129    let tables_to_commit: HashSet<_> = tables_to_commit.collect();
130    tables_to_commit.iter().for_each(|table_id| {
131        commit_info
132            .tables_to_commit
133            .try_insert(*table_id, epoch)
134            .expect("non duplicate");
135    });
136    if is_first_time {
137        commit_info
138            .new_table_fragment_infos
139            .push(NewTableFragmentInfo {
140                table_ids: tables_to_commit,
141            });
142    };
143}
144
145pub(super) type NodeToCollect = HashMap<WorkerId, bool>;
146pub(super) fn is_valid_after_worker_err(
147    node_to_collect: &mut NodeToCollect,
148    worker_id: WorkerId,
149) -> bool {
150    match node_to_collect.entry(worker_id) {
151        Entry::Occupied(entry) => {
152            if *entry.get() {
153                entry.remove();
154                true
155            } else {
156                false
157            }
158        }
159        Entry::Vacant(_) => true,
160    }
161}