risingwave_meta/barrier/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::catalog::Database;
21use risingwave_pb::ddl_service::DdlProgress;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::meta::PbRecoveryStatus;
24use tokio::sync::oneshot::Sender;
25
26use self::notifier::Notifier;
27use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
28use crate::manager::ActiveStreamingWorkerNodes;
29use crate::model::{ActorId, FragmentDownstreamRelation, StreamActor, StreamJobFragments};
30use crate::{MetaError, MetaResult};
31
32mod backfill_order_control;
33mod checkpoint;
34mod command;
35mod complete_task;
36mod context;
37mod edge_builder;
38mod info;
39mod manager;
40mod notifier;
41mod progress;
42mod rpc;
43mod schedule;
44mod trace;
45mod utils;
46mod worker;
47
48pub use backfill_order_control::{BackfillNode, BackfillOrderState};
49
50pub use self::command::{
51    BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
52    ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
53};
54pub use self::info::InflightSubscriptionInfo;
55pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
56pub use self::schedule::BarrierScheduler;
57pub use self::trace::TracedEpoch;
58
59/// The reason why the cluster is recovering.
60enum RecoveryReason {
61    /// After bootstrap.
62    Bootstrap,
63    /// After failure.
64    Failover(MetaError),
65    /// Manually triggered
66    Adhoc,
67}
68
69/// Status of barrier manager.
70enum BarrierManagerStatus {
71    /// Barrier manager is starting.
72    Starting,
73    /// Barrier manager is under recovery.
74    Recovering(RecoveryReason),
75    /// Barrier manager is running.
76    Running,
77}
78
79/// Scheduled command with its notifiers.
80struct Scheduled {
81    database_id: DatabaseId,
82    command: Command,
83    notifiers: Vec<Notifier>,
84    span: tracing::Span,
85}
86
87impl From<&BarrierManagerStatus> for PbRecoveryStatus {
88    fn from(status: &BarrierManagerStatus) -> Self {
89        match status {
90            BarrierManagerStatus::Starting => Self::StatusStarting,
91            BarrierManagerStatus::Recovering(reason) => match reason {
92                RecoveryReason::Bootstrap => Self::StatusStarting,
93                RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
94            },
95            BarrierManagerStatus::Running => Self::StatusRunning,
96        }
97    }
98}
99
100pub(crate) enum BarrierManagerRequest {
101    GetDdlProgress(Sender<HashMap<u32, DdlProgress>>),
102    AdhocRecovery(Sender<()>),
103    UpdateDatabaseBarrier {
104        database_id: DatabaseId,
105        barrier_interval_ms: Option<u32>,
106        checkpoint_frequency: Option<u64>,
107        sender: Sender<()>,
108    },
109}
110
111#[derive(Debug)]
112struct BarrierWorkerRuntimeInfoSnapshot {
113    active_streaming_nodes: ActiveStreamingWorkerNodes,
114    database_job_infos: HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
115    state_table_committed_epochs: HashMap<TableId, u64>,
116    /// `table_id` -> (`Vec<non-checkpoint epoch>`, checkpoint epoch)
117    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
118    subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
119    stream_actors: HashMap<ActorId, StreamActor>,
120    fragment_relations: FragmentDownstreamRelation,
121    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
122    background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
123    hummock_version_stats: HummockVersionStats,
124    database_infos: Vec<Database>,
125}
126
127impl BarrierWorkerRuntimeInfoSnapshot {
128    fn validate_database_info(
129        database_id: DatabaseId,
130        database_jobs: &HashMap<TableId, InflightStreamingJobInfo>,
131        active_streaming_nodes: &ActiveStreamingWorkerNodes,
132        stream_actors: &HashMap<ActorId, StreamActor>,
133        state_table_committed_epochs: &HashMap<TableId, u64>,
134    ) -> MetaResult<()> {
135        {
136            for fragment in database_jobs.values().flat_map(|job| job.fragment_infos()) {
137                for (actor_id, actor) in &fragment.actors {
138                    if !active_streaming_nodes
139                        .current()
140                        .contains_key(&actor.worker_id)
141                    {
142                        return Err(anyhow!(
143                            "worker_id {} of actor {} do not exist",
144                            actor.worker_id,
145                            actor_id
146                        )
147                        .into());
148                    }
149                    if !stream_actors.contains_key(actor_id) {
150                        return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
151                    }
152                }
153                for state_table_id in &fragment.state_table_ids {
154                    if !state_table_committed_epochs.contains_key(state_table_id) {
155                        return Err(anyhow!(
156                            "state table {} is not registered to hummock",
157                            state_table_id
158                        )
159                        .into());
160                    }
161                }
162            }
163            for (job_id, job) in database_jobs {
164                let mut committed_epochs = job.existing_table_ids().map(|table_id| {
165                    (
166                        table_id,
167                        *state_table_committed_epochs
168                            .get(&table_id)
169                            .expect("checked exist"),
170                    )
171                });
172                let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
173                    anyhow!(
174                        "job {} in database {} has no state table after recovery",
175                        job_id,
176                        database_id
177                    )
178                })?;
179                for (table_id, epoch) in committed_epochs {
180                    if epoch != first_epoch {
181                        return Err(anyhow!(
182                            "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
183                            job_id,
184                            database_id,
185                            first_table,
186                            first_epoch,
187                            table_id,
188                            epoch
189                        )
190                        .into());
191                    }
192                }
193            }
194        }
195        Ok(())
196    }
197
198    fn validate(&self) -> MetaResult<()> {
199        for (database_id, job_infos) in &self.database_job_infos {
200            Self::validate_database_info(
201                *database_id,
202                job_infos,
203                &self.active_streaming_nodes,
204                &self.stream_actors,
205                &self.state_table_committed_epochs,
206            )?
207        }
208        Ok(())
209    }
210}
211
212#[derive(Debug)]
213struct DatabaseRuntimeInfoSnapshot {
214    job_infos: HashMap<TableId, InflightStreamingJobInfo>,
215    state_table_committed_epochs: HashMap<TableId, u64>,
216    /// `table_id` -> (`Vec<non-checkpoint epoch>`, checkpoint epoch)
217    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
218    subscription_info: InflightSubscriptionInfo,
219    stream_actors: HashMap<ActorId, StreamActor>,
220    fragment_relations: FragmentDownstreamRelation,
221    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
222    background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
223}
224
225impl DatabaseRuntimeInfoSnapshot {
226    fn validate(
227        &self,
228        database_id: DatabaseId,
229        active_streaming_nodes: &ActiveStreamingWorkerNodes,
230    ) -> MetaResult<()> {
231        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
232            database_id,
233            &self.job_infos,
234            active_streaming_nodes,
235            &self.stream_actors,
236            &self.state_table_committed_epochs,
237        )
238    }
239}