risingwave_meta/barrier/context/
context_impl.rs1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::Context;
19use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
20use risingwave_common::id::JobId;
21use risingwave_meta_model::ActorId;
22use risingwave_pb::common::WorkerNode;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::id::SourceId;
25use risingwave_pb::stream_service::barrier_complete_response::{
26 PbListFinishedSource, PbLoadFinishedSource,
27};
28use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
29use risingwave_rpc_client::StreamingControlHandle;
30
31use crate::MetaResult;
32use crate::barrier::cdc_progress::CdcTableBackfillTracker;
33use crate::barrier::command::CommandContext;
34use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
35use crate::barrier::progress::TrackingJob;
36use crate::barrier::schedule::MarkReadyOptions;
37use crate::barrier::{
38 BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
39 CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
40 Scheduled,
41};
42use crate::hummock::CommitEpochInfo;
43use crate::manager::LocalNotification;
44use crate::model::FragmentDownstreamRelation;
45use crate::stream::{SourceChange, SplitState};
46
47impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
48 #[await_tree::instrument]
49 async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
50 self.hummock_manager.commit_epoch(commit_info).await?;
51 Ok(self.hummock_manager.get_version_stats().await)
52 }
53
54 #[await_tree::instrument("next_scheduled_barrier")]
55 async fn next_scheduled(&self) -> Scheduled {
56 self.scheduled_barriers.next_scheduled().await
57 }
58
59 fn abort_and_mark_blocked(
60 &self,
61 database_id: Option<DatabaseId>,
62 recovery_reason: RecoveryReason,
63 ) {
64 if database_id.is_none() {
65 self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
66 }
67
68 self.scheduled_barriers
70 .abort_and_mark_blocked(database_id, "cluster is under recovering");
71 }
72
73 fn mark_ready(&self, options: MarkReadyOptions) {
74 let is_global = matches!(&options, MarkReadyOptions::Global { .. });
75 self.scheduled_barriers.mark_ready(options);
76 if is_global {
77 self.set_status(BarrierManagerStatus::Running);
78 }
79 }
80
81 #[await_tree::instrument("post_collect_command({command})")]
82 async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
83 command.post_collect(self).await
84 }
85
86 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
87 self.metadata_manager
88 .notify_finish_failed(database_id, err)
89 .await
90 }
91
92 #[await_tree::instrument("finish_creating_job({job})")]
93 async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
94 let job_id = job.job_id();
95 job.finish(&self.metadata_manager, &self.source_manager)
96 .await?;
97 self.env
98 .notification_manager()
99 .notify_local_subscribers(LocalNotification::StreamingJobBackfillFinished(job_id));
100 Ok(())
101 }
102
103 #[await_tree::instrument("finish_cdc_table_backfill({job})")]
104 async fn finish_cdc_table_backfill(&self, job: JobId) -> MetaResult<()> {
105 CdcTableBackfillTracker::mark_complete_job(&self.env.meta_store().conn, job).await
106 }
107
108 #[await_tree::instrument("new_control_stream({})", node.id)]
109 async fn new_control_stream(
110 &self,
111 node: &WorkerNode,
112 init_request: &PbInitRequest,
113 ) -> MetaResult<StreamingControlHandle> {
114 self.new_control_stream_impl(node, init_request).await
115 }
116
117 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
118 self.reload_runtime_info_impl().await
119 }
120
121 async fn reload_database_runtime_info(
122 &self,
123 database_id: DatabaseId,
124 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
125 self.reload_database_runtime_info_impl(database_id).await
126 }
127
128 async fn handle_list_finished_source_ids(
129 &self,
130 list_finished: Vec<PbListFinishedSource>,
131 ) -> MetaResult<()> {
132 let mut list_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
133
134 for list_finished in list_finished {
135 let table_id = list_finished.table_id;
136 let associated_source_id = list_finished.associated_source_id;
137 list_finished_info
138 .entry((table_id, associated_source_id))
139 .or_default()
140 .insert(list_finished.reporter_actor_id);
141 }
142
143 for ((table_id, associated_source_id), actors) in list_finished_info {
144 let allow_yield = self
145 .refresh_manager
146 .mark_list_stage_finished(table_id, &actors)?;
147
148 if !allow_yield {
149 continue;
150 }
151
152 let database_id = self
154 .metadata_manager
155 .catalog_controller
156 .get_object_database_id(associated_source_id)
157 .await
158 .context("Failed to get database id for table")?;
159
160 let list_finish_command = Command::ListFinish {
162 table_id,
163 associated_source_id,
164 };
165
166 self.barrier_scheduler
168 .run_command_no_wait(database_id, list_finish_command)
169 .context("Failed to schedule ListFinish command")?;
170
171 tracing::info!(
172 %table_id,
173 %associated_source_id,
174 "ListFinish command scheduled successfully"
175 );
176 }
177 Ok(())
178 }
179
180 async fn handle_load_finished_source_ids(
181 &self,
182 load_finished: Vec<PbLoadFinishedSource>,
183 ) -> MetaResult<()> {
184 let mut load_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
185
186 for load_finished in load_finished {
187 let table_id = load_finished.table_id;
188 let associated_source_id = load_finished.associated_source_id;
189 load_finished_info
190 .entry((table_id, associated_source_id))
191 .or_default()
192 .insert(load_finished.reporter_actor_id);
193 }
194
195 for ((table_id, associated_source_id), actors) in load_finished_info {
196 let allow_yield = self
197 .refresh_manager
198 .mark_load_stage_finished(table_id, &actors)?;
199
200 if !allow_yield {
201 continue;
202 }
203
204 let database_id = self
206 .metadata_manager
207 .catalog_controller
208 .get_object_database_id(associated_source_id)
209 .await
210 .context("Failed to get database id for table")?;
211
212 let load_finish_command = Command::LoadFinish {
214 table_id,
215 associated_source_id,
216 };
217
218 self.barrier_scheduler
220 .run_command_no_wait(database_id, load_finish_command)
221 .context("Failed to schedule LoadFinish command")?;
222
223 tracing::info!(
224 %table_id,
225 %associated_source_id,
226 "LoadFinish command scheduled successfully"
227 );
228 }
229
230 Ok(())
231 }
232
233 async fn handle_refresh_finished_table_ids(
234 &self,
235 refresh_finished_table_job_ids: Vec<JobId>,
236 ) -> MetaResult<()> {
237 for job_id in refresh_finished_table_job_ids {
238 let table_id = job_id.as_mv_table_id();
239
240 self.refresh_manager.mark_refresh_complete(table_id).await?;
241 }
242
243 Ok(())
244 }
245}
246
247impl GlobalBarrierWorkerContextImpl {
248 fn set_status(&self, new_status: BarrierManagerStatus) {
249 self.status.store(Arc::new(new_status));
250 }
251}
252
253impl CommandContext {
254 pub async fn post_collect(
257 &self,
258 barrier_manager_context: &GlobalBarrierWorkerContextImpl,
259 ) -> MetaResult<()> {
260 let Some(command) = &self.command else {
261 return Ok(());
262 };
263 match command {
264 Command::Flush => {}
265
266 Command::Throttle { .. } => {}
267
268 Command::Pause => {}
269
270 Command::Resume => {}
271
272 Command::SourceChangeSplit(SplitState {
273 split_assignment: assignment,
274 ..
275 }) => {
276 barrier_manager_context
277 .metadata_manager
278 .update_fragment_splits(assignment)
279 .await?;
280 }
281
282 Command::DropStreamingJobs {
283 streaming_job_ids,
284 unregistered_state_table_ids,
285 ..
286 } => {
287 for job_id in streaming_job_ids {
288 barrier_manager_context
289 .refresh_manager
290 .remove_progress_tracker(job_id.as_mv_table_id(), "drop_streaming_jobs");
291 }
292
293 barrier_manager_context
294 .hummock_manager
295 .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
296 .await?;
297 barrier_manager_context
298 .metadata_manager
299 .catalog_controller
300 .complete_dropped_tables(unregistered_state_table_ids.iter().copied())
301 .await;
302 }
303 Command::ConnectorPropsChange(obj_id_map_props) => {
304 barrier_manager_context
306 .source_manager
307 .apply_source_change(SourceChange::UpdateSourceProps {
308 source_id_map_new_props: obj_id_map_props
311 .iter()
312 .map(|(object_id, props)| (object_id.as_source_id(), props.clone()))
313 .collect(),
314 })
315 .await;
316 }
317 Command::CreateStreamingJob {
318 info,
319 job_type,
320 cross_db_snapshot_backfill_info,
321 } => {
322 match job_type {
323 CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
324 barrier_manager_context
325 .metadata_manager
326 .catalog_controller
327 .fill_snapshot_backfill_epoch(
328 info.stream_job_fragments.fragments.iter().filter_map(
329 |(fragment_id, fragment)| {
330 if fragment.fragment_type_mask.contains(
331 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
332 ) {
333 Some(*fragment_id as _)
334 } else {
335 None
336 }
337 },
338 ),
339 None,
340 cross_db_snapshot_backfill_info,
341 )
342 .await?
343 }
344 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
345 barrier_manager_context
346 .metadata_manager
347 .catalog_controller
348 .fill_snapshot_backfill_epoch(
349 info.stream_job_fragments.fragments.iter().filter_map(
350 |(fragment_id, fragment)| {
351 if fragment.fragment_type_mask.contains_any([
352 FragmentTypeFlag::SnapshotBackfillStreamScan,
353 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
354 ]) {
355 Some(*fragment_id as _)
356 } else {
357 None
358 }
359 },
360 ),
361 Some(snapshot_backfill_info),
362 cross_db_snapshot_backfill_info,
363 )
364 .await?
365 }
366 }
367
368 let CreateStreamingJobCommandInfo {
371 stream_job_fragments,
372 upstream_fragment_downstreams,
373 ..
374 } = info;
375 let new_sink_downstream =
376 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
377 let new_downstreams = ctx.new_sink_downstream.clone();
378 let new_downstreams = FragmentDownstreamRelation::from([(
379 ctx.sink_fragment_id,
380 vec![new_downstreams],
381 )]);
382 Some(new_downstreams)
383 } else {
384 None
385 };
386
387 barrier_manager_context
388 .metadata_manager
389 .catalog_controller
390 .post_collect_job_fragments(
391 stream_job_fragments.stream_job_id(),
392 upstream_fragment_downstreams,
393 new_sink_downstream,
394 Some(&info.init_split_assignment),
395 )
396 .await?;
397
398 let source_change = SourceChange::CreateJob {
399 added_source_fragments: stream_job_fragments.stream_source_fragments(),
400 added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
401 };
402
403 barrier_manager_context
404 .source_manager
405 .apply_source_change(source_change)
406 .await;
407 }
408 Command::RescheduleFragment { reschedules, .. } => {
409 let fragment_splits = reschedules
410 .iter()
411 .map(|(fragment_id, reschedule)| {
412 (*fragment_id, reschedule.actor_splits.clone())
413 })
414 .collect();
415
416 barrier_manager_context
417 .metadata_manager
418 .update_fragment_splits(&fragment_splits)
419 .await?;
420 }
421
422 Command::ReplaceStreamJob(
423 replace_plan @ ReplaceStreamJobPlan {
424 old_fragments,
425 new_fragments,
426 upstream_fragment_downstreams,
427 to_drop_state_table_ids,
428 auto_refresh_schema_sinks,
429 init_split_assignment,
430 ..
431 },
432 ) => {
433 barrier_manager_context
435 .metadata_manager
436 .catalog_controller
437 .post_collect_job_fragments(
438 new_fragments.stream_job_id,
439 upstream_fragment_downstreams,
440 None,
441 Some(init_split_assignment),
442 )
443 .await?;
444
445 if let Some(sinks) = auto_refresh_schema_sinks {
446 for sink in sinks {
447 barrier_manager_context
448 .metadata_manager
449 .catalog_controller
450 .post_collect_job_fragments(
451 sink.tmp_sink_id.as_job_id(),
452 &Default::default(), None, None, )
456 .await?;
457 }
458 }
459
460 barrier_manager_context
462 .source_manager
463 .handle_replace_job(
464 old_fragments,
465 new_fragments.stream_source_fragments(),
466 replace_plan,
467 )
468 .await;
469 barrier_manager_context
470 .hummock_manager
471 .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
472 .await?;
473 }
474
475 Command::CreateSubscription {
476 subscription_id, ..
477 } => {
478 barrier_manager_context
479 .metadata_manager
480 .catalog_controller
481 .finish_create_subscription_catalog(*subscription_id)
482 .await?
483 }
484 Command::DropSubscription { .. } => {}
485 Command::ListFinish { .. } | Command::LoadFinish { .. } | Command::Refresh { .. } => {}
486 Command::ResetSource { .. } => {}
487 }
488
489 Ok(())
490 }
491}