risingwave_meta/barrier/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::catalog::Database;
21use risingwave_pb::ddl_service::DdlProgress;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::meta::PbRecoveryStatus;
24use tokio::sync::oneshot::Sender;
25
26use self::notifier::Notifier;
27use crate::barrier::info::BarrierInfo;
28use crate::manager::ActiveStreamingWorkerNodes;
29use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, SubscriptionId};
30use crate::{MetaError, MetaResult};
31
32mod backfill_order_control;
33pub mod cdc_progress;
34mod checkpoint;
35mod command;
36mod complete_task;
37mod context;
38mod edge_builder;
39mod info;
40mod manager;
41mod notifier;
42mod progress;
43mod rpc;
44mod schedule;
45#[cfg(test)]
46mod tests;
47mod trace;
48mod utils;
49mod worker;
50
51pub use backfill_order_control::{BackfillNode, BackfillOrderState};
52use risingwave_common::id::JobId;
53
54pub use self::command::{
55    BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
56    ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
57};
58pub(crate) use self::info::{SharedActorInfos, SharedFragmentInfo};
59pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
60pub use self::schedule::BarrierScheduler;
61pub use self::trace::TracedEpoch;
62use crate::barrier::cdc_progress::CdcProgress;
63use crate::controller::fragment::InflightFragmentInfo;
64use crate::stream::cdc::CdcTableSnapshotSplits;
65
66/// The reason why the cluster is recovering.
67enum RecoveryReason {
68    /// After bootstrap.
69    Bootstrap,
70    /// After failure.
71    Failover(MetaError),
72    /// Manually triggered
73    Adhoc,
74}
75
76/// Status of barrier manager.
77enum BarrierManagerStatus {
78    /// Barrier manager is starting.
79    Starting,
80    /// Barrier manager is under recovery.
81    Recovering(RecoveryReason),
82    /// Barrier manager is running.
83    Running,
84}
85
86/// Scheduled command with its notifiers.
87struct Scheduled {
88    database_id: DatabaseId,
89    command: Command,
90    notifiers: Vec<Notifier>,
91    span: tracing::Span,
92}
93
94impl From<&BarrierManagerStatus> for PbRecoveryStatus {
95    fn from(status: &BarrierManagerStatus) -> Self {
96        match status {
97            BarrierManagerStatus::Starting => Self::StatusStarting,
98            BarrierManagerStatus::Recovering(reason) => match reason {
99                RecoveryReason::Bootstrap => Self::StatusStarting,
100                RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
101            },
102            BarrierManagerStatus::Running => Self::StatusRunning,
103        }
104    }
105}
106
107pub(crate) enum BarrierManagerRequest {
108    GetDdlProgress(Sender<HashMap<JobId, DdlProgress>>),
109    GetCdcProgress(Sender<HashMap<JobId, CdcProgress>>),
110    AdhocRecovery(Sender<()>),
111    UpdateDatabaseBarrier {
112        database_id: DatabaseId,
113        barrier_interval_ms: Option<u32>,
114        checkpoint_frequency: Option<u64>,
115        sender: Sender<()>,
116    },
117}
118
119#[derive(Debug)]
120struct BarrierWorkerRuntimeInfoSnapshot {
121    active_streaming_nodes: ActiveStreamingWorkerNodes,
122    database_job_infos:
123        HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>,
124    state_table_committed_epochs: HashMap<TableId, u64>,
125    /// `table_id` -> (`Vec<non-checkpoint epoch>`, checkpoint epoch)
126    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
127    mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
128    stream_actors: HashMap<ActorId, StreamActor>,
129    fragment_relations: FragmentDownstreamRelation,
130    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
131    background_jobs: HashMap<JobId, String>,
132    hummock_version_stats: HummockVersionStats,
133    database_infos: Vec<Database>,
134    cdc_table_snapshot_splits: HashMap<JobId, CdcTableSnapshotSplits>,
135}
136
137impl BarrierWorkerRuntimeInfoSnapshot {
138    fn validate_database_info(
139        database_id: DatabaseId,
140        database_jobs: &HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
141        active_streaming_nodes: &ActiveStreamingWorkerNodes,
142        stream_actors: &HashMap<ActorId, StreamActor>,
143        state_table_committed_epochs: &HashMap<TableId, u64>,
144    ) -> MetaResult<()> {
145        {
146            for fragment in database_jobs.values().flat_map(|job| job.values()) {
147                for (actor_id, actor) in &fragment.actors {
148                    if !active_streaming_nodes
149                        .current()
150                        .contains_key(&actor.worker_id)
151                    {
152                        return Err(anyhow!(
153                            "worker_id {} of actor {} do not exist",
154                            actor.worker_id,
155                            actor_id
156                        )
157                        .into());
158                    }
159                    if !stream_actors.contains_key(actor_id) {
160                        return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
161                    }
162                }
163                for state_table_id in &fragment.state_table_ids {
164                    if !state_table_committed_epochs.contains_key(state_table_id) {
165                        return Err(anyhow!(
166                            "state table {} is not registered to hummock",
167                            state_table_id
168                        )
169                        .into());
170                    }
171                }
172            }
173            for (job_id, fragments) in database_jobs {
174                let mut committed_epochs =
175                    InflightFragmentInfo::existing_table_ids(fragments.values()).map(|table_id| {
176                        (
177                            table_id,
178                            *state_table_committed_epochs
179                                .get(&table_id)
180                                .expect("checked exist"),
181                        )
182                    });
183                let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
184                    anyhow!(
185                        "job {} in database {} has no state table after recovery",
186                        job_id,
187                        database_id
188                    )
189                })?;
190                for (table_id, epoch) in committed_epochs {
191                    if epoch != first_epoch {
192                        return Err(anyhow!(
193                            "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
194                            job_id,
195                            database_id,
196                            first_table,
197                            first_epoch,
198                            table_id,
199                            epoch
200                        )
201                        .into());
202                    }
203                }
204            }
205        }
206        Ok(())
207    }
208
209    fn validate(&self) -> MetaResult<()> {
210        for (database_id, job_infos) in &self.database_job_infos {
211            Self::validate_database_info(
212                *database_id,
213                job_infos,
214                &self.active_streaming_nodes,
215                &self.stream_actors,
216                &self.state_table_committed_epochs,
217            )?
218        }
219        Ok(())
220    }
221}
222
223#[derive(Debug)]
224struct DatabaseRuntimeInfoSnapshot {
225    job_infos: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
226    state_table_committed_epochs: HashMap<TableId, u64>,
227    /// `table_id` -> (`Vec<non-checkpoint epoch>`, checkpoint epoch)
228    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
229    mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
230    stream_actors: HashMap<ActorId, StreamActor>,
231    fragment_relations: FragmentDownstreamRelation,
232    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
233    background_jobs: HashMap<JobId, String>,
234    cdc_table_snapshot_splits: HashMap<JobId, CdcTableSnapshotSplits>,
235}
236
237impl DatabaseRuntimeInfoSnapshot {
238    fn validate(
239        &self,
240        database_id: DatabaseId,
241        active_streaming_nodes: &ActiveStreamingWorkerNodes,
242    ) -> MetaResult<()> {
243        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
244            database_id,
245            &self.job_infos,
246            active_streaming_nodes,
247            &self.stream_actors,
248            &self.state_table_committed_epochs,
249        )
250    }
251}