risingwave_meta/barrier/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::ddl_service::DdlProgress;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::meta::PbRecoveryStatus;
23use tokio::sync::oneshot::Sender;
24
25use self::notifier::Notifier;
26use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo};
27use crate::manager::ActiveStreamingWorkerNodes;
28use crate::model::{ActorId, FragmentDownstreamRelation, StreamActor, StreamJobFragments};
29use crate::{MetaError, MetaResult};
30
31mod checkpoint;
32mod command;
33mod complete_task;
34mod context;
35mod edge_builder;
36mod info;
37mod manager;
38mod notifier;
39mod progress;
40mod rpc;
41mod schedule;
42mod trace;
43mod utils;
44mod worker;
45
46pub use self::command::{
47    BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
48    ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
49};
50pub use self::info::InflightSubscriptionInfo;
51pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
52pub use self::schedule::BarrierScheduler;
53pub use self::trace::TracedEpoch;
54
55/// The reason why the cluster is recovering.
56enum RecoveryReason {
57    /// After bootstrap.
58    Bootstrap,
59    /// After failure.
60    Failover(MetaError),
61    /// Manually triggered
62    Adhoc,
63}
64
65/// Status of barrier manager.
66enum BarrierManagerStatus {
67    /// Barrier manager is starting.
68    Starting,
69    /// Barrier manager is under recovery.
70    Recovering(RecoveryReason),
71    /// Barrier manager is running.
72    Running,
73}
74
75/// Scheduled command with its notifiers.
76struct Scheduled {
77    database_id: DatabaseId,
78    command: Command,
79    notifiers: Vec<Notifier>,
80    span: tracing::Span,
81}
82
83impl From<&BarrierManagerStatus> for PbRecoveryStatus {
84    fn from(status: &BarrierManagerStatus) -> Self {
85        match status {
86            BarrierManagerStatus::Starting => Self::StatusStarting,
87            BarrierManagerStatus::Recovering(reason) => match reason {
88                RecoveryReason::Bootstrap => Self::StatusStarting,
89                RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
90            },
91            BarrierManagerStatus::Running => Self::StatusRunning,
92        }
93    }
94}
95
96pub(crate) enum BarrierManagerRequest {
97    GetDdlProgress(Sender<HashMap<u32, DdlProgress>>),
98    AdhocRecovery(Sender<()>),
99}
100
101#[derive(Debug)]
102struct BarrierWorkerRuntimeInfoSnapshot {
103    active_streaming_nodes: ActiveStreamingWorkerNodes,
104    database_fragment_infos: HashMap<DatabaseId, InflightDatabaseInfo>,
105    state_table_committed_epochs: HashMap<TableId, u64>,
106    subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
107    stream_actors: HashMap<ActorId, StreamActor>,
108    fragment_relations: FragmentDownstreamRelation,
109    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
110    background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
111    hummock_version_stats: HummockVersionStats,
112}
113
114impl BarrierWorkerRuntimeInfoSnapshot {
115    fn validate_database_info(
116        database_id: DatabaseId,
117        database_info: &InflightDatabaseInfo,
118        active_streaming_nodes: &ActiveStreamingWorkerNodes,
119        stream_actors: &HashMap<ActorId, StreamActor>,
120        state_table_committed_epochs: &HashMap<TableId, u64>,
121    ) -> MetaResult<()> {
122        {
123            for fragment in database_info.fragment_infos() {
124                for (actor_id, actor) in &fragment.actors {
125                    if !active_streaming_nodes
126                        .current()
127                        .contains_key(&actor.worker_id)
128                    {
129                        return Err(anyhow!(
130                            "worker_id {} of actor {} do not exist",
131                            actor.worker_id,
132                            actor_id
133                        )
134                        .into());
135                    }
136                    if !stream_actors.contains_key(actor_id) {
137                        return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
138                    }
139                }
140                for state_table_id in &fragment.state_table_ids {
141                    if !state_table_committed_epochs.contains_key(state_table_id) {
142                        return Err(anyhow!(
143                            "state table {} is not registered to hummock",
144                            state_table_id
145                        )
146                        .into());
147                    }
148                }
149            }
150            let mut committed_epochs = database_info.existing_table_ids().map(|table_id| {
151                (
152                    table_id,
153                    *state_table_committed_epochs
154                        .get(&table_id)
155                        .expect("checked exist"),
156                )
157            });
158            let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
159                anyhow!("database {} has no state table after recovery", database_id)
160            })?;
161            for (table_id, epoch) in committed_epochs {
162                if epoch != first_epoch {
163                    return Err(anyhow!(
164                        "database {} has tables with different table ids. {}:{}, {}:{}",
165                        database_id,
166                        first_table,
167                        first_epoch,
168                        table_id,
169                        epoch
170                    )
171                    .into());
172                }
173            }
174        }
175        Ok(())
176    }
177
178    fn validate(&self) -> MetaResult<()> {
179        for (database_id, database_info) in &self.database_fragment_infos {
180            Self::validate_database_info(
181                *database_id,
182                database_info,
183                &self.active_streaming_nodes,
184                &self.stream_actors,
185                &self.state_table_committed_epochs,
186            )?
187        }
188        Ok(())
189    }
190}
191
192#[derive(Debug)]
193struct DatabaseRuntimeInfoSnapshot {
194    database_fragment_info: InflightDatabaseInfo,
195    state_table_committed_epochs: HashMap<TableId, u64>,
196    subscription_info: InflightSubscriptionInfo,
197    stream_actors: HashMap<ActorId, StreamActor>,
198    fragment_relations: FragmentDownstreamRelation,
199    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
200    background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
201}
202
203impl DatabaseRuntimeInfoSnapshot {
204    fn validate(
205        &self,
206        database_id: DatabaseId,
207        active_streaming_nodes: &ActiveStreamingWorkerNodes,
208    ) -> MetaResult<()> {
209        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
210            database_id,
211            &self.database_fragment_info,
212            active_streaming_nodes,
213            &self.stream_actors,
214            &self.state_table_committed_epochs,
215        )
216    }
217}