1use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::catalog::Database;
21use risingwave_pb::ddl_service::DdlProgress;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::meta::PbRecoveryStatus;
24use tokio::sync::oneshot::Sender;
25
26use self::notifier::Notifier;
27use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
28use crate::manager::ActiveStreamingWorkerNodes;
29use crate::model::{ActorId, FragmentDownstreamRelation, StreamActor, StreamJobFragments};
30use crate::{MetaError, MetaResult};
31
32mod backfill_order_control;
33mod checkpoint;
34mod command;
35mod complete_task;
36mod context;
37mod edge_builder;
38mod info;
39mod manager;
40mod notifier;
41mod progress;
42mod rpc;
43mod schedule;
44mod trace;
45mod utils;
46mod worker;
47
48pub use backfill_order_control::{BackfillNode, BackfillOrderState};
49
50pub use self::command::{
51 BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
52 ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
53};
54pub use self::info::InflightSubscriptionInfo;
55pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
56pub use self::schedule::BarrierScheduler;
57pub use self::trace::TracedEpoch;
58
59enum RecoveryReason {
61 Bootstrap,
63 Failover(MetaError),
65 Adhoc,
67}
68
69enum BarrierManagerStatus {
71 Starting,
73 Recovering(RecoveryReason),
75 Running,
77}
78
79struct Scheduled {
81 database_id: DatabaseId,
82 command: Command,
83 notifiers: Vec<Notifier>,
84 span: tracing::Span,
85}
86
87impl From<&BarrierManagerStatus> for PbRecoveryStatus {
88 fn from(status: &BarrierManagerStatus) -> Self {
89 match status {
90 BarrierManagerStatus::Starting => Self::StatusStarting,
91 BarrierManagerStatus::Recovering(reason) => match reason {
92 RecoveryReason::Bootstrap => Self::StatusStarting,
93 RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
94 },
95 BarrierManagerStatus::Running => Self::StatusRunning,
96 }
97 }
98}
99
100pub(crate) enum BarrierManagerRequest {
101 GetDdlProgress(Sender<HashMap<u32, DdlProgress>>),
102 AdhocRecovery(Sender<()>),
103 UpdateDatabaseBarrier {
104 database_id: DatabaseId,
105 barrier_interval_ms: Option<u32>,
106 checkpoint_frequency: Option<u64>,
107 sender: Sender<()>,
108 },
109}
110
111#[derive(Debug)]
112struct BarrierWorkerRuntimeInfoSnapshot {
113 active_streaming_nodes: ActiveStreamingWorkerNodes,
114 database_job_infos: HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
115 state_table_committed_epochs: HashMap<TableId, u64>,
116 state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
118 subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
119 stream_actors: HashMap<ActorId, StreamActor>,
120 fragment_relations: FragmentDownstreamRelation,
121 source_splits: HashMap<ActorId, Vec<SplitImpl>>,
122 background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
123 hummock_version_stats: HummockVersionStats,
124 database_infos: Vec<Database>,
125}
126
127impl BarrierWorkerRuntimeInfoSnapshot {
128 fn validate_database_info(
129 database_id: DatabaseId,
130 database_jobs: &HashMap<TableId, InflightStreamingJobInfo>,
131 active_streaming_nodes: &ActiveStreamingWorkerNodes,
132 stream_actors: &HashMap<ActorId, StreamActor>,
133 state_table_committed_epochs: &HashMap<TableId, u64>,
134 ) -> MetaResult<()> {
135 {
136 for fragment in database_jobs.values().flat_map(|job| job.fragment_infos()) {
137 for (actor_id, actor) in &fragment.actors {
138 if !active_streaming_nodes
139 .current()
140 .contains_key(&actor.worker_id)
141 {
142 return Err(anyhow!(
143 "worker_id {} of actor {} do not exist",
144 actor.worker_id,
145 actor_id
146 )
147 .into());
148 }
149 if !stream_actors.contains_key(actor_id) {
150 return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
151 }
152 }
153 for state_table_id in &fragment.state_table_ids {
154 if !state_table_committed_epochs.contains_key(state_table_id) {
155 return Err(anyhow!(
156 "state table {} is not registered to hummock",
157 state_table_id
158 )
159 .into());
160 }
161 }
162 }
163 for (job_id, job) in database_jobs {
164 let mut committed_epochs = job.existing_table_ids().map(|table_id| {
165 (
166 table_id,
167 *state_table_committed_epochs
168 .get(&table_id)
169 .expect("checked exist"),
170 )
171 });
172 let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
173 anyhow!(
174 "job {} in database {} has no state table after recovery",
175 job_id,
176 database_id
177 )
178 })?;
179 for (table_id, epoch) in committed_epochs {
180 if epoch != first_epoch {
181 return Err(anyhow!(
182 "job {} in database {} has tables with different table ids. {}:{}, {}:{}",
183 job_id,
184 database_id,
185 first_table,
186 first_epoch,
187 table_id,
188 epoch
189 )
190 .into());
191 }
192 }
193 }
194 }
195 Ok(())
196 }
197
198 fn validate(&self) -> MetaResult<()> {
199 for (database_id, job_infos) in &self.database_job_infos {
200 Self::validate_database_info(
201 *database_id,
202 job_infos,
203 &self.active_streaming_nodes,
204 &self.stream_actors,
205 &self.state_table_committed_epochs,
206 )?
207 }
208 Ok(())
209 }
210}
211
212#[derive(Debug)]
213struct DatabaseRuntimeInfoSnapshot {
214 job_infos: HashMap<TableId, InflightStreamingJobInfo>,
215 state_table_committed_epochs: HashMap<TableId, u64>,
216 state_table_log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>>,
218 subscription_info: InflightSubscriptionInfo,
219 stream_actors: HashMap<ActorId, StreamActor>,
220 fragment_relations: FragmentDownstreamRelation,
221 source_splits: HashMap<ActorId, Vec<SplitImpl>>,
222 background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
223}
224
225impl DatabaseRuntimeInfoSnapshot {
226 fn validate(
227 &self,
228 database_id: DatabaseId,
229 active_streaming_nodes: &ActiveStreamingWorkerNodes,
230 ) -> MetaResult<()> {
231 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
232 database_id,
233 &self.job_infos,
234 active_streaming_nodes,
235 &self.stream_actors,
236 &self.state_table_committed_epochs,
237 )
238 }
239}