1use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::catalog::Database;
21use risingwave_pb::ddl_service::DdlProgress;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::meta::PbRecoveryStatus;
24use tokio::sync::oneshot::Sender;
25
26use self::notifier::Notifier;
27use crate::barrier::info::BarrierInfo;
28use crate::manager::ActiveStreamingWorkerNodes;
29use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, SubscriptionId};
30use crate::{MetaError, MetaResult};
31
32mod backfill_order_control;
33pub mod cdc_progress;
34mod checkpoint;
35mod command;
36mod complete_task;
37mod context;
38mod edge_builder;
39mod info;
40mod manager;
41mod notifier;
42mod progress;
43mod rpc;
44mod schedule;
45#[cfg(test)]
46mod tests;
47mod trace;
48mod utils;
49mod worker;
50
51pub use backfill_order_control::{BackfillNode, BackfillOrderState};
52use risingwave_common::id::JobId;
53
54pub use self::command::{
55 BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
56 ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
57};
58pub(crate) use self::info::{SharedActorInfos, SharedFragmentInfo};
59pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
60pub use self::schedule::BarrierScheduler;
61pub use self::trace::TracedEpoch;
62use crate::barrier::cdc_progress::CdcProgress;
63use crate::controller::fragment::InflightFragmentInfo;
64use crate::stream::cdc::CdcTableSnapshotSplits;
65
66enum RecoveryReason {
68 Bootstrap,
70 Failover(MetaError),
72 Adhoc,
74}
75
76enum BarrierManagerStatus {
78 Starting,
80 Recovering(RecoveryReason),
82 Running,
84}
85
86struct Scheduled {
88 database_id: DatabaseId,
89 command: Command,
90 notifiers: Vec<Notifier>,
91 span: tracing::Span,
92}
93
94impl From<&BarrierManagerStatus> for PbRecoveryStatus {
95 fn from(status: &BarrierManagerStatus) -> Self {
96 match status {
97 BarrierManagerStatus::Starting => Self::StatusStarting,
98 BarrierManagerStatus::Recovering(reason) => match reason {
99 RecoveryReason::Bootstrap => Self::StatusStarting,
100 RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
101 },
102 BarrierManagerStatus::Running => Self::StatusRunning,
103 }
104 }
105}
106
107pub(crate) enum BarrierManagerRequest {
108 GetDdlProgress(Sender<HashMap<JobId, DdlProgress>>),
109 GetCdcProgress(Sender<HashMap<JobId, CdcProgress>>),
110 AdhocRecovery(Sender<()>),
111 UpdateDatabaseBarrier {
112 database_id: DatabaseId,
113 barrier_interval_ms: Option<u32>,
114 checkpoint_frequency: Option<u64>,
115 sender: Sender<()>,
116 },
117}
118
119#[derive(Debug)]
120struct BarrierWorkerRuntimeInfoSnapshot {
121 active_streaming_nodes: ActiveStreamingWorkerNodes,
122 database_job_infos:
123 HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>,
124 state_table_committed_epochs: HashMap<TableId, u64>,
125 state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
127 mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
128 stream_actors: HashMap<ActorId, StreamActor>,
129 fragment_relations: FragmentDownstreamRelation,
130 source_splits: HashMap<ActorId, Vec<SplitImpl>>,
131 background_jobs: HashMap<JobId, String>,
132 hummock_version_stats: HummockVersionStats,
133 database_infos: Vec<Database>,
134 cdc_table_snapshot_splits: HashMap<JobId, CdcTableSnapshotSplits>,
135}
136
137impl BarrierWorkerRuntimeInfoSnapshot {
138 fn validate_database_info(
139 database_id: DatabaseId,
140 database_jobs: &HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
141 active_streaming_nodes: &ActiveStreamingWorkerNodes,
142 stream_actors: &HashMap<ActorId, StreamActor>,
143 state_table_committed_epochs: &HashMap<TableId, u64>,
144 ) -> MetaResult<()> {
145 {
146 for fragment in database_jobs.values().flat_map(|job| job.values()) {
147 for (actor_id, actor) in &fragment.actors {
148 if !active_streaming_nodes
149 .current()
150 .contains_key(&actor.worker_id)
151 {
152 return Err(anyhow!(
153 "worker_id {} of actor {} do not exist",
154 actor.worker_id,
155 actor_id
156 )
157 .into());
158 }
159 if !stream_actors.contains_key(actor_id) {
160 return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
161 }
162 }
163 for state_table_id in &fragment.state_table_ids {
164 if !state_table_committed_epochs.contains_key(state_table_id) {
165 return Err(anyhow!(
166 "state table {} is not registered to hummock",
167 state_table_id
168 )
169 .into());
170 }
171 }
172 }
173 for (job_id, fragments) in database_jobs {
174 let mut committed_epochs =
175 InflightFragmentInfo::existing_table_ids(fragments.values()).map(|table_id| {
176 (
177 table_id,
178 *state_table_committed_epochs
179 .get(&table_id)
180 .expect("checked exist"),
181 )
182 });
183 let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
184 anyhow!(
185 "job {} in database {} has no state table after recovery",
186 job_id,
187 database_id
188 )
189 })?;
190 for (table_id, epoch) in committed_epochs {
191 if epoch != first_epoch {
192 return Err(anyhow!(
193 "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
194 job_id,
195 database_id,
196 first_table,
197 first_epoch,
198 table_id,
199 epoch
200 )
201 .into());
202 }
203 }
204 }
205 }
206 Ok(())
207 }
208
209 fn validate(&self) -> MetaResult<()> {
210 for (database_id, job_infos) in &self.database_job_infos {
211 Self::validate_database_info(
212 *database_id,
213 job_infos,
214 &self.active_streaming_nodes,
215 &self.stream_actors,
216 &self.state_table_committed_epochs,
217 )?
218 }
219 Ok(())
220 }
221}
222
223#[derive(Debug)]
224struct DatabaseRuntimeInfoSnapshot {
225 job_infos: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
226 state_table_committed_epochs: HashMap<TableId, u64>,
227 state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
229 mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
230 stream_actors: HashMap<ActorId, StreamActor>,
231 fragment_relations: FragmentDownstreamRelation,
232 source_splits: HashMap<ActorId, Vec<SplitImpl>>,
233 background_jobs: HashMap<JobId, String>,
234 cdc_table_snapshot_splits: HashMap<JobId, CdcTableSnapshotSplits>,
235}
236
237impl DatabaseRuntimeInfoSnapshot {
238 fn validate(
239 &self,
240 database_id: DatabaseId,
241 active_streaming_nodes: &ActiveStreamingWorkerNodes,
242 ) -> MetaResult<()> {
243 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
244 database_id,
245 &self.job_infos,
246 active_streaming_nodes,
247 &self.stream_actors,
248 &self.state_table_committed_epochs,
249 )
250 }
251}