risingwave_meta/stream/
stream_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::iter;
17use std::sync::Arc;
18
19use await_tree::span;
20use futures::future::join_all;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::id::{JobId, SinkId};
26use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
27use risingwave_meta_model::prelude::Fragment as FragmentModel;
28use risingwave_meta_model::{StreamingParallelism, fragment};
29use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
30use risingwave_pb::expr::PbExprNode;
31use risingwave_pb::meta::table_fragments::ActorStatus;
32use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
33use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
34use thiserror_ext::AsReport;
35use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
36use tracing::Instrument;
37
38use super::{FragmentBackfillOrder, Locations, ReschedulePolicy, ScaleControllerRef};
39use crate::barrier::{
40    BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
41    ReplaceStreamJobPlan, SnapshotBackfillInfo,
42};
43use crate::controller::catalog::DropTableConnectorContext;
44use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
45use crate::error::bail_invalid_parameter;
46use crate::manager::{
47    MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
48};
49use crate::model::{
50    ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
51    FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate,
52    SubscriptionId,
53};
54use crate::stream::SourceManagerRef;
55use crate::stream::cdc::{
56    assign_cdc_table_snapshot_splits, is_parallelized_backfill_enabled_cdc_scan_fragment,
57};
58use crate::{MetaError, MetaResult};
59
60pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
61
62#[derive(Default)]
63pub struct CreateStreamingJobOption {
64    // leave empty as a placeholder for future option if there is any
65}
66
67#[derive(Debug, Clone)]
68pub struct UpstreamSinkInfo {
69    pub sink_id: SinkId,
70    pub sink_fragment_id: FragmentId,
71    pub sink_output_fields: Vec<PbField>,
72    // for backwards compatibility
73    pub sink_original_target_columns: Vec<PbColumnCatalog>,
74    pub project_exprs: Vec<PbExprNode>,
75    pub new_sink_downstream: DownstreamFragmentRelation,
76}
77
78/// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job.
79///
80/// Note: for better readability, keep this struct complete and immutable once created.
81pub struct CreateStreamingJobContext {
82    /// New fragment relation to add from upstream fragments to downstream fragments.
83    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
84    pub new_no_shuffle: FragmentNewNoShuffle,
85    pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
86
87    /// The locations of the actors to build in the streaming job.
88    pub building_locations: Locations,
89
90    /// DDL definition.
91    pub definition: String,
92
93    pub create_type: CreateType,
94
95    pub job_type: StreamingJobType,
96
97    /// Used for sink-into-table.
98    pub new_upstream_sink: Option<UpstreamSinkInfo>,
99
100    pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
101    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
102
103    pub option: CreateStreamingJobOption,
104
105    pub streaming_job: StreamingJob,
106
107    pub fragment_backfill_ordering: FragmentBackfillOrder,
108
109    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
110}
111
112struct StreamingJobExecution {
113    id: JobId,
114    shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
115    _permit: OwnedSemaphorePermit,
116}
117
118impl StreamingJobExecution {
119    fn new(
120        id: JobId,
121        shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
122        permit: OwnedSemaphorePermit,
123    ) -> Self {
124        Self {
125            id,
126            shutdown_tx: Some(shutdown_tx),
127            _permit: permit,
128        }
129    }
130}
131
132#[derive(Default)]
133struct CreatingStreamingJobInfo {
134    streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
135}
136
137impl CreatingStreamingJobInfo {
138    async fn add_job(&self, job: StreamingJobExecution) {
139        let mut jobs = self.streaming_jobs.lock().await;
140        jobs.insert(job.id, job);
141    }
142
143    async fn delete_job(&self, job_id: JobId) {
144        let mut jobs = self.streaming_jobs.lock().await;
145        jobs.remove(&job_id);
146    }
147
148    async fn cancel_jobs(
149        &self,
150        job_ids: Vec<JobId>,
151    ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
152        let mut jobs = self.streaming_jobs.lock().await;
153        let mut receivers = HashMap::new();
154        let mut background_job_ids = vec![];
155        for job_id in job_ids {
156            if let Some(job) = jobs.get_mut(&job_id) {
157                if let Some(shutdown_tx) = job.shutdown_tx.take() {
158                    let (tx, rx) = oneshot::channel();
159                    match shutdown_tx.send(tx) {
160                        Ok(()) => {
161                            receivers.insert(job_id, rx);
162                        }
163                        Err(_) => {
164                            return Err(anyhow::anyhow!(
165                                "failed to send shutdown signal for streaming job {}: receiver dropped",
166                                job_id
167                            )
168                            .into());
169                        }
170                    }
171                }
172            } else {
173                // If these job ids do not exist in streaming_jobs, they should be background creating jobs.
174                background_job_ids.push(job_id);
175            }
176        }
177
178        Ok((receivers, background_job_ids))
179    }
180}
181
182type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
183
184#[derive(Debug, Clone)]
185pub struct AutoRefreshSchemaSinkContext {
186    pub tmp_sink_id: SinkId,
187    pub original_sink: PbSink,
188    pub original_fragment: Fragment,
189    pub new_schema: Vec<PbColumnCatalog>,
190    pub newly_add_fields: Vec<Field>,
191    pub new_fragment: Fragment,
192    pub new_log_store_table: Option<PbTable>,
193    pub actor_status: BTreeMap<ActorId, ActorStatus>,
194}
195
196impl AutoRefreshSchemaSinkContext {
197    pub fn new_fragment_info(&self) -> InflightFragmentInfo {
198        InflightFragmentInfo {
199            fragment_id: self.new_fragment.fragment_id,
200            distribution_type: self.new_fragment.distribution_type.into(),
201            fragment_type_mask: self.new_fragment.fragment_type_mask,
202            vnode_count: self.new_fragment.vnode_count(),
203            nodes: self.new_fragment.nodes.clone(),
204            actors: self
205                .new_fragment
206                .actors
207                .iter()
208                .map(|actor| {
209                    (
210                        actor.actor_id as _,
211                        InflightActorInfo {
212                            worker_id: self.actor_status[&actor.actor_id]
213                                .location
214                                .as_ref()
215                                .unwrap()
216                                .worker_node_id,
217                            vnode_bitmap: actor.vnode_bitmap.clone(),
218                            splits: vec![],
219                        },
220                    )
221                })
222                .collect(),
223            state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
224        }
225    }
226}
227
228/// [`ReplaceStreamJobContext`] carries one-time infos for replacing the plan of an existing stream job.
229///
230/// Note: for better readability, keep this struct complete and immutable once created.
231pub struct ReplaceStreamJobContext {
232    /// The old job fragments to be replaced.
233    pub old_fragments: StreamJobFragments,
234
235    /// The updates to be applied to the downstream chain actors. Used for schema change.
236    pub replace_upstream: FragmentReplaceUpstream,
237    pub new_no_shuffle: FragmentNewNoShuffle,
238
239    /// New fragment relation to add from existing upstream fragment to downstream fragment.
240    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
241
242    /// The locations of the actors to build in the new job to replace.
243    pub building_locations: Locations,
244
245    pub streaming_job: StreamingJob,
246
247    pub tmp_id: JobId,
248
249    /// Used for dropping an associated source. Dropping source and related internal tables.
250    pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
251
252    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
253}
254
255/// `GlobalStreamManager` manages all the streams in the system.
256pub struct GlobalStreamManager {
257    pub env: MetaSrvEnv,
258
259    pub metadata_manager: MetadataManager,
260
261    /// Broadcasts and collect barriers
262    pub barrier_scheduler: BarrierScheduler,
263
264    /// Maintains streaming sources from external system like kafka
265    pub source_manager: SourceManagerRef,
266
267    /// Creating streaming job info.
268    creating_job_info: CreatingStreamingJobInfoRef,
269
270    pub scale_controller: ScaleControllerRef,
271}
272
273impl GlobalStreamManager {
274    pub fn new(
275        env: MetaSrvEnv,
276        metadata_manager: MetadataManager,
277        barrier_scheduler: BarrierScheduler,
278        source_manager: SourceManagerRef,
279        scale_controller: ScaleControllerRef,
280    ) -> MetaResult<Self> {
281        Ok(Self {
282            env,
283            metadata_manager,
284            barrier_scheduler,
285            source_manager,
286            creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
287            scale_controller,
288        })
289    }
290
291    /// Create streaming job, it works as follows:
292    ///
293    /// 1. Broadcast the actor info based on the scheduling result in the context, build the hanging
294    ///    channels in upstream worker nodes.
295    /// 2. (optional) Get the split information of the `StreamSource` via source manager and patch
296    ///    actors.
297    /// 3. Notify related worker nodes to update and build the actors.
298    /// 4. Store related meta data.
299    ///
300    /// This function is a wrapper over [`Self::run_create_streaming_job_command`].
301    #[await_tree::instrument]
302    pub async fn create_streaming_job(
303        self: &Arc<Self>,
304        stream_job_fragments: StreamJobFragmentsToCreate,
305        ctx: CreateStreamingJobContext,
306        permit: OwnedSemaphorePermit,
307    ) -> MetaResult<NotificationVersion> {
308        let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
309        let await_tree_span = span!(
310            "{:?}({})",
311            ctx.streaming_job.job_type(),
312            ctx.streaming_job.name()
313        );
314
315        let job_id = stream_job_fragments.stream_job_id();
316        let database_id = ctx.streaming_job.database_id();
317
318        let (cancel_tx, cancel_rx) = oneshot::channel();
319        let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
320        self.creating_job_info.add_job(execution).await;
321
322        let stream_manager = self.clone();
323        let fut = async move {
324            let create_type = ctx.create_type;
325            let streaming_job = stream_manager
326                .run_create_streaming_job_command(stream_job_fragments, ctx)
327                .await?;
328            let version = match create_type {
329                CreateType::Background => {
330                    stream_manager
331                        .env
332                        .notification_manager_ref()
333                        .current_version()
334                        .await
335                }
336                CreateType::Foreground => {
337                    stream_manager
338                        .metadata_manager
339                        .wait_streaming_job_finished(database_id, streaming_job.id() as _)
340                        .await?
341                }
342                CreateType::Unspecified => unreachable!(),
343            };
344
345            tracing::debug!(?streaming_job, "stream job finish");
346            Ok(version)
347        }
348        .in_current_span();
349
350        let create_fut = (self.env.await_tree_reg())
351            .register(await_tree_key, await_tree_span)
352            .instrument(Box::pin(fut));
353
354        let result = tokio::select! {
355            biased;
356
357            res = create_fut => res,
358            notifier = cancel_rx => {
359                let notifier = notifier.expect("sender should not be dropped");
360                tracing::debug!(id=%job_id, "cancelling streaming job");
361
362                if let Ok(job_fragments) = self.metadata_manager.get_job_fragments_by_id(job_id)
363                    .await {
364                    // try to cancel buffered creating command.
365                    if self.barrier_scheduler.try_cancel_scheduled_create(database_id, job_id) {
366                        tracing::debug!("cancelling streaming job {job_id} in buffer queue.");
367                    } else if !job_fragments.is_created() {
368                        tracing::debug!("cancelling streaming job {job_id} by issue cancel command.");
369
370                        let cancel_command = self.metadata_manager.catalog_controller
371                            .build_cancel_command(&job_fragments)
372                            .await?;
373                        self.metadata_manager.catalog_controller
374                            .try_abort_creating_streaming_job(job_id, true)
375                            .await?;
376
377                        self.barrier_scheduler.run_command(database_id, cancel_command).await?;
378                    } else {
379                        // streaming job is already completed
380                        let _ = notifier.send(false).inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
381                        return self.metadata_manager.wait_streaming_job_finished(database_id, job_id).await;
382                    }
383                }
384                notifier.send(true).expect("receiver should not be dropped");
385                Err(MetaError::cancelled("create"))
386            }
387        };
388
389        tracing::info!("cleaning creating job info: {}", job_id);
390        self.creating_job_info.delete_job(job_id).await;
391        result
392    }
393
394    /// The function will return after barrier collected
395    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
396    #[await_tree::instrument]
397    async fn run_create_streaming_job_command(
398        &self,
399        stream_job_fragments: StreamJobFragmentsToCreate,
400        CreateStreamingJobContext {
401            streaming_job,
402            upstream_fragment_downstreams,
403            new_no_shuffle,
404            upstream_actors,
405            definition,
406            create_type,
407            job_type,
408            new_upstream_sink,
409            snapshot_backfill_info,
410            cross_db_snapshot_backfill_info,
411            fragment_backfill_ordering,
412            locality_fragment_state_table_mapping,
413            ..
414        }: CreateStreamingJobContext,
415    ) -> MetaResult<StreamingJob> {
416        tracing::debug!(
417            table_id = %stream_job_fragments.stream_job_id(),
418            "built actors finished"
419        );
420
421        // Here we need to consider:
422        // - Shared source
423        // - Table with connector
424        // - MV on shared source
425        let mut init_split_assignment = self
426            .source_manager
427            .allocate_splits(&stream_job_fragments)
428            .await?;
429
430        init_split_assignment.extend(
431            self.source_manager
432                .allocate_splits_for_backfill(
433                    &stream_job_fragments,
434                    &new_no_shuffle,
435                    &upstream_actors,
436                )
437                .await?,
438        );
439
440        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
441            stream_job_fragments.stream_job_id,
442            &stream_job_fragments,
443            self.env.meta_store_ref(),
444        )
445        .await?;
446        let cdc_table_snapshot_split_assignment = if !cdc_table_snapshot_split_assignment.is_empty()
447        {
448            self.env.cdc_table_backfill_tracker.track_new_job(
449                stream_job_fragments.stream_job_id,
450                cdc_table_snapshot_split_assignment
451                    .values()
452                    .map(|s| u64::try_from(s.len()).unwrap())
453                    .sum(),
454            );
455            self.env
456                .cdc_table_backfill_tracker
457                .add_fragment_table_mapping(
458                    stream_job_fragments
459                        .fragments
460                        .values()
461                        .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
462                        .map(|f| f.fragment_id),
463                    stream_job_fragments.stream_job_id,
464                );
465            CdcTableSnapshotSplitAssignmentWithGeneration::new(
466                cdc_table_snapshot_split_assignment,
467                self.env
468                    .cdc_table_backfill_tracker
469                    .next_generation(iter::once(stream_job_fragments.stream_job_id)),
470            )
471        } else {
472            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
473        };
474
475        let info = CreateStreamingJobCommandInfo {
476            stream_job_fragments,
477            upstream_fragment_downstreams,
478            init_split_assignment,
479            definition: definition.clone(),
480            streaming_job: streaming_job.clone(),
481            job_type,
482            create_type,
483            fragment_backfill_ordering,
484            cdc_table_snapshot_split_assignment,
485            locality_fragment_state_table_mapping,
486        };
487
488        let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
489            tracing::debug!(
490                ?snapshot_backfill_info,
491                "sending Command::CreateSnapshotBackfillStreamingJob"
492            );
493            CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
494        } else {
495            tracing::debug!("sending Command::CreateStreamingJob");
496            if let Some(new_upstream_sink) = new_upstream_sink {
497                CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
498            } else {
499                CreateStreamingJobType::Normal
500            }
501        };
502
503        let command = Command::CreateStreamingJob {
504            info,
505            job_type,
506            cross_db_snapshot_backfill_info,
507        };
508
509        self.barrier_scheduler
510            .run_command(streaming_job.database_id(), command)
511            .await?;
512
513        tracing::debug!(?streaming_job, "first barrier collected for stream job");
514
515        Ok(streaming_job)
516    }
517
518    /// Send replace job command to barrier scheduler.
519    pub async fn replace_stream_job(
520        &self,
521        new_fragments: StreamJobFragmentsToCreate,
522        ReplaceStreamJobContext {
523            old_fragments,
524            replace_upstream,
525            new_no_shuffle,
526            upstream_fragment_downstreams,
527            tmp_id,
528            streaming_job,
529            drop_table_connector_ctx,
530            auto_refresh_schema_sinks,
531            ..
532        }: ReplaceStreamJobContext,
533    ) -> MetaResult<()> {
534        let init_split_assignment = if streaming_job.is_source() {
535            self.source_manager
536                .allocate_splits_for_replace_source(
537                    &new_fragments,
538                    &replace_upstream,
539                    &new_no_shuffle,
540                )
541                .await?
542        } else {
543            self.source_manager.allocate_splits(&new_fragments).await?
544        };
545        tracing::info!(
546            "replace_stream_job - allocate split: {:?}",
547            init_split_assignment
548        );
549
550        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
551            old_fragments.stream_job_id,
552            &new_fragments.inner,
553            self.env.meta_store_ref(),
554        )
555        .await?;
556
557        self.barrier_scheduler
558            .run_command(
559                streaming_job.database_id(),
560                Command::ReplaceStreamJob(ReplaceStreamJobPlan {
561                    old_fragments,
562                    new_fragments,
563                    replace_upstream,
564                    upstream_fragment_downstreams,
565                    init_split_assignment,
566                    streaming_job,
567                    tmp_id,
568                    to_drop_state_table_ids: {
569                        if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
570                            vec![drop_table_connector_ctx.to_remove_state_table_id]
571                        } else {
572                            Vec::new()
573                        }
574                    },
575                    auto_refresh_schema_sinks,
576                    cdc_table_snapshot_split_assignment,
577                }),
578            )
579            .await?;
580
581        Ok(())
582    }
583
584    /// Drop streaming jobs by barrier manager, and clean up all related resources. The error will
585    /// be ignored because the recovery process will take over it in cleaning part. Check
586    /// [`Command::DropStreamingJobs`] for details.
587    pub async fn drop_streaming_jobs(
588        &self,
589        database_id: DatabaseId,
590        removed_actors: Vec<ActorId>,
591        streaming_job_ids: Vec<JobId>,
592        state_table_ids: Vec<TableId>,
593        fragment_ids: HashSet<FragmentId>,
594        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
595    ) {
596        if !removed_actors.is_empty()
597            || !streaming_job_ids.is_empty()
598            || !state_table_ids.is_empty()
599        {
600            let _ = self
601                .barrier_scheduler
602                .run_command(
603                    database_id,
604                    Command::DropStreamingJobs {
605                        streaming_job_ids: streaming_job_ids.into_iter().collect(),
606                        actors: removed_actors,
607                        unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
608                        unregistered_fragment_ids: fragment_ids,
609                        dropped_sink_fragment_by_targets,
610                    },
611                )
612                .await
613                .inspect_err(|err| {
614                    tracing::error!(error = ?err.as_report(), "failed to run drop command");
615                });
616        }
617    }
618
619    /// Cancel streaming jobs and return the canceled table ids.
620    /// 1. Send cancel message to stream jobs (via `cancel_jobs`).
621    /// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
622    ///
623    /// Cleanup of their state will be cleaned up after the `CancelStreamJob` command succeeds,
624    /// by the barrier manager for both of them.
625    pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
626        if job_ids.is_empty() {
627            return Ok(vec![]);
628        }
629
630        let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
631        let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
632
633        let futures = receivers.into_iter().map(|(id, receiver)| async move {
634            if let Ok(cancelled) = receiver.await
635                && cancelled
636            {
637                tracing::info!("canceled streaming job {id}");
638                Ok(id)
639            } else {
640                Err(MetaError::from(anyhow::anyhow!(
641                    "failed to cancel streaming job {id}"
642                )))
643            }
644        });
645        let mut cancelled_ids = join_all(futures)
646            .await
647            .into_iter()
648            .collect::<MetaResult<Vec<_>>>()?;
649
650        // NOTE(kwannoel): For background_job_ids stream jobs that not tracked in streaming manager,
651        // we can directly cancel them by running the barrier command.
652        let futures = background_job_ids.into_iter().map(|id| async move {
653            let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
654            if fragment.is_created() {
655                tracing::warn!(
656                    "streaming job {} is already created, ignore cancel request",
657                    id
658                );
659                return Ok(None);
660            }
661            if fragment.is_created() {
662                Err(MetaError::invalid_parameter(format!(
663                    "streaming job {} is already created",
664                    id
665                )))?;
666            }
667
668            let cancel_command = self
669                .metadata_manager
670                .catalog_controller
671                .build_cancel_command(&fragment)
672                .await?;
673
674            let (_, database_id) = self
675                .metadata_manager
676                .catalog_controller
677                .try_abort_creating_streaming_job(id, true)
678                .await?;
679
680            if let Some(database_id) = database_id {
681                self.barrier_scheduler
682                    .run_command(database_id, cancel_command)
683                    .await?;
684            }
685
686            tracing::info!(?id, "cancelled background streaming job");
687            Ok(Some(id))
688        });
689        let cancelled_recovered_ids = join_all(futures)
690            .await
691            .into_iter()
692            .collect::<MetaResult<Vec<_>>>()?;
693
694        cancelled_ids.extend(cancelled_recovered_ids.into_iter().flatten());
695        Ok(cancelled_ids)
696    }
697
698    pub(crate) async fn reschedule_streaming_job(
699        &self,
700        job_id: JobId,
701        policy: ReschedulePolicy,
702        deferred: bool,
703    ) -> MetaResult<()> {
704        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
705
706        let background_jobs = self
707            .metadata_manager
708            .list_background_creating_jobs()
709            .await?;
710
711        if !background_jobs.is_empty() {
712            let related_jobs = self
713                .scale_controller
714                .resolve_related_no_shuffle_jobs(&background_jobs)
715                .await?;
716
717            if related_jobs.contains(&job_id) {
718                bail!(
719                    "Cannot alter the job {} because the related job {:?} is currently being created",
720                    job_id,
721                    background_jobs,
722                );
723            }
724        }
725
726        let worker_nodes = self
727            .metadata_manager
728            .list_active_streaming_compute_nodes()
729            .await?
730            .into_iter()
731            .filter(|w| w.is_streaming_schedulable())
732            .collect_vec();
733        let workers = worker_nodes.into_iter().map(|x| (x.id, x)).collect();
734
735        let commands = self
736            .scale_controller
737            .reschedule_inplace(HashMap::from([(job_id, policy)]), workers)
738            .await?;
739
740        if !deferred {
741            let _source_pause_guard = self.source_manager.pause_tick().await;
742
743            for (database_id, command) in commands {
744                self.barrier_scheduler
745                    .run_command(database_id, command)
746                    .await?;
747            }
748        }
749
750        Ok(())
751    }
752
753    /// This method is copied from `GlobalStreamManager::reschedule_streaming_job` and modified to handle reschedule CDC table backfill.
754    pub(crate) async fn reschedule_cdc_table_backfill(
755        &self,
756        job_id: JobId,
757        target: ReschedulePolicy,
758    ) -> MetaResult<()> {
759        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
760
761        let parallelism_policy = match target {
762            ReschedulePolicy::Parallelism(policy)
763                if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
764            {
765                policy
766            }
767            _ => bail_invalid_parameter!(
768                "CDC backfill reschedule only supports fixed parallelism targets"
769            ),
770        };
771
772        let worker_nodes = self
773            .metadata_manager
774            .list_active_streaming_compute_nodes()
775            .await?
776            .into_iter()
777            .filter(|w| w.is_streaming_schedulable())
778            .collect_vec();
779        let workers = worker_nodes.into_iter().map(|x| (x.id, x)).collect();
780
781        let cdc_fragment_id = {
782            let inner = self.metadata_manager.catalog_controller.inner.read().await;
783            let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
784                .select_only()
785                .columns([
786                    fragment::Column::FragmentId,
787                    fragment::Column::FragmentTypeMask,
788                ])
789                .filter(fragment::Column::JobId.eq(job_id))
790                .into_tuple()
791                .all(&inner.db)
792                .await?;
793
794            let cdc_fragments = fragments
795                .into_iter()
796                .filter_map(|(fragment_id, mask)| {
797                    FragmentTypeMask::from(mask)
798                        .contains(FragmentTypeFlag::StreamCdcScan)
799                        .then_some(fragment_id)
800                })
801                .collect_vec();
802
803            match cdc_fragments.len() {
804                0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
805                1 => cdc_fragments[0],
806                _ => bail_invalid_parameter!(
807                    "multiple StreamCdcScan fragments found for job {}; expected exactly one",
808                    job_id
809                ),
810            }
811        };
812
813        let fragment_policy = HashMap::from([(
814            cdc_fragment_id,
815            Some(parallelism_policy.parallelism.clone()),
816        )]);
817
818        let commands = self
819            .scale_controller
820            .reschedule_fragment_inplace(fragment_policy, workers)
821            .await?;
822
823        let _source_pause_guard = self.source_manager.pause_tick().await;
824
825        for (database_id, command) in commands {
826            self.barrier_scheduler
827                .run_command(database_id, command)
828                .await?;
829        }
830
831        Ok(())
832    }
833
834    pub(crate) async fn reschedule_fragments(
835        &self,
836        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
837    ) -> MetaResult<()> {
838        if fragment_targets.is_empty() {
839            return Ok(());
840        }
841
842        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
843
844        let workers = self
845            .metadata_manager
846            .list_active_streaming_compute_nodes()
847            .await?
848            .into_iter()
849            .filter(|w| w.is_streaming_schedulable())
850            .map(|worker| (worker.id, worker))
851            .collect();
852
853        let fragment_policy = fragment_targets
854            .into_iter()
855            .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
856            .collect();
857
858        let commands = self
859            .scale_controller
860            .reschedule_fragment_inplace(fragment_policy, workers)
861            .await?;
862
863        let _source_pause_guard = self.source_manager.pause_tick().await;
864
865        for (database_id, command) in commands {
866            self.barrier_scheduler
867                .run_command(database_id, command)
868                .await?;
869        }
870
871        Ok(())
872    }
873
874    // Don't need to add actor, just send a command
875    pub async fn create_subscription(
876        self: &Arc<Self>,
877        subscription: &Subscription,
878    ) -> MetaResult<()> {
879        let command = Command::CreateSubscription {
880            subscription_id: subscription.id,
881            upstream_mv_table_id: subscription.dependent_table_id,
882            retention_second: subscription.retention_seconds,
883        };
884
885        tracing::debug!("sending Command::CreateSubscription");
886        self.barrier_scheduler
887            .run_command(subscription.database_id, command)
888            .await?;
889        Ok(())
890    }
891
892    // Don't need to add actor, just send a command
893    pub async fn drop_subscription(
894        self: &Arc<Self>,
895        database_id: DatabaseId,
896        subscription_id: SubscriptionId,
897        table_id: TableId,
898    ) {
899        let command = Command::DropSubscription {
900            subscription_id,
901            upstream_mv_table_id: table_id,
902        };
903
904        tracing::debug!("sending Command::DropSubscriptions");
905        let _ = self
906            .barrier_scheduler
907            .run_command(database_id, command)
908            .await
909            .inspect_err(|err| {
910                tracing::error!(error = ?err.as_report(), "failed to run drop command");
911            });
912    }
913}