risingwave_meta/stream/
stream_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, span};
19use futures::FutureExt;
20use futures::future::join_all;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_meta_model::ObjectId;
25use risingwave_pb::catalog::{CreateType, Subscription, Table};
26use risingwave_pb::meta::object::PbObjectInfo;
27use risingwave_pb::meta::subscribe_response::{Operation, PbInfo};
28use risingwave_pb::meta::{PbObject, PbObjectGroup};
29use thiserror_ext::AsReport;
30use tokio::sync::mpsc::Sender;
31use tokio::sync::{Mutex, oneshot};
32use tracing::Instrument;
33
34use super::{
35    FragmentBackfillOrder, JobParallelismTarget, JobReschedulePolicy, JobReschedulePostUpdates,
36    JobRescheduleTarget, JobResourceGroupTarget, Locations, RescheduleOptions, ScaleControllerRef,
37};
38use crate::barrier::{
39    BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
40    ReplaceStreamJobPlan, SnapshotBackfillInfo,
41};
42use crate::controller::catalog::DropTableConnectorContext;
43use crate::error::bail_invalid_parameter;
44use crate::manager::{
45    MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
46};
47use crate::model::{
48    ActorId, FragmentDownstreamRelation, FragmentId, FragmentNewNoShuffle, FragmentReplaceUpstream,
49    StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
50};
51use crate::stream::{SourceChange, SourceManagerRef};
52use crate::{MetaError, MetaResult};
53
54pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
55
56#[derive(Default)]
57pub struct CreateStreamingJobOption {
58    // leave empty as a placeholder for future option if there is any
59}
60
61/// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job.
62///
63/// Note: for better readability, keep this struct complete and immutable once created.
64pub struct CreateStreamingJobContext {
65    /// New fragment relation to add from upstream fragments to downstream fragments.
66    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
67    pub new_no_shuffle: FragmentNewNoShuffle,
68    pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
69
70    /// Internal tables in the streaming job.
71    pub internal_tables: BTreeMap<u32, Table>,
72
73    /// The locations of the actors to build in the streaming job.
74    pub building_locations: Locations,
75
76    /// The locations of the existing actors, essentially the upstream mview actors to update.
77    pub existing_locations: Locations,
78
79    /// DDL definition.
80    pub definition: String,
81
82    pub mv_table_id: Option<u32>,
83
84    pub create_type: CreateType,
85
86    pub job_type: StreamingJobType,
87
88    /// Context provided for potential replace table, typically used when sinking into a table.
89    pub replace_table_job_info: Option<(
90        StreamingJob,
91        ReplaceStreamJobContext,
92        StreamJobFragmentsToCreate,
93    )>,
94
95    pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
96    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
97
98    pub option: CreateStreamingJobOption,
99
100    pub streaming_job: StreamingJob,
101
102    pub fragment_backfill_ordering: FragmentBackfillOrder,
103}
104
105impl CreateStreamingJobContext {
106    pub fn internal_tables(&self) -> Vec<Table> {
107        self.internal_tables.values().cloned().collect()
108    }
109}
110
111pub enum CreatingState {
112    Failed { reason: MetaError },
113    // sender is used to notify the canceling result.
114    Canceling { finish_tx: oneshot::Sender<()> },
115    Created { version: NotificationVersion },
116}
117
118struct StreamingJobExecution {
119    id: TableId,
120    shutdown_tx: Option<Sender<CreatingState>>,
121}
122
123impl StreamingJobExecution {
124    fn new(id: TableId, shutdown_tx: Sender<CreatingState>) -> Self {
125        Self {
126            id,
127            shutdown_tx: Some(shutdown_tx),
128        }
129    }
130}
131
132#[derive(Default)]
133struct CreatingStreamingJobInfo {
134    streaming_jobs: Mutex<HashMap<TableId, StreamingJobExecution>>,
135}
136
137impl CreatingStreamingJobInfo {
138    async fn add_job(&self, job: StreamingJobExecution) {
139        let mut jobs = self.streaming_jobs.lock().await;
140        jobs.insert(job.id, job);
141    }
142
143    async fn delete_job(&self, job_id: TableId) {
144        let mut jobs = self.streaming_jobs.lock().await;
145        jobs.remove(&job_id);
146    }
147
148    async fn cancel_jobs(
149        &self,
150        job_ids: Vec<TableId>,
151    ) -> (HashMap<TableId, oneshot::Receiver<()>>, Vec<TableId>) {
152        let mut jobs = self.streaming_jobs.lock().await;
153        let mut receivers = HashMap::new();
154        let mut recovered_job_ids = vec![];
155        for job_id in job_ids {
156            if let Some(job) = jobs.get_mut(&job_id) {
157                if let Some(shutdown_tx) = job.shutdown_tx.take() {
158                    let (tx, rx) = oneshot::channel();
159                    if shutdown_tx
160                        .send(CreatingState::Canceling { finish_tx: tx })
161                        .await
162                        .is_ok()
163                    {
164                        receivers.insert(job_id, rx);
165                    } else {
166                        tracing::warn!(id=?job_id, "failed to send canceling state");
167                    }
168                }
169            } else {
170                // If these job ids do not exist in streaming_jobs,
171                // we can infer they either:
172                // 1. are entirely non-existent,
173                // 2. OR they are recovered streaming jobs, and managed by BarrierManager.
174                recovered_job_ids.push(job_id);
175            }
176        }
177        (receivers, recovered_job_ids)
178    }
179}
180
181type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
182
183/// [`ReplaceStreamJobContext`] carries one-time infos for replacing the plan of an existing stream job.
184///
185/// Note: for better readability, keep this struct complete and immutable once created.
186pub struct ReplaceStreamJobContext {
187    /// The old job fragments to be replaced.
188    pub old_fragments: StreamJobFragments,
189
190    /// The updates to be applied to the downstream chain actors. Used for schema change.
191    pub replace_upstream: FragmentReplaceUpstream,
192    pub new_no_shuffle: FragmentNewNoShuffle,
193
194    /// New fragment relation to add from existing upstream fragment to downstream fragment.
195    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
196
197    /// The locations of the actors to build in the new job to replace.
198    pub building_locations: Locations,
199
200    /// The locations of the existing actors, essentially the downstream chain actors to update.
201    pub existing_locations: Locations,
202
203    pub streaming_job: StreamingJob,
204
205    pub tmp_id: u32,
206
207    /// Used for dropping an associated source. Dropping source and related internal tables.
208    pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
209}
210
211/// `GlobalStreamManager` manages all the streams in the system.
212pub struct GlobalStreamManager {
213    pub env: MetaSrvEnv,
214
215    pub metadata_manager: MetadataManager,
216
217    /// Broadcasts and collect barriers
218    pub barrier_scheduler: BarrierScheduler,
219
220    /// Maintains streaming sources from external system like kafka
221    pub source_manager: SourceManagerRef,
222
223    /// Creating streaming job info.
224    creating_job_info: CreatingStreamingJobInfoRef,
225
226    pub scale_controller: ScaleControllerRef,
227}
228
229impl GlobalStreamManager {
230    pub fn new(
231        env: MetaSrvEnv,
232        metadata_manager: MetadataManager,
233        barrier_scheduler: BarrierScheduler,
234        source_manager: SourceManagerRef,
235        scale_controller: ScaleControllerRef,
236    ) -> MetaResult<Self> {
237        Ok(Self {
238            env,
239            metadata_manager,
240            barrier_scheduler,
241            source_manager,
242            creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
243            scale_controller,
244        })
245    }
246
247    /// Create streaming job, it works as follows:
248    ///
249    /// 1. Broadcast the actor info based on the scheduling result in the context, build the hanging
250    ///    channels in upstream worker nodes.
251    /// 2. (optional) Get the split information of the `StreamSource` via source manager and patch
252    ///    actors.
253    /// 3. Notify related worker nodes to update and build the actors.
254    /// 4. Store related meta data.
255    ///
256    /// This function is a wrapper over [`Self::run_create_streaming_job_command`].
257    #[await_tree::instrument]
258    pub async fn create_streaming_job(
259        self: &Arc<Self>,
260        stream_job_fragments: StreamJobFragmentsToCreate,
261        ctx: CreateStreamingJobContext,
262        run_command_notifier: Option<oneshot::Sender<MetaResult<()>>>,
263    ) -> MetaResult<NotificationVersion> {
264        let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
265        let await_tree_span = span!(
266            "{:?}({})",
267            ctx.streaming_job.job_type(),
268            ctx.streaming_job.name()
269        );
270
271        let table_id = stream_job_fragments.stream_job_id();
272        let database_id = ctx.streaming_job.database_id().into();
273        let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
274        let execution = StreamingJobExecution::new(table_id, sender.clone());
275        self.creating_job_info.add_job(execution).await;
276
277        let stream_manager = self.clone();
278        let fut = async move {
279            let res: MetaResult<_> = try {
280                let (source_change, streaming_job) = stream_manager
281                    .run_create_streaming_job_command(stream_job_fragments, ctx)
282                    .inspect(move |result| {
283                        if let Some(tx) = run_command_notifier {
284                            let _ = tx.send(match result {
285                                Ok(_) => {
286                                    Ok(())
287                                }
288                                Err(err) => {
289                                    Err(err.clone())
290                                }
291                            });
292                        }
293                    })
294                    .await?;
295                let version = stream_manager
296                    .metadata_manager
297                    .wait_streaming_job_finished(
298                        streaming_job.database_id().into(),
299                        streaming_job.id() as _,
300                    )
301                    .await?;
302                stream_manager.source_manager
303                    .apply_source_change(source_change)
304                    .await;
305                tracing::debug!(?streaming_job, "stream job finish");
306                version
307            };
308
309            match res {
310                Ok(version) => {
311                    let _ = sender
312                        .send(CreatingState::Created { version })
313                        .await
314                        .inspect_err(|_| tracing::warn!("failed to notify created: {table_id}"));
315                }
316                Err(err) => {
317                    let _ = sender
318                        .send(CreatingState::Failed {
319                            reason: err.clone(),
320                        })
321                        .await
322                        .inspect_err(|_| {
323                            tracing::warn!(error = %err.as_report(), "failed to notify failed: {table_id}")
324                        });
325                }
326            }
327        }
328        .in_current_span();
329
330        let fut = (self.env.await_tree_reg())
331            .register(await_tree_key, await_tree_span)
332            .instrument(Box::pin(fut));
333        tokio::spawn(fut);
334
335        while let Some(state) = receiver
336            .recv()
337            .instrument_await("recv_creating_state")
338            .await
339        {
340            match state {
341                CreatingState::Failed { reason } => {
342                    tracing::debug!(id=?table_id, "stream job failed");
343                    // FIXME(kwannoel): For creating stream jobs
344                    // we need to clean up the resources in the stream manager.
345                    self.creating_job_info.delete_job(table_id).await;
346                    return Err(reason);
347                }
348                CreatingState::Canceling { finish_tx } => {
349                    tracing::debug!(id=?table_id, "cancelling streaming job");
350                    if let Ok(table_fragments) = self
351                        .metadata_manager
352                        .get_job_fragments_by_id(&table_id)
353                        .await
354                    {
355                        // try to cancel buffered creating command.
356                        if self
357                            .barrier_scheduler
358                            .try_cancel_scheduled_create(database_id, table_id)
359                        {
360                            tracing::debug!("cancelling streaming job {table_id} in buffer queue.");
361                        } else if !table_fragments.is_created() {
362                            tracing::debug!(
363                                "cancelling streaming job {table_id} by issue cancel command."
364                            );
365                            self.metadata_manager
366                                .catalog_controller
367                                .try_abort_creating_streaming_job(table_id.table_id as _, true)
368                                .await?;
369
370                            self.barrier_scheduler
371                                .run_command(database_id, Command::cancel(&table_fragments))
372                                .await?;
373                        } else {
374                            // streaming job is already completed.
375                            continue;
376                        }
377                        let _ = finish_tx.send(()).inspect_err(|_| {
378                            tracing::warn!("failed to notify cancelled: {table_id}")
379                        });
380                        self.creating_job_info.delete_job(table_id).await;
381                        return Err(MetaError::cancelled("create"));
382                    }
383                }
384                CreatingState::Created { version } => {
385                    self.creating_job_info.delete_job(table_id).await;
386                    return Ok(version);
387                }
388            }
389        }
390        self.creating_job_info.delete_job(table_id).await;
391        bail!("receiver failed to get notification version for finished stream job")
392    }
393
394    /// The function will only return after backfilling finishes
395    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
396    #[await_tree::instrument]
397    async fn run_create_streaming_job_command(
398        &self,
399        stream_job_fragments: StreamJobFragmentsToCreate,
400        CreateStreamingJobContext {
401            streaming_job,
402            upstream_fragment_downstreams,
403            new_no_shuffle,
404            upstream_actors,
405            definition,
406            create_type,
407            job_type,
408            replace_table_job_info,
409            internal_tables,
410            snapshot_backfill_info,
411            cross_db_snapshot_backfill_info,
412            fragment_backfill_ordering,
413            ..
414        }: CreateStreamingJobContext,
415    ) -> MetaResult<(SourceChange, StreamingJob)> {
416        let mut replace_table_command = None;
417
418        tracing::debug!(
419            table_id = %stream_job_fragments.stream_job_id(),
420            "built actors finished"
421        );
422
423        if let Some((streaming_job, context, stream_job_fragments)) = replace_table_job_info {
424            self.metadata_manager
425                .catalog_controller
426                .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
427                .await?;
428
429            let tmp_table_id = stream_job_fragments.stream_job_id();
430            let init_split_assignment = self
431                .source_manager
432                .allocate_splits(&stream_job_fragments)
433                .await?;
434
435            replace_table_command = Some(ReplaceStreamJobPlan {
436                old_fragments: context.old_fragments,
437                new_fragments: stream_job_fragments,
438                replace_upstream: context.replace_upstream,
439                upstream_fragment_downstreams: context.upstream_fragment_downstreams,
440                init_split_assignment,
441                streaming_job,
442                tmp_id: tmp_table_id.table_id,
443                to_drop_state_table_ids: Vec::new(), /* the create streaming job command will not drop any state table */
444            });
445        }
446
447        // Here we need to consider:
448        // - Shared source
449        // - Table with connector
450        // - MV on shared source
451        let mut init_split_assignment = self
452            .source_manager
453            .allocate_splits(&stream_job_fragments)
454            .await?;
455        init_split_assignment.extend(
456            self.source_manager
457                .allocate_splits_for_backfill(
458                    &stream_job_fragments,
459                    &new_no_shuffle,
460                    &upstream_actors,
461                )
462                .await?,
463        );
464
465        let source_change = SourceChange::CreateJobFinished {
466            finished_backfill_fragments: stream_job_fragments.source_backfill_fragments()?,
467        };
468
469        let info = CreateStreamingJobCommandInfo {
470            stream_job_fragments,
471            upstream_fragment_downstreams,
472            init_split_assignment,
473            definition: definition.clone(),
474            streaming_job: streaming_job.clone(),
475            internal_tables: internal_tables.into_values().collect_vec(),
476            job_type,
477            create_type,
478            fragment_backfill_ordering,
479        };
480
481        let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
482            tracing::debug!(
483                ?snapshot_backfill_info,
484                "sending Command::CreateSnapshotBackfillStreamingJob"
485            );
486            CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
487        } else {
488            tracing::debug!("sending Command::CreateStreamingJob");
489            if let Some(replace_table_command) = replace_table_command {
490                CreateStreamingJobType::SinkIntoTable(replace_table_command)
491            } else {
492                CreateStreamingJobType::Normal
493            }
494        };
495
496        let command = Command::CreateStreamingJob {
497            info,
498            job_type,
499            cross_db_snapshot_backfill_info,
500        };
501
502        self.barrier_scheduler
503            .run_command(streaming_job.database_id().into(), command)
504            .await?;
505
506        tracing::debug!(?streaming_job, "first barrier collected for stream job");
507
508        Ok((source_change, streaming_job))
509    }
510
511    /// Send replace job command to barrier scheduler.
512    pub async fn replace_stream_job(
513        &self,
514        new_fragments: StreamJobFragmentsToCreate,
515        ReplaceStreamJobContext {
516            old_fragments,
517            replace_upstream,
518            new_no_shuffle,
519            upstream_fragment_downstreams,
520            tmp_id,
521            streaming_job,
522            drop_table_connector_ctx,
523            ..
524        }: ReplaceStreamJobContext,
525    ) -> MetaResult<()> {
526        let init_split_assignment = if streaming_job.is_source() {
527            self.source_manager
528                .allocate_splits_for_replace_source(
529                    &new_fragments,
530                    &replace_upstream,
531                    &new_no_shuffle,
532                )
533                .await?
534        } else {
535            self.source_manager.allocate_splits(&new_fragments).await?
536        };
537        tracing::info!(
538            "replace_stream_job - allocate split: {:?}",
539            init_split_assignment
540        );
541
542        self.barrier_scheduler
543            .run_command(
544                streaming_job.database_id().into(),
545                Command::ReplaceStreamJob(ReplaceStreamJobPlan {
546                    old_fragments,
547                    new_fragments,
548                    replace_upstream,
549                    upstream_fragment_downstreams,
550                    init_split_assignment,
551                    streaming_job,
552                    tmp_id,
553                    to_drop_state_table_ids: {
554                        if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
555                            vec![TableId::new(
556                                drop_table_connector_ctx.to_remove_state_table_id as _,
557                            )]
558                        } else {
559                            Vec::new()
560                        }
561                    },
562                }),
563            )
564            .await?;
565
566        Ok(())
567    }
568
569    /// Drop streaming jobs by barrier manager, and clean up all related resources. The error will
570    /// be ignored because the recovery process will take over it in cleaning part. Check
571    /// [`Command::DropStreamingJobs`] for details.
572    pub async fn drop_streaming_jobs(
573        &self,
574        database_id: DatabaseId,
575        removed_actors: Vec<ActorId>,
576        streaming_job_ids: Vec<ObjectId>,
577        state_table_ids: Vec<risingwave_meta_model::TableId>,
578        fragment_ids: HashSet<FragmentId>,
579    ) {
580        if !removed_actors.is_empty()
581            || !streaming_job_ids.is_empty()
582            || !state_table_ids.is_empty()
583        {
584            let res = self
585                .barrier_scheduler
586                .run_command(
587                    database_id,
588                    Command::DropStreamingJobs {
589                        table_fragments_ids: streaming_job_ids
590                            .iter()
591                            .map(|job_id| TableId::new(*job_id as _))
592                            .collect(),
593                        actors: removed_actors,
594                        unregistered_state_table_ids: state_table_ids
595                            .iter()
596                            .map(|table_id| TableId::new(*table_id as _))
597                            .collect(),
598                        unregistered_fragment_ids: fragment_ids,
599                    },
600                )
601                .await
602                .inspect_err(|err| {
603                    tracing::error!(error = ?err.as_report(), "failed to run drop command");
604                });
605            if res.is_ok() {
606                self.post_dropping_streaming_jobs(state_table_ids).await;
607            }
608        }
609    }
610
611    async fn post_dropping_streaming_jobs(
612        &self,
613        state_table_ids: Vec<risingwave_meta_model::TableId>,
614    ) {
615        let tables = self
616            .metadata_manager
617            .catalog_controller
618            .complete_dropped_tables(state_table_ids.into_iter())
619            .await;
620        let objects = tables
621            .into_iter()
622            .map(|t| PbObject {
623                object_info: Some(PbObjectInfo::Table(t)),
624            })
625            .collect();
626        let group = PbInfo::ObjectGroup(PbObjectGroup { objects });
627        self.env
628            .notification_manager()
629            .notify_hummock(Operation::Delete, group.clone())
630            .await;
631        self.env
632            .notification_manager()
633            .notify_compactor(Operation::Delete, group)
634            .await;
635    }
636
637    /// Cancel streaming jobs and return the canceled table ids.
638    /// 1. Send cancel message to stream jobs (via `cancel_jobs`).
639    /// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
640    ///
641    /// Cleanup of their state will be cleaned up after the `CancelStreamJob` command succeeds,
642    /// by the barrier manager for both of them.
643    pub async fn cancel_streaming_jobs(&self, table_ids: Vec<TableId>) -> Vec<TableId> {
644        if table_ids.is_empty() {
645            return vec![];
646        }
647
648        let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
649        let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;
650
651        let futures = receivers.into_iter().map(|(id, receiver)| async move {
652            if receiver.await.is_ok() {
653                tracing::info!("canceled streaming job {id}");
654                Some(id)
655            } else {
656                tracing::warn!("failed to cancel streaming job {id}");
657                None
658            }
659        });
660        let mut cancelled_ids = join_all(futures).await.into_iter().flatten().collect_vec();
661
662        // NOTE(kwannoel): For recovered stream jobs, we can directly cancel them by running the barrier command,
663        // since Barrier manager manages the recovered stream jobs.
664        let futures = recovered_job_ids.into_iter().map(|id| async move {
665            tracing::debug!(?id, "cancelling recovered streaming job");
666            let result: MetaResult<()> = try {
667                let fragment = self
668                    .metadata_manager.get_job_fragments_by_id(&id)
669                    .await?;
670                if fragment.is_created() {
671                    Err(MetaError::invalid_parameter(format!(
672                        "streaming job {} is already created",
673                        id
674                    )))?;
675                }
676
677                let (_, database_id) = self.metadata_manager
678                    .catalog_controller
679                    .try_abort_creating_streaming_job(id.table_id as _, true)
680                    .await?;
681
682                if let Some(database_id) = database_id {
683                    self.barrier_scheduler
684                        .run_command(DatabaseId::new(database_id as _), Command::cancel(&fragment))
685                        .await?;
686                }
687            };
688            match result {
689                Ok(_) => {
690                    tracing::info!(?id, "cancelled recovered streaming job");
691                    Some(id)
692                }
693                Err(err) => {
694                    tracing::error!(error=?err.as_report(), "failed to cancel recovered streaming job {id}, does it correspond to any jobs in `SHOW JOBS`?");
695                    None
696                }
697            }
698        });
699        let cancelled_recovered_ids = join_all(futures).await.into_iter().flatten().collect_vec();
700
701        cancelled_ids.extend(cancelled_recovered_ids);
702        cancelled_ids
703    }
704
705    pub(crate) async fn reschedule_streaming_job(
706        &self,
707        job_id: u32,
708        target: JobRescheduleTarget,
709        deferred: bool,
710    ) -> MetaResult<()> {
711        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
712        let background_jobs = self
713            .metadata_manager
714            .list_background_creating_jobs()
715            .await?;
716
717        if !background_jobs.is_empty() {
718            let related_jobs = self
719                .scale_controller
720                .resolve_related_no_shuffle_jobs(&background_jobs)
721                .await?;
722
723            for job in background_jobs {
724                if related_jobs.contains(&job) {
725                    bail!(
726                        "Cannot alter the job {} because the related job {} is currently being created",
727                        job_id,
728                        job.table_id
729                    );
730                }
731            }
732        }
733
734        let JobRescheduleTarget {
735            parallelism: parallelism_change,
736            resource_group: resource_group_change,
737        } = target;
738
739        let database_id = DatabaseId::new(
740            self.metadata_manager
741                .catalog_controller
742                .get_object_database_id(job_id as ObjectId)
743                .await? as _,
744        );
745        let job_id = TableId::new(job_id);
746
747        let worker_nodes = self
748            .metadata_manager
749            .list_active_streaming_compute_nodes()
750            .await?
751            .into_iter()
752            .filter(|w| w.is_streaming_schedulable())
753            .collect_vec();
754
755        // Check if the provided parallelism is valid.
756        let available_parallelism = worker_nodes
757            .iter()
758            .map(|w| w.compute_node_parallelism())
759            .sum::<usize>();
760        let max_parallelism = self
761            .metadata_manager
762            .get_job_max_parallelism(job_id)
763            .await?;
764
765        if let JobParallelismTarget::Update(parallelism) = parallelism_change {
766            match parallelism {
767                TableParallelism::Adaptive => {
768                    if available_parallelism > max_parallelism {
769                        tracing::warn!(
770                            "too many parallelism available, use max parallelism {} will be limited",
771                            max_parallelism
772                        );
773                    }
774                }
775                TableParallelism::Fixed(parallelism) => {
776                    if parallelism > max_parallelism {
777                        bail_invalid_parameter!(
778                            "specified parallelism {} should not exceed max parallelism {}",
779                            parallelism,
780                            max_parallelism
781                        );
782                    }
783                }
784                TableParallelism::Custom => {
785                    bail_invalid_parameter!("should not alter parallelism to custom")
786                }
787            }
788        }
789
790        let table_parallelism_assignment = match &parallelism_change {
791            JobParallelismTarget::Update(parallelism) => HashMap::from([(job_id, *parallelism)]),
792            JobParallelismTarget::Refresh => HashMap::new(),
793        };
794        let resource_group_assignment = match &resource_group_change {
795            JobResourceGroupTarget::Update(target) => {
796                HashMap::from([(job_id.table_id() as ObjectId, target.clone())])
797            }
798            JobResourceGroupTarget::Keep => HashMap::new(),
799        };
800
801        if deferred {
802            tracing::debug!(
803                "deferred mode enabled for job {}, set the parallelism directly to parallelism {:?}, resource group {:?}",
804                job_id,
805                parallelism_change,
806                resource_group_change,
807            );
808            self.scale_controller
809                .post_apply_reschedule(
810                    &HashMap::new(),
811                    &JobReschedulePostUpdates {
812                        parallelism_updates: table_parallelism_assignment,
813                        resource_group_updates: resource_group_assignment,
814                    },
815                )
816                .await?;
817        } else {
818            let reschedule_plan = self
819                .scale_controller
820                .generate_job_reschedule_plan(JobReschedulePolicy {
821                    targets: HashMap::from([(
822                        job_id.table_id,
823                        JobRescheduleTarget {
824                            parallelism: parallelism_change,
825                            resource_group: resource_group_change,
826                        },
827                    )]),
828                })
829                .await?;
830
831            if reschedule_plan.reschedules.is_empty() {
832                tracing::debug!(
833                    "empty reschedule plan generated for job {}, set the parallelism directly to {:?}",
834                    job_id,
835                    reschedule_plan.post_updates
836                );
837                self.scale_controller
838                    .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
839                    .await?;
840            } else {
841                self.reschedule_actors(
842                    database_id,
843                    reschedule_plan,
844                    RescheduleOptions {
845                        resolve_no_shuffle_upstream: false,
846                        skip_create_new_actors: false,
847                    },
848                )
849                .await?;
850            }
851        };
852
853        Ok(())
854    }
855
856    // Don't need to add actor, just send a command
857    pub async fn create_subscription(
858        self: &Arc<Self>,
859        subscription: &Subscription,
860    ) -> MetaResult<()> {
861        let command = Command::CreateSubscription {
862            subscription_id: subscription.id,
863            upstream_mv_table_id: TableId::new(subscription.dependent_table_id),
864            retention_second: subscription.retention_seconds,
865        };
866
867        tracing::debug!("sending Command::CreateSubscription");
868        self.barrier_scheduler
869            .run_command(subscription.database_id.into(), command)
870            .await?;
871        Ok(())
872    }
873
874    // Don't need to add actor, just send a command
875    pub async fn drop_subscription(
876        self: &Arc<Self>,
877        database_id: DatabaseId,
878        subscription_id: u32,
879        table_id: u32,
880    ) {
881        let command = Command::DropSubscription {
882            subscription_id,
883            upstream_mv_table_id: TableId::new(table_id),
884        };
885
886        tracing::debug!("sending Command::DropSubscriptions");
887        let _ = self
888            .barrier_scheduler
889            .run_command(database_id, command)
890            .await
891            .inspect_err(|err| {
892                tracing::error!(error = ?err.as_report(), "failed to run drop command");
893            });
894    }
895}