1use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::ddl_service::DdlProgress;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::meta::PbRecoveryStatus;
23use tokio::sync::oneshot::Sender;
24
25use self::notifier::Notifier;
26use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
27use crate::manager::ActiveStreamingWorkerNodes;
28use crate::model::{ActorId, FragmentDownstreamRelation, StreamActor, StreamJobFragments};
29use crate::{MetaError, MetaResult};
30
31mod backfill_order_control;
32mod checkpoint;
33mod command;
34mod complete_task;
35mod context;
36mod edge_builder;
37mod info;
38mod manager;
39mod notifier;
40mod progress;
41mod rpc;
42mod schedule;
43mod trace;
44mod utils;
45mod worker;
46
47pub use backfill_order_control::{BackfillNode, BackfillOrderState};
48
49pub use self::command::{
50 BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
51 ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
52};
53pub use self::info::InflightSubscriptionInfo;
54pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
55pub use self::schedule::BarrierScheduler;
56pub use self::trace::TracedEpoch;
57
58enum RecoveryReason {
60 Bootstrap,
62 Failover(MetaError),
64 Adhoc,
66}
67
68enum BarrierManagerStatus {
70 Starting,
72 Recovering(RecoveryReason),
74 Running,
76}
77
78struct Scheduled {
80 database_id: DatabaseId,
81 command: Command,
82 notifiers: Vec<Notifier>,
83 span: tracing::Span,
84}
85
86impl From<&BarrierManagerStatus> for PbRecoveryStatus {
87 fn from(status: &BarrierManagerStatus) -> Self {
88 match status {
89 BarrierManagerStatus::Starting => Self::StatusStarting,
90 BarrierManagerStatus::Recovering(reason) => match reason {
91 RecoveryReason::Bootstrap => Self::StatusStarting,
92 RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
93 },
94 BarrierManagerStatus::Running => Self::StatusRunning,
95 }
96 }
97}
98
99pub(crate) enum BarrierManagerRequest {
100 GetDdlProgress(Sender<HashMap<u32, DdlProgress>>),
101 AdhocRecovery(Sender<()>),
102}
103
104#[derive(Debug)]
105struct BarrierWorkerRuntimeInfoSnapshot {
106 active_streaming_nodes: ActiveStreamingWorkerNodes,
107 database_job_infos: HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
108 state_table_committed_epochs: HashMap<TableId, u64>,
109 state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
111 subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
112 stream_actors: HashMap<ActorId, StreamActor>,
113 fragment_relations: FragmentDownstreamRelation,
114 source_splits: HashMap<ActorId, Vec<SplitImpl>>,
115 background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
116 hummock_version_stats: HummockVersionStats,
117}
118
119impl BarrierWorkerRuntimeInfoSnapshot {
120 fn validate_database_info(
121 database_id: DatabaseId,
122 database_jobs: &HashMap<TableId, InflightStreamingJobInfo>,
123 active_streaming_nodes: &ActiveStreamingWorkerNodes,
124 stream_actors: &HashMap<ActorId, StreamActor>,
125 state_table_committed_epochs: &HashMap<TableId, u64>,
126 ) -> MetaResult<()> {
127 {
128 for fragment in database_jobs.values().flat_map(|job| job.fragment_infos()) {
129 for (actor_id, actor) in &fragment.actors {
130 if !active_streaming_nodes
131 .current()
132 .contains_key(&actor.worker_id)
133 {
134 return Err(anyhow!(
135 "worker_id {} of actor {} do not exist",
136 actor.worker_id,
137 actor_id
138 )
139 .into());
140 }
141 if !stream_actors.contains_key(actor_id) {
142 return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
143 }
144 }
145 for state_table_id in &fragment.state_table_ids {
146 if !state_table_committed_epochs.contains_key(state_table_id) {
147 return Err(anyhow!(
148 "state table {} is not registered to hummock",
149 state_table_id
150 )
151 .into());
152 }
153 }
154 }
155 for (job_id, job) in database_jobs {
156 let mut committed_epochs = job.existing_table_ids().map(|table_id| {
157 (
158 table_id,
159 *state_table_committed_epochs
160 .get(&table_id)
161 .expect("checked exist"),
162 )
163 });
164 let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
165 anyhow!(
166 "job {} in database {} has no state table after recovery",
167 job_id,
168 database_id
169 )
170 })?;
171 for (table_id, epoch) in committed_epochs {
172 if epoch != first_epoch {
173 return Err(anyhow!(
174 "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
175 job_id,
176 database_id,
177 first_table,
178 first_epoch,
179 table_id,
180 epoch
181 )
182 .into());
183 }
184 }
185 }
186 }
187 Ok(())
188 }
189
190 fn validate(&self) -> MetaResult<()> {
191 for (database_id, job_infos) in &self.database_job_infos {
192 Self::validate_database_info(
193 *database_id,
194 job_infos,
195 &self.active_streaming_nodes,
196 &self.stream_actors,
197 &self.state_table_committed_epochs,
198 )?
199 }
200 Ok(())
201 }
202}
203
204#[derive(Debug)]
205struct DatabaseRuntimeInfoSnapshot {
206 job_infos: HashMap<TableId, InflightStreamingJobInfo>,
207 state_table_committed_epochs: HashMap<TableId, u64>,
208 state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
210 subscription_info: InflightSubscriptionInfo,
211 stream_actors: HashMap<ActorId, StreamActor>,
212 fragment_relations: FragmentDownstreamRelation,
213 source_splits: HashMap<ActorId, Vec<SplitImpl>>,
214 background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
215}
216
217impl DatabaseRuntimeInfoSnapshot {
218 fn validate(
219 &self,
220 database_id: DatabaseId,
221 active_streaming_nodes: &ActiveStreamingWorkerNodes,
222 ) -> MetaResult<()> {
223 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
224 database_id,
225 &self.job_infos,
226 active_streaming_nodes,
227 &self.stream_actors,
228 &self.state_table_committed_epochs,
229 )
230 }
231}