risingwave_meta/barrier/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::catalog::Database;
21use risingwave_pb::ddl_service::DdlProgress;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::meta::PbRecoveryStatus;
24use tokio::sync::oneshot::Sender;
25
26use self::notifier::Notifier;
27use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
28use crate::manager::ActiveStreamingWorkerNodes;
29use crate::model::{ActorId, FragmentDownstreamRelation, StreamActor, StreamJobFragments};
30use crate::{MetaError, MetaResult};
31
32mod backfill_order_control;
33pub mod cdc_progress;
34mod checkpoint;
35mod command;
36mod complete_task;
37mod context;
38mod edge_builder;
39mod info;
40mod manager;
41mod notifier;
42mod progress;
43mod rpc;
44mod schedule;
45#[cfg(test)]
46mod tests;
47mod trace;
48mod utils;
49mod worker;
50
51pub use backfill_order_control::{BackfillNode, BackfillOrderState};
52use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
53
54pub use self::command::{
55    BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
56    ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
57};
58pub(crate) use self::info::{InflightSubscriptionInfo, SharedActorInfos, SharedFragmentInfo};
59pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
60pub use self::schedule::BarrierScheduler;
61pub use self::trace::TracedEpoch;
62
63/// The reason why the cluster is recovering.
64enum RecoveryReason {
65    /// After bootstrap.
66    Bootstrap,
67    /// After failure.
68    Failover(MetaError),
69    /// Manually triggered
70    Adhoc,
71}
72
73/// Status of barrier manager.
74enum BarrierManagerStatus {
75    /// Barrier manager is starting.
76    Starting,
77    /// Barrier manager is under recovery.
78    Recovering(RecoveryReason),
79    /// Barrier manager is running.
80    Running,
81}
82
83/// Scheduled command with its notifiers.
84struct Scheduled {
85    database_id: DatabaseId,
86    command: Command,
87    notifiers: Vec<Notifier>,
88    span: tracing::Span,
89}
90
91impl From<&BarrierManagerStatus> for PbRecoveryStatus {
92    fn from(status: &BarrierManagerStatus) -> Self {
93        match status {
94            BarrierManagerStatus::Starting => Self::StatusStarting,
95            BarrierManagerStatus::Recovering(reason) => match reason {
96                RecoveryReason::Bootstrap => Self::StatusStarting,
97                RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
98            },
99            BarrierManagerStatus::Running => Self::StatusRunning,
100        }
101    }
102}
103
104pub(crate) enum BarrierManagerRequest {
105    GetDdlProgress(Sender<HashMap<u32, DdlProgress>>),
106    AdhocRecovery(Sender<()>),
107    UpdateDatabaseBarrier {
108        database_id: DatabaseId,
109        barrier_interval_ms: Option<u32>,
110        checkpoint_frequency: Option<u64>,
111        sender: Sender<()>,
112    },
113}
114
115#[derive(Debug)]
116struct BarrierWorkerRuntimeInfoSnapshot {
117    active_streaming_nodes: ActiveStreamingWorkerNodes,
118    database_job_infos: HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
119    state_table_committed_epochs: HashMap<TableId, u64>,
120    /// `table_id` -> (`Vec<non-checkpoint epoch>`, checkpoint epoch)
121    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
122    subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
123    stream_actors: HashMap<ActorId, StreamActor>,
124    fragment_relations: FragmentDownstreamRelation,
125    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
126    background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
127    hummock_version_stats: HummockVersionStats,
128    database_infos: Vec<Database>,
129    cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
130}
131
132impl BarrierWorkerRuntimeInfoSnapshot {
133    fn validate_database_info(
134        database_id: DatabaseId,
135        database_jobs: &HashMap<TableId, InflightStreamingJobInfo>,
136        active_streaming_nodes: &ActiveStreamingWorkerNodes,
137        stream_actors: &HashMap<ActorId, StreamActor>,
138        state_table_committed_epochs: &HashMap<TableId, u64>,
139    ) -> MetaResult<()> {
140        {
141            for fragment in database_jobs.values().flat_map(|job| job.fragment_infos()) {
142                for (actor_id, actor) in &fragment.actors {
143                    if !active_streaming_nodes
144                        .current()
145                        .contains_key(&actor.worker_id)
146                    {
147                        return Err(anyhow!(
148                            "worker_id {} of actor {} do not exist",
149                            actor.worker_id,
150                            actor_id
151                        )
152                        .into());
153                    }
154                    if !stream_actors.contains_key(actor_id) {
155                        return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
156                    }
157                }
158                for state_table_id in &fragment.state_table_ids {
159                    if !state_table_committed_epochs.contains_key(state_table_id) {
160                        return Err(anyhow!(
161                            "state table {} is not registered to hummock",
162                            state_table_id
163                        )
164                        .into());
165                    }
166                }
167            }
168            for (job_id, job) in database_jobs {
169                let mut committed_epochs = job.existing_table_ids().map(|table_id| {
170                    (
171                        table_id,
172                        *state_table_committed_epochs
173                            .get(&table_id)
174                            .expect("checked exist"),
175                    )
176                });
177                let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
178                    anyhow!(
179                        "job {} in database {} has no state table after recovery",
180                        job_id,
181                        database_id
182                    )
183                })?;
184                for (table_id, epoch) in committed_epochs {
185                    if epoch != first_epoch {
186                        return Err(anyhow!(
187                            "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
188                            job_id,
189                            database_id,
190                            first_table,
191                            first_epoch,
192                            table_id,
193                            epoch
194                        )
195                        .into());
196                    }
197                }
198            }
199        }
200        Ok(())
201    }
202
203    fn validate(&self) -> MetaResult<()> {
204        for (database_id, job_infos) in &self.database_job_infos {
205            Self::validate_database_info(
206                *database_id,
207                job_infos,
208                &self.active_streaming_nodes,
209                &self.stream_actors,
210                &self.state_table_committed_epochs,
211            )?
212        }
213        Ok(())
214    }
215}
216
217#[derive(Debug)]
218struct DatabaseRuntimeInfoSnapshot {
219    job_infos: HashMap<TableId, InflightStreamingJobInfo>,
220    state_table_committed_epochs: HashMap<TableId, u64>,
221    /// `table_id` -> (`Vec<non-checkpoint epoch>`, checkpoint epoch)
222    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
223    subscription_info: InflightSubscriptionInfo,
224    stream_actors: HashMap<ActorId, StreamActor>,
225    fragment_relations: FragmentDownstreamRelation,
226    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
227    background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
228    cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
229}
230
231impl DatabaseRuntimeInfoSnapshot {
232    fn validate(
233        &self,
234        database_id: DatabaseId,
235        active_streaming_nodes: &ActiveStreamingWorkerNodes,
236    ) -> MetaResult<()> {
237        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
238            database_id,
239            &self.job_infos,
240            active_streaming_nodes,
241            &self.stream_actors,
242            &self.state_table_committed_epochs,
243        )
244    }
245}