1use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::catalog::Database;
21use risingwave_pb::ddl_service::DdlProgress;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::meta::PbRecoveryStatus;
24use tokio::sync::oneshot::Sender;
25
26use self::notifier::Notifier;
27use crate::barrier::info::BarrierInfo;
28use crate::manager::ActiveStreamingWorkerNodes;
29use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, SubscriptionId};
30use crate::{MetaError, MetaResult};
31
32mod backfill_order_control;
33pub mod cdc_progress;
34mod checkpoint;
35mod command;
36mod complete_task;
37mod context;
38mod edge_builder;
39mod info;
40mod manager;
41mod notifier;
42mod progress;
43mod rpc;
44mod schedule;
45#[cfg(test)]
46mod tests;
47mod trace;
48mod utils;
49mod worker;
50
51pub use backfill_order_control::{BackfillNode, BackfillOrderState};
52use risingwave_common::id::JobId;
53use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
54
55pub use self::command::{
56 BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
57 ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
58};
59pub(crate) use self::info::{SharedActorInfos, SharedFragmentInfo};
60pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
61pub use self::schedule::BarrierScheduler;
62pub use self::trace::TracedEpoch;
63use crate::controller::fragment::InflightFragmentInfo;
64
65enum RecoveryReason {
67 Bootstrap,
69 Failover(MetaError),
71 Adhoc,
73}
74
75enum BarrierManagerStatus {
77 Starting,
79 Recovering(RecoveryReason),
81 Running,
83}
84
85struct Scheduled {
87 database_id: DatabaseId,
88 command: Command,
89 notifiers: Vec<Notifier>,
90 span: tracing::Span,
91}
92
93impl From<&BarrierManagerStatus> for PbRecoveryStatus {
94 fn from(status: &BarrierManagerStatus) -> Self {
95 match status {
96 BarrierManagerStatus::Starting => Self::StatusStarting,
97 BarrierManagerStatus::Recovering(reason) => match reason {
98 RecoveryReason::Bootstrap => Self::StatusStarting,
99 RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
100 },
101 BarrierManagerStatus::Running => Self::StatusRunning,
102 }
103 }
104}
105
106pub(crate) enum BarrierManagerRequest {
107 GetDdlProgress(Sender<HashMap<JobId, DdlProgress>>),
108 AdhocRecovery(Sender<()>),
109 UpdateDatabaseBarrier {
110 database_id: DatabaseId,
111 barrier_interval_ms: Option<u32>,
112 checkpoint_frequency: Option<u64>,
113 sender: Sender<()>,
114 },
115}
116
117#[derive(Debug)]
118struct BarrierWorkerRuntimeInfoSnapshot {
119 active_streaming_nodes: ActiveStreamingWorkerNodes,
120 database_job_infos:
121 HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>,
122 state_table_committed_epochs: HashMap<TableId, u64>,
123 state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
125 mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
126 stream_actors: HashMap<ActorId, StreamActor>,
127 fragment_relations: FragmentDownstreamRelation,
128 source_splits: HashMap<ActorId, Vec<SplitImpl>>,
129 background_jobs: HashMap<JobId, String>,
130 hummock_version_stats: HummockVersionStats,
131 database_infos: Vec<Database>,
132 cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
133}
134
135impl BarrierWorkerRuntimeInfoSnapshot {
136 fn validate_database_info(
137 database_id: DatabaseId,
138 database_jobs: &HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
139 active_streaming_nodes: &ActiveStreamingWorkerNodes,
140 stream_actors: &HashMap<ActorId, StreamActor>,
141 state_table_committed_epochs: &HashMap<TableId, u64>,
142 ) -> MetaResult<()> {
143 {
144 for fragment in database_jobs.values().flat_map(|job| job.values()) {
145 for (actor_id, actor) in &fragment.actors {
146 if !active_streaming_nodes
147 .current()
148 .contains_key(&actor.worker_id)
149 {
150 return Err(anyhow!(
151 "worker_id {} of actor {} do not exist",
152 actor.worker_id,
153 actor_id
154 )
155 .into());
156 }
157 if !stream_actors.contains_key(actor_id) {
158 return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
159 }
160 }
161 for state_table_id in &fragment.state_table_ids {
162 if !state_table_committed_epochs.contains_key(state_table_id) {
163 return Err(anyhow!(
164 "state table {} is not registered to hummock",
165 state_table_id
166 )
167 .into());
168 }
169 }
170 }
171 for (job_id, fragments) in database_jobs {
172 let mut committed_epochs =
173 InflightFragmentInfo::existing_table_ids(fragments.values()).map(|table_id| {
174 (
175 table_id,
176 *state_table_committed_epochs
177 .get(&table_id)
178 .expect("checked exist"),
179 )
180 });
181 let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
182 anyhow!(
183 "job {} in database {} has no state table after recovery",
184 job_id,
185 database_id
186 )
187 })?;
188 for (table_id, epoch) in committed_epochs {
189 if epoch != first_epoch {
190 return Err(anyhow!(
191 "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
192 job_id,
193 database_id,
194 first_table,
195 first_epoch,
196 table_id,
197 epoch
198 )
199 .into());
200 }
201 }
202 }
203 }
204 Ok(())
205 }
206
207 fn validate(&self) -> MetaResult<()> {
208 for (database_id, job_infos) in &self.database_job_infos {
209 Self::validate_database_info(
210 *database_id,
211 job_infos,
212 &self.active_streaming_nodes,
213 &self.stream_actors,
214 &self.state_table_committed_epochs,
215 )?
216 }
217 Ok(())
218 }
219}
220
221#[derive(Debug)]
222struct DatabaseRuntimeInfoSnapshot {
223 job_infos: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
224 state_table_committed_epochs: HashMap<TableId, u64>,
225 state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
227 mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
228 stream_actors: HashMap<ActorId, StreamActor>,
229 fragment_relations: FragmentDownstreamRelation,
230 source_splits: HashMap<ActorId, Vec<SplitImpl>>,
231 background_jobs: HashMap<JobId, String>,
232 cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
233}
234
235impl DatabaseRuntimeInfoSnapshot {
236 fn validate(
237 &self,
238 database_id: DatabaseId,
239 active_streaming_nodes: &ActiveStreamingWorkerNodes,
240 ) -> MetaResult<()> {
241 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
242 database_id,
243 &self.job_infos,
244 active_streaming_nodes,
245 &self.stream_actors,
246 &self.state_table_committed_epochs,
247 )
248 }
249}