1use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitImpl;
20use risingwave_pb::ddl_service::DdlProgress;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::meta::PbRecoveryStatus;
23use tokio::sync::oneshot::Sender;
24
25use self::notifier::Notifier;
26use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo};
27use crate::manager::ActiveStreamingWorkerNodes;
28use crate::model::{ActorId, FragmentDownstreamRelation, StreamActor, StreamJobFragments};
29use crate::{MetaError, MetaResult};
30
31mod checkpoint;
32mod command;
33mod complete_task;
34mod context;
35mod edge_builder;
36mod info;
37mod manager;
38mod notifier;
39mod progress;
40mod rpc;
41mod schedule;
42mod trace;
43mod utils;
44mod worker;
45
46pub use self::command::{
47 BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
48 ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo,
49};
50pub use self::info::InflightSubscriptionInfo;
51pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
52pub use self::schedule::BarrierScheduler;
53pub use self::trace::TracedEpoch;
54
55enum RecoveryReason {
57 Bootstrap,
59 Failover(MetaError),
61 Adhoc,
63}
64
65enum BarrierManagerStatus {
67 Starting,
69 Recovering(RecoveryReason),
71 Running,
73}
74
75struct Scheduled {
77 database_id: DatabaseId,
78 command: Command,
79 notifiers: Vec<Notifier>,
80 span: tracing::Span,
81}
82
83impl From<&BarrierManagerStatus> for PbRecoveryStatus {
84 fn from(status: &BarrierManagerStatus) -> Self {
85 match status {
86 BarrierManagerStatus::Starting => Self::StatusStarting,
87 BarrierManagerStatus::Recovering(reason) => match reason {
88 RecoveryReason::Bootstrap => Self::StatusStarting,
89 RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
90 },
91 BarrierManagerStatus::Running => Self::StatusRunning,
92 }
93 }
94}
95
96pub(crate) enum BarrierManagerRequest {
97 GetDdlProgress(Sender<HashMap<u32, DdlProgress>>),
98 AdhocRecovery(Sender<()>),
99}
100
101#[derive(Debug)]
102struct BarrierWorkerRuntimeInfoSnapshot {
103 active_streaming_nodes: ActiveStreamingWorkerNodes,
104 database_fragment_infos: HashMap<DatabaseId, InflightDatabaseInfo>,
105 state_table_committed_epochs: HashMap<TableId, u64>,
106 subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
107 stream_actors: HashMap<ActorId, StreamActor>,
108 fragment_relations: FragmentDownstreamRelation,
109 source_splits: HashMap<ActorId, Vec<SplitImpl>>,
110 background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
111 hummock_version_stats: HummockVersionStats,
112}
113
114impl BarrierWorkerRuntimeInfoSnapshot {
115 fn validate_database_info(
116 database_id: DatabaseId,
117 database_info: &InflightDatabaseInfo,
118 active_streaming_nodes: &ActiveStreamingWorkerNodes,
119 stream_actors: &HashMap<ActorId, StreamActor>,
120 state_table_committed_epochs: &HashMap<TableId, u64>,
121 ) -> MetaResult<()> {
122 {
123 for fragment in database_info.fragment_infos() {
124 for (actor_id, actor) in &fragment.actors {
125 if !active_streaming_nodes
126 .current()
127 .contains_key(&actor.worker_id)
128 {
129 return Err(anyhow!(
130 "worker_id {} of actor {} do not exist",
131 actor.worker_id,
132 actor_id
133 )
134 .into());
135 }
136 if !stream_actors.contains_key(actor_id) {
137 return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
138 }
139 }
140 for state_table_id in &fragment.state_table_ids {
141 if !state_table_committed_epochs.contains_key(state_table_id) {
142 return Err(anyhow!(
143 "state table {} is not registered to hummock",
144 state_table_id
145 )
146 .into());
147 }
148 }
149 }
150 let mut committed_epochs = database_info.existing_table_ids().map(|table_id| {
151 (
152 table_id,
153 *state_table_committed_epochs
154 .get(&table_id)
155 .expect("checked exist"),
156 )
157 });
158 let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
159 anyhow!("database {} has no state table after recovery", database_id)
160 })?;
161 for (table_id, epoch) in committed_epochs {
162 if epoch != first_epoch {
163 return Err(anyhow!(
164 "database {} has tables with different table ids. {}:{}, {}:{}",
165 database_id,
166 first_table,
167 first_epoch,
168 table_id,
169 epoch
170 )
171 .into());
172 }
173 }
174 }
175 Ok(())
176 }
177
178 fn validate(&self) -> MetaResult<()> {
179 for (database_id, database_info) in &self.database_fragment_infos {
180 Self::validate_database_info(
181 *database_id,
182 database_info,
183 &self.active_streaming_nodes,
184 &self.stream_actors,
185 &self.state_table_committed_epochs,
186 )?
187 }
188 Ok(())
189 }
190}
191
192#[derive(Debug)]
193struct DatabaseRuntimeInfoSnapshot {
194 database_fragment_info: InflightDatabaseInfo,
195 state_table_committed_epochs: HashMap<TableId, u64>,
196 subscription_info: InflightSubscriptionInfo,
197 stream_actors: HashMap<ActorId, StreamActor>,
198 fragment_relations: FragmentDownstreamRelation,
199 source_splits: HashMap<ActorId, Vec<SplitImpl>>,
200 background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
201}
202
203impl DatabaseRuntimeInfoSnapshot {
204 fn validate(
205 &self,
206 database_id: DatabaseId,
207 active_streaming_nodes: &ActiveStreamingWorkerNodes,
208 ) -> MetaResult<()> {
209 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
210 database_id,
211 &self.database_fragment_info,
212 active_streaming_nodes,
213 &self.stream_actors,
214 &self.state_table_committed_epochs,
215 )
216 }
217}