risingwave_meta/stream/
stream_manager.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::span;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::{JobId, SinkId};
25use risingwave_connector::source::CdcTableSnapshotSplitRaw;
26use risingwave_meta_model::prelude::Fragment as FragmentModel;
27use risingwave_meta_model::{StreamingParallelism, WorkerId, fragment, streaming_job};
28use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
29use risingwave_pb::expr::PbExprNode;
30use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
31use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
32use thiserror_ext::AsReport;
33use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
34use tracing::Instrument;
35
36use super::{
37    GlobalRefreshManagerRef, ReschedulePolicy, ScaleControllerRef, StreamFragmentGraph,
38    UserDefinedFragmentBackfillOrder,
39};
40use crate::barrier::{
41    BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
42    ReplaceStreamJobPlan, SnapshotBackfillInfo,
43};
44use crate::controller::catalog::DropTableConnectorContext;
45use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
46use crate::error::bail_invalid_parameter;
47use crate::hummock::HummockManagerRef;
48use crate::manager::{
49    MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
50};
51use crate::model::{
52    ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
53    FragmentReplaceUpstream, StreamActor, StreamContext, StreamJobFragments,
54    StreamJobFragmentsToCreate, SubscriptionId,
55};
56use crate::stream::{ReplaceJobSplitPlan, SourceManagerRef};
57use crate::{MetaError, MetaResult};
58
59pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
60
61pub(crate) async fn cleanup_dropped_streaming_jobs(
62    refresh_manager: &GlobalRefreshManagerRef,
63    hummock_manager: &HummockManagerRef,
64    metadata_manager: &MetadataManager,
65    streaming_job_ids: impl IntoIterator<Item = JobId>,
66    state_table_ids: Vec<TableId>,
67    progress_status: &str,
68) -> MetaResult<()> {
69    for job_id in streaming_job_ids {
70        refresh_manager.remove_progress_tracker(job_id.as_mv_table_id(), progress_status);
71    }
72
73    if state_table_ids.is_empty() {
74        return Ok(());
75    }
76
77    hummock_manager
78        .unregister_table_ids(state_table_ids.clone())
79        .await?;
80    metadata_manager
81        .catalog_controller
82        .complete_dropped_tables(state_table_ids)
83        .await;
84    Ok(())
85}
86
87#[derive(Default)]
88pub struct CreateStreamingJobOption {
89    // leave empty as a placeholder for future option if there is any
90}
91
92#[derive(Debug, Clone)]
93pub struct UpstreamSinkInfo {
94    pub sink_id: SinkId,
95    pub sink_fragment_id: FragmentId,
96    pub sink_output_fields: Vec<PbField>,
97    // for backwards compatibility
98    pub sink_original_target_columns: Vec<PbColumnCatalog>,
99    pub project_exprs: Vec<PbExprNode>,
100    pub new_sink_downstream: DownstreamFragmentRelation,
101}
102
103/// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job.
104///
105/// Note: for better readability, keep this struct complete and immutable once created.
106pub struct CreateStreamingJobContext {
107    /// New fragment relation to add from upstream fragments to downstream fragments.
108    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
109
110    /// The resource group of the database this job belongs to.
111    pub database_resource_group: String,
112
113    /// DDL definition.
114    pub definition: String,
115
116    pub create_type: CreateType,
117
118    pub job_type: StreamingJobType,
119
120    /// Used for sink-into-table.
121    pub new_upstream_sink: Option<UpstreamSinkInfo>,
122
123    pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
124    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
125
126    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
127
128    pub option: CreateStreamingJobOption,
129
130    pub streaming_job: StreamingJob,
131
132    pub fragment_backfill_ordering: UserDefinedFragmentBackfillOrder,
133
134    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
135
136    pub is_serverless_backfill: bool,
137
138    /// The `streaming_job::Model` for this job, loaded from meta store.
139    pub streaming_job_model: streaming_job::Model,
140}
141
142struct StreamingJobExecution {
143    id: JobId,
144    shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
145    _permit: OwnedSemaphorePermit,
146}
147
148impl StreamingJobExecution {
149    fn new(
150        id: JobId,
151        shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
152        permit: OwnedSemaphorePermit,
153    ) -> Self {
154        Self {
155            id,
156            shutdown_tx: Some(shutdown_tx),
157            _permit: permit,
158        }
159    }
160}
161
162#[derive(Default)]
163struct CreatingStreamingJobInfo {
164    streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
165}
166
167impl CreatingStreamingJobInfo {
168    async fn add_job(&self, job: StreamingJobExecution) {
169        let mut jobs = self.streaming_jobs.lock().await;
170        jobs.insert(job.id, job);
171    }
172
173    async fn delete_job(&self, job_id: JobId) {
174        let mut jobs = self.streaming_jobs.lock().await;
175        jobs.remove(&job_id);
176    }
177
178    async fn cancel_jobs(
179        &self,
180        job_ids: Vec<JobId>,
181    ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
182        let mut jobs = self.streaming_jobs.lock().await;
183        let mut receivers = HashMap::new();
184        let mut background_job_ids = vec![];
185        for job_id in job_ids {
186            if let Some(job) = jobs.get_mut(&job_id) {
187                if let Some(shutdown_tx) = job.shutdown_tx.take() {
188                    let (tx, rx) = oneshot::channel();
189                    match shutdown_tx.send(tx) {
190                        Ok(()) => {
191                            receivers.insert(job_id, rx);
192                        }
193                        Err(_) => {
194                            return Err(anyhow::anyhow!(
195                                "failed to send shutdown signal for streaming job {}: receiver dropped",
196                                job_id
197                            )
198                            .into());
199                        }
200                    }
201                }
202            } else {
203                // If these job ids do not exist in streaming_jobs, they should be background creating jobs.
204                background_job_ids.push(job_id);
205            }
206        }
207
208        Ok((receivers, background_job_ids))
209    }
210}
211
212type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
213
214#[derive(Debug, Clone)]
215pub struct AutoRefreshSchemaSinkContext {
216    pub tmp_sink_id: SinkId,
217    pub original_sink: PbSink,
218    pub original_fragment: Fragment,
219    pub new_schema: Vec<PbColumnCatalog>,
220    pub newly_add_fields: Vec<Field>,
221    pub removed_column_names: Vec<String>,
222    pub new_fragment: Fragment,
223    pub new_log_store_table: Option<Box<PbTable>>,
224    /// The sink's own stream context (timezone, `config_override`).
225    pub ctx: StreamContext,
226}
227
228impl AutoRefreshSchemaSinkContext {
229    pub fn new_fragment_info(
230        &self,
231        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
232        actor_location: &HashMap<ActorId, WorkerId>,
233    ) -> InflightFragmentInfo {
234        InflightFragmentInfo {
235            fragment_id: self.new_fragment.fragment_id,
236            distribution_type: self.new_fragment.distribution_type.into(),
237            fragment_type_mask: self.new_fragment.fragment_type_mask,
238            vnode_count: self.new_fragment.vnode_count(),
239            nodes: self.new_fragment.nodes.clone(),
240            actors: stream_actors
241                .get(&self.new_fragment.fragment_id)
242                .into_iter()
243                .flatten()
244                .map(|actor| {
245                    (
246                        actor.actor_id,
247                        InflightActorInfo {
248                            worker_id: actor_location[&actor.actor_id],
249                            vnode_bitmap: actor.vnode_bitmap.clone(),
250                            splits: vec![],
251                        },
252                    )
253                })
254                .collect(),
255            state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
256        }
257    }
258}
259
260/// [`ReplaceStreamJobContext`] carries one-time infos for replacing the plan of an existing stream job.
261///
262/// Note: for better readability, keep this struct complete and immutable once created.
263pub struct ReplaceStreamJobContext {
264    /// The old job fragments to be replaced.
265    pub old_fragments: StreamJobFragments,
266
267    /// The updates to be applied to the downstream chain actors. Used for schema change.
268    pub replace_upstream: FragmentReplaceUpstream,
269
270    /// New fragment relation to add from existing upstream fragment to downstream fragment.
271    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
272
273    pub streaming_job: StreamingJob,
274
275    /// The resource group of the database this job belongs to.
276    pub database_resource_group: String,
277
278    pub tmp_id: JobId,
279
280    /// Used for dropping an associated source. Dropping source and related internal tables.
281    pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
282
283    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
284
285    /// The `streaming_job::Model` for this job, loaded from meta store.
286    pub streaming_job_model: streaming_job::Model,
287}
288
289/// `GlobalStreamManager` manages all the streams in the system.
290pub struct GlobalStreamManager {
291    pub env: MetaSrvEnv,
292
293    pub metadata_manager: MetadataManager,
294
295    /// Broadcasts and collect barriers
296    pub barrier_scheduler: BarrierScheduler,
297
298    pub hummock_manager: HummockManagerRef,
299
300    /// Maintains streaming sources from external system like kafka
301    pub source_manager: SourceManagerRef,
302
303    pub refresh_manager: GlobalRefreshManagerRef,
304
305    /// Creating streaming job info.
306    creating_job_info: CreatingStreamingJobInfoRef,
307
308    pub scale_controller: ScaleControllerRef,
309}
310
311impl GlobalStreamManager {
312    pub fn new(
313        env: MetaSrvEnv,
314        metadata_manager: MetadataManager,
315        barrier_scheduler: BarrierScheduler,
316        hummock_manager: HummockManagerRef,
317        source_manager: SourceManagerRef,
318        refresh_manager: GlobalRefreshManagerRef,
319        scale_controller: ScaleControllerRef,
320    ) -> MetaResult<Self> {
321        Ok(Self {
322            env,
323            metadata_manager,
324            barrier_scheduler,
325            hummock_manager,
326            source_manager,
327            refresh_manager,
328            creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
329            scale_controller,
330        })
331    }
332
333    /// Create streaming job, it works as follows:
334    ///
335    /// 1. Broadcast the actor info based on the scheduling result in the context, build the hanging
336    ///    channels in upstream worker nodes.
337    /// 2. (optional) Get the split information of the `StreamSource` via source manager and patch
338    ///    actors.
339    /// 3. Notify related worker nodes to update and build the actors.
340    /// 4. Store related meta data.
341    ///
342    /// This function is a wrapper over [`Self::run_create_streaming_job_command`].
343    #[await_tree::instrument]
344    pub async fn create_streaming_job(
345        self: &Arc<Self>,
346        stream_job_fragments: StreamJobFragmentsToCreate,
347        ctx: CreateStreamingJobContext,
348        permit: OwnedSemaphorePermit,
349    ) -> MetaResult<NotificationVersion> {
350        let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
351        let await_tree_span = span!(
352            "{:?}({})",
353            ctx.streaming_job.job_type(),
354            ctx.streaming_job.name()
355        );
356
357        let job_id = stream_job_fragments.stream_job_id();
358        let database_id = ctx.streaming_job.database_id();
359
360        let (cancel_tx, cancel_rx) = oneshot::channel();
361        let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
362        self.creating_job_info.add_job(execution).await;
363
364        let stream_manager = self.clone();
365        let fut = async move {
366            let create_type = ctx.create_type;
367            let streaming_job = stream_manager
368                .run_create_streaming_job_command(stream_job_fragments, ctx)
369                .await?;
370            let version = match create_type {
371                CreateType::Background => {
372                    stream_manager
373                        .env
374                        .notification_manager_ref()
375                        .current_version()
376                        .await
377                }
378                CreateType::Foreground => {
379                    stream_manager
380                        .metadata_manager
381                        .wait_streaming_job_finished(database_id, streaming_job.id() as _)
382                        .await?
383                }
384                CreateType::Unspecified => unreachable!(),
385            };
386
387            tracing::debug!(?streaming_job, "stream job finish");
388            Ok(version)
389        }
390        .in_current_span();
391
392        let create_fut = (self.env.await_tree_reg())
393            .register(await_tree_key, await_tree_span)
394            .instrument(Box::pin(fut));
395
396        let result = async {
397            tokio::select! {
398                biased;
399
400                res = create_fut => res,
401                notifier = cancel_rx => {
402                    let notifier = notifier.expect("sender should not be dropped");
403                    tracing::debug!(id=%job_id, "cancelling streaming job");
404
405                    enum CancelResult {
406                        Completed(MetaResult<NotificationVersion>),
407                        Failed(MetaError),
408                        Cancelled,
409                    }
410
411                    let cancel_res = if let Ok(job_fragments) =
412                        self.metadata_manager.get_job_fragments_by_id(job_id).await
413                    {
414                        // try to cancel buffered creating command.
415                        if self
416                            .barrier_scheduler
417                            .try_cancel_scheduled_create(database_id, job_id)
418                        {
419                            tracing::debug!(
420                                id=%job_id,
421                                "cancelling streaming job in buffer queue."
422                            );
423                            CancelResult::Cancelled
424                        } else if !job_fragments.is_created() {
425                            tracing::debug!(
426                                id=%job_id,
427                                "cancelling streaming job by issue cancel command."
428                            );
429
430                            let cancel_result: MetaResult<()> = async {
431                                let cancel_command = self.metadata_manager.catalog_controller
432                                    .build_cancel_command(&job_fragments)
433                                    .await?;
434                                let cleanup_state_table_ids =
435                                    job_fragments.all_table_ids().collect_vec();
436                                self.metadata_manager.catalog_controller
437                                    .try_abort_creating_streaming_job(job_id, true)
438                                    .await?;
439
440                                self.barrier_scheduler
441                                    .run_command(database_id, cancel_command)
442                                    .await?;
443                                cleanup_dropped_streaming_jobs(
444                                    &self.refresh_manager,
445                                    &self.hummock_manager,
446                                    &self.metadata_manager,
447                                    [job_id],
448                                    cleanup_state_table_ids,
449                                    "cancel_streaming_job",
450                                )
451                                .await?;
452                                Ok(())
453                            }
454                            .await;
455
456                            match cancel_result {
457                                Ok(()) => CancelResult::Cancelled,
458                                Err(err) => {
459                                    tracing::warn!(
460                                        error = ?err.as_report(),
461                                        id = %job_id,
462                                        "failed to run cancel command for creating streaming job"
463                                    );
464                                    CancelResult::Failed(err)
465                                }
466                            }
467                        } else {
468                            // streaming job is already completed
469                            CancelResult::Completed(
470                                self.metadata_manager
471                                    .wait_streaming_job_finished(database_id, job_id)
472                                    .await,
473                            )
474                        }
475                    } else {
476                        CancelResult::Cancelled
477                    };
478
479                    let (cancelled, result) = match cancel_res {
480                        CancelResult::Completed(result) => (false, result),
481                        CancelResult::Failed(err) => (false, Err(err)),
482                        CancelResult::Cancelled => (true, Err(MetaError::cancelled("create"))),
483                    };
484
485                    let _ = notifier
486                        .send(cancelled)
487                        .inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
488
489                    result
490                }
491            }
492        }
493        .await;
494
495        tracing::debug!("cleaning creating job info: {}", job_id);
496        self.creating_job_info.delete_job(job_id).await;
497        result
498    }
499
500    /// The function will return after barrier collected
501    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
502    #[await_tree::instrument]
503    async fn run_create_streaming_job_command(
504        &self,
505        stream_job_fragments: StreamJobFragmentsToCreate,
506        CreateStreamingJobContext {
507            streaming_job,
508            upstream_fragment_downstreams,
509            database_resource_group,
510            definition,
511            create_type,
512            job_type,
513            new_upstream_sink,
514            snapshot_backfill_info,
515            cross_db_snapshot_backfill_info,
516            fragment_backfill_ordering,
517            locality_fragment_state_table_mapping,
518            cdc_table_snapshot_splits,
519            is_serverless_backfill,
520            streaming_job_model,
521            ..
522        }: CreateStreamingJobContext,
523    ) -> MetaResult<StreamingJob> {
524        tracing::debug!(
525            table_id = %stream_job_fragments.stream_job_id(),
526            "built actors finished"
527        );
528
529        // Phase 1: Gather fragment-level split information.
530        // - For source fragments: discover splits from the external source.
531        // - For backfill fragments: splits will be aligned in Phase 2 inside the barrier worker
532        //   using the actor-level no-shuffle mapping produced by render_actors.
533        let init_split_assignment = self
534            .source_manager
535            .discover_splits(&stream_job_fragments)
536            .await?;
537
538        let fragment_backfill_ordering =
539            StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
540                fragment_backfill_ordering,
541                &stream_job_fragments.downstreams,
542                || {
543                    stream_job_fragments
544                        .fragments
545                        .iter()
546                        .map(|(fragment_id, fragment)| {
547                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
548                        })
549                },
550            );
551
552        let info = CreateStreamingJobCommandInfo {
553            stream_job_fragments,
554            upstream_fragment_downstreams,
555            init_split_assignment,
556            definition: definition.clone(),
557            streaming_job: streaming_job.clone(),
558            job_type,
559            create_type,
560            database_resource_group,
561            fragment_backfill_ordering,
562            cdc_table_snapshot_splits,
563            locality_fragment_state_table_mapping,
564            is_serverless: is_serverless_backfill,
565            streaming_job_model,
566        };
567
568        let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
569            tracing::debug!(
570                ?snapshot_backfill_info,
571                "sending Command::CreateSnapshotBackfillStreamingJob"
572            );
573            CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
574        } else {
575            tracing::debug!("sending Command::CreateStreamingJob");
576            if let Some(new_upstream_sink) = new_upstream_sink {
577                CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
578            } else {
579                CreateStreamingJobType::Normal
580            }
581        };
582
583        let command = Command::CreateStreamingJob {
584            info,
585            job_type,
586            cross_db_snapshot_backfill_info,
587        };
588
589        self.barrier_scheduler
590            .run_command(streaming_job.database_id(), command)
591            .await?;
592
593        tracing::debug!(?streaming_job, "first barrier collected for stream job");
594
595        Ok(streaming_job)
596    }
597
598    /// Send replace job command to barrier scheduler.
599    pub async fn replace_stream_job(
600        &self,
601        new_fragments: StreamJobFragmentsToCreate,
602        ReplaceStreamJobContext {
603            old_fragments,
604            replace_upstream,
605            upstream_fragment_downstreams,
606            tmp_id,
607            streaming_job,
608            drop_table_connector_ctx,
609            auto_refresh_schema_sinks,
610            streaming_job_model,
611            database_resource_group,
612        }: ReplaceStreamJobContext,
613    ) -> MetaResult<()> {
614        // Phase 1: Gather fragment-level split information.
615        // For replace source with existing downstream, splits will be aligned
616        // in Phase 2 inside the barrier worker using actor-level no-shuffle produced by render_actors.
617        // For replace source with no downstream (or non-source), discover splits fresh.
618        let split_plan = if streaming_job.is_source() {
619            match self
620                .source_manager
621                .discover_splits_for_replace_source(&new_fragments, &replace_upstream)
622                .await?
623            {
624                Some(discovered) => ReplaceJobSplitPlan::Discovered(discovered),
625                None => ReplaceJobSplitPlan::AlignFromPrevious,
626            }
627        } else {
628            let discovered = self.source_manager.discover_splits(&new_fragments).await?;
629            ReplaceJobSplitPlan::Discovered(discovered)
630        };
631        tracing::info!("replace_stream_job - split plan: {:?}", split_plan);
632
633        self.barrier_scheduler
634            .run_command(
635                streaming_job.database_id(),
636                Command::ReplaceStreamJob(ReplaceStreamJobPlan {
637                    old_fragments,
638                    new_fragments,
639                    database_resource_group,
640                    replace_upstream,
641                    upstream_fragment_downstreams,
642                    split_plan,
643                    streaming_job,
644                    streaming_job_model,
645                    tmp_id,
646                    to_drop_state_table_ids: {
647                        if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
648                            vec![drop_table_connector_ctx.to_remove_state_table_id]
649                        } else {
650                            Vec::new()
651                        }
652                    },
653                    auto_refresh_schema_sinks,
654                }),
655            )
656            .await?;
657
658        Ok(())
659    }
660
661    /// Drop streaming jobs by barrier manager, and clean up all related resources. The error will
662    /// be ignored because the recovery process will take over it in cleaning part. Check
663    /// [`Command::DropStreamingJobs`] for details.
664    pub async fn drop_streaming_jobs(
665        &self,
666        database_id: DatabaseId,
667        streaming_job_ids: Vec<JobId>,
668        state_table_ids: Vec<TableId>,
669        fragment_ids: HashSet<FragmentId>,
670        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
671    ) {
672        if !streaming_job_ids.is_empty() || !state_table_ids.is_empty() {
673            let cleanup_streaming_job_ids = streaming_job_ids.clone();
674            let cleanup_state_table_ids = state_table_ids.clone();
675            let run_result = self
676                .barrier_scheduler
677                .run_command(
678                    database_id,
679                    Command::DropStreamingJobs {
680                        streaming_job_ids: streaming_job_ids.into_iter().collect(),
681                        unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
682                        unregistered_fragment_ids: fragment_ids,
683                        dropped_sink_fragment_by_targets,
684                    },
685                )
686                .await;
687            let result = match run_result {
688                Ok(()) => {
689                    cleanup_dropped_streaming_jobs(
690                        &self.refresh_manager,
691                        &self.hummock_manager,
692                        &self.metadata_manager,
693                        cleanup_streaming_job_ids,
694                        cleanup_state_table_ids,
695                        "drop_streaming_jobs",
696                    )
697                    .await
698                }
699                Err(err) => Err(err),
700            };
701            let _ = result.inspect_err(|err| {
702                tracing::error!(error = ?err.as_report(), "failed to run drop command");
703            });
704        }
705    }
706
707    /// Cancel streaming jobs and return the canceled table ids.
708    /// 1. Send cancel message to stream jobs (via `cancel_jobs`).
709    /// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
710    ///
711    /// Cleanup of their state is handled by the caller after the drop command is collected.
712    pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
713        if job_ids.is_empty() {
714            return Ok(vec![]);
715        }
716
717        let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
718        let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
719
720        let futures = receivers.into_iter().map(|(id, receiver)| async move {
721            if let Ok(cancelled) = receiver.await
722                && cancelled
723            {
724                tracing::info!("canceled streaming job {id}");
725                Ok(id)
726            } else {
727                Err(MetaError::from(anyhow::anyhow!(
728                    "failed to cancel streaming job {id}"
729                )))
730            }
731        });
732        let mut cancelled_ids = join_all(futures)
733            .await
734            .into_iter()
735            .collect::<MetaResult<Vec<_>>>()?;
736
737        // NOTE(kwannoel): For background_job_ids stream jobs that not tracked in streaming manager,
738        // we can directly cancel them by running the barrier command.
739        let futures = background_job_ids.into_iter().map(|id| async move {
740            let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
741            if fragment.is_created() {
742                tracing::warn!(
743                    "streaming job {} is already created, ignore cancel request",
744                    id
745                );
746                return Ok(None);
747            }
748            if fragment.is_created() {
749                Err(MetaError::invalid_parameter(format!(
750                    "streaming job {} is already created",
751                    id
752                )))?;
753            }
754
755            let cancel_command = self
756                .metadata_manager
757                .catalog_controller
758                .build_cancel_command(&fragment)
759                .await?;
760            let cleanup_state_table_ids = fragment.all_table_ids().collect_vec();
761
762            let (_, database_id) = self
763                .metadata_manager
764                .catalog_controller
765                .try_abort_creating_streaming_job(id, true)
766                .await?;
767
768            if let Some(database_id) = database_id {
769                self.barrier_scheduler
770                    .run_command(database_id, cancel_command)
771                    .await?;
772                cleanup_dropped_streaming_jobs(
773                    &self.refresh_manager,
774                    &self.hummock_manager,
775                    &self.metadata_manager,
776                    [id],
777                    cleanup_state_table_ids,
778                    "cancel_streaming_job",
779                )
780                .await?;
781            }
782
783            tracing::info!(?id, "cancelled background streaming job");
784            Ok(Some(id))
785        });
786        let cancelled_recovered_ids = join_all(futures)
787            .await
788            .into_iter()
789            .collect::<MetaResult<Vec<_>>>()?;
790
791        cancelled_ids.extend(cancelled_recovered_ids.into_iter().flatten());
792        Ok(cancelled_ids)
793    }
794
795    pub(crate) async fn reschedule_streaming_job(
796        &self,
797        job_id: JobId,
798        policy: ReschedulePolicy,
799        deferred: bool,
800    ) -> MetaResult<()> {
801        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
802
803        let background_jobs = self
804            .metadata_manager
805            .list_background_creating_jobs()
806            .await?;
807
808        if !background_jobs.is_empty() {
809            let blocked_jobs = self
810                .metadata_manager
811                .collect_reschedule_blocked_jobs_for_creating_jobs(&background_jobs, !deferred)
812                .await?;
813
814            if blocked_jobs.contains(&job_id) {
815                bail!(
816                    "Cannot alter the job {} because it is blocked by creating unreschedulable backfill jobs",
817                    job_id,
818                );
819            }
820        }
821
822        let commands = self
823            .scale_controller
824            .reschedule_inplace(HashMap::from([(job_id, policy)]))
825            .await?;
826
827        if !deferred {
828            let _source_pause_guard = self.source_manager.pause_tick().await;
829
830            for (database_id, command) in commands {
831                self.barrier_scheduler
832                    .run_command(database_id, command)
833                    .await?;
834            }
835        }
836
837        Ok(())
838    }
839
840    pub(crate) async fn reschedule_streaming_job_backfill_parallelism(
841        &self,
842        job_id: JobId,
843        parallelism: Option<StreamingParallelism>,
844        deferred: bool,
845    ) -> MetaResult<()> {
846        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
847
848        let background_jobs = self
849            .metadata_manager
850            .list_background_creating_jobs()
851            .await?;
852
853        if !background_jobs.is_empty() {
854            let unreschedulable = self
855                .metadata_manager
856                .collect_unreschedulable_backfill_jobs(&background_jobs, !deferred)
857                .await?;
858
859            if unreschedulable.contains(&job_id) {
860                bail!(
861                    "Cannot alter the job {} because it is a non-reschedulable background backfill job",
862                    job_id,
863                );
864            }
865        }
866
867        let commands = self
868            .scale_controller
869            .reschedule_backfill_parallelism_inplace(HashMap::from([(job_id, parallelism)]))
870            .await?;
871
872        if !deferred {
873            let _source_pause_guard = self.source_manager.pause_tick().await;
874
875            for (database_id, command) in commands {
876                self.barrier_scheduler
877                    .run_command(database_id, command)
878                    .await?;
879            }
880        }
881
882        Ok(())
883    }
884
885    /// This method is copied from `GlobalStreamManager::reschedule_streaming_job` and modified to handle reschedule CDC table backfill.
886    pub(crate) async fn reschedule_cdc_table_backfill(
887        &self,
888        job_id: JobId,
889        target: ReschedulePolicy,
890    ) -> MetaResult<()> {
891        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
892
893        let parallelism_policy = match target {
894            ReschedulePolicy::Parallelism(policy)
895                if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
896            {
897                policy
898            }
899            _ => bail_invalid_parameter!(
900                "CDC backfill reschedule only supports fixed parallelism targets"
901            ),
902        };
903
904        let cdc_fragment_id = {
905            let inner = self.metadata_manager.catalog_controller.inner.read().await;
906            let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
907                .select_only()
908                .columns([
909                    fragment::Column::FragmentId,
910                    fragment::Column::FragmentTypeMask,
911                ])
912                .filter(fragment::Column::JobId.eq(job_id))
913                .into_tuple()
914                .all(&inner.db)
915                .await?;
916
917            let cdc_fragments = fragments
918                .into_iter()
919                .filter_map(|(fragment_id, mask)| {
920                    FragmentTypeMask::from(mask)
921                        .contains(FragmentTypeFlag::StreamCdcScan)
922                        .then_some(fragment_id)
923                })
924                .collect_vec();
925
926            match cdc_fragments.len() {
927                0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
928                1 => cdc_fragments[0],
929                _ => bail_invalid_parameter!(
930                    "multiple StreamCdcScan fragments found for job {}; expected exactly one",
931                    job_id
932                ),
933            }
934        };
935
936        let fragment_policy = HashMap::from([(
937            cdc_fragment_id,
938            Some(parallelism_policy.parallelism.clone()),
939        )]);
940
941        let commands = self
942            .scale_controller
943            .reschedule_fragment_inplace(fragment_policy)
944            .await?;
945
946        let _source_pause_guard = self.source_manager.pause_tick().await;
947
948        for (database_id, command) in commands {
949            self.barrier_scheduler
950                .run_command(database_id, command)
951                .await?;
952        }
953
954        Ok(())
955    }
956
957    pub(crate) async fn reschedule_fragments(
958        &self,
959        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
960    ) -> MetaResult<()> {
961        if fragment_targets.is_empty() {
962            return Ok(());
963        }
964
965        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
966
967        let fragment_policy = fragment_targets
968            .into_iter()
969            .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
970            .collect();
971
972        let commands = self
973            .scale_controller
974            .reschedule_fragment_inplace(fragment_policy)
975            .await?;
976
977        let _source_pause_guard = self.source_manager.pause_tick().await;
978
979        for (database_id, command) in commands {
980            self.barrier_scheduler
981                .run_command(database_id, command)
982                .await?;
983        }
984
985        Ok(())
986    }
987
988    // Don't need to add actor, just send a command
989    pub async fn create_subscription(
990        self: &Arc<Self>,
991        subscription: &Subscription,
992    ) -> MetaResult<()> {
993        let command = Command::CreateSubscription {
994            subscription_id: subscription.id,
995            upstream_mv_table_id: subscription.dependent_table_id,
996            retention_second: subscription.retention_seconds,
997        };
998
999        tracing::debug!("sending Command::CreateSubscription");
1000        self.barrier_scheduler
1001            .run_command(subscription.database_id, command)
1002            .await?;
1003        Ok(())
1004    }
1005
1006    // Don't need to add actor, just send a command
1007    pub async fn drop_subscription(
1008        self: &Arc<Self>,
1009        database_id: DatabaseId,
1010        subscription_id: SubscriptionId,
1011        table_id: TableId,
1012    ) {
1013        let command = Command::DropSubscription {
1014            subscription_id,
1015            upstream_mv_table_id: table_id,
1016        };
1017
1018        tracing::debug!("sending Command::DropSubscriptions");
1019        let _ = self
1020            .barrier_scheduler
1021            .run_command(database_id, command)
1022            .await
1023            .inspect_err(|err| {
1024                tracing::error!(error = ?err.as_report(), "failed to run drop command");
1025            });
1026    }
1027
1028    pub async fn alter_subscription_retention(
1029        self: &Arc<Self>,
1030        database_id: DatabaseId,
1031        subscription_id: SubscriptionId,
1032        table_id: TableId,
1033        retention_second: u64,
1034    ) -> MetaResult<()> {
1035        let command = Command::AlterSubscriptionRetention {
1036            subscription_id,
1037            upstream_mv_table_id: table_id,
1038            retention_second,
1039        };
1040
1041        tracing::debug!("sending Command::AlterSubscriptionRetention");
1042        self.barrier_scheduler
1043            .run_command(database_id, command)
1044            .await?;
1045        Ok(())
1046    }
1047}