risingwave_meta/barrier/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_pb::catalog::Database;
20use risingwave_pb::hummock::HummockVersionStats;
21use risingwave_pb::meta::PbRecoveryStatus;
22use tokio::sync::oneshot::Sender;
23
24use self::notifier::Notifier;
25use crate::barrier::info::BarrierInfo;
26use crate::manager::ActiveStreamingWorkerNodes;
27use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamActor, SubscriptionId};
28use crate::{MetaError, MetaResult};
29
30mod backfill_order_control;
31pub mod cdc_progress;
32mod checkpoint;
33mod command;
34pub use command::RescheduleContext;
35mod complete_task;
36pub(super) mod context;
37mod edge_builder;
38mod info;
39mod manager;
40mod notifier;
41mod progress;
42mod rpc;
43mod schedule;
44#[cfg(test)]
45mod tests;
46mod trace;
47mod utils;
48mod worker;
49
50pub use backfill_order_control::{BackfillNode, BackfillOrderState};
51use risingwave_common::id::JobId;
52use risingwave_pb::ddl_service::PbBackfillType;
53
54pub use self::command::{
55    BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
56    ReplaceStreamJobPlan, Reschedule, ReschedulePlan, ResumeBackfillTarget, SnapshotBackfillInfo,
57};
58pub(crate) use self::info::{SharedActorInfos, SharedFragmentInfo};
59pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
60pub use self::schedule::BarrierScheduler;
61pub use self::trace::TracedEpoch;
62use crate::barrier::cdc_progress::CdcProgress;
63use crate::barrier::context::recovery::LoadedRecoveryContext;
64use crate::controller::fragment::InflightFragmentInfo;
65use crate::stream::cdc::CdcTableSnapshotSplits;
66
67/// The reason why the cluster is recovering.
68enum RecoveryReason {
69    /// After bootstrap.
70    Bootstrap,
71    /// After failure.
72    Failover(MetaError),
73    /// Manually triggered
74    Adhoc,
75}
76
77/// Status of barrier manager.
78enum BarrierManagerStatus {
79    /// Barrier manager is starting.
80    Starting,
81    /// Barrier manager is under recovery.
82    Recovering(RecoveryReason),
83    /// Barrier manager is running.
84    Running,
85}
86
87/// Scheduled command with its notifiers.
88struct Scheduled {
89    database_id: DatabaseId,
90    command: Command,
91    notifiers: Vec<Notifier>,
92    span: tracing::Span,
93}
94
95impl From<&BarrierManagerStatus> for PbRecoveryStatus {
96    fn from(status: &BarrierManagerStatus) -> Self {
97        match status {
98            BarrierManagerStatus::Starting => Self::StatusStarting,
99            BarrierManagerStatus::Recovering(reason) => match reason {
100                RecoveryReason::Bootstrap => Self::StatusStarting,
101                RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
102            },
103            BarrierManagerStatus::Running => Self::StatusRunning,
104        }
105    }
106}
107
108pub(crate) struct BackfillProgress {
109    pub(crate) progress: String,
110    pub(crate) backfill_type: PbBackfillType,
111}
112
113#[derive(Debug, Clone, Copy)]
114pub(crate) struct FragmentBackfillProgress {
115    pub(crate) job_id: JobId,
116    pub(crate) fragment_id: FragmentId,
117    pub(crate) consumed_rows: u64,
118    pub(crate) done: bool,
119    pub(crate) upstream_type: BackfillUpstreamType,
120}
121
122pub(crate) struct UpdateDatabaseBarrierRequest {
123    pub database_id: DatabaseId,
124    pub barrier_interval_ms: Option<u32>,
125    pub checkpoint_frequency: Option<u64>,
126    pub sender: Sender<()>,
127}
128
129pub(crate) enum BarrierManagerRequest {
130    GetBackfillProgress(Sender<MetaResult<HashMap<JobId, BackfillProgress>>>),
131    GetFragmentBackfillProgress(Sender<MetaResult<Vec<FragmentBackfillProgress>>>),
132    GetCdcProgress(Sender<MetaResult<HashMap<JobId, CdcProgress>>>),
133    AdhocRecovery(Sender<()>),
134    UpdateDatabaseBarrier(UpdateDatabaseBarrierRequest),
135    MayHaveSnapshotBackfillingJob(Sender<bool>),
136}
137
138#[derive(Debug)]
139struct BarrierWorkerRuntimeInfoSnapshot {
140    active_streaming_nodes: ActiveStreamingWorkerNodes,
141    recovery_context: LoadedRecoveryContext,
142    state_table_committed_epochs: HashMap<TableId, u64>,
143    /// `table_id` -> (`Vec<non-checkpoint epoch>`, checkpoint epoch)
144    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
145    mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
146    background_jobs: HashSet<JobId>,
147    hummock_version_stats: HummockVersionStats,
148    database_infos: Vec<Database>,
149    cdc_table_snapshot_splits: HashMap<JobId, CdcTableSnapshotSplits>,
150}
151
152impl BarrierWorkerRuntimeInfoSnapshot {
153    fn validate_database_info(
154        database_id: DatabaseId,
155        database_jobs: &HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
156        active_streaming_nodes: &ActiveStreamingWorkerNodes,
157        stream_actors: &HashMap<ActorId, StreamActor>,
158        state_table_committed_epochs: &HashMap<TableId, u64>,
159    ) -> MetaResult<()> {
160        {
161            for fragment in database_jobs.values().flat_map(|job| job.values()) {
162                for (actor_id, actor) in &fragment.actors {
163                    if !active_streaming_nodes
164                        .current()
165                        .contains_key(&actor.worker_id)
166                    {
167                        return Err(anyhow!(
168                            "worker_id {} of actor {} do not exist",
169                            actor.worker_id,
170                            actor_id
171                        )
172                        .into());
173                    }
174                    if !stream_actors.contains_key(actor_id) {
175                        return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
176                    }
177                }
178                for state_table_id in &fragment.state_table_ids {
179                    if !state_table_committed_epochs.contains_key(state_table_id) {
180                        return Err(anyhow!(
181                            "state table {} is not registered to hummock",
182                            state_table_id
183                        )
184                        .into());
185                    }
186                }
187            }
188            for (job_id, fragments) in database_jobs {
189                let mut committed_epochs =
190                    InflightFragmentInfo::existing_table_ids(fragments.values()).map(|table_id| {
191                        (
192                            table_id,
193                            *state_table_committed_epochs
194                                .get(&table_id)
195                                .expect("checked exist"),
196                        )
197                    });
198                let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
199                    anyhow!(
200                        "job {} in database {} has no state table after recovery",
201                        job_id,
202                        database_id
203                    )
204                })?;
205                for (table_id, epoch) in committed_epochs {
206                    if epoch != first_epoch {
207                        return Err(anyhow!(
208                            "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
209                            job_id,
210                            database_id,
211                            first_table,
212                            first_epoch,
213                            table_id,
214                            epoch
215                        )
216                        .into());
217                    }
218                }
219            }
220        }
221        Ok(())
222    }
223}
224
225#[derive(Debug)]
226struct DatabaseRuntimeInfoSnapshot {
227    recovery_context: LoadedRecoveryContext,
228    state_table_committed_epochs: HashMap<TableId, u64>,
229    /// `table_id` -> (`Vec<non-checkpoint epoch>`, checkpoint epoch)
230    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
231    mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
232    background_jobs: HashSet<JobId>,
233    cdc_table_snapshot_splits: HashMap<JobId, CdcTableSnapshotSplits>,
234}