risingwave_meta/stream/
stream_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::iter;
17use std::sync::Arc;
18
19use await_tree::span;
20use futures::future::join_all;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::id::JobId;
26use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
27use risingwave_meta_model::prelude::Fragment as FragmentModel;
28use risingwave_meta_model::{ObjectId, StreamingParallelism, fragment};
29use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
30use risingwave_pb::expr::PbExprNode;
31use risingwave_pb::meta::table_fragments::ActorStatus;
32use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
33use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
34use thiserror_ext::AsReport;
35use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
36use tracing::Instrument;
37
38use super::{FragmentBackfillOrder, Locations, ReschedulePolicy, ScaleControllerRef};
39use crate::barrier::{
40    BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
41    ReplaceStreamJobPlan, SnapshotBackfillInfo,
42};
43use crate::controller::catalog::DropTableConnectorContext;
44use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
45use crate::error::bail_invalid_parameter;
46use crate::manager::{
47    MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
48};
49use crate::model::{
50    ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
51    FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate,
52};
53use crate::stream::SourceManagerRef;
54use crate::stream::cdc::{
55    assign_cdc_table_snapshot_splits, is_parallelized_backfill_enabled_cdc_scan_fragment,
56};
57use crate::{MetaError, MetaResult};
58
59pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
60
61#[derive(Default)]
62pub struct CreateStreamingJobOption {
63    // leave empty as a placeholder for future option if there is any
64}
65
66#[derive(Debug, Clone)]
67pub struct UpstreamSinkInfo {
68    pub sink_id: ObjectId,
69    pub sink_fragment_id: FragmentId,
70    pub sink_output_fields: Vec<PbField>,
71    // for backwards compatibility
72    pub sink_original_target_columns: Vec<PbColumnCatalog>,
73    pub project_exprs: Vec<PbExprNode>,
74    pub new_sink_downstream: DownstreamFragmentRelation,
75}
76
77/// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job.
78///
79/// Note: for better readability, keep this struct complete and immutable once created.
80pub struct CreateStreamingJobContext {
81    /// New fragment relation to add from upstream fragments to downstream fragments.
82    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
83    pub new_no_shuffle: FragmentNewNoShuffle,
84    pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
85
86    /// The locations of the actors to build in the streaming job.
87    pub building_locations: Locations,
88
89    /// DDL definition.
90    pub definition: String,
91
92    pub mv_table_id: Option<u32>,
93
94    pub create_type: CreateType,
95
96    pub job_type: StreamingJobType,
97
98    /// Used for sink-into-table.
99    pub new_upstream_sink: Option<UpstreamSinkInfo>,
100
101    pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
102    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
103
104    pub option: CreateStreamingJobOption,
105
106    pub streaming_job: StreamingJob,
107
108    pub fragment_backfill_ordering: FragmentBackfillOrder,
109
110    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
111}
112
113struct StreamingJobExecution {
114    id: JobId,
115    shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
116    _permit: OwnedSemaphorePermit,
117}
118
119impl StreamingJobExecution {
120    fn new(
121        id: JobId,
122        shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
123        permit: OwnedSemaphorePermit,
124    ) -> Self {
125        Self {
126            id,
127            shutdown_tx: Some(shutdown_tx),
128            _permit: permit,
129        }
130    }
131}
132
133#[derive(Default)]
134struct CreatingStreamingJobInfo {
135    streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
136}
137
138impl CreatingStreamingJobInfo {
139    async fn add_job(&self, job: StreamingJobExecution) {
140        let mut jobs = self.streaming_jobs.lock().await;
141        jobs.insert(job.id, job);
142    }
143
144    async fn delete_job(&self, job_id: JobId) {
145        let mut jobs = self.streaming_jobs.lock().await;
146        jobs.remove(&job_id);
147    }
148
149    async fn cancel_jobs(
150        &self,
151        job_ids: Vec<JobId>,
152    ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
153        let mut jobs = self.streaming_jobs.lock().await;
154        let mut receivers = HashMap::new();
155        let mut background_job_ids = vec![];
156        for job_id in job_ids {
157            if let Some(job) = jobs.get_mut(&job_id) {
158                if let Some(shutdown_tx) = job.shutdown_tx.take() {
159                    let (tx, rx) = oneshot::channel();
160                    match shutdown_tx.send(tx) {
161                        Ok(()) => {
162                            receivers.insert(job_id, rx);
163                        }
164                        Err(_) => {
165                            return Err(anyhow::anyhow!(
166                                "failed to send shutdown signal for streaming job {}: receiver dropped",
167                                job_id
168                            )
169                            .into());
170                        }
171                    }
172                }
173            } else {
174                // If these job ids do not exist in streaming_jobs, they should be background creating jobs.
175                background_job_ids.push(job_id);
176            }
177        }
178
179        Ok((receivers, background_job_ids))
180    }
181
182    async fn check_job_exists(&self, job_id: JobId) -> bool {
183        let jobs = self.streaming_jobs.lock().await;
184        jobs.contains_key(&job_id)
185    }
186}
187
188type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
189
190#[derive(Debug, Clone)]
191pub struct AutoRefreshSchemaSinkContext {
192    pub tmp_sink_id: JobId,
193    pub original_sink: PbSink,
194    pub original_fragment: Fragment,
195    pub new_schema: Vec<PbColumnCatalog>,
196    pub newly_add_fields: Vec<Field>,
197    pub new_fragment: Fragment,
198    pub new_log_store_table: Option<PbTable>,
199    pub actor_status: BTreeMap<ActorId, ActorStatus>,
200}
201
202impl AutoRefreshSchemaSinkContext {
203    pub fn new_fragment_info(&self) -> InflightFragmentInfo {
204        InflightFragmentInfo {
205            fragment_id: self.new_fragment.fragment_id,
206            distribution_type: self.new_fragment.distribution_type.into(),
207            fragment_type_mask: self.new_fragment.fragment_type_mask,
208            vnode_count: self.new_fragment.vnode_count(),
209            nodes: self.new_fragment.nodes.clone(),
210            actors: self
211                .new_fragment
212                .actors
213                .iter()
214                .map(|actor| {
215                    (
216                        actor.actor_id as _,
217                        InflightActorInfo {
218                            worker_id: self.actor_status[&actor.actor_id]
219                                .location
220                                .as_ref()
221                                .unwrap()
222                                .worker_node_id as _,
223                            vnode_bitmap: actor.vnode_bitmap.clone(),
224                            splits: vec![],
225                        },
226                    )
227                })
228                .collect(),
229            state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
230        }
231    }
232}
233
234/// [`ReplaceStreamJobContext`] carries one-time infos for replacing the plan of an existing stream job.
235///
236/// Note: for better readability, keep this struct complete and immutable once created.
237pub struct ReplaceStreamJobContext {
238    /// The old job fragments to be replaced.
239    pub old_fragments: StreamJobFragments,
240
241    /// The updates to be applied to the downstream chain actors. Used for schema change.
242    pub replace_upstream: FragmentReplaceUpstream,
243    pub new_no_shuffle: FragmentNewNoShuffle,
244
245    /// New fragment relation to add from existing upstream fragment to downstream fragment.
246    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
247
248    /// The locations of the actors to build in the new job to replace.
249    pub building_locations: Locations,
250
251    pub streaming_job: StreamingJob,
252
253    pub tmp_id: JobId,
254
255    /// Used for dropping an associated source. Dropping source and related internal tables.
256    pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
257
258    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
259}
260
261/// `GlobalStreamManager` manages all the streams in the system.
262pub struct GlobalStreamManager {
263    pub env: MetaSrvEnv,
264
265    pub metadata_manager: MetadataManager,
266
267    /// Broadcasts and collect barriers
268    pub barrier_scheduler: BarrierScheduler,
269
270    /// Maintains streaming sources from external system like kafka
271    pub source_manager: SourceManagerRef,
272
273    /// Creating streaming job info.
274    creating_job_info: CreatingStreamingJobInfoRef,
275
276    pub scale_controller: ScaleControllerRef,
277}
278
279impl GlobalStreamManager {
280    pub fn new(
281        env: MetaSrvEnv,
282        metadata_manager: MetadataManager,
283        barrier_scheduler: BarrierScheduler,
284        source_manager: SourceManagerRef,
285        scale_controller: ScaleControllerRef,
286    ) -> MetaResult<Self> {
287        Ok(Self {
288            env,
289            metadata_manager,
290            barrier_scheduler,
291            source_manager,
292            creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
293            scale_controller,
294        })
295    }
296
297    /// Create streaming job, it works as follows:
298    ///
299    /// 1. Broadcast the actor info based on the scheduling result in the context, build the hanging
300    ///    channels in upstream worker nodes.
301    /// 2. (optional) Get the split information of the `StreamSource` via source manager and patch
302    ///    actors.
303    /// 3. Notify related worker nodes to update and build the actors.
304    /// 4. Store related meta data.
305    ///
306    /// This function is a wrapper over [`Self::run_create_streaming_job_command`].
307    #[await_tree::instrument]
308    pub async fn create_streaming_job(
309        self: &Arc<Self>,
310        stream_job_fragments: StreamJobFragmentsToCreate,
311        ctx: CreateStreamingJobContext,
312        permit: OwnedSemaphorePermit,
313    ) -> MetaResult<NotificationVersion> {
314        let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
315        let await_tree_span = span!(
316            "{:?}({})",
317            ctx.streaming_job.job_type(),
318            ctx.streaming_job.name()
319        );
320
321        let job_id = stream_job_fragments.stream_job_id();
322        let database_id = ctx.streaming_job.database_id();
323
324        let (cancel_tx, cancel_rx) = oneshot::channel();
325        let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
326        self.creating_job_info.add_job(execution).await;
327
328        let stream_manager = self.clone();
329        let fut = async move {
330            let create_type = ctx.create_type;
331            let streaming_job = stream_manager
332                .run_create_streaming_job_command(stream_job_fragments, ctx)
333                .await?;
334            let version = match create_type {
335                CreateType::Background => {
336                    stream_manager
337                        .env
338                        .notification_manager_ref()
339                        .current_version()
340                        .await
341                }
342                CreateType::Foreground => {
343                    stream_manager
344                        .metadata_manager
345                        .wait_streaming_job_finished(database_id, streaming_job.id() as _)
346                        .await?
347                }
348                CreateType::Unspecified => unreachable!(),
349            };
350
351            tracing::debug!(?streaming_job, "stream job finish");
352            Ok(version)
353        }
354        .in_current_span();
355
356        let create_fut = (self.env.await_tree_reg())
357            .register(await_tree_key, await_tree_span)
358            .instrument(Box::pin(fut));
359
360        let result = tokio::select! {
361            biased;
362
363            res = create_fut => res,
364            notifier = cancel_rx => {
365                let notifier = notifier.expect("sender should not be dropped");
366                tracing::debug!(id=%job_id, "cancelling streaming job");
367
368                if let Ok(job_fragments) = self.metadata_manager.get_job_fragments_by_id(job_id)
369                    .await {
370                    // try to cancel buffered creating command.
371                    if self.barrier_scheduler.try_cancel_scheduled_create(database_id, job_id) {
372                        tracing::debug!("cancelling streaming job {job_id} in buffer queue.");
373                    } else if !job_fragments.is_created() {
374                        tracing::debug!("cancelling streaming job {job_id} by issue cancel command.");
375
376                        let cancel_command = self.metadata_manager.catalog_controller
377                            .build_cancel_command(&job_fragments)
378                            .await?;
379                        self.metadata_manager.catalog_controller
380                            .try_abort_creating_streaming_job(job_id, true)
381                            .await?;
382
383                        self.barrier_scheduler.run_command(database_id, cancel_command).await?;
384                    } else {
385                        // streaming job is already completed
386                        let _ = notifier.send(false).inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
387                        return self.metadata_manager.wait_streaming_job_finished(database_id, job_id).await;
388                    }
389                }
390                notifier.send(true).expect("receiver should not be dropped");
391                Err(MetaError::cancelled("create"))
392            }
393        };
394
395        tracing::info!("cleaning creating job info: {}", job_id);
396        self.creating_job_info.delete_job(job_id).await;
397        result
398    }
399
400    /// The function will return after barrier collected
401    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
402    #[await_tree::instrument]
403    async fn run_create_streaming_job_command(
404        &self,
405        stream_job_fragments: StreamJobFragmentsToCreate,
406        CreateStreamingJobContext {
407            streaming_job,
408            upstream_fragment_downstreams,
409            new_no_shuffle,
410            upstream_actors,
411            definition,
412            create_type,
413            job_type,
414            new_upstream_sink,
415            snapshot_backfill_info,
416            cross_db_snapshot_backfill_info,
417            fragment_backfill_ordering,
418            locality_fragment_state_table_mapping,
419            ..
420        }: CreateStreamingJobContext,
421    ) -> MetaResult<StreamingJob> {
422        tracing::debug!(
423            table_id = %stream_job_fragments.stream_job_id(),
424            "built actors finished"
425        );
426
427        // Here we need to consider:
428        // - Shared source
429        // - Table with connector
430        // - MV on shared source
431        let mut init_split_assignment = self
432            .source_manager
433            .allocate_splits(&stream_job_fragments)
434            .await?;
435
436        init_split_assignment.extend(
437            self.source_manager
438                .allocate_splits_for_backfill(
439                    &stream_job_fragments,
440                    &new_no_shuffle,
441                    &upstream_actors,
442                )
443                .await?,
444        );
445
446        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
447            stream_job_fragments.stream_job_id,
448            &stream_job_fragments,
449            self.env.meta_store_ref(),
450        )
451        .await?;
452        let cdc_table_snapshot_split_assignment = if !cdc_table_snapshot_split_assignment.is_empty()
453        {
454            self.env.cdc_table_backfill_tracker.track_new_job(
455                stream_job_fragments.stream_job_id,
456                cdc_table_snapshot_split_assignment
457                    .values()
458                    .map(|s| u64::try_from(s.len()).unwrap())
459                    .sum(),
460            );
461            self.env
462                .cdc_table_backfill_tracker
463                .add_fragment_table_mapping(
464                    stream_job_fragments
465                        .fragments
466                        .values()
467                        .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
468                        .map(|f| f.fragment_id),
469                    stream_job_fragments.stream_job_id,
470                );
471            CdcTableSnapshotSplitAssignmentWithGeneration::new(
472                cdc_table_snapshot_split_assignment,
473                self.env
474                    .cdc_table_backfill_tracker
475                    .next_generation(iter::once(stream_job_fragments.stream_job_id)),
476            )
477        } else {
478            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
479        };
480
481        let info = CreateStreamingJobCommandInfo {
482            stream_job_fragments,
483            upstream_fragment_downstreams,
484            init_split_assignment,
485            definition: definition.clone(),
486            streaming_job: streaming_job.clone(),
487            job_type,
488            create_type,
489            fragment_backfill_ordering,
490            cdc_table_snapshot_split_assignment,
491            locality_fragment_state_table_mapping,
492        };
493
494        let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
495            tracing::debug!(
496                ?snapshot_backfill_info,
497                "sending Command::CreateSnapshotBackfillStreamingJob"
498            );
499            CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
500        } else {
501            tracing::debug!("sending Command::CreateStreamingJob");
502            if let Some(new_upstream_sink) = new_upstream_sink {
503                CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
504            } else {
505                CreateStreamingJobType::Normal
506            }
507        };
508
509        let command = Command::CreateStreamingJob {
510            info,
511            job_type,
512            cross_db_snapshot_backfill_info,
513        };
514
515        self.barrier_scheduler
516            .run_command(streaming_job.database_id(), command)
517            .await?;
518
519        tracing::debug!(?streaming_job, "first barrier collected for stream job");
520
521        Ok(streaming_job)
522    }
523
524    /// Send replace job command to barrier scheduler.
525    pub async fn replace_stream_job(
526        &self,
527        new_fragments: StreamJobFragmentsToCreate,
528        ReplaceStreamJobContext {
529            old_fragments,
530            replace_upstream,
531            new_no_shuffle,
532            upstream_fragment_downstreams,
533            tmp_id,
534            streaming_job,
535            drop_table_connector_ctx,
536            auto_refresh_schema_sinks,
537            ..
538        }: ReplaceStreamJobContext,
539    ) -> MetaResult<()> {
540        let init_split_assignment = if streaming_job.is_source() {
541            self.source_manager
542                .allocate_splits_for_replace_source(
543                    &new_fragments,
544                    &replace_upstream,
545                    &new_no_shuffle,
546                )
547                .await?
548        } else {
549            self.source_manager.allocate_splits(&new_fragments).await?
550        };
551        tracing::info!(
552            "replace_stream_job - allocate split: {:?}",
553            init_split_assignment
554        );
555
556        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
557            old_fragments.stream_job_id,
558            &new_fragments.inner,
559            self.env.meta_store_ref(),
560        )
561        .await?;
562
563        self.barrier_scheduler
564            .run_command(
565                streaming_job.database_id(),
566                Command::ReplaceStreamJob(ReplaceStreamJobPlan {
567                    old_fragments,
568                    new_fragments,
569                    replace_upstream,
570                    upstream_fragment_downstreams,
571                    init_split_assignment,
572                    streaming_job,
573                    tmp_id,
574                    to_drop_state_table_ids: {
575                        if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
576                            vec![drop_table_connector_ctx.to_remove_state_table_id]
577                        } else {
578                            Vec::new()
579                        }
580                    },
581                    auto_refresh_schema_sinks,
582                    cdc_table_snapshot_split_assignment,
583                }),
584            )
585            .await?;
586
587        Ok(())
588    }
589
590    /// Drop streaming jobs by barrier manager, and clean up all related resources. The error will
591    /// be ignored because the recovery process will take over it in cleaning part. Check
592    /// [`Command::DropStreamingJobs`] for details.
593    pub async fn drop_streaming_jobs(
594        &self,
595        database_id: DatabaseId,
596        removed_actors: Vec<ActorId>,
597        streaming_job_ids: Vec<JobId>,
598        state_table_ids: Vec<risingwave_meta_model::TableId>,
599        fragment_ids: HashSet<FragmentId>,
600        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
601    ) {
602        // TODO(august): This is a workaround for canceling SITT via drop, remove it after refactoring SITT.
603        for &job_id in &streaming_job_ids {
604            if self.creating_job_info.check_job_exists(job_id).await {
605                tracing::info!(
606                    ?job_id,
607                    "streaming job is creating, cancel it with drop directly"
608                );
609                self.metadata_manager
610                    .notify_cancelled(database_id, job_id)
611                    .await;
612            }
613        }
614
615        if !removed_actors.is_empty()
616            || !streaming_job_ids.is_empty()
617            || !state_table_ids.is_empty()
618        {
619            let _ = self
620                .barrier_scheduler
621                .run_command(
622                    database_id,
623                    Command::DropStreamingJobs {
624                        streaming_job_ids: streaming_job_ids.into_iter().collect(),
625                        actors: removed_actors,
626                        unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
627                        unregistered_fragment_ids: fragment_ids,
628                        dropped_sink_fragment_by_targets,
629                    },
630                )
631                .await
632                .inspect_err(|err| {
633                    tracing::error!(error = ?err.as_report(), "failed to run drop command");
634                });
635        }
636    }
637
638    /// Cancel streaming jobs and return the canceled table ids.
639    /// 1. Send cancel message to stream jobs (via `cancel_jobs`).
640    /// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
641    ///
642    /// Cleanup of their state will be cleaned up after the `CancelStreamJob` command succeeds,
643    /// by the barrier manager for both of them.
644    pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
645        if job_ids.is_empty() {
646            return Ok(vec![]);
647        }
648
649        let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
650        let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
651
652        let futures = receivers.into_iter().map(|(id, receiver)| async move {
653            if let Ok(cancelled) = receiver.await
654                && cancelled
655            {
656                tracing::info!("canceled streaming job {id}");
657                Ok(id)
658            } else {
659                Err(MetaError::from(anyhow::anyhow!(
660                    "failed to cancel streaming job {id}"
661                )))
662            }
663        });
664        let mut cancelled_ids = join_all(futures)
665            .await
666            .into_iter()
667            .collect::<MetaResult<Vec<_>>>()?;
668
669        // NOTE(kwannoel): For background_job_ids stream jobs that not tracked in streaming manager,
670        // we can directly cancel them by running the barrier command.
671        let futures = background_job_ids.into_iter().map(|id| async move {
672            let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
673            if fragment.is_created() {
674                Err(MetaError::invalid_parameter(format!(
675                    "streaming job {} is already created",
676                    id
677                )))?;
678            }
679
680            let cancel_command = self
681                .metadata_manager
682                .catalog_controller
683                .build_cancel_command(&fragment)
684                .await?;
685
686            let (_, database_id) = self
687                .metadata_manager
688                .catalog_controller
689                .try_abort_creating_streaming_job(id, true)
690                .await?;
691
692            if let Some(database_id) = database_id {
693                self.barrier_scheduler
694                    .run_command(database_id, cancel_command)
695                    .await?;
696            }
697
698            tracing::info!(?id, "cancelled recovered streaming job");
699            Ok(id)
700        });
701        let cancelled_recovered_ids = join_all(futures)
702            .await
703            .into_iter()
704            .collect::<MetaResult<Vec<_>>>()?;
705
706        cancelled_ids.extend(cancelled_recovered_ids);
707        Ok(cancelled_ids)
708    }
709
710    pub(crate) async fn reschedule_streaming_job(
711        &self,
712        job_id: JobId,
713        policy: ReschedulePolicy,
714        deferred: bool,
715    ) -> MetaResult<()> {
716        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
717
718        let background_jobs = self
719            .metadata_manager
720            .list_background_creating_jobs()
721            .await?;
722
723        if !background_jobs.is_empty() {
724            let related_jobs = self
725                .scale_controller
726                .resolve_related_no_shuffle_jobs(&background_jobs)
727                .await?;
728
729            if related_jobs.contains(&job_id) {
730                bail!(
731                    "Cannot alter the job {} because the related job {:?} is currently being created",
732                    job_id,
733                    background_jobs,
734                );
735            }
736        }
737
738        let worker_nodes = self
739            .metadata_manager
740            .list_active_streaming_compute_nodes()
741            .await?
742            .into_iter()
743            .filter(|w| w.is_streaming_schedulable())
744            .collect_vec();
745        let workers = worker_nodes.into_iter().map(|x| (x.id as i32, x)).collect();
746
747        let commands = self
748            .scale_controller
749            .reschedule_inplace(HashMap::from([(job_id, policy)]), workers)
750            .await?;
751
752        if !deferred {
753            let _source_pause_guard = self.source_manager.pause_tick().await;
754
755            for (database_id, command) in commands {
756                self.barrier_scheduler
757                    .run_command(database_id, command)
758                    .await?;
759            }
760        }
761
762        Ok(())
763    }
764
765    /// This method is copied from `GlobalStreamManager::reschedule_streaming_job` and modified to handle reschedule CDC table backfill.
766    pub(crate) async fn reschedule_cdc_table_backfill(
767        &self,
768        job_id: u32,
769        target: ReschedulePolicy,
770    ) -> MetaResult<()> {
771        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
772
773        let job_id = TableId::new(job_id);
774
775        let parallelism_policy = match target {
776            ReschedulePolicy::Parallelism(policy)
777                if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
778            {
779                policy
780            }
781            _ => bail_invalid_parameter!(
782                "CDC backfill reschedule only supports fixed parallelism targets"
783            ),
784        };
785
786        let worker_nodes = self
787            .metadata_manager
788            .list_active_streaming_compute_nodes()
789            .await?
790            .into_iter()
791            .filter(|w| w.is_streaming_schedulable())
792            .collect_vec();
793        let workers = worker_nodes.into_iter().map(|x| (x.id as i32, x)).collect();
794
795        let cdc_fragment_id = {
796            let inner = self.metadata_manager.catalog_controller.inner.read().await;
797            let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
798                .select_only()
799                .columns([
800                    fragment::Column::FragmentId,
801                    fragment::Column::FragmentTypeMask,
802                ])
803                .filter(fragment::Column::JobId.eq(job_id))
804                .into_tuple()
805                .all(&inner.db)
806                .await?;
807
808            let cdc_fragments = fragments
809                .into_iter()
810                .filter_map(|(fragment_id, mask)| {
811                    FragmentTypeMask::from(mask)
812                        .contains(FragmentTypeFlag::StreamCdcScan)
813                        .then_some(fragment_id)
814                })
815                .collect_vec();
816
817            match cdc_fragments.len() {
818                0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
819                1 => cdc_fragments[0],
820                _ => bail_invalid_parameter!(
821                    "multiple StreamCdcScan fragments found for job {}; expected exactly one",
822                    job_id
823                ),
824            }
825        };
826
827        let fragment_policy = HashMap::from([(
828            cdc_fragment_id,
829            Some(parallelism_policy.parallelism.clone()),
830        )]);
831
832        let commands = self
833            .scale_controller
834            .reschedule_fragment_inplace(fragment_policy, workers)
835            .await?;
836
837        let _source_pause_guard = self.source_manager.pause_tick().await;
838
839        for (database_id, command) in commands {
840            self.barrier_scheduler
841                .run_command(database_id, command)
842                .await?;
843        }
844
845        Ok(())
846    }
847
848    pub(crate) async fn reschedule_fragments(
849        &self,
850        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
851    ) -> MetaResult<()> {
852        if fragment_targets.is_empty() {
853            return Ok(());
854        }
855
856        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
857
858        let workers = self
859            .metadata_manager
860            .list_active_streaming_compute_nodes()
861            .await?
862            .into_iter()
863            .filter(|w| w.is_streaming_schedulable())
864            .map(|worker| (worker.id as i32, worker))
865            .collect();
866
867        let fragment_policy = fragment_targets
868            .into_iter()
869            .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
870            .collect();
871
872        let commands = self
873            .scale_controller
874            .reschedule_fragment_inplace(fragment_policy, workers)
875            .await?;
876
877        let _source_pause_guard = self.source_manager.pause_tick().await;
878
879        for (database_id, command) in commands {
880            self.barrier_scheduler
881                .run_command(database_id, command)
882                .await?;
883        }
884
885        Ok(())
886    }
887
888    // Don't need to add actor, just send a command
889    pub async fn create_subscription(
890        self: &Arc<Self>,
891        subscription: &Subscription,
892    ) -> MetaResult<()> {
893        let command = Command::CreateSubscription {
894            subscription_id: subscription.id,
895            upstream_mv_table_id: TableId::new(subscription.dependent_table_id),
896            retention_second: subscription.retention_seconds,
897        };
898
899        tracing::debug!("sending Command::CreateSubscription");
900        self.barrier_scheduler
901            .run_command(subscription.database_id.into(), command)
902            .await?;
903        Ok(())
904    }
905
906    // Don't need to add actor, just send a command
907    pub async fn drop_subscription(
908        self: &Arc<Self>,
909        database_id: DatabaseId,
910        subscription_id: u32,
911        table_id: u32,
912    ) {
913        let command = Command::DropSubscription {
914            subscription_id,
915            upstream_mv_table_id: TableId::new(table_id),
916        };
917
918        tracing::debug!("sending Command::DropSubscriptions");
919        let _ = self
920            .barrier_scheduler
921            .run_command(database_id, command)
922            .await
923            .inspect_err(|err| {
924                tracing::error!(error = ?err.as_report(), "failed to run drop command");
925            });
926    }
927}