risingwave_meta/stream/
stream_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use futures::FutureExt;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, TableId};
23use risingwave_meta_model::ObjectId;
24use risingwave_pb::catalog::{CreateType, Subscription, Table};
25use risingwave_pb::meta::object::PbObjectInfo;
26use risingwave_pb::meta::subscribe_response::{Operation, PbInfo};
27use risingwave_pb::meta::{PbObject, PbObjectGroup};
28use thiserror_ext::AsReport;
29use tokio::sync::mpsc::Sender;
30use tokio::sync::{Mutex, oneshot};
31use tracing::Instrument;
32
33use super::{
34    JobParallelismTarget, JobReschedulePolicy, JobReschedulePostUpdates, JobRescheduleTarget,
35    JobResourceGroupTarget, Locations, RescheduleOptions, ScaleControllerRef,
36};
37use crate::barrier::{
38    BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
39    ReplaceStreamJobPlan, SnapshotBackfillInfo,
40};
41use crate::controller::catalog::DropTableConnectorContext;
42use crate::error::bail_invalid_parameter;
43use crate::manager::{
44    MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
45};
46use crate::model::{
47    ActorId, FragmentDownstreamRelation, FragmentId, FragmentNewNoShuffle, FragmentReplaceUpstream,
48    StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
49};
50use crate::stream::{SourceChange, SourceManagerRef};
51use crate::{MetaError, MetaResult};
52
53pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
54
55#[derive(Default)]
56pub struct CreateStreamingJobOption {
57    // leave empty as a placeholder for future option if there is any
58}
59
60/// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job.
61///
62/// Note: for better readability, keep this struct complete and immutable once created.
63pub struct CreateStreamingJobContext {
64    /// New fragment relation to add from upstream fragments to downstream fragments.
65    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
66    pub new_no_shuffle: FragmentNewNoShuffle,
67    pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
68
69    /// Internal tables in the streaming job.
70    pub internal_tables: BTreeMap<u32, Table>,
71
72    /// The locations of the actors to build in the streaming job.
73    pub building_locations: Locations,
74
75    /// The locations of the existing actors, essentially the upstream mview actors to update.
76    pub existing_locations: Locations,
77
78    /// DDL definition.
79    pub definition: String,
80
81    pub mv_table_id: Option<u32>,
82
83    pub create_type: CreateType,
84
85    pub job_type: StreamingJobType,
86
87    /// Context provided for potential replace table, typically used when sinking into a table.
88    pub replace_table_job_info: Option<(
89        StreamingJob,
90        ReplaceStreamJobContext,
91        StreamJobFragmentsToCreate,
92    )>,
93
94    pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
95    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
96
97    pub option: CreateStreamingJobOption,
98
99    pub streaming_job: StreamingJob,
100}
101
102impl CreateStreamingJobContext {
103    pub fn internal_tables(&self) -> Vec<Table> {
104        self.internal_tables.values().cloned().collect()
105    }
106}
107
108pub enum CreatingState {
109    Failed { reason: MetaError },
110    // sender is used to notify the canceling result.
111    Canceling { finish_tx: oneshot::Sender<()> },
112    Created { version: NotificationVersion },
113}
114
115struct StreamingJobExecution {
116    id: TableId,
117    shutdown_tx: Option<Sender<CreatingState>>,
118}
119
120impl StreamingJobExecution {
121    fn new(id: TableId, shutdown_tx: Sender<CreatingState>) -> Self {
122        Self {
123            id,
124            shutdown_tx: Some(shutdown_tx),
125        }
126    }
127}
128
129#[derive(Default)]
130struct CreatingStreamingJobInfo {
131    streaming_jobs: Mutex<HashMap<TableId, StreamingJobExecution>>,
132}
133
134impl CreatingStreamingJobInfo {
135    async fn add_job(&self, job: StreamingJobExecution) {
136        let mut jobs = self.streaming_jobs.lock().await;
137        jobs.insert(job.id, job);
138    }
139
140    async fn delete_job(&self, job_id: TableId) {
141        let mut jobs = self.streaming_jobs.lock().await;
142        jobs.remove(&job_id);
143    }
144
145    async fn cancel_jobs(
146        &self,
147        job_ids: Vec<TableId>,
148    ) -> (HashMap<TableId, oneshot::Receiver<()>>, Vec<TableId>) {
149        let mut jobs = self.streaming_jobs.lock().await;
150        let mut receivers = HashMap::new();
151        let mut recovered_job_ids = vec![];
152        for job_id in job_ids {
153            if let Some(job) = jobs.get_mut(&job_id) {
154                if let Some(shutdown_tx) = job.shutdown_tx.take() {
155                    let (tx, rx) = oneshot::channel();
156                    if shutdown_tx
157                        .send(CreatingState::Canceling { finish_tx: tx })
158                        .await
159                        .is_ok()
160                    {
161                        receivers.insert(job_id, rx);
162                    } else {
163                        tracing::warn!(id=?job_id, "failed to send canceling state");
164                    }
165                }
166            } else {
167                // If these job ids do not exist in streaming_jobs,
168                // we can infer they either:
169                // 1. are entirely non-existent,
170                // 2. OR they are recovered streaming jobs, and managed by BarrierManager.
171                recovered_job_ids.push(job_id);
172            }
173        }
174        (receivers, recovered_job_ids)
175    }
176}
177
178type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
179
180/// [`ReplaceStreamJobContext`] carries one-time infos for replacing the plan of an existing stream job.
181///
182/// Note: for better readability, keep this struct complete and immutable once created.
183pub struct ReplaceStreamJobContext {
184    /// The old job fragments to be replaced.
185    pub old_fragments: StreamJobFragments,
186
187    /// The updates to be applied to the downstream chain actors. Used for schema change.
188    pub replace_upstream: FragmentReplaceUpstream,
189    pub new_no_shuffle: FragmentNewNoShuffle,
190
191    /// New fragment relation to add from existing upstream fragment to downstream fragment.
192    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
193
194    /// The locations of the actors to build in the new job to replace.
195    pub building_locations: Locations,
196
197    /// The locations of the existing actors, essentially the downstream chain actors to update.
198    pub existing_locations: Locations,
199
200    pub streaming_job: StreamingJob,
201
202    pub tmp_id: u32,
203
204    /// Used for dropping an associated source. Dropping source and related internal tables.
205    pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
206}
207
208/// `GlobalStreamManager` manages all the streams in the system.
209pub struct GlobalStreamManager {
210    pub env: MetaSrvEnv,
211
212    pub metadata_manager: MetadataManager,
213
214    /// Broadcasts and collect barriers
215    pub barrier_scheduler: BarrierScheduler,
216
217    /// Maintains streaming sources from external system like kafka
218    pub source_manager: SourceManagerRef,
219
220    /// Creating streaming job info.
221    creating_job_info: CreatingStreamingJobInfoRef,
222
223    pub scale_controller: ScaleControllerRef,
224}
225
226impl GlobalStreamManager {
227    pub fn new(
228        env: MetaSrvEnv,
229        metadata_manager: MetadataManager,
230        barrier_scheduler: BarrierScheduler,
231        source_manager: SourceManagerRef,
232        scale_controller: ScaleControllerRef,
233    ) -> MetaResult<Self> {
234        Ok(Self {
235            env,
236            metadata_manager,
237            barrier_scheduler,
238            source_manager,
239            creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
240            scale_controller,
241        })
242    }
243
244    /// Create streaming job, it works as follows:
245    ///
246    /// 1. Broadcast the actor info based on the scheduling result in the context, build the hanging
247    ///    channels in upstream worker nodes.
248    /// 2. (optional) Get the split information of the `StreamSource` via source manager and patch
249    ///    actors.
250    /// 3. Notify related worker nodes to update and build the actors.
251    /// 4. Store related meta data.
252    ///
253    /// This function is a wrapper over [`Self::run_create_streaming_job_command`].
254    pub async fn create_streaming_job(
255        self: &Arc<Self>,
256        stream_job_fragments: StreamJobFragmentsToCreate,
257        ctx: CreateStreamingJobContext,
258        run_command_notifier: Option<oneshot::Sender<MetaResult<()>>>,
259    ) -> MetaResult<NotificationVersion> {
260        let table_id = stream_job_fragments.stream_job_id();
261        let database_id = ctx.streaming_job.database_id().into();
262        let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
263        let execution = StreamingJobExecution::new(table_id, sender.clone());
264        self.creating_job_info.add_job(execution).await;
265
266        let stream_manager = self.clone();
267        let fut = async move {
268            let res: MetaResult<_> = try {
269                let (source_change, streaming_job) = stream_manager
270                    .run_create_streaming_job_command(stream_job_fragments, ctx)
271                    .inspect(move |result| {
272                        if let Some(tx) = run_command_notifier {
273                            let _ = tx.send(match result {
274                                Ok(_) => {
275                                    Ok(())
276                                }
277                                Err(err) => {
278                                    Err(err.clone())
279                                }
280                            });
281                        }
282                    })
283                    .await?;
284                let version = stream_manager
285                    .metadata_manager
286                    .wait_streaming_job_finished(
287                        streaming_job.database_id().into(),
288                        streaming_job.id() as _,
289                    )
290                    .await?;
291                stream_manager.source_manager.apply_source_change(source_change).await;
292                tracing::debug!(?streaming_job, "stream job finish");
293                version
294            };
295
296            match res {
297                Ok(version) => {
298                    let _ = sender
299                        .send(CreatingState::Created { version })
300                        .await
301                        .inspect_err(|_| tracing::warn!("failed to notify created: {table_id}"));
302                }
303                Err(err) => {
304                    let _ = sender
305                        .send(CreatingState::Failed {
306                            reason: err.clone(),
307                        })
308                        .await
309                        .inspect_err(|_| {
310                            tracing::warn!(error = %err.as_report(), "failed to notify failed: {table_id}")
311                        });
312                }
313            }
314        }
315        .in_current_span();
316        tokio::spawn(fut);
317
318        while let Some(state) = receiver.recv().await {
319            match state {
320                CreatingState::Failed { reason } => {
321                    tracing::debug!(id=?table_id, "stream job failed");
322                    // FIXME(kwannoel): For creating stream jobs
323                    // we need to clean up the resources in the stream manager.
324                    self.creating_job_info.delete_job(table_id).await;
325                    return Err(reason);
326                }
327                CreatingState::Canceling { finish_tx } => {
328                    tracing::debug!(id=?table_id, "cancelling streaming job");
329                    if let Ok(table_fragments) = self
330                        .metadata_manager
331                        .get_job_fragments_by_id(&table_id)
332                        .await
333                    {
334                        // try to cancel buffered creating command.
335                        if self
336                            .barrier_scheduler
337                            .try_cancel_scheduled_create(database_id, table_id)
338                        {
339                            tracing::debug!("cancelling streaming job {table_id} in buffer queue.");
340                        } else if !table_fragments.is_created() {
341                            tracing::debug!(
342                                "cancelling streaming job {table_id} by issue cancel command."
343                            );
344                            self.metadata_manager
345                                .catalog_controller
346                                .try_abort_creating_streaming_job(table_id.table_id as _, true)
347                                .await?;
348
349                            self.barrier_scheduler
350                                .run_command(database_id, Command::cancel(&table_fragments))
351                                .await?;
352                        } else {
353                            // streaming job is already completed.
354                            continue;
355                        }
356                        let _ = finish_tx.send(()).inspect_err(|_| {
357                            tracing::warn!("failed to notify cancelled: {table_id}")
358                        });
359                        self.creating_job_info.delete_job(table_id).await;
360                        return Err(MetaError::cancelled("create"));
361                    }
362                }
363                CreatingState::Created { version } => {
364                    self.creating_job_info.delete_job(table_id).await;
365                    return Ok(version);
366                }
367            }
368        }
369        self.creating_job_info.delete_job(table_id).await;
370        bail!("receiver failed to get notification version for finished stream job")
371    }
372
373    /// The function will only return after backfilling finishes
374    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
375    async fn run_create_streaming_job_command(
376        &self,
377        stream_job_fragments: StreamJobFragmentsToCreate,
378        CreateStreamingJobContext {
379            streaming_job,
380            upstream_fragment_downstreams,
381            new_no_shuffle,
382            upstream_actors,
383            definition,
384            create_type,
385            job_type,
386            replace_table_job_info,
387            internal_tables,
388            snapshot_backfill_info,
389            cross_db_snapshot_backfill_info,
390            ..
391        }: CreateStreamingJobContext,
392    ) -> MetaResult<(SourceChange, StreamingJob)> {
393        let mut replace_table_command = None;
394
395        tracing::debug!(
396            table_id = %stream_job_fragments.stream_job_id(),
397            "built actors finished"
398        );
399
400        if let Some((streaming_job, context, stream_job_fragments)) = replace_table_job_info {
401            self.metadata_manager
402                .catalog_controller
403                .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
404                .await?;
405
406            let tmp_table_id = stream_job_fragments.stream_job_id();
407            let init_split_assignment = self
408                .source_manager
409                .allocate_splits(&stream_job_fragments)
410                .await?;
411
412            replace_table_command = Some(ReplaceStreamJobPlan {
413                old_fragments: context.old_fragments,
414                new_fragments: stream_job_fragments,
415                replace_upstream: context.replace_upstream,
416                upstream_fragment_downstreams: context.upstream_fragment_downstreams,
417                init_split_assignment,
418                streaming_job,
419                tmp_id: tmp_table_id.table_id,
420                to_drop_state_table_ids: Vec::new(), /* the create streaming job command will not drop any state table */
421            });
422        }
423
424        // Here we need to consider:
425        // - Shared source
426        // - Table with connector
427        // - MV on shared source
428        let mut init_split_assignment = self
429            .source_manager
430            .allocate_splits(&stream_job_fragments)
431            .await?;
432        init_split_assignment.extend(
433            self.source_manager
434                .allocate_splits_for_backfill(
435                    &stream_job_fragments,
436                    &new_no_shuffle,
437                    &upstream_actors,
438                )
439                .await?,
440        );
441
442        let source_change = SourceChange::CreateJobFinished {
443            finished_backfill_fragments: stream_job_fragments.source_backfill_fragments()?,
444        };
445
446        let info = CreateStreamingJobCommandInfo {
447            stream_job_fragments,
448            upstream_fragment_downstreams,
449            init_split_assignment,
450            definition: definition.clone(),
451            streaming_job: streaming_job.clone(),
452            internal_tables: internal_tables.into_values().collect_vec(),
453            job_type,
454            create_type,
455        };
456
457        let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
458            tracing::debug!(
459                ?snapshot_backfill_info,
460                "sending Command::CreateSnapshotBackfillStreamingJob"
461            );
462            CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
463        } else {
464            tracing::debug!("sending Command::CreateStreamingJob");
465            if let Some(replace_table_command) = replace_table_command {
466                CreateStreamingJobType::SinkIntoTable(replace_table_command)
467            } else {
468                CreateStreamingJobType::Normal
469            }
470        };
471
472        let command = Command::CreateStreamingJob {
473            info,
474            job_type,
475            cross_db_snapshot_backfill_info,
476        };
477
478        self.barrier_scheduler
479            .run_command(streaming_job.database_id().into(), command)
480            .await?;
481
482        tracing::debug!(?streaming_job, "first barrier collected for stream job");
483
484        Ok((source_change, streaming_job))
485    }
486
487    /// Send replace job command to barrier scheduler.
488    pub async fn replace_stream_job(
489        &self,
490        new_fragments: StreamJobFragmentsToCreate,
491        ReplaceStreamJobContext {
492            old_fragments,
493            replace_upstream,
494            new_no_shuffle,
495            upstream_fragment_downstreams,
496            tmp_id,
497            streaming_job,
498            drop_table_connector_ctx,
499            ..
500        }: ReplaceStreamJobContext,
501    ) -> MetaResult<()> {
502        let init_split_assignment = if streaming_job.is_source() {
503            self.source_manager
504                .allocate_splits_for_replace_source(
505                    &new_fragments,
506                    &replace_upstream,
507                    &new_no_shuffle,
508                )
509                .await?
510        } else {
511            self.source_manager.allocate_splits(&new_fragments).await?
512        };
513        tracing::info!(
514            "replace_stream_job - allocate split: {:?}",
515            init_split_assignment
516        );
517
518        self.barrier_scheduler
519            .run_command(
520                streaming_job.database_id().into(),
521                Command::ReplaceStreamJob(ReplaceStreamJobPlan {
522                    old_fragments,
523                    new_fragments,
524                    replace_upstream,
525                    upstream_fragment_downstreams,
526                    init_split_assignment,
527                    streaming_job,
528                    tmp_id,
529                    to_drop_state_table_ids: {
530                        if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
531                            vec![TableId::new(
532                                drop_table_connector_ctx.to_remove_state_table_id as _,
533                            )]
534                        } else {
535                            Vec::new()
536                        }
537                    },
538                }),
539            )
540            .await?;
541
542        Ok(())
543    }
544
545    /// Drop streaming jobs by barrier manager, and clean up all related resources. The error will
546    /// be ignored because the recovery process will take over it in cleaning part. Check
547    /// [`Command::DropStreamingJobs`] for details.
548    pub async fn drop_streaming_jobs(
549        &self,
550        database_id: DatabaseId,
551        removed_actors: Vec<ActorId>,
552        streaming_job_ids: Vec<ObjectId>,
553        state_table_ids: Vec<risingwave_meta_model::TableId>,
554        fragment_ids: HashSet<FragmentId>,
555    ) {
556        if !removed_actors.is_empty()
557            || !streaming_job_ids.is_empty()
558            || !state_table_ids.is_empty()
559        {
560            let res = self
561                .barrier_scheduler
562                .run_command(
563                    database_id,
564                    Command::DropStreamingJobs {
565                        table_fragments_ids: streaming_job_ids
566                            .iter()
567                            .map(|job_id| TableId::new(*job_id as _))
568                            .collect(),
569                        actors: removed_actors,
570                        unregistered_state_table_ids: state_table_ids
571                            .iter()
572                            .map(|table_id| TableId::new(*table_id as _))
573                            .collect(),
574                        unregistered_fragment_ids: fragment_ids,
575                    },
576                )
577                .await
578                .inspect_err(|err| {
579                    tracing::error!(error = ?err.as_report(), "failed to run drop command");
580                });
581            if res.is_ok() {
582                self.post_dropping_streaming_jobs(state_table_ids).await;
583            }
584        }
585    }
586
587    async fn post_dropping_streaming_jobs(
588        &self,
589        state_table_ids: Vec<risingwave_meta_model::TableId>,
590    ) {
591        let tables = self
592            .metadata_manager
593            .catalog_controller
594            .complete_dropped_tables(state_table_ids.into_iter())
595            .await;
596        let objects = tables
597            .into_iter()
598            .map(|t| PbObject {
599                object_info: Some(PbObjectInfo::Table(t)),
600            })
601            .collect();
602        let group = PbInfo::ObjectGroup(PbObjectGroup { objects });
603        self.env
604            .notification_manager()
605            .notify_hummock(Operation::Delete, group.clone())
606            .await;
607        self.env
608            .notification_manager()
609            .notify_compactor(Operation::Delete, group)
610            .await;
611    }
612
613    /// Cancel streaming jobs and return the canceled table ids.
614    /// 1. Send cancel message to stream jobs (via `cancel_jobs`).
615    /// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
616    ///
617    /// Cleanup of their state will be cleaned up after the `CancelStreamJob` command succeeds,
618    /// by the barrier manager for both of them.
619    pub async fn cancel_streaming_jobs(&self, table_ids: Vec<TableId>) -> Vec<TableId> {
620        if table_ids.is_empty() {
621            return vec![];
622        }
623
624        let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
625        let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;
626
627        let futures = receivers.into_iter().map(|(id, receiver)| async move {
628            if receiver.await.is_ok() {
629                tracing::info!("canceled streaming job {id}");
630                Some(id)
631            } else {
632                tracing::warn!("failed to cancel streaming job {id}");
633                None
634            }
635        });
636        let mut cancelled_ids = join_all(futures).await.into_iter().flatten().collect_vec();
637
638        // NOTE(kwannoel): For recovered stream jobs, we can directly cancel them by running the barrier command,
639        // since Barrier manager manages the recovered stream jobs.
640        let futures = recovered_job_ids.into_iter().map(|id| async move {
641            tracing::debug!(?id, "cancelling recovered streaming job");
642            let result: MetaResult<()> = try {
643                let fragment = self
644                    .metadata_manager.get_job_fragments_by_id(&id)
645                    .await?;
646                if fragment.is_created() {
647                    Err(MetaError::invalid_parameter(format!(
648                        "streaming job {} is already created",
649                        id
650                    )))?;
651                }
652
653                let (_, database_id) = self.metadata_manager
654                    .catalog_controller
655                    .try_abort_creating_streaming_job(id.table_id as _, true)
656                    .await?;
657
658                if let Some(database_id) = database_id {
659                    self.barrier_scheduler
660                        .run_command(DatabaseId::new(database_id as _), Command::cancel(&fragment))
661                        .await?;
662                }
663            };
664            match result {
665                Ok(_) => {
666                    tracing::info!(?id, "cancelled recovered streaming job");
667                    Some(id)
668                }
669                Err(err) => {
670                    tracing::error!(error=?err.as_report(), "failed to cancel recovered streaming job {id}, does it correspond to any jobs in `SHOW JOBS`?");
671                    None
672                }
673            }
674        });
675        let cancelled_recovered_ids = join_all(futures).await.into_iter().flatten().collect_vec();
676
677        cancelled_ids.extend(cancelled_recovered_ids);
678        cancelled_ids
679    }
680
681    pub(crate) async fn reschedule_streaming_job(
682        &self,
683        job_id: u32,
684        target: JobRescheduleTarget,
685        deferred: bool,
686    ) -> MetaResult<()> {
687        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
688        let background_jobs = self
689            .metadata_manager
690            .list_background_creating_jobs()
691            .await?;
692
693        if !background_jobs.is_empty() {
694            let related_jobs = self
695                .scale_controller
696                .resolve_related_no_shuffle_jobs(&background_jobs)
697                .await?;
698
699            for job in background_jobs {
700                if related_jobs.contains(&job) {
701                    bail!(
702                        "Cannot alter the job {} because the related job {} is currently being created",
703                        job_id,
704                        job.table_id
705                    );
706                }
707            }
708        }
709
710        let JobRescheduleTarget {
711            parallelism: parallelism_change,
712            resource_group: resource_group_change,
713        } = target;
714
715        let database_id = DatabaseId::new(
716            self.metadata_manager
717                .catalog_controller
718                .get_object_database_id(job_id as ObjectId)
719                .await? as _,
720        );
721        let job_id = TableId::new(job_id);
722
723        let worker_nodes = self
724            .metadata_manager
725            .list_active_streaming_compute_nodes()
726            .await?
727            .into_iter()
728            .filter(|w| w.is_streaming_schedulable())
729            .collect_vec();
730
731        // Check if the provided parallelism is valid.
732        let available_parallelism = worker_nodes
733            .iter()
734            .map(|w| w.compute_node_parallelism())
735            .sum::<usize>();
736        let max_parallelism = self
737            .metadata_manager
738            .get_job_max_parallelism(job_id)
739            .await?;
740
741        if let JobParallelismTarget::Update(parallelism) = parallelism_change {
742            match parallelism {
743                TableParallelism::Adaptive => {
744                    if available_parallelism > max_parallelism {
745                        tracing::warn!(
746                            "too many parallelism available, use max parallelism {} will be limited",
747                            max_parallelism
748                        );
749                    }
750                }
751                TableParallelism::Fixed(parallelism) => {
752                    if parallelism > max_parallelism {
753                        bail_invalid_parameter!(
754                            "specified parallelism {} should not exceed max parallelism {}",
755                            parallelism,
756                            max_parallelism
757                        );
758                    }
759                }
760                TableParallelism::Custom => {
761                    bail_invalid_parameter!("should not alter parallelism to custom")
762                }
763            }
764        }
765
766        let table_parallelism_assignment = match &parallelism_change {
767            JobParallelismTarget::Update(parallelism) => HashMap::from([(job_id, *parallelism)]),
768            JobParallelismTarget::Refresh => HashMap::new(),
769        };
770        let resource_group_assignment = match &resource_group_change {
771            JobResourceGroupTarget::Update(target) => {
772                HashMap::from([(job_id.table_id() as ObjectId, target.clone())])
773            }
774            JobResourceGroupTarget::Keep => HashMap::new(),
775        };
776
777        if deferred {
778            tracing::debug!(
779                "deferred mode enabled for job {}, set the parallelism directly to parallelism {:?}, resource group {:?}",
780                job_id,
781                parallelism_change,
782                resource_group_change,
783            );
784            self.scale_controller
785                .post_apply_reschedule(
786                    &HashMap::new(),
787                    &JobReschedulePostUpdates {
788                        parallelism_updates: table_parallelism_assignment,
789                        resource_group_updates: resource_group_assignment,
790                    },
791                )
792                .await?;
793        } else {
794            let reschedule_plan = self
795                .scale_controller
796                .generate_job_reschedule_plan(JobReschedulePolicy {
797                    targets: HashMap::from([(
798                        job_id.table_id,
799                        JobRescheduleTarget {
800                            parallelism: parallelism_change,
801                            resource_group: resource_group_change,
802                        },
803                    )]),
804                })
805                .await?;
806
807            if reschedule_plan.reschedules.is_empty() {
808                tracing::debug!(
809                    "empty reschedule plan generated for job {}, set the parallelism directly to {:?}",
810                    job_id,
811                    reschedule_plan.post_updates
812                );
813                self.scale_controller
814                    .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
815                    .await?;
816            } else {
817                self.reschedule_actors(
818                    database_id,
819                    reschedule_plan,
820                    RescheduleOptions {
821                        resolve_no_shuffle_upstream: false,
822                        skip_create_new_actors: false,
823                    },
824                )
825                .await?;
826            }
827        };
828
829        Ok(())
830    }
831
832    // Don't need to add actor, just send a command
833    pub async fn create_subscription(
834        self: &Arc<Self>,
835        subscription: &Subscription,
836    ) -> MetaResult<()> {
837        let command = Command::CreateSubscription {
838            subscription_id: subscription.id,
839            upstream_mv_table_id: TableId::new(subscription.dependent_table_id),
840            retention_second: subscription.retention_seconds,
841        };
842
843        tracing::debug!("sending Command::CreateSubscription");
844        self.barrier_scheduler
845            .run_command(subscription.database_id.into(), command)
846            .await?;
847        Ok(())
848    }
849
850    // Don't need to add actor, just send a command
851    pub async fn drop_subscription(
852        self: &Arc<Self>,
853        database_id: DatabaseId,
854        subscription_id: u32,
855        table_id: u32,
856    ) {
857        let command = Command::DropSubscription {
858            subscription_id,
859            upstream_mv_table_id: TableId::new(table_id),
860        };
861
862        tracing::debug!("sending Command::DropSubscriptions");
863        let _ = self
864            .barrier_scheduler
865            .run_command(database_id, command)
866            .await
867            .inspect_err(|err| {
868                tracing::error!(error = ?err.as_report(), "failed to run drop command");
869            });
870    }
871}