risingwave_meta/barrier/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::ddl_service::DdlProgress;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::meta::PbRecoveryStatus;
23use tokio::sync::oneshot::Sender;
24
25use self::notifier::Notifier;
26use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
27use crate::manager::ActiveStreamingWorkerNodes;
28use crate::model::{ActorId, FragmentDownstreamRelation, StreamActor, StreamJobFragments};
29use crate::{MetaError, MetaResult};
30
31mod checkpoint;
32mod command;
33mod complete_task;
34mod context;
35mod edge_builder;
36mod info;
37mod manager;
38mod notifier;
39mod progress;
40mod rpc;
41mod schedule;
42mod trace;
43mod utils;
44mod worker;
45
46pub use self::command::{
47    BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
48    ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
49};
50pub use self::info::InflightSubscriptionInfo;
51pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
52pub use self::schedule::BarrierScheduler;
53pub use self::trace::TracedEpoch;
54
55/// The reason why the cluster is recovering.
56enum RecoveryReason {
57    /// After bootstrap.
58    Bootstrap,
59    /// After failure.
60    Failover(MetaError),
61    /// Manually triggered
62    Adhoc,
63}
64
65/// Status of barrier manager.
66enum BarrierManagerStatus {
67    /// Barrier manager is starting.
68    Starting,
69    /// Barrier manager is under recovery.
70    Recovering(RecoveryReason),
71    /// Barrier manager is running.
72    Running,
73}
74
75/// Scheduled command with its notifiers.
76struct Scheduled {
77    database_id: DatabaseId,
78    command: Command,
79    notifiers: Vec<Notifier>,
80    span: tracing::Span,
81}
82
83impl From<&BarrierManagerStatus> for PbRecoveryStatus {
84    fn from(status: &BarrierManagerStatus) -> Self {
85        match status {
86            BarrierManagerStatus::Starting => Self::StatusStarting,
87            BarrierManagerStatus::Recovering(reason) => match reason {
88                RecoveryReason::Bootstrap => Self::StatusStarting,
89                RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
90            },
91            BarrierManagerStatus::Running => Self::StatusRunning,
92        }
93    }
94}
95
96pub(crate) enum BarrierManagerRequest {
97    GetDdlProgress(Sender<HashMap<u32, DdlProgress>>),
98    AdhocRecovery(Sender<()>),
99}
100
101#[derive(Debug)]
102struct BarrierWorkerRuntimeInfoSnapshot {
103    active_streaming_nodes: ActiveStreamingWorkerNodes,
104    database_job_infos: HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
105    state_table_committed_epochs: HashMap<TableId, u64>,
106    state_table_log_epochs: HashMap<TableId, Vec<Vec<u64>>>,
107    subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
108    stream_actors: HashMap<ActorId, StreamActor>,
109    fragment_relations: FragmentDownstreamRelation,
110    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
111    background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
112    hummock_version_stats: HummockVersionStats,
113}
114
115impl BarrierWorkerRuntimeInfoSnapshot {
116    fn validate_database_info(
117        database_id: DatabaseId,
118        database_jobs: &HashMap<TableId, InflightStreamingJobInfo>,
119        active_streaming_nodes: &ActiveStreamingWorkerNodes,
120        stream_actors: &HashMap<ActorId, StreamActor>,
121        state_table_committed_epochs: &HashMap<TableId, u64>,
122    ) -> MetaResult<()> {
123        {
124            for fragment in database_jobs.values().flat_map(|job| job.fragment_infos()) {
125                for (actor_id, actor) in &fragment.actors {
126                    if !active_streaming_nodes
127                        .current()
128                        .contains_key(&actor.worker_id)
129                    {
130                        return Err(anyhow!(
131                            "worker_id {} of actor {} do not exist",
132                            actor.worker_id,
133                            actor_id
134                        )
135                        .into());
136                    }
137                    if !stream_actors.contains_key(actor_id) {
138                        return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
139                    }
140                }
141                for state_table_id in &fragment.state_table_ids {
142                    if !state_table_committed_epochs.contains_key(state_table_id) {
143                        return Err(anyhow!(
144                            "state table {} is not registered to hummock",
145                            state_table_id
146                        )
147                        .into());
148                    }
149                }
150            }
151            for (job_id, job) in database_jobs {
152                let mut committed_epochs = job.existing_table_ids().map(|table_id| {
153                    (
154                        table_id,
155                        *state_table_committed_epochs
156                            .get(&table_id)
157                            .expect("checked exist"),
158                    )
159                });
160                let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
161                    anyhow!(
162                        "job {} in database {} has no state table after recovery",
163                        job_id,
164                        database_id
165                    )
166                })?;
167                for (table_id, epoch) in committed_epochs {
168                    if epoch != first_epoch {
169                        return Err(anyhow!(
170                            "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
171                            job_id,
172                            database_id,
173                            first_table,
174                            first_epoch,
175                            table_id,
176                            epoch
177                        )
178                        .into());
179                    }
180                }
181            }
182        }
183        Ok(())
184    }
185
186    fn validate(&self) -> MetaResult<()> {
187        for (database_id, job_infos) in &self.database_job_infos {
188            Self::validate_database_info(
189                *database_id,
190                job_infos,
191                &self.active_streaming_nodes,
192                &self.stream_actors,
193                &self.state_table_committed_epochs,
194            )?
195        }
196        Ok(())
197    }
198}
199
200#[derive(Debug)]
201struct DatabaseRuntimeInfoSnapshot {
202    job_infos: HashMap<TableId, InflightStreamingJobInfo>,
203    state_table_committed_epochs: HashMap<TableId, u64>,
204    state_table_log_epochs: HashMap<TableId, Vec<Vec<u64>>>,
205    subscription_info: InflightSubscriptionInfo,
206    stream_actors: HashMap<ActorId, StreamActor>,
207    fragment_relations: FragmentDownstreamRelation,
208    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
209    background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
210}
211
212impl DatabaseRuntimeInfoSnapshot {
213    fn validate(
214        &self,
215        database_id: DatabaseId,
216        active_streaming_nodes: &ActiveStreamingWorkerNodes,
217    ) -> MetaResult<()> {
218        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
219            database_id,
220            &self.job_infos,
221            active_streaming_nodes,
222            &self.stream_actors,
223            &self.state_table_committed_epochs,
224        )
225    }
226}