1use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::ddl_service::DdlProgress;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::meta::PbRecoveryStatus;
23use tokio::sync::oneshot::Sender;
24
25use self::notifier::Notifier;
26use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
27use crate::manager::ActiveStreamingWorkerNodes;
28use crate::model::{ActorId, FragmentDownstreamRelation, StreamActor, StreamJobFragments};
29use crate::{MetaError, MetaResult};
30
31mod checkpoint;
32mod command;
33mod complete_task;
34mod context;
35mod edge_builder;
36mod info;
37mod manager;
38mod notifier;
39mod progress;
40mod rpc;
41mod schedule;
42mod trace;
43mod utils;
44mod worker;
45
46pub use self::command::{
47 BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
48 ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
49};
50pub use self::info::InflightSubscriptionInfo;
51pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
52pub use self::schedule::BarrierScheduler;
53pub use self::trace::TracedEpoch;
54
55enum RecoveryReason {
57 Bootstrap,
59 Failover(MetaError),
61 Adhoc,
63}
64
65enum BarrierManagerStatus {
67 Starting,
69 Recovering(RecoveryReason),
71 Running,
73}
74
75struct Scheduled {
77 database_id: DatabaseId,
78 command: Command,
79 notifiers: Vec<Notifier>,
80 span: tracing::Span,
81}
82
83impl From<&BarrierManagerStatus> for PbRecoveryStatus {
84 fn from(status: &BarrierManagerStatus) -> Self {
85 match status {
86 BarrierManagerStatus::Starting => Self::StatusStarting,
87 BarrierManagerStatus::Recovering(reason) => match reason {
88 RecoveryReason::Bootstrap => Self::StatusStarting,
89 RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
90 },
91 BarrierManagerStatus::Running => Self::StatusRunning,
92 }
93 }
94}
95
96pub(crate) enum BarrierManagerRequest {
97 GetDdlProgress(Sender<HashMap<u32, DdlProgress>>),
98 AdhocRecovery(Sender<()>),
99}
100
101#[derive(Debug)]
102struct BarrierWorkerRuntimeInfoSnapshot {
103 active_streaming_nodes: ActiveStreamingWorkerNodes,
104 database_job_infos: HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
105 state_table_committed_epochs: HashMap<TableId, u64>,
106 state_table_log_epochs: HashMap<TableId, Vec<Vec<u64>>>,
107 subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
108 stream_actors: HashMap<ActorId, StreamActor>,
109 fragment_relations: FragmentDownstreamRelation,
110 source_splits: HashMap<ActorId, Vec<SplitImpl>>,
111 background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
112 hummock_version_stats: HummockVersionStats,
113}
114
115impl BarrierWorkerRuntimeInfoSnapshot {
116 fn validate_database_info(
117 database_id: DatabaseId,
118 database_jobs: &HashMap<TableId, InflightStreamingJobInfo>,
119 active_streaming_nodes: &ActiveStreamingWorkerNodes,
120 stream_actors: &HashMap<ActorId, StreamActor>,
121 state_table_committed_epochs: &HashMap<TableId, u64>,
122 ) -> MetaResult<()> {
123 {
124 for fragment in database_jobs.values().flat_map(|job| job.fragment_infos()) {
125 for (actor_id, actor) in &fragment.actors {
126 if !active_streaming_nodes
127 .current()
128 .contains_key(&actor.worker_id)
129 {
130 return Err(anyhow!(
131 "worker_id {} of actor {} do not exist",
132 actor.worker_id,
133 actor_id
134 )
135 .into());
136 }
137 if !stream_actors.contains_key(actor_id) {
138 return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
139 }
140 }
141 for state_table_id in &fragment.state_table_ids {
142 if !state_table_committed_epochs.contains_key(state_table_id) {
143 return Err(anyhow!(
144 "state table {} is not registered to hummock",
145 state_table_id
146 )
147 .into());
148 }
149 }
150 }
151 for (job_id, job) in database_jobs {
152 let mut committed_epochs = job.existing_table_ids().map(|table_id| {
153 (
154 table_id,
155 *state_table_committed_epochs
156 .get(&table_id)
157 .expect("checked exist"),
158 )
159 });
160 let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
161 anyhow!(
162 "job {} in database {} has no state table after recovery",
163 job_id,
164 database_id
165 )
166 })?;
167 for (table_id, epoch) in committed_epochs {
168 if epoch != first_epoch {
169 return Err(anyhow!(
170 "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
171 job_id,
172 database_id,
173 first_table,
174 first_epoch,
175 table_id,
176 epoch
177 )
178 .into());
179 }
180 }
181 }
182 }
183 Ok(())
184 }
185
186 fn validate(&self) -> MetaResult<()> {
187 for (database_id, job_infos) in &self.database_job_infos {
188 Self::validate_database_info(
189 *database_id,
190 job_infos,
191 &self.active_streaming_nodes,
192 &self.stream_actors,
193 &self.state_table_committed_epochs,
194 )?
195 }
196 Ok(())
197 }
198}
199
200#[derive(Debug)]
201struct DatabaseRuntimeInfoSnapshot {
202 job_infos: HashMap<TableId, InflightStreamingJobInfo>,
203 state_table_committed_epochs: HashMap<TableId, u64>,
204 state_table_log_epochs: HashMap<TableId, Vec<Vec<u64>>>,
205 subscription_info: InflightSubscriptionInfo,
206 stream_actors: HashMap<ActorId, StreamActor>,
207 fragment_relations: FragmentDownstreamRelation,
208 source_splits: HashMap<ActorId, Vec<SplitImpl>>,
209 background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
210}
211
212impl DatabaseRuntimeInfoSnapshot {
213 fn validate(
214 &self,
215 database_id: DatabaseId,
216 active_streaming_nodes: &ActiveStreamingWorkerNodes,
217 ) -> MetaResult<()> {
218 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
219 database_id,
220 &self.job_infos,
221 active_streaming_nodes,
222 &self.stream_actors,
223 &self.state_table_committed_epochs,
224 )
225 }
226}