1use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::catalog::Database;
21use risingwave_pb::ddl_service::DdlProgress;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::meta::PbRecoveryStatus;
24use tokio::sync::oneshot::Sender;
25
26use self::notifier::Notifier;
27use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
28use crate::manager::ActiveStreamingWorkerNodes;
29use crate::model::{ActorId, FragmentDownstreamRelation, StreamActor, StreamJobFragments};
30use crate::{MetaError, MetaResult};
31
32mod backfill_order_control;
33pub mod cdc_progress;
34mod checkpoint;
35mod command;
36mod complete_task;
37mod context;
38mod edge_builder;
39mod info;
40mod manager;
41mod notifier;
42mod progress;
43mod rpc;
44mod schedule;
45#[cfg(test)]
46mod tests;
47mod trace;
48mod utils;
49mod worker;
50
51pub use backfill_order_control::{BackfillNode, BackfillOrderState};
52use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
53
54pub use self::command::{
55 BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
56 ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
57};
58pub(crate) use self::info::{InflightSubscriptionInfo, SharedActorInfos, SharedFragmentInfo};
59pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
60pub use self::schedule::BarrierScheduler;
61pub use self::trace::TracedEpoch;
62
63enum RecoveryReason {
65 Bootstrap,
67 Failover(MetaError),
69 Adhoc,
71}
72
73enum BarrierManagerStatus {
75 Starting,
77 Recovering(RecoveryReason),
79 Running,
81}
82
83struct Scheduled {
85 database_id: DatabaseId,
86 command: Command,
87 notifiers: Vec<Notifier>,
88 span: tracing::Span,
89}
90
91impl From<&BarrierManagerStatus> for PbRecoveryStatus {
92 fn from(status: &BarrierManagerStatus) -> Self {
93 match status {
94 BarrierManagerStatus::Starting => Self::StatusStarting,
95 BarrierManagerStatus::Recovering(reason) => match reason {
96 RecoveryReason::Bootstrap => Self::StatusStarting,
97 RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
98 },
99 BarrierManagerStatus::Running => Self::StatusRunning,
100 }
101 }
102}
103
104pub(crate) enum BarrierManagerRequest {
105 GetDdlProgress(Sender<HashMap<u32, DdlProgress>>),
106 AdhocRecovery(Sender<()>),
107 UpdateDatabaseBarrier {
108 database_id: DatabaseId,
109 barrier_interval_ms: Option<u32>,
110 checkpoint_frequency: Option<u64>,
111 sender: Sender<()>,
112 },
113}
114
115#[derive(Debug)]
116struct BarrierWorkerRuntimeInfoSnapshot {
117 active_streaming_nodes: ActiveStreamingWorkerNodes,
118 database_job_infos: HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
119 state_table_committed_epochs: HashMap<TableId, u64>,
120 state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
122 subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
123 stream_actors: HashMap<ActorId, StreamActor>,
124 fragment_relations: FragmentDownstreamRelation,
125 source_splits: HashMap<ActorId, Vec<SplitImpl>>,
126 background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
127 hummock_version_stats: HummockVersionStats,
128 database_infos: Vec<Database>,
129 cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
130}
131
132impl BarrierWorkerRuntimeInfoSnapshot {
133 fn validate_database_info(
134 database_id: DatabaseId,
135 database_jobs: &HashMap<TableId, InflightStreamingJobInfo>,
136 active_streaming_nodes: &ActiveStreamingWorkerNodes,
137 stream_actors: &HashMap<ActorId, StreamActor>,
138 state_table_committed_epochs: &HashMap<TableId, u64>,
139 ) -> MetaResult<()> {
140 {
141 for fragment in database_jobs.values().flat_map(|job| job.fragment_infos()) {
142 for (actor_id, actor) in &fragment.actors {
143 if !active_streaming_nodes
144 .current()
145 .contains_key(&actor.worker_id)
146 {
147 return Err(anyhow!(
148 "worker_id {} of actor {} do not exist",
149 actor.worker_id,
150 actor_id
151 )
152 .into());
153 }
154 if !stream_actors.contains_key(actor_id) {
155 return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
156 }
157 }
158 for state_table_id in &fragment.state_table_ids {
159 if !state_table_committed_epochs.contains_key(state_table_id) {
160 return Err(anyhow!(
161 "state table {} is not registered to hummock",
162 state_table_id
163 )
164 .into());
165 }
166 }
167 }
168 for (job_id, job) in database_jobs {
169 let mut committed_epochs = job.existing_table_ids().map(|table_id| {
170 (
171 table_id,
172 *state_table_committed_epochs
173 .get(&table_id)
174 .expect("checked exist"),
175 )
176 });
177 let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
178 anyhow!(
179 "job {} in database {} has no state table after recovery",
180 job_id,
181 database_id
182 )
183 })?;
184 for (table_id, epoch) in committed_epochs {
185 if epoch != first_epoch {
186 return Err(anyhow!(
187 "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
188 job_id,
189 database_id,
190 first_table,
191 first_epoch,
192 table_id,
193 epoch
194 )
195 .into());
196 }
197 }
198 }
199 }
200 Ok(())
201 }
202
203 fn validate(&self) -> MetaResult<()> {
204 for (database_id, job_infos) in &self.database_job_infos {
205 Self::validate_database_info(
206 *database_id,
207 job_infos,
208 &self.active_streaming_nodes,
209 &self.stream_actors,
210 &self.state_table_committed_epochs,
211 )?
212 }
213 Ok(())
214 }
215}
216
217#[derive(Debug)]
218struct DatabaseRuntimeInfoSnapshot {
219 job_infos: HashMap<TableId, InflightStreamingJobInfo>,
220 state_table_committed_epochs: HashMap<TableId, u64>,
221 state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
223 subscription_info: InflightSubscriptionInfo,
224 stream_actors: HashMap<ActorId, StreamActor>,
225 fragment_relations: FragmentDownstreamRelation,
226 source_splits: HashMap<ActorId, Vec<SplitImpl>>,
227 background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
228 cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
229}
230
231impl DatabaseRuntimeInfoSnapshot {
232 fn validate(
233 &self,
234 database_id: DatabaseId,
235 active_streaming_nodes: &ActiveStreamingWorkerNodes,
236 ) -> MetaResult<()> {
237 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
238 database_id,
239 &self.job_infos,
240 active_streaming_nodes,
241 &self.stream_actors,
242 &self.state_table_committed_epochs,
243 )
244 }
245}