1use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::catalog::Database;
21use risingwave_pb::ddl_service::DdlProgress;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::meta::PbRecoveryStatus;
24use tokio::sync::oneshot::Sender;
25
26use self::notifier::Notifier;
27use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
28use crate::manager::ActiveStreamingWorkerNodes;
29use crate::model::{ActorId, FragmentDownstreamRelation, StreamActor, StreamJobFragments};
30use crate::{MetaError, MetaResult};
31
32mod backfill_order_control;
33mod checkpoint;
34mod command;
35mod complete_task;
36mod context;
37mod edge_builder;
38mod info;
39mod manager;
40mod notifier;
41mod progress;
42mod rpc;
43mod schedule;
44#[cfg(test)]
45mod tests;
46mod trace;
47mod utils;
48mod worker;
49
50pub use backfill_order_control::{BackfillNode, BackfillOrderState};
51use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignment;
52
53pub use self::command::{
54 BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
55 ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
56};
57pub(crate) use self::info::{InflightSubscriptionInfo, SharedActorInfos, SharedFragmentInfo};
58pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
59pub use self::schedule::BarrierScheduler;
60pub use self::trace::TracedEpoch;
61
62enum RecoveryReason {
64 Bootstrap,
66 Failover(MetaError),
68 Adhoc,
70}
71
72enum BarrierManagerStatus {
74 Starting,
76 Recovering(RecoveryReason),
78 Running,
80}
81
82struct Scheduled {
84 database_id: DatabaseId,
85 command: Command,
86 notifiers: Vec<Notifier>,
87 span: tracing::Span,
88}
89
90impl From<&BarrierManagerStatus> for PbRecoveryStatus {
91 fn from(status: &BarrierManagerStatus) -> Self {
92 match status {
93 BarrierManagerStatus::Starting => Self::StatusStarting,
94 BarrierManagerStatus::Recovering(reason) => match reason {
95 RecoveryReason::Bootstrap => Self::StatusStarting,
96 RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
97 },
98 BarrierManagerStatus::Running => Self::StatusRunning,
99 }
100 }
101}
102
103pub(crate) enum BarrierManagerRequest {
104 GetDdlProgress(Sender<HashMap<u32, DdlProgress>>),
105 AdhocRecovery(Sender<()>),
106 UpdateDatabaseBarrier {
107 database_id: DatabaseId,
108 barrier_interval_ms: Option<u32>,
109 checkpoint_frequency: Option<u64>,
110 sender: Sender<()>,
111 },
112}
113
114#[derive(Debug)]
115struct BarrierWorkerRuntimeInfoSnapshot {
116 active_streaming_nodes: ActiveStreamingWorkerNodes,
117 database_job_infos: HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
118 state_table_committed_epochs: HashMap<TableId, u64>,
119 state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
121 subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
122 stream_actors: HashMap<ActorId, StreamActor>,
123 fragment_relations: FragmentDownstreamRelation,
124 source_splits: HashMap<ActorId, Vec<SplitImpl>>,
125 background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
126 hummock_version_stats: HummockVersionStats,
127 database_infos: Vec<Database>,
128 cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
129}
130
131impl BarrierWorkerRuntimeInfoSnapshot {
132 fn validate_database_info(
133 database_id: DatabaseId,
134 database_jobs: &HashMap<TableId, InflightStreamingJobInfo>,
135 active_streaming_nodes: &ActiveStreamingWorkerNodes,
136 stream_actors: &HashMap<ActorId, StreamActor>,
137 state_table_committed_epochs: &HashMap<TableId, u64>,
138 ) -> MetaResult<()> {
139 {
140 for fragment in database_jobs.values().flat_map(|job| job.fragment_infos()) {
141 for (actor_id, actor) in &fragment.actors {
142 if !active_streaming_nodes
143 .current()
144 .contains_key(&actor.worker_id)
145 {
146 return Err(anyhow!(
147 "worker_id {} of actor {} do not exist",
148 actor.worker_id,
149 actor_id
150 )
151 .into());
152 }
153 if !stream_actors.contains_key(actor_id) {
154 return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
155 }
156 }
157 for state_table_id in &fragment.state_table_ids {
158 if !state_table_committed_epochs.contains_key(state_table_id) {
159 return Err(anyhow!(
160 "state table {} is not registered to hummock",
161 state_table_id
162 )
163 .into());
164 }
165 }
166 }
167 for (job_id, job) in database_jobs {
168 let mut committed_epochs = job.existing_table_ids().map(|table_id| {
169 (
170 table_id,
171 *state_table_committed_epochs
172 .get(&table_id)
173 .expect("checked exist"),
174 )
175 });
176 let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
177 anyhow!(
178 "job {} in database {} has no state table after recovery",
179 job_id,
180 database_id
181 )
182 })?;
183 for (table_id, epoch) in committed_epochs {
184 if epoch != first_epoch {
185 return Err(anyhow!(
186 "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
187 job_id,
188 database_id,
189 first_table,
190 first_epoch,
191 table_id,
192 epoch
193 )
194 .into());
195 }
196 }
197 }
198 }
199 Ok(())
200 }
201
202 fn validate(&self) -> MetaResult<()> {
203 for (database_id, job_infos) in &self.database_job_infos {
204 Self::validate_database_info(
205 *database_id,
206 job_infos,
207 &self.active_streaming_nodes,
208 &self.stream_actors,
209 &self.state_table_committed_epochs,
210 )?
211 }
212 Ok(())
213 }
214}
215
216#[derive(Debug)]
217struct DatabaseRuntimeInfoSnapshot {
218 job_infos: HashMap<TableId, InflightStreamingJobInfo>,
219 state_table_committed_epochs: HashMap<TableId, u64>,
220 state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
222 subscription_info: InflightSubscriptionInfo,
223 stream_actors: HashMap<ActorId, StreamActor>,
224 fragment_relations: FragmentDownstreamRelation,
225 source_splits: HashMap<ActorId, Vec<SplitImpl>>,
226 background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
227 cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
228}
229
230impl DatabaseRuntimeInfoSnapshot {
231 fn validate(
232 &self,
233 database_id: DatabaseId,
234 active_streaming_nodes: &ActiveStreamingWorkerNodes,
235 ) -> MetaResult<()> {
236 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
237 database_id,
238 &self.job_infos,
239 active_streaming_nodes,
240 &self.stream_actors,
241 &self.state_table_committed_epochs,
242 )
243 }
244}