Skip to main content

risingwave_meta/barrier/context/
context_impl.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::Context;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
22use risingwave_common::id::{JobId, SinkId};
23use risingwave_meta_model::ActorId;
24use risingwave_meta_model::streaming_job::BackfillOrders;
25use risingwave_pb::common::WorkerNode;
26use risingwave_pb::hummock::HummockVersionStats;
27use risingwave_pb::id::SourceId;
28use risingwave_pb::stream_service::barrier_complete_response::{
29    PbIcebergV3SinkMetadata, PbListFinishedSource, PbLoadFinishedSource,
30};
31use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
32use risingwave_rpc_client::StreamingControlHandle;
33use thiserror_ext::AsReport;
34
35use crate::barrier::cdc_progress::CdcTableBackfillTracker;
36use crate::barrier::checkpoint::independent_job::BatchRefreshJobTriggerContext;
37use crate::barrier::command::{PostCollectCommand, ResumeBackfillTarget};
38use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
39use crate::barrier::progress::TrackingJob;
40use crate::barrier::schedule::MarkReadyOptions;
41use crate::barrier::{
42    BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, BatchRefreshInfo, Command,
43    CreateStreamingJobCommandInfo, CreateStreamingJobType, DatabaseRuntimeInfoSnapshot,
44    RecoveryReason, ReplaceStreamJobPlan, Scheduled,
45};
46use crate::hummock::CommitEpochInfo;
47use crate::manager::LocalNotification;
48use crate::model::FragmentDownstreamRelation;
49use crate::stream::{SourceChange, cleanup_dropped_streaming_jobs};
50use crate::{MetaError, MetaResult};
51
52impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
53    #[await_tree::instrument]
54    async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
55        self.hummock_manager.commit_epoch(commit_info).await?;
56        Ok(self.hummock_manager.get_version_stats().await)
57    }
58
59    #[await_tree::instrument("next_scheduled_barrier")]
60    async fn next_scheduled(&self) -> Scheduled {
61        self.scheduled_barriers.next_scheduled().await
62    }
63
64    fn abort_and_mark_blocked(
65        &self,
66        database_id: Option<DatabaseId>,
67        recovery_reason: RecoveryReason,
68    ) {
69        if database_id.is_none() {
70            self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
71        }
72
73        // Mark blocked and abort buffered schedules, they might be dirty already.
74        self.scheduled_barriers
75            .abort_and_mark_blocked(database_id, "cluster is under recovering");
76    }
77
78    fn mark_ready(&self, options: MarkReadyOptions) {
79        let is_global = matches!(&options, MarkReadyOptions::Global { .. });
80        self.scheduled_barriers.mark_ready(options);
81        if is_global {
82            self.set_status(BarrierManagerStatus::Running);
83        }
84    }
85
86    #[await_tree::instrument("post_collect_command({command})")]
87    async fn post_collect_command(&self, command: PostCollectCommand) -> MetaResult<()> {
88        command.post_collect(self).await
89    }
90
91    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
92        self.metadata_manager
93            .notify_finish_failed(database_id, err)
94            .await
95    }
96
97    #[await_tree::instrument("finish_creating_job({job})")]
98    async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
99        let job_id = job.job_id();
100        job.finish(&self.metadata_manager, &self.source_manager)
101            .await?;
102        self.env
103            .notification_manager()
104            .notify_local_subscribers(LocalNotification::StreamingJobBackfillFinished(job_id));
105        Ok(())
106    }
107
108    #[await_tree::instrument("finish_cdc_table_backfill({job})")]
109    async fn finish_cdc_table_backfill(&self, job: JobId) -> MetaResult<()> {
110        CdcTableBackfillTracker::mark_complete_job(&self.env.meta_store().conn, job).await
111    }
112
113    #[await_tree::instrument("new_control_stream({})", node.id)]
114    async fn new_control_stream(
115        &self,
116        node: &WorkerNode,
117        init_request: &PbInitRequest,
118    ) -> MetaResult<StreamingControlHandle> {
119        self.new_control_stream_impl(node, init_request).await
120    }
121
122    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
123        self.reload_runtime_info_impl().await
124    }
125
126    async fn reload_database_runtime_info(
127        &self,
128        database_id: DatabaseId,
129    ) -> MetaResult<DatabaseRuntimeInfoSnapshot> {
130        self.reload_database_runtime_info_impl(database_id).await
131    }
132
133    async fn handle_list_finished_source_ids(
134        &self,
135        list_finished: Vec<PbListFinishedSource>,
136    ) -> MetaResult<()> {
137        let mut list_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
138
139        for list_finished in list_finished {
140            let table_id = list_finished.table_id;
141            let associated_source_id = list_finished.associated_source_id;
142            list_finished_info
143                .entry((table_id, associated_source_id))
144                .or_default()
145                .insert(list_finished.reporter_actor_id);
146        }
147
148        for ((table_id, associated_source_id), actors) in list_finished_info {
149            let allow_yield = self
150                .refresh_manager
151                .mark_list_stage_finished(table_id, &actors)?;
152
153            if !allow_yield {
154                continue;
155            }
156
157            let Some(database_id) = self
158                .get_source_database_id_for_refresh_stage(table_id, associated_source_id, "list")
159                .await?
160            else {
161                continue;
162            };
163
164            // Create ListFinish command
165            let list_finish_command = Command::ListFinish {
166                table_id,
167                associated_source_id,
168            };
169
170            // Schedule the command through the barrier system without waiting
171            self.barrier_scheduler
172                .run_command_no_wait(database_id, list_finish_command)
173                .context("Failed to schedule ListFinish command")?;
174
175            tracing::info!(
176                %table_id,
177                %associated_source_id,
178                "ListFinish command scheduled successfully"
179            );
180        }
181        Ok(())
182    }
183
184    async fn handle_load_finished_source_ids(
185        &self,
186        load_finished: Vec<PbLoadFinishedSource>,
187    ) -> MetaResult<()> {
188        let mut load_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
189
190        for load_finished in load_finished {
191            let table_id = load_finished.table_id;
192            let associated_source_id = load_finished.associated_source_id;
193            load_finished_info
194                .entry((table_id, associated_source_id))
195                .or_default()
196                .insert(load_finished.reporter_actor_id);
197        }
198
199        for ((table_id, associated_source_id), actors) in load_finished_info {
200            let allow_yield = self
201                .refresh_manager
202                .mark_load_stage_finished(table_id, &actors)?;
203
204            if !allow_yield {
205                continue;
206            }
207
208            let Some(database_id) = self
209                .get_source_database_id_for_refresh_stage(table_id, associated_source_id, "load")
210                .await?
211            else {
212                continue;
213            };
214
215            // Create LoadFinish command
216            let load_finish_command = Command::LoadFinish {
217                table_id,
218                associated_source_id,
219            };
220
221            // Schedule the command through the barrier system without waiting
222            self.barrier_scheduler
223                .run_command_no_wait(database_id, load_finish_command)
224                .context("Failed to schedule LoadFinish command")?;
225
226            tracing::info!(
227                %table_id,
228                %associated_source_id,
229                "LoadFinish command scheduled successfully"
230            );
231        }
232
233        Ok(())
234    }
235
236    async fn handle_refresh_finished_table_ids(
237        &self,
238        refresh_finished_table_job_ids: Vec<JobId>,
239    ) -> MetaResult<()> {
240        for job_id in refresh_finished_table_job_ids {
241            let table_id = job_id.as_mv_table_id();
242
243            self.refresh_manager.mark_refresh_complete(table_id).await?;
244        }
245
246        Ok(())
247    }
248
249    async fn load_batch_refresh_trigger_context(
250        &self,
251        job_id: JobId,
252        database_id: DatabaseId,
253        last_committed_epoch: u64,
254    ) -> MetaResult<BatchRefreshJobTriggerContext> {
255        self.load_batch_refresh_trigger_context_impl(job_id, database_id, last_committed_epoch)
256            .await
257    }
258
259    #[await_tree::instrument]
260    async fn pre_commit_iceberg_v3_sink_metadata(
261        &self,
262        reports: Vec<PbIcebergV3SinkMetadata>,
263    ) -> MetaResult<Vec<SinkId>> {
264        let grouped = group_v3_reports_by_sink(reports)?;
265        let success_ids: Vec<SinkId> = grouped.keys().cloned().collect();
266        let futs = FuturesUnordered::new();
267        for (sink_id, (prev_epoch, reports)) in grouped {
268            if reports.is_empty() {
269                continue;
270            }
271            let manager = &self.iceberg_v3_sink_manager;
272            futs.push(async move {
273                (
274                    sink_id,
275                    manager
276                        .pre_commit_v3_epoch(sink_id, prev_epoch, reports)
277                        .await,
278                )
279            });
280        }
281
282        // Drain all futures regardless of individual failures, so that no coordinator is left with
283        // state inconsistent vs. the caller's view.
284        let results: Vec<(SinkId, anyhow::Result<()>)> = futs.collect().await;
285        let errs: Vec<(SinkId, anyhow::Error)> = results
286            .into_iter()
287            .filter_map(|(id, r)| r.err().map(|e| (id, e)))
288            .collect();
289        if errs.is_empty() {
290            Ok(success_ids)
291        } else {
292            Err(aggregate_v3_sink_errors("pre-commit", errs).into())
293        }
294    }
295
296    #[await_tree::instrument]
297    async fn commit_iceberg_v3_sink_metadata(&self, sink_ids: Vec<SinkId>) -> MetaResult<()> {
298        let futs = FuturesUnordered::new();
299        for sink_id in sink_ids {
300            let manager = &self.iceberg_v3_sink_manager;
301            futs.push(async move { (sink_id, manager.commit_v3_epoch(sink_id).await) });
302        }
303
304        let results: Vec<(SinkId, anyhow::Result<()>)> = futs.collect().await;
305        let errs: Vec<(SinkId, anyhow::Error)> = results
306            .into_iter()
307            .filter_map(|(id, r)| r.err().map(|e| (id, e)))
308            .collect();
309        if errs.is_empty() {
310            Ok(())
311        } else {
312            Err(aggregate_v3_sink_errors("commit", errs).into())
313        }
314    }
315}
316
317/// Combine per-sink errors from a fan-out into a single `anyhow::Error`. The first failing
318/// sink's error is used as the source so the original chain is preserved; the message lists
319/// every failing `sink_id` and its error stringified.
320fn aggregate_v3_sink_errors(
321    phase: &'static str,
322    mut errs: Vec<(SinkId, anyhow::Error)>,
323) -> anyhow::Error {
324    debug_assert!(!errs.is_empty());
325    let sink_ids: Vec<String> = errs.iter().map(|(id, _)| id.to_string()).collect();
326    let details: Vec<String> = errs
327        .iter()
328        .map(|(id, e)| format!("sink {}: {}", id, e.as_report()))
329        .collect();
330    // Preserve the first error's chain as the cause.
331    let (_, first_err) = errs.remove(0);
332    first_err.context(format!(
333        "iceberg v3 sink {} failed for sink_id(s) [{}]: {}",
334        phase,
335        sink_ids.join(", "),
336        details.join("; ")
337    ))
338}
339
340fn group_v3_reports_by_sink(
341    reports: Vec<PbIcebergV3SinkMetadata>,
342) -> MetaResult<HashMap<SinkId, (u64, Vec<PbIcebergV3SinkMetadata>)>> {
343    let mut grouped: HashMap<SinkId, (u64, Vec<PbIcebergV3SinkMetadata>)> = HashMap::new();
344    for r in reports {
345        let sink_id = r.sink_id;
346        let prev_epoch = r.prev_epoch;
347        let entry = grouped.entry(sink_id).or_insert((prev_epoch, Vec::new()));
348        if entry.0 != prev_epoch {
349            return Err(anyhow::anyhow!(
350                "iceberg v3 sink {} reports disagree on prev_epoch: {} vs {}",
351                sink_id,
352                entry.0,
353                prev_epoch
354            )
355            .into());
356        }
357        entry.1.push(r);
358    }
359    Ok(grouped)
360}
361
362impl GlobalBarrierWorkerContextImpl {
363    async fn get_source_database_id_for_refresh_stage(
364        &self,
365        table_id: TableId,
366        associated_source_id: SourceId,
367        stage: &'static str,
368    ) -> MetaResult<Option<DatabaseId>> {
369        match self
370            .metadata_manager
371            .catalog_controller
372            .get_object_database_id(associated_source_id)
373            .await
374        {
375            Ok(database_id) => Ok(Some(database_id)),
376            Err(err) if err.is_catalog_id_not_found("object") => {
377                tracing::warn!(
378                    %table_id,
379                    %associated_source_id,
380                    stage,
381                    "skip refresh finish command because associated source is already dropped"
382                );
383                Ok(None)
384            }
385            Err(err) => Err(err)
386                .with_context(|| {
387                    format!(
388                        "failed to get database id for refresh stage: table_id={}, associated_source_id={}, stage={stage}",
389                        table_id, associated_source_id
390                    )
391                })
392                .map_err(Into::into),
393        }
394    }
395
396    fn set_status(&self, new_status: BarrierManagerStatus) {
397        self.status.store(Arc::new(new_status));
398    }
399
400    /// Load the context metadata and resolve upstream log epochs for a batch refresh trigger.
401    async fn load_batch_refresh_trigger_context_impl(
402        &self,
403        job_id: JobId,
404        database_id: DatabaseId,
405        last_committed_epoch: u64,
406    ) -> MetaResult<BatchRefreshJobTriggerContext> {
407        use itertools::Itertools;
408        use sea_orm::TransactionTrait;
409
410        use crate::controller::scale::load_fragment_context_for_jobs;
411
412        // Load metadata from the catalog under a single transaction.
413        let inner = self
414            .metadata_manager
415            .catalog_controller
416            .get_inner_read_guard()
417            .await;
418        let txn = inner.db.begin().await?;
419
420        // 1. Load fragment context (job model, database model).
421        let fragment_context =
422            load_fragment_context_for_jobs(&txn, HashSet::from([job_id])).await?;
423
424        let streaming_job_model = fragment_context
425            .job_map
426            .get(&job_id)
427            .ok_or_else(|| anyhow::anyhow!("streaming job model not found for job {}", job_id))?
428            .clone();
429
430        let database_model = fragment_context
431            .database_map
432            .get(&database_id)
433            .ok_or_else(|| {
434                anyhow::anyhow!("database model not found for database {}", database_id)
435            })?;
436        let database_resource_group = database_model.resource_group.clone();
437
438        // 2. Load job definition.
439        let mut job_extra_info = self
440            .metadata_manager
441            .catalog_controller
442            .get_streaming_job_extra_info_in_txn(&txn, vec![job_id])
443            .await?;
444        let definition = job_extra_info
445            .remove(&job_id)
446            .ok_or_else(|| anyhow::anyhow!("extra info not found for job {}", job_id))?
447            .job_definition;
448
449        // 3. Get fragments from fragment_context and load downstream relations.
450        let fragments = fragment_context
451            .job_fragments
452            .get(&job_id)
453            .ok_or_else(|| anyhow::anyhow!("fragments not found for job {}", job_id))?
454            .clone();
455
456        // Derive upstream table IDs from the snapshot backfill scan nodes in the fragments.
457        let upstream_table_ids: HashSet<TableId> = {
458            use crate::stream::StreamFragmentGraph;
459            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
460                fragments.values().map(|f| (&f.nodes, f.fragment_type_mask)),
461            )?
462            .0
463            .ok_or_else(|| {
464                anyhow::anyhow!("batch refresh job {} has no snapshot backfill info", job_id)
465            })?;
466            snapshot_backfill_info
467                .upstream_mv_table_id_to_backfill_epoch
468                .into_keys()
469                .collect()
470        };
471
472        let fragment_ids: Vec<_> = fragments.keys().copied().collect();
473        let downstreams = self
474            .metadata_manager
475            .catalog_controller
476            .get_fragment_downstream_relations_in_txn(&txn, fragment_ids)
477            .await?;
478
479        txn.commit().await?;
480        drop(inner);
481
482        // Resolve upstream log epochs from the hummock changelog.
483        let (upstream_table_log_epochs, target_upstream_epoch) = self
484            .hummock_manager
485            .on_current_version_and_table_change_log(|version, table_change_log| {
486                let mut target_upstream_epoch = last_committed_epoch;
487                let mut log_epochs: HashMap<TableId, Vec<(Vec<u64>, u64)>> = HashMap::new();
488
489                for &upstream_table_id in &upstream_table_ids {
490                    let upstream_committed_epoch = version
491                        .state_table_info
492                        .info()
493                        .get(&upstream_table_id)
494                        .map(|info| info.committed_epoch)
495                        .ok_or_else(|| {
496                            anyhow::anyhow!(
497                                "cannot get committed epoch for upstream table {}",
498                                upstream_table_id
499                            )
500                        })?;
501
502                    target_upstream_epoch =
503                        std::cmp::max(target_upstream_epoch, upstream_committed_epoch);
504
505                    if upstream_committed_epoch <= last_committed_epoch {
506                        continue;
507                    }
508
509                    if let Some(change_log) = table_change_log.get(&upstream_table_id) {
510                        let epochs = change_log
511                            .filter_epoch((last_committed_epoch, upstream_committed_epoch))
512                            .map(|epoch_log| {
513                                (
514                                    epoch_log.non_checkpoint_epochs.clone(),
515                                    epoch_log.checkpoint_epoch,
516                                )
517                            })
518                            .collect_vec();
519                        if !epochs.is_empty() {
520                            log_epochs.insert(upstream_table_id, epochs);
521                        }
522                    } else {
523                        anyhow::bail!(
524                            "upstream table {} has lagged downstream on epoch {} but no table change log (upstream committed: {})",
525                            upstream_table_id,
526                            last_committed_epoch,
527                            upstream_committed_epoch,
528                        );
529                    }
530                }
531
532                Ok((log_epochs, target_upstream_epoch))
533            })
534            .await?;
535
536        Ok(BatchRefreshJobTriggerContext {
537            fragments,
538            downstreams,
539            streaming_job_model,
540            definition,
541            database_resource_group,
542            upstream_table_log_epochs,
543            target_upstream_epoch,
544        })
545    }
546}
547
548impl PostCollectCommand {
549    /// Do some stuffs after barriers are collected and the new storage version is committed, for
550    /// the given command.
551    pub async fn post_collect(
552        self,
553        barrier_manager_context: &GlobalBarrierWorkerContextImpl,
554    ) -> MetaResult<()> {
555        match self {
556            PostCollectCommand::Command(_) => {}
557            PostCollectCommand::SourceChangeSplit {
558                split_assignment: assignment,
559            } => {
560                barrier_manager_context
561                    .metadata_manager
562                    .update_fragment_splits(&assignment)
563                    .await?;
564            }
565
566            PostCollectCommand::DropStreamingJobs => {}
567            PostCollectCommand::ConnectorPropsChange(obj_id_map_props) => {
568                // todo: we dont know the type of the object id, it can be a source or a sink. Should carry more info in the barrier command.
569                barrier_manager_context
570                    .source_manager
571                    .apply_source_change(SourceChange::UpdateSourceProps {
572                        // Only sources are managed in source manager. Convert object IDs to source IDs and let
573                        // source manager ignore unknown/unregistered sources.
574                        source_id_map_new_props: obj_id_map_props
575                            .iter()
576                            .map(|(object_id, props)| (object_id.as_source_id(), props.clone()))
577                            .collect(),
578                    })
579                    .await;
580            }
581            PostCollectCommand::ResumeBackfill { target } => match target {
582                ResumeBackfillTarget::Job(job_id) => {
583                    barrier_manager_context
584                        .metadata_manager
585                        .catalog_controller
586                        .update_backfill_orders_by_job_id(job_id, None)
587                        .await?;
588                }
589                ResumeBackfillTarget::Fragment(fragment_id) => {
590                    let mut job_ids = barrier_manager_context
591                        .metadata_manager
592                        .catalog_controller
593                        .get_fragment_job_id(vec![fragment_id])
594                        .await?;
595                    let job_id = job_ids.pop().ok_or_else(|| {
596                        MetaError::invalid_parameter("fragment not found".to_owned())
597                    })?;
598                    let job_id = JobId::new(job_id.as_raw_id());
599
600                    let extra_info = barrier_manager_context
601                        .metadata_manager
602                        .catalog_controller
603                        .get_streaming_job_extra_info(vec![job_id])
604                        .await?;
605                    let mut backfill_orders: BackfillOrders = extra_info
606                        .get(&job_id)
607                        .cloned()
608                        .ok_or_else(|| MetaError::invalid_parameter("job not found".to_owned()))?
609                        .backfill_orders
610                        .unwrap_or_default();
611
612                    let resumed_fragment_id = fragment_id.as_raw_id();
613                    for children in backfill_orders.0.values_mut() {
614                        children.retain(|child| *child != resumed_fragment_id);
615                    }
616                    backfill_orders.0.retain(|_, children| !children.is_empty());
617
618                    barrier_manager_context
619                        .metadata_manager
620                        .catalog_controller
621                        .update_backfill_orders_by_job_id(job_id, Some(backfill_orders))
622                        .await?;
623                }
624            },
625            PostCollectCommand::CreateStreamingJob {
626                info,
627                job_type,
628                cross_db_snapshot_backfill_info,
629                resolved_split_assignment,
630            } => {
631                match &job_type {
632                    CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
633                        barrier_manager_context
634                            .metadata_manager
635                            .catalog_controller
636                            .fill_snapshot_backfill_epoch(
637                                info.stream_job_fragments.fragments.iter().filter_map(
638                                    |(fragment_id, fragment)| {
639                                        if fragment.fragment_type_mask.contains(
640                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
641                                        ) {
642                                            Some(*fragment_id as _)
643                                        } else {
644                                            None
645                                        }
646                                    },
647                                ),
648                                None,
649                                &cross_db_snapshot_backfill_info,
650                            )
651                            .await?
652                    }
653                    CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
654                    | CreateStreamingJobType::BatchRefresh(BatchRefreshInfo {
655                        snapshot_backfill_info,
656                        ..
657                    }) => {
658                        barrier_manager_context
659                            .metadata_manager
660                            .catalog_controller
661                            .fill_snapshot_backfill_epoch(
662                                info.stream_job_fragments.fragments.iter().filter_map(
663                                    |(fragment_id, fragment)| {
664                                        if fragment.fragment_type_mask.contains_any([
665                                            FragmentTypeFlag::SnapshotBackfillStreamScan,
666                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
667                                        ]) {
668                                            Some(*fragment_id as _)
669                                        } else {
670                                            None
671                                        }
672                                    },
673                                ),
674                                Some(snapshot_backfill_info),
675                                &cross_db_snapshot_backfill_info,
676                            )
677                            .await?
678                    }
679                }
680
681                // Do `post_collect_job_fragments` of the original streaming job in the end, so that in any previous failure,
682                // we won't mark the job as `Creating`, and then the job will be later clean by the recovery triggered by the returned error.
683                let CreateStreamingJobCommandInfo {
684                    stream_job_fragments,
685                    upstream_fragment_downstreams,
686                    ..
687                } = info;
688                let new_sink_downstream =
689                    if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
690                        let new_downstreams = ctx.new_sink_downstream.clone();
691                        let new_downstreams = FragmentDownstreamRelation::from([(
692                            ctx.sink_fragment_id,
693                            vec![new_downstreams],
694                        )]);
695                        Some(new_downstreams)
696                    } else {
697                        None
698                    };
699
700                barrier_manager_context
701                    .metadata_manager
702                    .catalog_controller
703                    .post_collect_job_fragments(
704                        stream_job_fragments.stream_job_id(),
705                        &upstream_fragment_downstreams,
706                        new_sink_downstream,
707                        Some(&resolved_split_assignment),
708                    )
709                    .await?;
710
711                let source_change = SourceChange::CreateJob {
712                    added_source_fragments: stream_job_fragments.stream_source_fragments(),
713                    added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
714                };
715
716                barrier_manager_context
717                    .source_manager
718                    .apply_source_change(source_change)
719                    .await;
720            }
721            PostCollectCommand::Reschedule { reschedules, .. } => {
722                let fragment_splits = reschedules
723                    .iter()
724                    .map(|(fragment_id, reschedule)| {
725                        (*fragment_id, reschedule.actor_splits.clone())
726                    })
727                    .collect();
728
729                barrier_manager_context
730                    .metadata_manager
731                    .update_fragment_splits(&fragment_splits)
732                    .await?;
733            }
734
735            PostCollectCommand::ReplaceStreamJob {
736                plan: replace_plan,
737                resolved_split_assignment,
738            } => {
739                let ReplaceStreamJobPlan {
740                    old_fragments,
741                    new_fragments,
742                    upstream_fragment_downstreams,
743                    to_drop_state_table_ids,
744                    auto_refresh_schema_sinks,
745                    ..
746                } = &replace_plan;
747                // Update actors and actor_dispatchers for new table fragments.
748                barrier_manager_context
749                    .metadata_manager
750                    .catalog_controller
751                    .post_collect_job_fragments(
752                        new_fragments.stream_job_id,
753                        upstream_fragment_downstreams,
754                        None,
755                        Some(&resolved_split_assignment),
756                    )
757                    .await?;
758
759                if let Some(sinks) = auto_refresh_schema_sinks {
760                    for sink in sinks {
761                        barrier_manager_context
762                            .metadata_manager
763                            .catalog_controller
764                            .post_collect_job_fragments(
765                                sink.tmp_sink_id.as_job_id(),
766                                &Default::default(), // upstream_fragment_downstreams is already inserted in the job of upstream table
767                                None, // no replace plan
768                                None, // no init split assignment
769                            )
770                            .await?;
771                    }
772                }
773
774                // Apply the split changes in source manager.
775                barrier_manager_context
776                    .source_manager
777                    .handle_replace_job(
778                        old_fragments,
779                        new_fragments.stream_source_fragments(),
780                        &replace_plan,
781                    )
782                    .await;
783                cleanup_dropped_streaming_jobs(
784                    &barrier_manager_context.refresh_manager,
785                    &barrier_manager_context.hummock_manager,
786                    &barrier_manager_context.metadata_manager,
787                    [],
788                    to_drop_state_table_ids.clone(),
789                    "replace_streaming_job",
790                )
791                .await?;
792            }
793
794            PostCollectCommand::CreateSubscription { subscription_id } => {
795                barrier_manager_context
796                    .metadata_manager
797                    .catalog_controller
798                    .finish_create_subscription_catalog(subscription_id)
799                    .await?
800            }
801        }
802
803        Ok(())
804    }
805}
806
807#[cfg(test)]
808mod tests {
809    use super::*;
810
811    #[test]
812    fn test_skip_refresh_finish_when_associated_source_missing() {
813        let err = MetaError::catalog_id_not_found("object", 42);
814        assert!(err.is_catalog_id_not_found("object"));
815    }
816
817    #[test]
818    fn test_do_not_skip_refresh_finish_for_other_not_found_types() {
819        let err = MetaError::catalog_id_not_found("table", 42);
820        assert!(!err.is_catalog_id_not_found("object"));
821    }
822}