risingwave_meta/barrier/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_pb::catalog::Database;
20use risingwave_pb::hummock::HummockVersionStats;
21use risingwave_pb::meta::PbRecoveryStatus;
22use tokio::sync::oneshot::Sender;
23
24use self::notifier::Notifier;
25use crate::barrier::info::BarrierInfo;
26use crate::manager::ActiveStreamingWorkerNodes;
27use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamActor, SubscriptionId};
28use crate::{MetaError, MetaResult};
29
30mod backfill_order_control;
31pub mod cdc_progress;
32mod checkpoint;
33mod command;
34pub use command::RescheduleContext;
35mod complete_task;
36pub(super) mod context;
37mod edge_builder;
38mod info;
39mod manager;
40mod notifier;
41mod partial_graph;
42mod progress;
43mod rpc;
44mod schedule;
45#[cfg(test)]
46mod tests;
47mod trace;
48mod utils;
49mod worker;
50
51pub use backfill_order_control::{BackfillNode, BackfillOrderState};
52use risingwave_common::id::JobId;
53use risingwave_pb::ddl_service::PbBackfillType;
54
55pub use self::command::{
56    BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
57    ReplaceStreamJobPlan, Reschedule, ReschedulePlan, ResumeBackfillTarget, SnapshotBackfillInfo,
58};
59pub(crate) use self::info::{SharedActorInfos, SharedFragmentInfo};
60pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
61pub use self::schedule::BarrierScheduler;
62pub use self::trace::TracedEpoch;
63use crate::barrier::cdc_progress::CdcProgress;
64use crate::barrier::context::recovery::LoadedRecoveryContext;
65use crate::controller::fragment::InflightFragmentInfo;
66use crate::stream::cdc::CdcTableSnapshotSplits;
67
68/// The reason why the cluster is recovering.
69enum RecoveryReason {
70    /// After bootstrap.
71    Bootstrap,
72    /// After failure.
73    Failover(MetaError),
74    /// Manually triggered
75    Adhoc,
76}
77
78/// Status of barrier manager.
79enum BarrierManagerStatus {
80    /// Barrier manager is starting.
81    Starting,
82    /// Barrier manager is under recovery.
83    Recovering(RecoveryReason),
84    /// Barrier manager is running.
85    Running,
86}
87
88/// Scheduled command with its notifiers.
89struct Scheduled {
90    database_id: DatabaseId,
91    command: Command,
92    notifiers: Vec<Notifier>,
93    span: tracing::Span,
94}
95
96impl From<&BarrierManagerStatus> for PbRecoveryStatus {
97    fn from(status: &BarrierManagerStatus) -> Self {
98        match status {
99            BarrierManagerStatus::Starting => Self::StatusStarting,
100            BarrierManagerStatus::Recovering(reason) => match reason {
101                RecoveryReason::Bootstrap => Self::StatusStarting,
102                RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
103            },
104            BarrierManagerStatus::Running => Self::StatusRunning,
105        }
106    }
107}
108
109pub(crate) struct BackfillProgress {
110    pub(crate) progress: String,
111    pub(crate) backfill_type: PbBackfillType,
112}
113
114#[derive(Debug, Clone, Copy)]
115pub(crate) struct FragmentBackfillProgress {
116    pub(crate) job_id: JobId,
117    pub(crate) fragment_id: FragmentId,
118    pub(crate) consumed_rows: u64,
119    pub(crate) done: bool,
120    pub(crate) upstream_type: BackfillUpstreamType,
121}
122
123pub(crate) struct UpdateDatabaseBarrierRequest {
124    pub database_id: DatabaseId,
125    pub barrier_interval_ms: Option<u32>,
126    pub checkpoint_frequency: Option<u64>,
127    pub sender: Sender<()>,
128}
129
130pub(crate) enum BarrierManagerRequest {
131    GetBackfillProgress(Sender<MetaResult<HashMap<JobId, BackfillProgress>>>),
132    GetFragmentBackfillProgress(Sender<MetaResult<Vec<FragmentBackfillProgress>>>),
133    GetCdcProgress(Sender<MetaResult<HashMap<JobId, CdcProgress>>>),
134    AdhocRecovery(Sender<()>),
135    UpdateDatabaseBarrier(UpdateDatabaseBarrierRequest),
136    MayHaveSnapshotBackfillingJob(Sender<bool>),
137}
138
139#[derive(Debug)]
140struct BarrierWorkerRuntimeInfoSnapshot {
141    active_streaming_nodes: ActiveStreamingWorkerNodes,
142    recovery_context: LoadedRecoveryContext,
143    state_table_committed_epochs: HashMap<TableId, u64>,
144    /// `table_id` -> (`Vec<non-checkpoint epoch>`, checkpoint epoch)
145    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
146    mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
147    background_jobs: HashSet<JobId>,
148    hummock_version_stats: HummockVersionStats,
149    database_infos: Vec<Database>,
150    cdc_table_snapshot_splits: HashMap<JobId, CdcTableSnapshotSplits>,
151}
152
153impl BarrierWorkerRuntimeInfoSnapshot {
154    fn validate_database_info(
155        database_id: DatabaseId,
156        database_jobs: &HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
157        active_streaming_nodes: &ActiveStreamingWorkerNodes,
158        stream_actors: &HashMap<ActorId, StreamActor>,
159        state_table_committed_epochs: &HashMap<TableId, u64>,
160    ) -> MetaResult<()> {
161        {
162            for fragment in database_jobs.values().flat_map(|job| job.values()) {
163                for (actor_id, actor) in &fragment.actors {
164                    if !active_streaming_nodes
165                        .current()
166                        .contains_key(&actor.worker_id)
167                    {
168                        return Err(anyhow!(
169                            "worker_id {} of actor {} do not exist",
170                            actor.worker_id,
171                            actor_id
172                        )
173                        .into());
174                    }
175                    if !stream_actors.contains_key(actor_id) {
176                        return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
177                    }
178                }
179                for state_table_id in &fragment.state_table_ids {
180                    if !state_table_committed_epochs.contains_key(state_table_id) {
181                        return Err(anyhow!(
182                            "state table {} is not registered to hummock",
183                            state_table_id
184                        )
185                        .into());
186                    }
187                }
188            }
189            for (job_id, fragments) in database_jobs {
190                let mut committed_epochs =
191                    InflightFragmentInfo::existing_table_ids(fragments.values()).map(|table_id| {
192                        (
193                            table_id,
194                            *state_table_committed_epochs
195                                .get(&table_id)
196                                .expect("checked exist"),
197                        )
198                    });
199                let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
200                    anyhow!(
201                        "job {} in database {} has no state table after recovery",
202                        job_id,
203                        database_id
204                    )
205                })?;
206                for (table_id, epoch) in committed_epochs {
207                    if epoch != first_epoch {
208                        return Err(anyhow!(
209                            "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
210                            job_id,
211                            database_id,
212                            first_table,
213                            first_epoch,
214                            table_id,
215                            epoch
216                        )
217                        .into());
218                    }
219                }
220            }
221        }
222        Ok(())
223    }
224}
225
226#[derive(Debug)]
227struct DatabaseRuntimeInfoSnapshot {
228    recovery_context: LoadedRecoveryContext,
229    state_table_committed_epochs: HashMap<TableId, u64>,
230    /// `table_id` -> (`Vec<non-checkpoint epoch>`, checkpoint epoch)
231    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
232    mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
233    background_jobs: HashSet<JobId>,
234    cdc_table_snapshot_splits: HashMap<JobId, CdcTableSnapshotSplits>,
235}