risingwave_meta/barrier/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::catalog::Database;
21use risingwave_pb::ddl_service::DdlProgress;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::meta::PbRecoveryStatus;
24use tokio::sync::oneshot::Sender;
25
26use self::notifier::Notifier;
27use crate::barrier::info::BarrierInfo;
28use crate::manager::ActiveStreamingWorkerNodes;
29use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, SubscriptionId};
30use crate::{MetaError, MetaResult};
31
32mod backfill_order_control;
33pub mod cdc_progress;
34mod checkpoint;
35mod command;
36mod complete_task;
37mod context;
38mod edge_builder;
39mod info;
40mod manager;
41mod notifier;
42mod progress;
43mod rpc;
44mod schedule;
45#[cfg(test)]
46mod tests;
47mod trace;
48mod utils;
49mod worker;
50
51pub use backfill_order_control::{BackfillNode, BackfillOrderState};
52use risingwave_common::id::JobId;
53use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
54
55pub use self::command::{
56    BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
57    ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
58};
59pub(crate) use self::info::{SharedActorInfos, SharedFragmentInfo};
60pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
61pub use self::schedule::BarrierScheduler;
62pub use self::trace::TracedEpoch;
63use crate::controller::fragment::InflightFragmentInfo;
64
65/// The reason why the cluster is recovering.
66enum RecoveryReason {
67    /// After bootstrap.
68    Bootstrap,
69    /// After failure.
70    Failover(MetaError),
71    /// Manually triggered
72    Adhoc,
73}
74
75/// Status of barrier manager.
76enum BarrierManagerStatus {
77    /// Barrier manager is starting.
78    Starting,
79    /// Barrier manager is under recovery.
80    Recovering(RecoveryReason),
81    /// Barrier manager is running.
82    Running,
83}
84
85/// Scheduled command with its notifiers.
86struct Scheduled {
87    database_id: DatabaseId,
88    command: Command,
89    notifiers: Vec<Notifier>,
90    span: tracing::Span,
91}
92
93impl From<&BarrierManagerStatus> for PbRecoveryStatus {
94    fn from(status: &BarrierManagerStatus) -> Self {
95        match status {
96            BarrierManagerStatus::Starting => Self::StatusStarting,
97            BarrierManagerStatus::Recovering(reason) => match reason {
98                RecoveryReason::Bootstrap => Self::StatusStarting,
99                RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
100            },
101            BarrierManagerStatus::Running => Self::StatusRunning,
102        }
103    }
104}
105
106pub(crate) enum BarrierManagerRequest {
107    GetDdlProgress(Sender<HashMap<JobId, DdlProgress>>),
108    AdhocRecovery(Sender<()>),
109    UpdateDatabaseBarrier {
110        database_id: DatabaseId,
111        barrier_interval_ms: Option<u32>,
112        checkpoint_frequency: Option<u64>,
113        sender: Sender<()>,
114    },
115}
116
117#[derive(Debug)]
118struct BarrierWorkerRuntimeInfoSnapshot {
119    active_streaming_nodes: ActiveStreamingWorkerNodes,
120    database_job_infos:
121        HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>,
122    state_table_committed_epochs: HashMap<TableId, u64>,
123    /// `table_id` -> (`Vec<non-checkpoint epoch>`, checkpoint epoch)
124    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
125    mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
126    stream_actors: HashMap<ActorId, StreamActor>,
127    fragment_relations: FragmentDownstreamRelation,
128    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
129    background_jobs: HashMap<JobId, String>,
130    hummock_version_stats: HummockVersionStats,
131    database_infos: Vec<Database>,
132    cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
133}
134
135impl BarrierWorkerRuntimeInfoSnapshot {
136    fn validate_database_info(
137        database_id: DatabaseId,
138        database_jobs: &HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
139        active_streaming_nodes: &ActiveStreamingWorkerNodes,
140        stream_actors: &HashMap<ActorId, StreamActor>,
141        state_table_committed_epochs: &HashMap<TableId, u64>,
142    ) -> MetaResult<()> {
143        {
144            for fragment in database_jobs.values().flat_map(|job| job.values()) {
145                for (actor_id, actor) in &fragment.actors {
146                    if !active_streaming_nodes
147                        .current()
148                        .contains_key(&actor.worker_id)
149                    {
150                        return Err(anyhow!(
151                            "worker_id {} of actor {} do not exist",
152                            actor.worker_id,
153                            actor_id
154                        )
155                        .into());
156                    }
157                    if !stream_actors.contains_key(actor_id) {
158                        return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
159                    }
160                }
161                for state_table_id in &fragment.state_table_ids {
162                    if !state_table_committed_epochs.contains_key(state_table_id) {
163                        return Err(anyhow!(
164                            "state table {} is not registered to hummock",
165                            state_table_id
166                        )
167                        .into());
168                    }
169                }
170            }
171            for (job_id, fragments) in database_jobs {
172                let mut committed_epochs =
173                    InflightFragmentInfo::existing_table_ids(fragments.values()).map(|table_id| {
174                        (
175                            table_id,
176                            *state_table_committed_epochs
177                                .get(&table_id)
178                                .expect("checked exist"),
179                        )
180                    });
181                let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
182                    anyhow!(
183                        "job {} in database {} has no state table after recovery",
184                        job_id,
185                        database_id
186                    )
187                })?;
188                for (table_id, epoch) in committed_epochs {
189                    if epoch != first_epoch {
190                        return Err(anyhow!(
191                            "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
192                            job_id,
193                            database_id,
194                            first_table,
195                            first_epoch,
196                            table_id,
197                            epoch
198                        )
199                        .into());
200                    }
201                }
202            }
203        }
204        Ok(())
205    }
206
207    fn validate(&self) -> MetaResult<()> {
208        for (database_id, job_infos) in &self.database_job_infos {
209            Self::validate_database_info(
210                *database_id,
211                job_infos,
212                &self.active_streaming_nodes,
213                &self.stream_actors,
214                &self.state_table_committed_epochs,
215            )?
216        }
217        Ok(())
218    }
219}
220
221#[derive(Debug)]
222struct DatabaseRuntimeInfoSnapshot {
223    job_infos: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
224    state_table_committed_epochs: HashMap<TableId, u64>,
225    /// `table_id` -> (`Vec<non-checkpoint epoch>`, checkpoint epoch)
226    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
227    mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
228    stream_actors: HashMap<ActorId, StreamActor>,
229    fragment_relations: FragmentDownstreamRelation,
230    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
231    background_jobs: HashMap<JobId, String>,
232    cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
233}
234
235impl DatabaseRuntimeInfoSnapshot {
236    fn validate(
237        &self,
238        database_id: DatabaseId,
239        active_streaming_nodes: &ActiveStreamingWorkerNodes,
240    ) -> MetaResult<()> {
241        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
242            database_id,
243            &self.job_infos,
244            active_streaming_nodes,
245            &self.stream_actors,
246            &self.state_table_committed_epochs,
247        )
248    }
249}