risingwave_meta/barrier/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::ddl_service::DdlProgress;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::meta::PbRecoveryStatus;
23use tokio::sync::oneshot::Sender;
24
25use self::notifier::Notifier;
26use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
27use crate::manager::ActiveStreamingWorkerNodes;
28use crate::model::{ActorId, FragmentDownstreamRelation, StreamActor, StreamJobFragments};
29use crate::{MetaError, MetaResult};
30
31mod backfill_order_control;
32mod checkpoint;
33mod command;
34mod complete_task;
35mod context;
36mod edge_builder;
37mod info;
38mod manager;
39mod notifier;
40mod progress;
41mod rpc;
42mod schedule;
43mod trace;
44mod utils;
45mod worker;
46
47pub use backfill_order_control::{BackfillNode, BackfillOrderState};
48
49pub use self::command::{
50    BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
51    ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
52};
53pub use self::info::InflightSubscriptionInfo;
54pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
55pub use self::schedule::BarrierScheduler;
56pub use self::trace::TracedEpoch;
57
58/// The reason why the cluster is recovering.
59enum RecoveryReason {
60    /// After bootstrap.
61    Bootstrap,
62    /// After failure.
63    Failover(MetaError),
64    /// Manually triggered
65    Adhoc,
66}
67
68/// Status of barrier manager.
69enum BarrierManagerStatus {
70    /// Barrier manager is starting.
71    Starting,
72    /// Barrier manager is under recovery.
73    Recovering(RecoveryReason),
74    /// Barrier manager is running.
75    Running,
76}
77
78/// Scheduled command with its notifiers.
79struct Scheduled {
80    database_id: DatabaseId,
81    command: Command,
82    notifiers: Vec<Notifier>,
83    span: tracing::Span,
84}
85
86impl From<&BarrierManagerStatus> for PbRecoveryStatus {
87    fn from(status: &BarrierManagerStatus) -> Self {
88        match status {
89            BarrierManagerStatus::Starting => Self::StatusStarting,
90            BarrierManagerStatus::Recovering(reason) => match reason {
91                RecoveryReason::Bootstrap => Self::StatusStarting,
92                RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
93            },
94            BarrierManagerStatus::Running => Self::StatusRunning,
95        }
96    }
97}
98
99pub(crate) enum BarrierManagerRequest {
100    GetDdlProgress(Sender<HashMap<u32, DdlProgress>>),
101    AdhocRecovery(Sender<()>),
102}
103
104#[derive(Debug)]
105struct BarrierWorkerRuntimeInfoSnapshot {
106    active_streaming_nodes: ActiveStreamingWorkerNodes,
107    database_job_infos: HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
108    state_table_committed_epochs: HashMap<TableId, u64>,
109    /// `table_id` -> (`Vec<non-checkpoint epoch>`, checkpoint epoch)
110    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
111    subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
112    stream_actors: HashMap<ActorId, StreamActor>,
113    fragment_relations: FragmentDownstreamRelation,
114    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
115    background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
116    hummock_version_stats: HummockVersionStats,
117}
118
119impl BarrierWorkerRuntimeInfoSnapshot {
120    fn validate_database_info(
121        database_id: DatabaseId,
122        database_jobs: &HashMap<TableId, InflightStreamingJobInfo>,
123        active_streaming_nodes: &ActiveStreamingWorkerNodes,
124        stream_actors: &HashMap<ActorId, StreamActor>,
125        state_table_committed_epochs: &HashMap<TableId, u64>,
126    ) -> MetaResult<()> {
127        {
128            for fragment in database_jobs.values().flat_map(|job| job.fragment_infos()) {
129                for (actor_id, actor) in &fragment.actors {
130                    if !active_streaming_nodes
131                        .current()
132                        .contains_key(&actor.worker_id)
133                    {
134                        return Err(anyhow!(
135                            "worker_id {} of actor {} do not exist",
136                            actor.worker_id,
137                            actor_id
138                        )
139                        .into());
140                    }
141                    if !stream_actors.contains_key(actor_id) {
142                        return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
143                    }
144                }
145                for state_table_id in &fragment.state_table_ids {
146                    if !state_table_committed_epochs.contains_key(state_table_id) {
147                        return Err(anyhow!(
148                            "state table {} is not registered to hummock",
149                            state_table_id
150                        )
151                        .into());
152                    }
153                }
154            }
155            for (job_id, job) in database_jobs {
156                let mut committed_epochs = job.existing_table_ids().map(|table_id| {
157                    (
158                        table_id,
159                        *state_table_committed_epochs
160                            .get(&table_id)
161                            .expect("checked exist"),
162                    )
163                });
164                let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
165                    anyhow!(
166                        "job {} in database {} has no state table after recovery",
167                        job_id,
168                        database_id
169                    )
170                })?;
171                for (table_id, epoch) in committed_epochs {
172                    if epoch != first_epoch {
173                        return Err(anyhow!(
174                            "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
175                            job_id,
176                            database_id,
177                            first_table,
178                            first_epoch,
179                            table_id,
180                            epoch
181                        )
182                        .into());
183                    }
184                }
185            }
186        }
187        Ok(())
188    }
189
190    fn validate(&self) -> MetaResult<()> {
191        for (database_id, job_infos) in &self.database_job_infos {
192            Self::validate_database_info(
193                *database_id,
194                job_infos,
195                &self.active_streaming_nodes,
196                &self.stream_actors,
197                &self.state_table_committed_epochs,
198            )?
199        }
200        Ok(())
201    }
202}
203
204#[derive(Debug)]
205struct DatabaseRuntimeInfoSnapshot {
206    job_infos: HashMap<TableId, InflightStreamingJobInfo>,
207    state_table_committed_epochs: HashMap<TableId, u64>,
208    /// `table_id` -> (`Vec<non-checkpoint epoch>`, checkpoint epoch)
209    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
210    subscription_info: InflightSubscriptionInfo,
211    stream_actors: HashMap<ActorId, StreamActor>,
212    fragment_relations: FragmentDownstreamRelation,
213    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
214    background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
215}
216
217impl DatabaseRuntimeInfoSnapshot {
218    fn validate(
219        &self,
220        database_id: DatabaseId,
221        active_streaming_nodes: &ActiveStreamingWorkerNodes,
222    ) -> MetaResult<()> {
223        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
224            database_id,
225            &self.job_infos,
226            active_streaming_nodes,
227            &self.stream_actors,
228            &self.state_table_committed_epochs,
229        )
230    }
231}