1use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::catalog::Database;
21use risingwave_pb::ddl_service::DdlProgress;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::meta::PbRecoveryStatus;
24use tokio::sync::oneshot::Sender;
25
26use self::notifier::Notifier;
27use crate::barrier::info::BarrierInfo;
28use crate::manager::ActiveStreamingWorkerNodes;
29use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, SubscriptionId};
30use crate::{MetaError, MetaResult};
31
32mod backfill_order_control;
33pub mod cdc_progress;
34mod checkpoint;
35mod command;
36mod complete_task;
37mod context;
38mod edge_builder;
39mod info;
40mod manager;
41mod notifier;
42mod progress;
43mod rpc;
44mod schedule;
45#[cfg(test)]
46mod tests;
47mod trace;
48mod utils;
49mod worker;
50
51pub use backfill_order_control::{BackfillNode, BackfillOrderState};
52use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
53
54pub use self::command::{
55    BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
56    ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
57};
58pub(crate) use self::info::{SharedActorInfos, SharedFragmentInfo};
59pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
60pub use self::schedule::BarrierScheduler;
61pub use self::trace::TracedEpoch;
62use crate::controller::fragment::InflightFragmentInfo;
63
64enum RecoveryReason {
66    Bootstrap,
68    Failover(MetaError),
70    Adhoc,
72}
73
74enum BarrierManagerStatus {
76    Starting,
78    Recovering(RecoveryReason),
80    Running,
82}
83
84struct Scheduled {
86    database_id: DatabaseId,
87    command: Command,
88    notifiers: Vec<Notifier>,
89    span: tracing::Span,
90}
91
92impl From<&BarrierManagerStatus> for PbRecoveryStatus {
93    fn from(status: &BarrierManagerStatus) -> Self {
94        match status {
95            BarrierManagerStatus::Starting => Self::StatusStarting,
96            BarrierManagerStatus::Recovering(reason) => match reason {
97                RecoveryReason::Bootstrap => Self::StatusStarting,
98                RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
99            },
100            BarrierManagerStatus::Running => Self::StatusRunning,
101        }
102    }
103}
104
105pub(crate) enum BarrierManagerRequest {
106    GetDdlProgress(Sender<HashMap<u32, DdlProgress>>),
107    AdhocRecovery(Sender<()>),
108    UpdateDatabaseBarrier {
109        database_id: DatabaseId,
110        barrier_interval_ms: Option<u32>,
111        checkpoint_frequency: Option<u64>,
112        sender: Sender<()>,
113    },
114}
115
116#[derive(Debug)]
117struct BarrierWorkerRuntimeInfoSnapshot {
118    active_streaming_nodes: ActiveStreamingWorkerNodes,
119    database_job_infos:
120        HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>,
121    state_table_committed_epochs: HashMap<TableId, u64>,
122    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
124    mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
125    stream_actors: HashMap<ActorId, StreamActor>,
126    fragment_relations: FragmentDownstreamRelation,
127    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
128    background_jobs: HashMap<TableId, String>,
129    hummock_version_stats: HummockVersionStats,
130    database_infos: Vec<Database>,
131    cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
132}
133
134impl BarrierWorkerRuntimeInfoSnapshot {
135    fn validate_database_info(
136        database_id: DatabaseId,
137        database_jobs: &HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>,
138        active_streaming_nodes: &ActiveStreamingWorkerNodes,
139        stream_actors: &HashMap<ActorId, StreamActor>,
140        state_table_committed_epochs: &HashMap<TableId, u64>,
141    ) -> MetaResult<()> {
142        {
143            for fragment in database_jobs.values().flat_map(|job| job.values()) {
144                for (actor_id, actor) in &fragment.actors {
145                    if !active_streaming_nodes
146                        .current()
147                        .contains_key(&actor.worker_id)
148                    {
149                        return Err(anyhow!(
150                            "worker_id {} of actor {} do not exist",
151                            actor.worker_id,
152                            actor_id
153                        )
154                        .into());
155                    }
156                    if !stream_actors.contains_key(actor_id) {
157                        return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
158                    }
159                }
160                for state_table_id in &fragment.state_table_ids {
161                    if !state_table_committed_epochs.contains_key(state_table_id) {
162                        return Err(anyhow!(
163                            "state table {} is not registered to hummock",
164                            state_table_id
165                        )
166                        .into());
167                    }
168                }
169            }
170            for (job_id, fragments) in database_jobs {
171                let mut committed_epochs =
172                    InflightFragmentInfo::existing_table_ids(fragments.values()).map(|table_id| {
173                        (
174                            table_id,
175                            *state_table_committed_epochs
176                                .get(&table_id)
177                                .expect("checked exist"),
178                        )
179                    });
180                let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
181                    anyhow!(
182                        "job {} in database {} has no state table after recovery",
183                        job_id,
184                        database_id
185                    )
186                })?;
187                for (table_id, epoch) in committed_epochs {
188                    if epoch != first_epoch {
189                        return Err(anyhow!(
190                            "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
191                            job_id,
192                            database_id,
193                            first_table,
194                            first_epoch,
195                            table_id,
196                            epoch
197                        )
198                        .into());
199                    }
200                }
201            }
202        }
203        Ok(())
204    }
205
206    fn validate(&self) -> MetaResult<()> {
207        for (database_id, job_infos) in &self.database_job_infos {
208            Self::validate_database_info(
209                *database_id,
210                job_infos,
211                &self.active_streaming_nodes,
212                &self.stream_actors,
213                &self.state_table_committed_epochs,
214            )?
215        }
216        Ok(())
217    }
218}
219
220#[derive(Debug)]
221struct DatabaseRuntimeInfoSnapshot {
222    job_infos: HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>,
223    state_table_committed_epochs: HashMap<TableId, u64>,
224    state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
226    mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
227    stream_actors: HashMap<ActorId, StreamActor>,
228    fragment_relations: FragmentDownstreamRelation,
229    source_splits: HashMap<ActorId, Vec<SplitImpl>>,
230    background_jobs: HashMap<TableId, String>,
231    cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
232}
233
234impl DatabaseRuntimeInfoSnapshot {
235    fn validate(
236        &self,
237        database_id: DatabaseId,
238        active_streaming_nodes: &ActiveStreamingWorkerNodes,
239    ) -> MetaResult<()> {
240        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
241            database_id,
242            &self.job_infos,
243            active_streaming_nodes,
244            &self.stream_actors,
245            &self.state_table_committed_epochs,
246        )
247    }
248}