risingwave_meta/barrier/context/
context_impl.rs1use std::sync::Arc;
16
17use risingwave_common::catalog::DatabaseId;
18use risingwave_pb::common::WorkerNode;
19use risingwave_pb::hummock::HummockVersionStats;
20use risingwave_pb::stream_plan::PbFragmentTypeFlag;
21use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
22use risingwave_rpc_client::StreamingControlHandle;
23
24use crate::MetaResult;
25use crate::barrier::command::CommandContext;
26use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
27use crate::barrier::progress::TrackingJob;
28use crate::barrier::schedule::MarkReadyOptions;
29use crate::barrier::{
30 BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
31 CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
32 Scheduled,
33};
34use crate::hummock::CommitEpochInfo;
35use crate::stream::SourceChange;
36
37impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
38 async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
39 self.hummock_manager.commit_epoch(commit_info).await?;
40 Ok(self.hummock_manager.get_version_stats().await)
41 }
42
43 async fn next_scheduled(&self) -> Scheduled {
44 self.scheduled_barriers.next_scheduled().await
45 }
46
47 fn abort_and_mark_blocked(
48 &self,
49 database_id: Option<DatabaseId>,
50 recovery_reason: RecoveryReason,
51 ) {
52 if database_id.is_none() {
53 self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
54 }
55
56 self.scheduled_barriers
58 .abort_and_mark_blocked(database_id, "cluster is under recovering");
59 }
60
61 fn mark_ready(&self, options: MarkReadyOptions) {
62 let is_global = matches!(&options, MarkReadyOptions::Global { .. });
63 self.scheduled_barriers.mark_ready(options);
64 if is_global {
65 self.set_status(BarrierManagerStatus::Running);
66 }
67 }
68
69 async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
70 command.post_collect(self).await
71 }
72
73 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
74 self.metadata_manager
75 .notify_finish_failed(database_id, err)
76 .await
77 }
78
79 async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
80 job.finish(&self.metadata_manager).await
81 }
82
83 async fn new_control_stream(
84 &self,
85 node: &WorkerNode,
86 init_request: &PbInitRequest,
87 ) -> MetaResult<StreamingControlHandle> {
88 self.new_control_stream_impl(node, init_request).await
89 }
90
91 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
92 self.reload_runtime_info_impl().await
93 }
94
95 async fn reload_database_runtime_info(
96 &self,
97 database_id: DatabaseId,
98 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
99 self.reload_database_runtime_info_impl(database_id).await
100 }
101}
102
103impl GlobalBarrierWorkerContextImpl {
104 fn set_status(&self, new_status: BarrierManagerStatus) {
105 self.status.store(Arc::new(new_status));
106 }
107}
108
109impl CommandContext {
110 pub async fn post_collect(
113 &self,
114 barrier_manager_context: &GlobalBarrierWorkerContextImpl,
115 ) -> MetaResult<()> {
116 let Some(command) = &self.command else {
117 return Ok(());
118 };
119 match command {
120 Command::Flush => {}
121
122 Command::Throttle(_) => {}
123
124 Command::Pause => {}
125
126 Command::Resume => {}
127
128 Command::SourceChangeSplit(split_assignment) => {
129 barrier_manager_context
130 .metadata_manager
131 .update_actor_splits_by_split_assignment(split_assignment)
132 .await?;
133 barrier_manager_context
134 .source_manager
135 .apply_source_change(SourceChange::SplitChange(split_assignment.clone()))
136 .await;
137 }
138
139 Command::DropStreamingJobs {
140 unregistered_state_table_ids,
141 ..
142 } => {
143 barrier_manager_context
144 .hummock_manager
145 .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
146 .await?;
147 }
148 Command::CreateStreamingJob {
149 info,
150 job_type,
151 cross_db_snapshot_backfill_info,
152 } => {
153 match job_type {
154 CreateStreamingJobType::SinkIntoTable(
155 replace_plan @ ReplaceStreamJobPlan {
156 old_fragments,
157 new_fragments,
158 upstream_fragment_downstreams,
159 init_split_assignment,
160 ..
161 },
162 ) => {
163 barrier_manager_context
164 .metadata_manager
165 .catalog_controller
166 .post_collect_job_fragments(
167 new_fragments.stream_job_id.table_id as _,
168 new_fragments.actor_ids(),
169 upstream_fragment_downstreams,
170 init_split_assignment,
171 )
172 .await?;
173 barrier_manager_context
174 .source_manager
175 .handle_replace_job(
176 old_fragments,
177 new_fragments.stream_source_fragments(),
178 init_split_assignment.clone(),
179 replace_plan,
180 )
181 .await;
182 }
183 CreateStreamingJobType::Normal => {
184 barrier_manager_context
185 .metadata_manager
186 .catalog_controller
187 .fill_snapshot_backfill_epoch(
188 info.stream_job_fragments.fragments.iter().filter_map(
189 |(fragment_id, fragment)| {
190 if (fragment.fragment_type_mask
191 & PbFragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32)
192 != 0
193 {
194 Some(*fragment_id as _)
195 } else {
196 None
197 }
198 },
199 ),
200 None,
201 cross_db_snapshot_backfill_info,
202 )
203 .await?
204 }
205 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
206 barrier_manager_context
207 .metadata_manager
208 .catalog_controller
209 .fill_snapshot_backfill_epoch(
210 info.stream_job_fragments.fragments.iter().filter_map(
211 |(fragment_id, fragment)| {
212 if (fragment.fragment_type_mask
213 & (PbFragmentTypeFlag::SnapshotBackfillStreamScan as u32 | PbFragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32))
214 != 0
215 {
216 Some(*fragment_id as _)
217 } else {
218 None
219 }
220 },
221 ),
222 Some(snapshot_backfill_info),
223 cross_db_snapshot_backfill_info,
224 )
225 .await?
226 }
227 }
228
229 let CreateStreamingJobCommandInfo {
232 stream_job_fragments,
233 upstream_fragment_downstreams,
234 init_split_assignment,
235 streaming_job,
236 ..
237 } = info;
238 barrier_manager_context
239 .metadata_manager
240 .catalog_controller
241 .post_collect_job_fragments_inner(
242 stream_job_fragments.stream_job_id().table_id as _,
243 stream_job_fragments.actor_ids(),
244 upstream_fragment_downstreams,
245 init_split_assignment,
246 streaming_job.is_materialized_view(),
247 )
248 .await?;
249
250 let source_change = SourceChange::CreateJob {
251 added_source_fragments: stream_job_fragments.stream_source_fragments(),
252 added_backfill_fragments: stream_job_fragments.source_backfill_fragments()?,
253 split_assignment: init_split_assignment.clone(),
254 };
255
256 barrier_manager_context
257 .source_manager
258 .apply_source_change(source_change)
259 .await;
260 }
261 Command::RescheduleFragment {
262 reschedules,
263 post_updates,
264 ..
265 } => {
266 barrier_manager_context
267 .scale_controller
268 .post_apply_reschedule(reschedules, post_updates)
269 .await?;
270 }
271
272 Command::ReplaceStreamJob(
273 replace_plan @ ReplaceStreamJobPlan {
274 old_fragments,
275 new_fragments,
276 upstream_fragment_downstreams,
277 init_split_assignment,
278 to_drop_state_table_ids,
279 ..
280 },
281 ) => {
282 barrier_manager_context
284 .metadata_manager
285 .catalog_controller
286 .post_collect_job_fragments(
287 new_fragments.stream_job_id.table_id as _,
288 new_fragments.actor_ids(),
289 upstream_fragment_downstreams,
290 init_split_assignment,
291 )
292 .await?;
293
294 barrier_manager_context
296 .source_manager
297 .handle_replace_job(
298 old_fragments,
299 new_fragments.stream_source_fragments(),
300 init_split_assignment.clone(),
301 replace_plan,
302 )
303 .await;
304 barrier_manager_context
305 .hummock_manager
306 .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
307 .await?;
308 }
309
310 Command::CreateSubscription {
311 subscription_id, ..
312 } => {
313 barrier_manager_context
314 .metadata_manager
315 .catalog_controller
316 .finish_create_subscription_catalog(*subscription_id)
317 .await?
318 }
319 Command::DropSubscription { .. } => {}
320 Command::MergeSnapshotBackfillStreamingJobs(_) => {}
321 Command::ConnectorPropsChange(_) => {}
322 Command::StartFragmentBackfill { .. } => {}
323 }
324
325 Ok(())
326 }
327}