risingwave_meta/barrier/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::catalog::Database;
21use risingwave_pb::ddl_service::DdlProgress;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::meta::PbRecoveryStatus;
24use tokio::sync::oneshot::Sender;
25
26use self::notifier::Notifier;
27use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
28use crate::manager::ActiveStreamingWorkerNodes;
29use crate::model::{ActorId, FragmentDownstreamRelation, StreamActor, StreamJobFragments};
30use crate::{MetaError, MetaResult};
31
32mod backfill_order_control;
33mod checkpoint;
34mod command;
35mod complete_task;
36mod context;
37mod edge_builder;
38mod info;
39mod manager;
40mod notifier;
41mod progress;
42mod rpc;
43mod schedule;
44#[cfg(test)]
45mod tests;
46mod trace;
47mod utils;
48mod worker;
49
50pub use backfill_order_control::{BackfillNode, BackfillOrderState};
51use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignment;
52
53pub use self::command::{
54    BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
55    ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
56};
57pub(crate) use self::info::{InflightSubscriptionInfo, SharedActorInfos, SharedFragmentInfo};
58pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
59pub use self::schedule::BarrierScheduler;
60pub use self::trace::TracedEpoch;
61
62/// The reason why the cluster is recovering.
63enum RecoveryReason {
64    /// After bootstrap.
65    Bootstrap,
66    /// After failure.
67    Failover(MetaError),
68    /// Manually triggered
69    Adhoc,
70}
71
72/// Status of barrier manager.
73enum BarrierManagerStatus {
74    /// Barrier manager is starting.
75    Starting,
76    /// Barrier manager is under recovery.
77    Recovering(RecoveryReason),
78    /// Barrier manager is running.
79    Running,
80}
81
82/// Scheduled command with its notifiers.
83struct Scheduled {
84    database_id: DatabaseId,
85    command: Command,
86    notifiers: Vec<Notifier>,
87    span: tracing::Span,
88}
89
90impl From<&BarrierManagerStatus> for PbRecoveryStatus {
91    fn from(status: &BarrierManagerStatus) -> Self {
92        match status {
93            BarrierManagerStatus::Starting => Self::StatusStarting,
94            BarrierManagerStatus::Recovering(reason) => match reason {
95                RecoveryReason::Bootstrap => Self::StatusStarting,
96                RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
97            },
98            BarrierManagerStatus::Running => Self::StatusRunning,
99        }
100    }
101}
102
103pub(crate) enum BarrierManagerRequest {
104    GetDdlProgress(Sender<HashMap<u32, DdlProgress>>),
105    AdhocRecovery(Sender<()>),
106    UpdateDatabaseBarrier {
107        database_id: DatabaseId,
108        barrier_interval_ms: Option<u32>,
109        checkpoint_frequency: Option<u64>,
110        sender: Sender<()>,
111    },
112}
113
114#[derive(Debug)]
115struct BarrierWorkerRuntimeInfoSnapshot {
116    active_streaming_nodes: ActiveStreamingWorkerNodes,
117    database_job_infos: HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
118    state_table_committed_epochs: HashMap<TableId, u64>,
119    /// `table_id` -> (`Vec<non-checkpoint epoch>`, checkpoint epoch)
120    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
121    subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
122    stream_actors: HashMap<ActorId, StreamActor>,
123    fragment_relations: FragmentDownstreamRelation,
124    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
125    background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
126    hummock_version_stats: HummockVersionStats,
127    database_infos: Vec<Database>,
128    cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
129}
130
131impl BarrierWorkerRuntimeInfoSnapshot {
132    fn validate_database_info(
133        database_id: DatabaseId,
134        database_jobs: &HashMap<TableId, InflightStreamingJobInfo>,
135        active_streaming_nodes: &ActiveStreamingWorkerNodes,
136        stream_actors: &HashMap<ActorId, StreamActor>,
137        state_table_committed_epochs: &HashMap<TableId, u64>,
138    ) -> MetaResult<()> {
139        {
140            for fragment in database_jobs.values().flat_map(|job| job.fragment_infos()) {
141                for (actor_id, actor) in &fragment.actors {
142                    if !active_streaming_nodes
143                        .current()
144                        .contains_key(&actor.worker_id)
145                    {
146                        return Err(anyhow!(
147                            "worker_id {} of actor {} do not exist",
148                            actor.worker_id,
149                            actor_id
150                        )
151                        .into());
152                    }
153                    if !stream_actors.contains_key(actor_id) {
154                        return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
155                    }
156                }
157                for state_table_id in &fragment.state_table_ids {
158                    if !state_table_committed_epochs.contains_key(state_table_id) {
159                        return Err(anyhow!(
160                            "state table {} is not registered to hummock",
161                            state_table_id
162                        )
163                        .into());
164                    }
165                }
166            }
167            for (job_id, job) in database_jobs {
168                let mut committed_epochs = job.existing_table_ids().map(|table_id| {
169                    (
170                        table_id,
171                        *state_table_committed_epochs
172                            .get(&table_id)
173                            .expect("checked exist"),
174                    )
175                });
176                let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
177                    anyhow!(
178                        "job {} in database {} has no state table after recovery",
179                        job_id,
180                        database_id
181                    )
182                })?;
183                for (table_id, epoch) in committed_epochs {
184                    if epoch != first_epoch {
185                        return Err(anyhow!(
186                            "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
187                            job_id,
188                            database_id,
189                            first_table,
190                            first_epoch,
191                            table_id,
192                            epoch
193                        )
194                        .into());
195                    }
196                }
197            }
198        }
199        Ok(())
200    }
201
202    fn validate(&self) -> MetaResult<()> {
203        for (database_id, job_infos) in &self.database_job_infos {
204            Self::validate_database_info(
205                *database_id,
206                job_infos,
207                &self.active_streaming_nodes,
208                &self.stream_actors,
209                &self.state_table_committed_epochs,
210            )?
211        }
212        Ok(())
213    }
214}
215
216#[derive(Debug)]
217struct DatabaseRuntimeInfoSnapshot {
218    job_infos: HashMap<TableId, InflightStreamingJobInfo>,
219    state_table_committed_epochs: HashMap<TableId, u64>,
220    /// `table_id` -> (`Vec<non-checkpoint epoch>`, checkpoint epoch)
221    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
222    subscription_info: InflightSubscriptionInfo,
223    stream_actors: HashMap<ActorId, StreamActor>,
224    fragment_relations: FragmentDownstreamRelation,
225    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
226    background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
227    cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
228}
229
230impl DatabaseRuntimeInfoSnapshot {
231    fn validate(
232        &self,
233        database_id: DatabaseId,
234        active_streaming_nodes: &ActiveStreamingWorkerNodes,
235    ) -> MetaResult<()> {
236        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
237            database_id,
238            &self.job_infos,
239            active_streaming_nodes,
240            &self.stream_actors,
241            &self.state_table_committed_epochs,
242        )
243    }
244}