risingwave_meta/stream/
stream_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use futures::FutureExt;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, TableId};
23use risingwave_meta_model::ObjectId;
24use risingwave_pb::catalog::{CreateType, Subscription, Table};
25use risingwave_pb::meta::object::PbObjectInfo;
26use risingwave_pb::meta::subscribe_response::{Operation, PbInfo};
27use risingwave_pb::meta::{PbObject, PbObjectGroup};
28use thiserror_ext::AsReport;
29use tokio::sync::mpsc::Sender;
30use tokio::sync::{Mutex, oneshot};
31use tracing::Instrument;
32
33use super::{
34    FragmentBackfillOrder, JobParallelismTarget, JobReschedulePolicy, JobReschedulePostUpdates,
35    JobRescheduleTarget, JobResourceGroupTarget, Locations, RescheduleOptions, ScaleControllerRef,
36};
37use crate::barrier::{
38    BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
39    ReplaceStreamJobPlan, SnapshotBackfillInfo,
40};
41use crate::controller::catalog::DropTableConnectorContext;
42use crate::error::bail_invalid_parameter;
43use crate::manager::{
44    MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
45};
46use crate::model::{
47    ActorId, FragmentDownstreamRelation, FragmentId, FragmentNewNoShuffle, FragmentReplaceUpstream,
48    StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
49};
50use crate::stream::{SourceChange, SourceManagerRef};
51use crate::{MetaError, MetaResult};
52
53pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
54
55#[derive(Default)]
56pub struct CreateStreamingJobOption {
57    // leave empty as a placeholder for future option if there is any
58}
59
60/// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job.
61///
62/// Note: for better readability, keep this struct complete and immutable once created.
63pub struct CreateStreamingJobContext {
64    /// New fragment relation to add from upstream fragments to downstream fragments.
65    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
66    pub new_no_shuffle: FragmentNewNoShuffle,
67    pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
68
69    /// Internal tables in the streaming job.
70    pub internal_tables: BTreeMap<u32, Table>,
71
72    /// The locations of the actors to build in the streaming job.
73    pub building_locations: Locations,
74
75    /// The locations of the existing actors, essentially the upstream mview actors to update.
76    pub existing_locations: Locations,
77
78    /// DDL definition.
79    pub definition: String,
80
81    pub mv_table_id: Option<u32>,
82
83    pub create_type: CreateType,
84
85    pub job_type: StreamingJobType,
86
87    /// Context provided for potential replace table, typically used when sinking into a table.
88    pub replace_table_job_info: Option<(
89        StreamingJob,
90        ReplaceStreamJobContext,
91        StreamJobFragmentsToCreate,
92    )>,
93
94    pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
95    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
96
97    pub option: CreateStreamingJobOption,
98
99    pub streaming_job: StreamingJob,
100
101    pub fragment_backfill_ordering: FragmentBackfillOrder,
102}
103
104impl CreateStreamingJobContext {
105    pub fn internal_tables(&self) -> Vec<Table> {
106        self.internal_tables.values().cloned().collect()
107    }
108}
109
110pub enum CreatingState {
111    Failed { reason: MetaError },
112    // sender is used to notify the canceling result.
113    Canceling { finish_tx: oneshot::Sender<()> },
114    Created { version: NotificationVersion },
115}
116
117struct StreamingJobExecution {
118    id: TableId,
119    shutdown_tx: Option<Sender<CreatingState>>,
120}
121
122impl StreamingJobExecution {
123    fn new(id: TableId, shutdown_tx: Sender<CreatingState>) -> Self {
124        Self {
125            id,
126            shutdown_tx: Some(shutdown_tx),
127        }
128    }
129}
130
131#[derive(Default)]
132struct CreatingStreamingJobInfo {
133    streaming_jobs: Mutex<HashMap<TableId, StreamingJobExecution>>,
134}
135
136impl CreatingStreamingJobInfo {
137    async fn add_job(&self, job: StreamingJobExecution) {
138        let mut jobs = self.streaming_jobs.lock().await;
139        jobs.insert(job.id, job);
140    }
141
142    async fn delete_job(&self, job_id: TableId) {
143        let mut jobs = self.streaming_jobs.lock().await;
144        jobs.remove(&job_id);
145    }
146
147    async fn cancel_jobs(
148        &self,
149        job_ids: Vec<TableId>,
150    ) -> (HashMap<TableId, oneshot::Receiver<()>>, Vec<TableId>) {
151        let mut jobs = self.streaming_jobs.lock().await;
152        let mut receivers = HashMap::new();
153        let mut recovered_job_ids = vec![];
154        for job_id in job_ids {
155            if let Some(job) = jobs.get_mut(&job_id) {
156                if let Some(shutdown_tx) = job.shutdown_tx.take() {
157                    let (tx, rx) = oneshot::channel();
158                    if shutdown_tx
159                        .send(CreatingState::Canceling { finish_tx: tx })
160                        .await
161                        .is_ok()
162                    {
163                        receivers.insert(job_id, rx);
164                    } else {
165                        tracing::warn!(id=?job_id, "failed to send canceling state");
166                    }
167                }
168            } else {
169                // If these job ids do not exist in streaming_jobs,
170                // we can infer they either:
171                // 1. are entirely non-existent,
172                // 2. OR they are recovered streaming jobs, and managed by BarrierManager.
173                recovered_job_ids.push(job_id);
174            }
175        }
176        (receivers, recovered_job_ids)
177    }
178}
179
180type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
181
182/// [`ReplaceStreamJobContext`] carries one-time infos for replacing the plan of an existing stream job.
183///
184/// Note: for better readability, keep this struct complete and immutable once created.
185pub struct ReplaceStreamJobContext {
186    /// The old job fragments to be replaced.
187    pub old_fragments: StreamJobFragments,
188
189    /// The updates to be applied to the downstream chain actors. Used for schema change.
190    pub replace_upstream: FragmentReplaceUpstream,
191    pub new_no_shuffle: FragmentNewNoShuffle,
192
193    /// New fragment relation to add from existing upstream fragment to downstream fragment.
194    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
195
196    /// The locations of the actors to build in the new job to replace.
197    pub building_locations: Locations,
198
199    /// The locations of the existing actors, essentially the downstream chain actors to update.
200    pub existing_locations: Locations,
201
202    pub streaming_job: StreamingJob,
203
204    pub tmp_id: u32,
205
206    /// Used for dropping an associated source. Dropping source and related internal tables.
207    pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
208}
209
210/// `GlobalStreamManager` manages all the streams in the system.
211pub struct GlobalStreamManager {
212    pub env: MetaSrvEnv,
213
214    pub metadata_manager: MetadataManager,
215
216    /// Broadcasts and collect barriers
217    pub barrier_scheduler: BarrierScheduler,
218
219    /// Maintains streaming sources from external system like kafka
220    pub source_manager: SourceManagerRef,
221
222    /// Creating streaming job info.
223    creating_job_info: CreatingStreamingJobInfoRef,
224
225    pub scale_controller: ScaleControllerRef,
226}
227
228impl GlobalStreamManager {
229    pub fn new(
230        env: MetaSrvEnv,
231        metadata_manager: MetadataManager,
232        barrier_scheduler: BarrierScheduler,
233        source_manager: SourceManagerRef,
234        scale_controller: ScaleControllerRef,
235    ) -> MetaResult<Self> {
236        Ok(Self {
237            env,
238            metadata_manager,
239            barrier_scheduler,
240            source_manager,
241            creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
242            scale_controller,
243        })
244    }
245
246    /// Create streaming job, it works as follows:
247    ///
248    /// 1. Broadcast the actor info based on the scheduling result in the context, build the hanging
249    ///    channels in upstream worker nodes.
250    /// 2. (optional) Get the split information of the `StreamSource` via source manager and patch
251    ///    actors.
252    /// 3. Notify related worker nodes to update and build the actors.
253    /// 4. Store related meta data.
254    ///
255    /// This function is a wrapper over [`Self::run_create_streaming_job_command`].
256    pub async fn create_streaming_job(
257        self: &Arc<Self>,
258        stream_job_fragments: StreamJobFragmentsToCreate,
259        ctx: CreateStreamingJobContext,
260        run_command_notifier: Option<oneshot::Sender<MetaResult<()>>>,
261    ) -> MetaResult<NotificationVersion> {
262        let table_id = stream_job_fragments.stream_job_id();
263        let database_id = ctx.streaming_job.database_id().into();
264        let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
265        let execution = StreamingJobExecution::new(table_id, sender.clone());
266        self.creating_job_info.add_job(execution).await;
267
268        let stream_manager = self.clone();
269        let fut = async move {
270            let res: MetaResult<_> = try {
271                let (source_change, streaming_job) = stream_manager
272                    .run_create_streaming_job_command(stream_job_fragments, ctx)
273                    .inspect(move |result| {
274                        if let Some(tx) = run_command_notifier {
275                            let _ = tx.send(match result {
276                                Ok(_) => {
277                                    Ok(())
278                                }
279                                Err(err) => {
280                                    Err(err.clone())
281                                }
282                            });
283                        }
284                    })
285                    .await?;
286                let version = stream_manager
287                    .metadata_manager
288                    .wait_streaming_job_finished(
289                        streaming_job.database_id().into(),
290                        streaming_job.id() as _,
291                    )
292                    .await?;
293                stream_manager.source_manager.apply_source_change(source_change).await;
294                tracing::debug!(?streaming_job, "stream job finish");
295                version
296            };
297
298            match res {
299                Ok(version) => {
300                    let _ = sender
301                        .send(CreatingState::Created { version })
302                        .await
303                        .inspect_err(|_| tracing::warn!("failed to notify created: {table_id}"));
304                }
305                Err(err) => {
306                    let _ = sender
307                        .send(CreatingState::Failed {
308                            reason: err.clone(),
309                        })
310                        .await
311                        .inspect_err(|_| {
312                            tracing::warn!(error = %err.as_report(), "failed to notify failed: {table_id}")
313                        });
314                }
315            }
316        }
317        .in_current_span();
318        tokio::spawn(fut);
319
320        while let Some(state) = receiver.recv().await {
321            match state {
322                CreatingState::Failed { reason } => {
323                    tracing::debug!(id=?table_id, "stream job failed");
324                    // FIXME(kwannoel): For creating stream jobs
325                    // we need to clean up the resources in the stream manager.
326                    self.creating_job_info.delete_job(table_id).await;
327                    return Err(reason);
328                }
329                CreatingState::Canceling { finish_tx } => {
330                    tracing::debug!(id=?table_id, "cancelling streaming job");
331                    if let Ok(table_fragments) = self
332                        .metadata_manager
333                        .get_job_fragments_by_id(&table_id)
334                        .await
335                    {
336                        // try to cancel buffered creating command.
337                        if self
338                            .barrier_scheduler
339                            .try_cancel_scheduled_create(database_id, table_id)
340                        {
341                            tracing::debug!("cancelling streaming job {table_id} in buffer queue.");
342                        } else if !table_fragments.is_created() {
343                            tracing::debug!(
344                                "cancelling streaming job {table_id} by issue cancel command."
345                            );
346                            self.metadata_manager
347                                .catalog_controller
348                                .try_abort_creating_streaming_job(table_id.table_id as _, true)
349                                .await?;
350
351                            self.barrier_scheduler
352                                .run_command(database_id, Command::cancel(&table_fragments))
353                                .await?;
354                        } else {
355                            // streaming job is already completed.
356                            continue;
357                        }
358                        let _ = finish_tx.send(()).inspect_err(|_| {
359                            tracing::warn!("failed to notify cancelled: {table_id}")
360                        });
361                        self.creating_job_info.delete_job(table_id).await;
362                        return Err(MetaError::cancelled("create"));
363                    }
364                }
365                CreatingState::Created { version } => {
366                    self.creating_job_info.delete_job(table_id).await;
367                    return Ok(version);
368                }
369            }
370        }
371        self.creating_job_info.delete_job(table_id).await;
372        bail!("receiver failed to get notification version for finished stream job")
373    }
374
375    /// The function will only return after backfilling finishes
376    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
377    async fn run_create_streaming_job_command(
378        &self,
379        stream_job_fragments: StreamJobFragmentsToCreate,
380        CreateStreamingJobContext {
381            streaming_job,
382            upstream_fragment_downstreams,
383            new_no_shuffle,
384            upstream_actors,
385            definition,
386            create_type,
387            job_type,
388            replace_table_job_info,
389            internal_tables,
390            snapshot_backfill_info,
391            cross_db_snapshot_backfill_info,
392            fragment_backfill_ordering,
393            ..
394        }: CreateStreamingJobContext,
395    ) -> MetaResult<(SourceChange, StreamingJob)> {
396        let mut replace_table_command = None;
397
398        tracing::debug!(
399            table_id = %stream_job_fragments.stream_job_id(),
400            "built actors finished"
401        );
402
403        if let Some((streaming_job, context, stream_job_fragments)) = replace_table_job_info {
404            self.metadata_manager
405                .catalog_controller
406                .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
407                .await?;
408
409            let tmp_table_id = stream_job_fragments.stream_job_id();
410            let init_split_assignment = self
411                .source_manager
412                .allocate_splits(&stream_job_fragments)
413                .await?;
414
415            replace_table_command = Some(ReplaceStreamJobPlan {
416                old_fragments: context.old_fragments,
417                new_fragments: stream_job_fragments,
418                replace_upstream: context.replace_upstream,
419                upstream_fragment_downstreams: context.upstream_fragment_downstreams,
420                init_split_assignment,
421                streaming_job,
422                tmp_id: tmp_table_id.table_id,
423                to_drop_state_table_ids: Vec::new(), /* the create streaming job command will not drop any state table */
424            });
425        }
426
427        // Here we need to consider:
428        // - Shared source
429        // - Table with connector
430        // - MV on shared source
431        let mut init_split_assignment = self
432            .source_manager
433            .allocate_splits(&stream_job_fragments)
434            .await?;
435        init_split_assignment.extend(
436            self.source_manager
437                .allocate_splits_for_backfill(
438                    &stream_job_fragments,
439                    &new_no_shuffle,
440                    &upstream_actors,
441                )
442                .await?,
443        );
444
445        let source_change = SourceChange::CreateJobFinished {
446            finished_backfill_fragments: stream_job_fragments.source_backfill_fragments()?,
447        };
448
449        let info = CreateStreamingJobCommandInfo {
450            stream_job_fragments,
451            upstream_fragment_downstreams,
452            init_split_assignment,
453            definition: definition.clone(),
454            streaming_job: streaming_job.clone(),
455            internal_tables: internal_tables.into_values().collect_vec(),
456            job_type,
457            create_type,
458            fragment_backfill_ordering,
459        };
460
461        let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
462            tracing::debug!(
463                ?snapshot_backfill_info,
464                "sending Command::CreateSnapshotBackfillStreamingJob"
465            );
466            CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
467        } else {
468            tracing::debug!("sending Command::CreateStreamingJob");
469            if let Some(replace_table_command) = replace_table_command {
470                CreateStreamingJobType::SinkIntoTable(replace_table_command)
471            } else {
472                CreateStreamingJobType::Normal
473            }
474        };
475
476        let command = Command::CreateStreamingJob {
477            info,
478            job_type,
479            cross_db_snapshot_backfill_info,
480        };
481
482        self.barrier_scheduler
483            .run_command(streaming_job.database_id().into(), command)
484            .await?;
485
486        tracing::debug!(?streaming_job, "first barrier collected for stream job");
487
488        Ok((source_change, streaming_job))
489    }
490
491    /// Send replace job command to barrier scheduler.
492    pub async fn replace_stream_job(
493        &self,
494        new_fragments: StreamJobFragmentsToCreate,
495        ReplaceStreamJobContext {
496            old_fragments,
497            replace_upstream,
498            new_no_shuffle,
499            upstream_fragment_downstreams,
500            tmp_id,
501            streaming_job,
502            drop_table_connector_ctx,
503            ..
504        }: ReplaceStreamJobContext,
505    ) -> MetaResult<()> {
506        let init_split_assignment = if streaming_job.is_source() {
507            self.source_manager
508                .allocate_splits_for_replace_source(
509                    &new_fragments,
510                    &replace_upstream,
511                    &new_no_shuffle,
512                )
513                .await?
514        } else {
515            self.source_manager.allocate_splits(&new_fragments).await?
516        };
517        tracing::info!(
518            "replace_stream_job - allocate split: {:?}",
519            init_split_assignment
520        );
521
522        self.barrier_scheduler
523            .run_command(
524                streaming_job.database_id().into(),
525                Command::ReplaceStreamJob(ReplaceStreamJobPlan {
526                    old_fragments,
527                    new_fragments,
528                    replace_upstream,
529                    upstream_fragment_downstreams,
530                    init_split_assignment,
531                    streaming_job,
532                    tmp_id,
533                    to_drop_state_table_ids: {
534                        if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
535                            vec![TableId::new(
536                                drop_table_connector_ctx.to_remove_state_table_id as _,
537                            )]
538                        } else {
539                            Vec::new()
540                        }
541                    },
542                }),
543            )
544            .await?;
545
546        Ok(())
547    }
548
549    /// Drop streaming jobs by barrier manager, and clean up all related resources. The error will
550    /// be ignored because the recovery process will take over it in cleaning part. Check
551    /// [`Command::DropStreamingJobs`] for details.
552    pub async fn drop_streaming_jobs(
553        &self,
554        database_id: DatabaseId,
555        removed_actors: Vec<ActorId>,
556        streaming_job_ids: Vec<ObjectId>,
557        state_table_ids: Vec<risingwave_meta_model::TableId>,
558        fragment_ids: HashSet<FragmentId>,
559    ) {
560        if !removed_actors.is_empty()
561            || !streaming_job_ids.is_empty()
562            || !state_table_ids.is_empty()
563        {
564            let res = self
565                .barrier_scheduler
566                .run_command(
567                    database_id,
568                    Command::DropStreamingJobs {
569                        table_fragments_ids: streaming_job_ids
570                            .iter()
571                            .map(|job_id| TableId::new(*job_id as _))
572                            .collect(),
573                        actors: removed_actors,
574                        unregistered_state_table_ids: state_table_ids
575                            .iter()
576                            .map(|table_id| TableId::new(*table_id as _))
577                            .collect(),
578                        unregistered_fragment_ids: fragment_ids,
579                    },
580                )
581                .await
582                .inspect_err(|err| {
583                    tracing::error!(error = ?err.as_report(), "failed to run drop command");
584                });
585            if res.is_ok() {
586                self.post_dropping_streaming_jobs(state_table_ids).await;
587            }
588        }
589    }
590
591    async fn post_dropping_streaming_jobs(
592        &self,
593        state_table_ids: Vec<risingwave_meta_model::TableId>,
594    ) {
595        let tables = self
596            .metadata_manager
597            .catalog_controller
598            .complete_dropped_tables(state_table_ids.into_iter())
599            .await;
600        let objects = tables
601            .into_iter()
602            .map(|t| PbObject {
603                object_info: Some(PbObjectInfo::Table(t)),
604            })
605            .collect();
606        let group = PbInfo::ObjectGroup(PbObjectGroup { objects });
607        self.env
608            .notification_manager()
609            .notify_hummock(Operation::Delete, group.clone())
610            .await;
611        self.env
612            .notification_manager()
613            .notify_compactor(Operation::Delete, group)
614            .await;
615    }
616
617    /// Cancel streaming jobs and return the canceled table ids.
618    /// 1. Send cancel message to stream jobs (via `cancel_jobs`).
619    /// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
620    ///
621    /// Cleanup of their state will be cleaned up after the `CancelStreamJob` command succeeds,
622    /// by the barrier manager for both of them.
623    pub async fn cancel_streaming_jobs(&self, table_ids: Vec<TableId>) -> Vec<TableId> {
624        if table_ids.is_empty() {
625            return vec![];
626        }
627
628        let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
629        let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;
630
631        let futures = receivers.into_iter().map(|(id, receiver)| async move {
632            if receiver.await.is_ok() {
633                tracing::info!("canceled streaming job {id}");
634                Some(id)
635            } else {
636                tracing::warn!("failed to cancel streaming job {id}");
637                None
638            }
639        });
640        let mut cancelled_ids = join_all(futures).await.into_iter().flatten().collect_vec();
641
642        // NOTE(kwannoel): For recovered stream jobs, we can directly cancel them by running the barrier command,
643        // since Barrier manager manages the recovered stream jobs.
644        let futures = recovered_job_ids.into_iter().map(|id| async move {
645            tracing::debug!(?id, "cancelling recovered streaming job");
646            let result: MetaResult<()> = try {
647                let fragment = self
648                    .metadata_manager.get_job_fragments_by_id(&id)
649                    .await?;
650                if fragment.is_created() {
651                    Err(MetaError::invalid_parameter(format!(
652                        "streaming job {} is already created",
653                        id
654                    )))?;
655                }
656
657                let (_, database_id) = self.metadata_manager
658                    .catalog_controller
659                    .try_abort_creating_streaming_job(id.table_id as _, true)
660                    .await?;
661
662                if let Some(database_id) = database_id {
663                    self.barrier_scheduler
664                        .run_command(DatabaseId::new(database_id as _), Command::cancel(&fragment))
665                        .await?;
666                }
667            };
668            match result {
669                Ok(_) => {
670                    tracing::info!(?id, "cancelled recovered streaming job");
671                    Some(id)
672                }
673                Err(err) => {
674                    tracing::error!(error=?err.as_report(), "failed to cancel recovered streaming job {id}, does it correspond to any jobs in `SHOW JOBS`?");
675                    None
676                }
677            }
678        });
679        let cancelled_recovered_ids = join_all(futures).await.into_iter().flatten().collect_vec();
680
681        cancelled_ids.extend(cancelled_recovered_ids);
682        cancelled_ids
683    }
684
685    pub(crate) async fn reschedule_streaming_job(
686        &self,
687        job_id: u32,
688        target: JobRescheduleTarget,
689        deferred: bool,
690    ) -> MetaResult<()> {
691        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
692        let background_jobs = self
693            .metadata_manager
694            .list_background_creating_jobs()
695            .await?;
696
697        if !background_jobs.is_empty() {
698            let related_jobs = self
699                .scale_controller
700                .resolve_related_no_shuffle_jobs(&background_jobs)
701                .await?;
702
703            for job in background_jobs {
704                if related_jobs.contains(&job) {
705                    bail!(
706                        "Cannot alter the job {} because the related job {} is currently being created",
707                        job_id,
708                        job.table_id
709                    );
710                }
711            }
712        }
713
714        let JobRescheduleTarget {
715            parallelism: parallelism_change,
716            resource_group: resource_group_change,
717        } = target;
718
719        let database_id = DatabaseId::new(
720            self.metadata_manager
721                .catalog_controller
722                .get_object_database_id(job_id as ObjectId)
723                .await? as _,
724        );
725        let job_id = TableId::new(job_id);
726
727        let worker_nodes = self
728            .metadata_manager
729            .list_active_streaming_compute_nodes()
730            .await?
731            .into_iter()
732            .filter(|w| w.is_streaming_schedulable())
733            .collect_vec();
734
735        // Check if the provided parallelism is valid.
736        let available_parallelism = worker_nodes
737            .iter()
738            .map(|w| w.compute_node_parallelism())
739            .sum::<usize>();
740        let max_parallelism = self
741            .metadata_manager
742            .get_job_max_parallelism(job_id)
743            .await?;
744
745        if let JobParallelismTarget::Update(parallelism) = parallelism_change {
746            match parallelism {
747                TableParallelism::Adaptive => {
748                    if available_parallelism > max_parallelism {
749                        tracing::warn!(
750                            "too many parallelism available, use max parallelism {} will be limited",
751                            max_parallelism
752                        );
753                    }
754                }
755                TableParallelism::Fixed(parallelism) => {
756                    if parallelism > max_parallelism {
757                        bail_invalid_parameter!(
758                            "specified parallelism {} should not exceed max parallelism {}",
759                            parallelism,
760                            max_parallelism
761                        );
762                    }
763                }
764                TableParallelism::Custom => {
765                    bail_invalid_parameter!("should not alter parallelism to custom")
766                }
767            }
768        }
769
770        let table_parallelism_assignment = match &parallelism_change {
771            JobParallelismTarget::Update(parallelism) => HashMap::from([(job_id, *parallelism)]),
772            JobParallelismTarget::Refresh => HashMap::new(),
773        };
774        let resource_group_assignment = match &resource_group_change {
775            JobResourceGroupTarget::Update(target) => {
776                HashMap::from([(job_id.table_id() as ObjectId, target.clone())])
777            }
778            JobResourceGroupTarget::Keep => HashMap::new(),
779        };
780
781        if deferred {
782            tracing::debug!(
783                "deferred mode enabled for job {}, set the parallelism directly to parallelism {:?}, resource group {:?}",
784                job_id,
785                parallelism_change,
786                resource_group_change,
787            );
788            self.scale_controller
789                .post_apply_reschedule(
790                    &HashMap::new(),
791                    &JobReschedulePostUpdates {
792                        parallelism_updates: table_parallelism_assignment,
793                        resource_group_updates: resource_group_assignment,
794                    },
795                )
796                .await?;
797        } else {
798            let reschedule_plan = self
799                .scale_controller
800                .generate_job_reschedule_plan(JobReschedulePolicy {
801                    targets: HashMap::from([(
802                        job_id.table_id,
803                        JobRescheduleTarget {
804                            parallelism: parallelism_change,
805                            resource_group: resource_group_change,
806                        },
807                    )]),
808                })
809                .await?;
810
811            if reschedule_plan.reschedules.is_empty() {
812                tracing::debug!(
813                    "empty reschedule plan generated for job {}, set the parallelism directly to {:?}",
814                    job_id,
815                    reschedule_plan.post_updates
816                );
817                self.scale_controller
818                    .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
819                    .await?;
820            } else {
821                self.reschedule_actors(
822                    database_id,
823                    reschedule_plan,
824                    RescheduleOptions {
825                        resolve_no_shuffle_upstream: false,
826                        skip_create_new_actors: false,
827                    },
828                )
829                .await?;
830            }
831        };
832
833        Ok(())
834    }
835
836    // Don't need to add actor, just send a command
837    pub async fn create_subscription(
838        self: &Arc<Self>,
839        subscription: &Subscription,
840    ) -> MetaResult<()> {
841        let command = Command::CreateSubscription {
842            subscription_id: subscription.id,
843            upstream_mv_table_id: TableId::new(subscription.dependent_table_id),
844            retention_second: subscription.retention_seconds,
845        };
846
847        tracing::debug!("sending Command::CreateSubscription");
848        self.barrier_scheduler
849            .run_command(subscription.database_id.into(), command)
850            .await?;
851        Ok(())
852    }
853
854    // Don't need to add actor, just send a command
855    pub async fn drop_subscription(
856        self: &Arc<Self>,
857        database_id: DatabaseId,
858        subscription_id: u32,
859        table_id: u32,
860    ) {
861        let command = Command::DropSubscription {
862            subscription_id,
863            upstream_mv_table_id: TableId::new(table_id),
864        };
865
866        tracing::debug!("sending Command::DropSubscriptions");
867        let _ = self
868            .barrier_scheduler
869            .run_command(database_id, command)
870            .await
871            .inspect_err(|err| {
872                tracing::error!(error = ?err.as_report(), "failed to run drop command");
873            });
874    }
875}