1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::Context;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
22use risingwave_common::id::{JobId, SinkId};
23use risingwave_meta_model::ActorId;
24use risingwave_meta_model::streaming_job::BackfillOrders;
25use risingwave_pb::common::WorkerNode;
26use risingwave_pb::hummock::HummockVersionStats;
27use risingwave_pb::id::SourceId;
28use risingwave_pb::stream_service::barrier_complete_response::{
29 PbIcebergV3SinkMetadata, PbListFinishedSource, PbLoadFinishedSource,
30};
31use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
32use risingwave_rpc_client::StreamingControlHandle;
33use thiserror_ext::AsReport;
34
35use crate::barrier::cdc_progress::CdcTableBackfillTracker;
36use crate::barrier::checkpoint::independent_job::BatchRefreshJobTriggerContext;
37use crate::barrier::command::{PostCollectCommand, ResumeBackfillTarget};
38use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
39use crate::barrier::progress::TrackingJob;
40use crate::barrier::schedule::MarkReadyOptions;
41use crate::barrier::{
42 BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, BatchRefreshInfo, Command,
43 CreateStreamingJobCommandInfo, CreateStreamingJobType, DatabaseRuntimeInfoSnapshot,
44 RecoveryReason, ReplaceStreamJobPlan, Scheduled,
45};
46use crate::hummock::CommitEpochInfo;
47use crate::manager::LocalNotification;
48use crate::model::FragmentDownstreamRelation;
49use crate::stream::{SourceChange, cleanup_dropped_streaming_jobs};
50use crate::{MetaError, MetaResult};
51
52impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
53 #[await_tree::instrument]
54 async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
55 self.hummock_manager.commit_epoch(commit_info).await?;
56 Ok(self.hummock_manager.get_version_stats().await)
57 }
58
59 #[await_tree::instrument("next_scheduled_barrier")]
60 async fn next_scheduled(&self) -> Scheduled {
61 self.scheduled_barriers.next_scheduled().await
62 }
63
64 fn abort_and_mark_blocked(
65 &self,
66 database_id: Option<DatabaseId>,
67 recovery_reason: RecoveryReason,
68 ) {
69 if database_id.is_none() {
70 self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
71 }
72
73 self.scheduled_barriers
75 .abort_and_mark_blocked(database_id, "cluster is under recovering");
76 }
77
78 fn mark_ready(&self, options: MarkReadyOptions) {
79 let is_global = matches!(&options, MarkReadyOptions::Global { .. });
80 self.scheduled_barriers.mark_ready(options);
81 if is_global {
82 self.set_status(BarrierManagerStatus::Running);
83 }
84 }
85
86 #[await_tree::instrument("post_collect_command({command})")]
87 async fn post_collect_command(&self, command: PostCollectCommand) -> MetaResult<()> {
88 command.post_collect(self).await
89 }
90
91 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
92 self.metadata_manager
93 .notify_finish_failed(database_id, err)
94 .await
95 }
96
97 #[await_tree::instrument("finish_creating_job({job})")]
98 async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
99 let job_id = job.job_id();
100 job.finish(&self.metadata_manager, &self.source_manager)
101 .await?;
102 self.env
103 .notification_manager()
104 .notify_local_subscribers(LocalNotification::StreamingJobBackfillFinished(job_id));
105 Ok(())
106 }
107
108 #[await_tree::instrument("finish_cdc_table_backfill({job})")]
109 async fn finish_cdc_table_backfill(&self, job: JobId) -> MetaResult<()> {
110 CdcTableBackfillTracker::mark_complete_job(&self.env.meta_store().conn, job).await
111 }
112
113 #[await_tree::instrument("new_control_stream({})", node.id)]
114 async fn new_control_stream(
115 &self,
116 node: &WorkerNode,
117 init_request: &PbInitRequest,
118 ) -> MetaResult<StreamingControlHandle> {
119 self.new_control_stream_impl(node, init_request).await
120 }
121
122 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
123 self.reload_runtime_info_impl().await
124 }
125
126 async fn reload_database_runtime_info(
127 &self,
128 database_id: DatabaseId,
129 ) -> MetaResult<DatabaseRuntimeInfoSnapshot> {
130 self.reload_database_runtime_info_impl(database_id).await
131 }
132
133 async fn handle_list_finished_source_ids(
134 &self,
135 list_finished: Vec<PbListFinishedSource>,
136 ) -> MetaResult<()> {
137 let mut list_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
138
139 for list_finished in list_finished {
140 let table_id = list_finished.table_id;
141 let associated_source_id = list_finished.associated_source_id;
142 list_finished_info
143 .entry((table_id, associated_source_id))
144 .or_default()
145 .insert(list_finished.reporter_actor_id);
146 }
147
148 for ((table_id, associated_source_id), actors) in list_finished_info {
149 let allow_yield = self
150 .refresh_manager
151 .mark_list_stage_finished(table_id, &actors)?;
152
153 if !allow_yield {
154 continue;
155 }
156
157 let Some(database_id) = self
158 .get_source_database_id_for_refresh_stage(table_id, associated_source_id, "list")
159 .await?
160 else {
161 continue;
162 };
163
164 let list_finish_command = Command::ListFinish {
166 table_id,
167 associated_source_id,
168 };
169
170 self.barrier_scheduler
172 .run_command_no_wait(database_id, list_finish_command)
173 .context("Failed to schedule ListFinish command")?;
174
175 tracing::info!(
176 %table_id,
177 %associated_source_id,
178 "ListFinish command scheduled successfully"
179 );
180 }
181 Ok(())
182 }
183
184 async fn handle_load_finished_source_ids(
185 &self,
186 load_finished: Vec<PbLoadFinishedSource>,
187 ) -> MetaResult<()> {
188 let mut load_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
189
190 for load_finished in load_finished {
191 let table_id = load_finished.table_id;
192 let associated_source_id = load_finished.associated_source_id;
193 load_finished_info
194 .entry((table_id, associated_source_id))
195 .or_default()
196 .insert(load_finished.reporter_actor_id);
197 }
198
199 for ((table_id, associated_source_id), actors) in load_finished_info {
200 let allow_yield = self
201 .refresh_manager
202 .mark_load_stage_finished(table_id, &actors)?;
203
204 if !allow_yield {
205 continue;
206 }
207
208 let Some(database_id) = self
209 .get_source_database_id_for_refresh_stage(table_id, associated_source_id, "load")
210 .await?
211 else {
212 continue;
213 };
214
215 let load_finish_command = Command::LoadFinish {
217 table_id,
218 associated_source_id,
219 };
220
221 self.barrier_scheduler
223 .run_command_no_wait(database_id, load_finish_command)
224 .context("Failed to schedule LoadFinish command")?;
225
226 tracing::info!(
227 %table_id,
228 %associated_source_id,
229 "LoadFinish command scheduled successfully"
230 );
231 }
232
233 Ok(())
234 }
235
236 async fn handle_refresh_finished_table_ids(
237 &self,
238 refresh_finished_table_job_ids: Vec<JobId>,
239 ) -> MetaResult<()> {
240 for job_id in refresh_finished_table_job_ids {
241 let table_id = job_id.as_mv_table_id();
242
243 self.refresh_manager.mark_refresh_complete(table_id).await?;
244 }
245
246 Ok(())
247 }
248
249 async fn load_batch_refresh_trigger_context(
250 &self,
251 job_id: JobId,
252 database_id: DatabaseId,
253 last_committed_epoch: u64,
254 ) -> MetaResult<BatchRefreshJobTriggerContext> {
255 self.load_batch_refresh_trigger_context_impl(job_id, database_id, last_committed_epoch)
256 .await
257 }
258
259 #[await_tree::instrument]
260 async fn pre_commit_iceberg_v3_sink_metadata(
261 &self,
262 reports: Vec<PbIcebergV3SinkMetadata>,
263 ) -> MetaResult<Vec<SinkId>> {
264 let grouped = group_v3_reports_by_sink(reports)?;
265 let success_ids: Vec<SinkId> = grouped.keys().cloned().collect();
266 let futs = FuturesUnordered::new();
267 for (sink_id, (prev_epoch, reports)) in grouped {
268 if reports.is_empty() {
269 continue;
270 }
271 let manager = &self.iceberg_v3_sink_manager;
272 futs.push(async move {
273 (
274 sink_id,
275 manager
276 .pre_commit_v3_epoch(sink_id, prev_epoch, reports)
277 .await,
278 )
279 });
280 }
281
282 let results: Vec<(SinkId, anyhow::Result<()>)> = futs.collect().await;
285 let errs: Vec<(SinkId, anyhow::Error)> = results
286 .into_iter()
287 .filter_map(|(id, r)| r.err().map(|e| (id, e)))
288 .collect();
289 if errs.is_empty() {
290 Ok(success_ids)
291 } else {
292 Err(aggregate_v3_sink_errors("pre-commit", errs).into())
293 }
294 }
295
296 #[await_tree::instrument]
297 async fn commit_iceberg_v3_sink_metadata(&self, sink_ids: Vec<SinkId>) -> MetaResult<()> {
298 let futs = FuturesUnordered::new();
299 for sink_id in sink_ids {
300 let manager = &self.iceberg_v3_sink_manager;
301 futs.push(async move { (sink_id, manager.commit_v3_epoch(sink_id).await) });
302 }
303
304 let results: Vec<(SinkId, anyhow::Result<()>)> = futs.collect().await;
305 let errs: Vec<(SinkId, anyhow::Error)> = results
306 .into_iter()
307 .filter_map(|(id, r)| r.err().map(|e| (id, e)))
308 .collect();
309 if errs.is_empty() {
310 Ok(())
311 } else {
312 Err(aggregate_v3_sink_errors("commit", errs).into())
313 }
314 }
315}
316
317fn aggregate_v3_sink_errors(
321 phase: &'static str,
322 mut errs: Vec<(SinkId, anyhow::Error)>,
323) -> anyhow::Error {
324 debug_assert!(!errs.is_empty());
325 let sink_ids: Vec<String> = errs.iter().map(|(id, _)| id.to_string()).collect();
326 let details: Vec<String> = errs
327 .iter()
328 .map(|(id, e)| format!("sink {}: {}", id, e.as_report()))
329 .collect();
330 let (_, first_err) = errs.remove(0);
332 first_err.context(format!(
333 "iceberg v3 sink {} failed for sink_id(s) [{}]: {}",
334 phase,
335 sink_ids.join(", "),
336 details.join("; ")
337 ))
338}
339
340fn group_v3_reports_by_sink(
341 reports: Vec<PbIcebergV3SinkMetadata>,
342) -> MetaResult<HashMap<SinkId, (u64, Vec<PbIcebergV3SinkMetadata>)>> {
343 let mut grouped: HashMap<SinkId, (u64, Vec<PbIcebergV3SinkMetadata>)> = HashMap::new();
344 for r in reports {
345 let sink_id = r.sink_id;
346 let prev_epoch = r.prev_epoch;
347 let entry = grouped.entry(sink_id).or_insert((prev_epoch, Vec::new()));
348 if entry.0 != prev_epoch {
349 return Err(anyhow::anyhow!(
350 "iceberg v3 sink {} reports disagree on prev_epoch: {} vs {}",
351 sink_id,
352 entry.0,
353 prev_epoch
354 )
355 .into());
356 }
357 entry.1.push(r);
358 }
359 Ok(grouped)
360}
361
362impl GlobalBarrierWorkerContextImpl {
363 async fn get_source_database_id_for_refresh_stage(
364 &self,
365 table_id: TableId,
366 associated_source_id: SourceId,
367 stage: &'static str,
368 ) -> MetaResult<Option<DatabaseId>> {
369 match self
370 .metadata_manager
371 .catalog_controller
372 .get_object_database_id(associated_source_id)
373 .await
374 {
375 Ok(database_id) => Ok(Some(database_id)),
376 Err(err) if err.is_catalog_id_not_found("object") => {
377 tracing::warn!(
378 %table_id,
379 %associated_source_id,
380 stage,
381 "skip refresh finish command because associated source is already dropped"
382 );
383 Ok(None)
384 }
385 Err(err) => Err(err)
386 .with_context(|| {
387 format!(
388 "failed to get database id for refresh stage: table_id={}, associated_source_id={}, stage={stage}",
389 table_id, associated_source_id
390 )
391 })
392 .map_err(Into::into),
393 }
394 }
395
396 fn set_status(&self, new_status: BarrierManagerStatus) {
397 self.status.store(Arc::new(new_status));
398 }
399
400 async fn load_batch_refresh_trigger_context_impl(
402 &self,
403 job_id: JobId,
404 database_id: DatabaseId,
405 last_committed_epoch: u64,
406 ) -> MetaResult<BatchRefreshJobTriggerContext> {
407 use itertools::Itertools;
408 use sea_orm::TransactionTrait;
409
410 use crate::controller::scale::load_fragment_context_for_jobs;
411
412 let inner = self
414 .metadata_manager
415 .catalog_controller
416 .get_inner_read_guard()
417 .await;
418 let txn = inner.db.begin().await?;
419
420 let fragment_context =
422 load_fragment_context_for_jobs(&txn, HashSet::from([job_id])).await?;
423
424 let streaming_job_model = fragment_context
425 .job_map
426 .get(&job_id)
427 .ok_or_else(|| anyhow::anyhow!("streaming job model not found for job {}", job_id))?
428 .clone();
429
430 let database_model = fragment_context
431 .database_map
432 .get(&database_id)
433 .ok_or_else(|| {
434 anyhow::anyhow!("database model not found for database {}", database_id)
435 })?;
436 let database_resource_group = database_model.resource_group.clone();
437
438 let mut job_extra_info = self
440 .metadata_manager
441 .catalog_controller
442 .get_streaming_job_extra_info_in_txn(&txn, vec![job_id])
443 .await?;
444 let definition = job_extra_info
445 .remove(&job_id)
446 .ok_or_else(|| anyhow::anyhow!("extra info not found for job {}", job_id))?
447 .job_definition;
448
449 let fragments = fragment_context
451 .job_fragments
452 .get(&job_id)
453 .ok_or_else(|| anyhow::anyhow!("fragments not found for job {}", job_id))?
454 .clone();
455
456 let upstream_table_ids: HashSet<TableId> = {
458 use crate::stream::StreamFragmentGraph;
459 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
460 fragments.values().map(|f| (&f.nodes, f.fragment_type_mask)),
461 )?
462 .0
463 .ok_or_else(|| {
464 anyhow::anyhow!("batch refresh job {} has no snapshot backfill info", job_id)
465 })?;
466 snapshot_backfill_info
467 .upstream_mv_table_id_to_backfill_epoch
468 .into_keys()
469 .collect()
470 };
471
472 let fragment_ids: Vec<_> = fragments.keys().copied().collect();
473 let downstreams = self
474 .metadata_manager
475 .catalog_controller
476 .get_fragment_downstream_relations_in_txn(&txn, fragment_ids)
477 .await?;
478
479 txn.commit().await?;
480 drop(inner);
481
482 let (upstream_table_log_epochs, target_upstream_epoch) = self
484 .hummock_manager
485 .on_current_version_and_table_change_log(|version, table_change_log| {
486 let mut target_upstream_epoch = last_committed_epoch;
487 let mut log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>> = HashMap::new();
488
489 for &upstream_table_id in &upstream_table_ids {
490 let upstream_committed_epoch = version
491 .state_table_info
492 .info()
493 .get(&upstream_table_id)
494 .map(|info| info.committed_epoch)
495 .ok_or_else(|| {
496 anyhow::anyhow!(
497 "cannot get committed epoch for upstream table {}",
498 upstream_table_id
499 )
500 })?;
501
502 target_upstream_epoch =
503 std::cmp::max(target_upstream_epoch, upstream_committed_epoch);
504
505 if upstream_committed_epoch <= last_committed_epoch {
506 continue;
507 }
508
509 if let Some(change_log) = table_change_log.get(&upstream_table_id) {
510 let epochs = change_log
511 .filter_epoch((last_committed_epoch, upstream_committed_epoch))
512 .map(|epoch_log| {
513 (
514 epoch_log.non_checkpoint_epochs.clone(),
515 epoch_log.checkpoint_epoch,
516 )
517 })
518 .collect_vec();
519 if !epochs.is_empty() {
520 log_epochs.insert(upstream_table_id, epochs);
521 }
522 } else {
523 anyhow::bail!(
524 "upstream table {} has lagged downstream on epoch {} but no table change log (upstream committed: {})",
525 upstream_table_id,
526 last_committed_epoch,
527 upstream_committed_epoch,
528 );
529 }
530 }
531
532 Ok((log_epochs, target_upstream_epoch))
533 })
534 .await?;
535
536 Ok(BatchRefreshJobTriggerContext {
537 fragments,
538 downstreams,
539 streaming_job_model,
540 definition,
541 database_resource_group,
542 upstream_table_log_epochs,
543 target_upstream_epoch,
544 })
545 }
546}
547
548impl PostCollectCommand {
549 pub async fn post_collect(
552 self,
553 barrier_manager_context: &GlobalBarrierWorkerContextImpl,
554 ) -> MetaResult<()> {
555 match self {
556 PostCollectCommand::Command(_) => {}
557 PostCollectCommand::SourceChangeSplit {
558 split_assignment: assignment,
559 } => {
560 barrier_manager_context
561 .metadata_manager
562 .update_fragment_splits(&assignment)
563 .await?;
564 }
565
566 PostCollectCommand::DropStreamingJobs => {}
567 PostCollectCommand::ConnectorPropsChange(obj_id_map_props) => {
568 barrier_manager_context
570 .source_manager
571 .apply_source_change(SourceChange::UpdateSourceProps {
572 source_id_map_new_props: obj_id_map_props
575 .iter()
576 .map(|(object_id, props)| (object_id.as_source_id(), props.clone()))
577 .collect(),
578 })
579 .await;
580 }
581 PostCollectCommand::ResumeBackfill { target } => match target {
582 ResumeBackfillTarget::Job(job_id) => {
583 barrier_manager_context
584 .metadata_manager
585 .catalog_controller
586 .update_backfill_orders_by_job_id(job_id, None)
587 .await?;
588 }
589 ResumeBackfillTarget::Fragment(fragment_id) => {
590 let mut job_ids = barrier_manager_context
591 .metadata_manager
592 .catalog_controller
593 .get_fragment_job_id(vec![fragment_id])
594 .await?;
595 let job_id = job_ids.pop().ok_or_else(|| {
596 MetaError::invalid_parameter("fragment not found".to_owned())
597 })?;
598 let job_id = JobId::new(job_id.as_raw_id());
599
600 let extra_info = barrier_manager_context
601 .metadata_manager
602 .catalog_controller
603 .get_streaming_job_extra_info(vec![job_id])
604 .await?;
605 let mut backfill_orders: BackfillOrders = extra_info
606 .get(&job_id)
607 .cloned()
608 .ok_or_else(|| MetaError::invalid_parameter("job not found".to_owned()))?
609 .backfill_orders
610 .unwrap_or_default();
611
612 let resumed_fragment_id = fragment_id.as_raw_id();
613 for children in backfill_orders.0.values_mut() {
614 children.retain(|child| *child != resumed_fragment_id);
615 }
616 backfill_orders.0.retain(|_, children| !children.is_empty());
617
618 barrier_manager_context
619 .metadata_manager
620 .catalog_controller
621 .update_backfill_orders_by_job_id(job_id, Some(backfill_orders))
622 .await?;
623 }
624 },
625 PostCollectCommand::CreateStreamingJob {
626 info,
627 job_type,
628 cross_db_snapshot_backfill_info,
629 resolved_split_assignment,
630 } => {
631 match &job_type {
632 CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
633 barrier_manager_context
634 .metadata_manager
635 .catalog_controller
636 .fill_snapshot_backfill_epoch(
637 info.stream_job_fragments.fragments.iter().filter_map(
638 |(fragment_id, fragment)| {
639 if fragment.fragment_type_mask.contains(
640 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
641 ) {
642 Some(*fragment_id as _)
643 } else {
644 None
645 }
646 },
647 ),
648 None,
649 &cross_db_snapshot_backfill_info,
650 )
651 .await?
652 }
653 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
654 | CreateStreamingJobType::BatchRefresh(BatchRefreshInfo {
655 snapshot_backfill_info,
656 ..
657 }) => {
658 barrier_manager_context
659 .metadata_manager
660 .catalog_controller
661 .fill_snapshot_backfill_epoch(
662 info.stream_job_fragments.fragments.iter().filter_map(
663 |(fragment_id, fragment)| {
664 if fragment.fragment_type_mask.contains_any([
665 FragmentTypeFlag::SnapshotBackfillStreamScan,
666 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
667 ]) {
668 Some(*fragment_id as _)
669 } else {
670 None
671 }
672 },
673 ),
674 Some(snapshot_backfill_info),
675 &cross_db_snapshot_backfill_info,
676 )
677 .await?
678 }
679 }
680
681 let CreateStreamingJobCommandInfo {
684 stream_job_fragments,
685 upstream_fragment_downstreams,
686 ..
687 } = info;
688 let new_sink_downstream =
689 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
690 let new_downstreams = ctx.new_sink_downstream.clone();
691 let new_downstreams = FragmentDownstreamRelation::from([(
692 ctx.sink_fragment_id,
693 vec![new_downstreams],
694 )]);
695 Some(new_downstreams)
696 } else {
697 None
698 };
699
700 barrier_manager_context
701 .metadata_manager
702 .catalog_controller
703 .post_collect_job_fragments(
704 stream_job_fragments.stream_job_id(),
705 &upstream_fragment_downstreams,
706 new_sink_downstream,
707 Some(&resolved_split_assignment),
708 )
709 .await?;
710
711 let source_change = SourceChange::CreateJob {
712 added_source_fragments: stream_job_fragments.stream_source_fragments(),
713 added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
714 };
715
716 barrier_manager_context
717 .source_manager
718 .apply_source_change(source_change)
719 .await;
720 }
721 PostCollectCommand::Reschedule { reschedules, .. } => {
722 let fragment_splits = reschedules
723 .iter()
724 .map(|(fragment_id, reschedule)| {
725 (*fragment_id, reschedule.actor_splits.clone())
726 })
727 .collect();
728
729 barrier_manager_context
730 .metadata_manager
731 .update_fragment_splits(&fragment_splits)
732 .await?;
733 }
734
735 PostCollectCommand::ReplaceStreamJob {
736 plan: replace_plan,
737 resolved_split_assignment,
738 } => {
739 let ReplaceStreamJobPlan {
740 old_fragments,
741 new_fragments,
742 upstream_fragment_downstreams,
743 to_drop_state_table_ids,
744 auto_refresh_schema_sinks,
745 ..
746 } = &replace_plan;
747 barrier_manager_context
749 .metadata_manager
750 .catalog_controller
751 .post_collect_job_fragments(
752 new_fragments.stream_job_id,
753 upstream_fragment_downstreams,
754 None,
755 Some(&resolved_split_assignment),
756 )
757 .await?;
758
759 if let Some(sinks) = auto_refresh_schema_sinks {
760 for sink in sinks {
761 barrier_manager_context
762 .metadata_manager
763 .catalog_controller
764 .post_collect_job_fragments(
765 sink.tmp_sink_id.as_job_id(),
766 &Default::default(), None, None, )
770 .await?;
771 }
772 }
773
774 barrier_manager_context
776 .source_manager
777 .handle_replace_job(
778 old_fragments,
779 new_fragments.stream_source_fragments(),
780 &replace_plan,
781 )
782 .await;
783 cleanup_dropped_streaming_jobs(
784 &barrier_manager_context.refresh_manager,
785 &barrier_manager_context.hummock_manager,
786 &barrier_manager_context.metadata_manager,
787 [],
788 to_drop_state_table_ids.clone(),
789 "replace_streaming_job",
790 )
791 .await?;
792 }
793
794 PostCollectCommand::CreateSubscription { subscription_id } => {
795 barrier_manager_context
796 .metadata_manager
797 .catalog_controller
798 .finish_create_subscription_catalog(subscription_id)
799 .await?
800 }
801 }
802
803 Ok(())
804 }
805}
806
807#[cfg(test)]
808mod tests {
809 use super::*;
810
811 #[test]
812 fn test_skip_refresh_finish_when_associated_source_missing() {
813 let err = MetaError::catalog_id_not_found("object", 42);
814 assert!(err.is_catalog_id_not_found("object"));
815 }
816
817 #[test]
818 fn test_do_not_skip_refresh_finish_for_other_not_found_types() {
819 let err = MetaError::catalog_id_not_found("table", 42);
820 assert!(!err.is_catalog_id_not_found("object"));
821 }
822}