risingwave_meta/stream/
stream_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::iter;
17use std::ops::Deref;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, span};
21use futures::FutureExt;
22use futures::future::join_all;
23use itertools::Itertools;
24use risingwave_common::bail;
25use risingwave_common::catalog::{DatabaseId, TableId};
26use risingwave_meta_model::ObjectId;
27use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
28use risingwave_pb::meta::object::PbObjectInfo;
29use risingwave_pb::meta::subscribe_response::{Operation, PbInfo};
30use risingwave_pb::meta::table_fragments::ActorStatus;
31use risingwave_pb::meta::{PbObject, PbObjectGroup};
32use risingwave_pb::plan_common::PbColumnCatalog;
33use thiserror_ext::AsReport;
34use tokio::sync::mpsc::Sender;
35use tokio::sync::{Mutex, oneshot};
36use tracing::Instrument;
37
38use super::{
39    FragmentBackfillOrder, JobParallelismTarget, JobReschedulePolicy, JobReschedulePostUpdates,
40    JobRescheduleTarget, JobResourceGroupTarget, Locations, RescheduleOptions, ScaleControllerRef,
41};
42use crate::barrier::{
43    BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
44    ReplaceStreamJobPlan, SnapshotBackfillInfo,
45};
46use crate::controller::catalog::DropTableConnectorContext;
47use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
48use crate::error::bail_invalid_parameter;
49use crate::manager::{
50    MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
51};
52use crate::model::{
53    ActorId, Fragment, FragmentDownstreamRelation, FragmentId, FragmentNewNoShuffle,
54    FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
55};
56use crate::stream::cdc::{
57    assign_cdc_table_snapshot_splits, assign_cdc_table_snapshot_splits_for_replace_table,
58};
59use crate::stream::{SourceChange, SourceManagerRef};
60use crate::{MetaError, MetaResult};
61
62pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
63
64#[derive(Default)]
65pub struct CreateStreamingJobOption {
66    // leave empty as a placeholder for future option if there is any
67}
68
69/// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job.
70///
71/// Note: for better readability, keep this struct complete and immutable once created.
72pub struct CreateStreamingJobContext {
73    /// New fragment relation to add from upstream fragments to downstream fragments.
74    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
75    pub new_no_shuffle: FragmentNewNoShuffle,
76    pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
77
78    /// The locations of the actors to build in the streaming job.
79    pub building_locations: Locations,
80
81    /// DDL definition.
82    pub definition: String,
83
84    pub mv_table_id: Option<u32>,
85
86    pub create_type: CreateType,
87
88    pub job_type: StreamingJobType,
89
90    /// Context provided for potential replace table, typically used when sinking into a table.
91    pub replace_table_job_info: Option<(
92        StreamingJob,
93        ReplaceStreamJobContext,
94        StreamJobFragmentsToCreate,
95    )>,
96
97    pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
98    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
99
100    pub option: CreateStreamingJobOption,
101
102    pub streaming_job: StreamingJob,
103
104    pub fragment_backfill_ordering: FragmentBackfillOrder,
105}
106
107pub enum CreatingState {
108    Failed { reason: MetaError },
109    // sender is used to notify the canceling result.
110    Canceling { finish_tx: oneshot::Sender<()> },
111    Created { version: NotificationVersion },
112}
113
114struct StreamingJobExecution {
115    id: TableId,
116    shutdown_tx: Option<Sender<CreatingState>>,
117}
118
119impl StreamingJobExecution {
120    fn new(id: TableId, shutdown_tx: Sender<CreatingState>) -> Self {
121        Self {
122            id,
123            shutdown_tx: Some(shutdown_tx),
124        }
125    }
126}
127
128#[derive(Default)]
129struct CreatingStreamingJobInfo {
130    streaming_jobs: Mutex<HashMap<TableId, StreamingJobExecution>>,
131}
132
133impl CreatingStreamingJobInfo {
134    async fn add_job(&self, job: StreamingJobExecution) {
135        let mut jobs = self.streaming_jobs.lock().await;
136        jobs.insert(job.id, job);
137    }
138
139    async fn delete_job(&self, job_id: TableId) {
140        let mut jobs = self.streaming_jobs.lock().await;
141        jobs.remove(&job_id);
142    }
143
144    async fn cancel_jobs(
145        &self,
146        job_ids: Vec<TableId>,
147    ) -> (HashMap<TableId, oneshot::Receiver<()>>, Vec<TableId>) {
148        let mut jobs = self.streaming_jobs.lock().await;
149        let mut receivers = HashMap::new();
150        let mut recovered_job_ids = vec![];
151        for job_id in job_ids {
152            if let Some(job) = jobs.get_mut(&job_id) {
153                if let Some(shutdown_tx) = job.shutdown_tx.take() {
154                    let (tx, rx) = oneshot::channel();
155                    if shutdown_tx
156                        .send(CreatingState::Canceling { finish_tx: tx })
157                        .await
158                        .is_ok()
159                    {
160                        receivers.insert(job_id, rx);
161                    } else {
162                        tracing::warn!(id=?job_id, "failed to send canceling state");
163                    }
164                }
165            } else {
166                // If these job ids do not exist in streaming_jobs,
167                // we can infer they either:
168                // 1. are entirely non-existent,
169                // 2. OR they are recovered streaming jobs, and managed by BarrierManager.
170                recovered_job_ids.push(job_id);
171            }
172        }
173        (receivers, recovered_job_ids)
174    }
175
176    async fn check_job_exists(&self, job_id: TableId) -> bool {
177        let jobs = self.streaming_jobs.lock().await;
178        jobs.contains_key(&job_id)
179    }
180}
181
182type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
183
184#[derive(Debug, Clone)]
185pub struct AutoRefreshSchemaSinkContext {
186    pub tmp_sink_id: ObjectId,
187    pub original_sink: PbSink,
188    pub original_fragment: Fragment,
189    pub new_columns: Vec<PbColumnCatalog>,
190    pub new_fragment: Fragment,
191    pub new_log_store_table: Option<PbTable>,
192    pub actor_status: BTreeMap<ActorId, ActorStatus>,
193}
194
195impl AutoRefreshSchemaSinkContext {
196    pub fn new_fragment_info(&self) -> InflightFragmentInfo {
197        InflightFragmentInfo {
198            fragment_id: self.new_fragment.fragment_id,
199            distribution_type: self.new_fragment.distribution_type.into(),
200            nodes: self.new_fragment.nodes.clone(),
201            actors: self
202                .new_fragment
203                .actors
204                .iter()
205                .map(|actor| {
206                    (
207                        actor.actor_id as _,
208                        InflightActorInfo {
209                            worker_id: self.actor_status[&actor.actor_id]
210                                .location
211                                .as_ref()
212                                .unwrap()
213                                .worker_node_id as _,
214                            vnode_bitmap: actor.vnode_bitmap.clone(),
215                        },
216                    )
217                })
218                .collect(),
219            state_table_ids: self
220                .new_fragment
221                .state_table_ids
222                .iter()
223                .map(|table| (*table).into())
224                .collect(),
225        }
226    }
227}
228
229/// [`ReplaceStreamJobContext`] carries one-time infos for replacing the plan of an existing stream job.
230///
231/// Note: for better readability, keep this struct complete and immutable once created.
232pub struct ReplaceStreamJobContext {
233    /// The old job fragments to be replaced.
234    pub old_fragments: StreamJobFragments,
235
236    /// The updates to be applied to the downstream chain actors. Used for schema change.
237    pub replace_upstream: FragmentReplaceUpstream,
238    pub new_no_shuffle: FragmentNewNoShuffle,
239
240    /// New fragment relation to add from existing upstream fragment to downstream fragment.
241    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
242
243    /// The locations of the actors to build in the new job to replace.
244    pub building_locations: Locations,
245
246    pub streaming_job: StreamingJob,
247
248    pub tmp_id: u32,
249
250    /// Used for dropping an associated source. Dropping source and related internal tables.
251    pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
252
253    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
254}
255
256/// `GlobalStreamManager` manages all the streams in the system.
257pub struct GlobalStreamManager {
258    pub env: MetaSrvEnv,
259
260    pub metadata_manager: MetadataManager,
261
262    /// Broadcasts and collect barriers
263    pub barrier_scheduler: BarrierScheduler,
264
265    /// Maintains streaming sources from external system like kafka
266    pub source_manager: SourceManagerRef,
267
268    /// Creating streaming job info.
269    creating_job_info: CreatingStreamingJobInfoRef,
270
271    pub scale_controller: ScaleControllerRef,
272}
273
274impl GlobalStreamManager {
275    pub fn new(
276        env: MetaSrvEnv,
277        metadata_manager: MetadataManager,
278        barrier_scheduler: BarrierScheduler,
279        source_manager: SourceManagerRef,
280        scale_controller: ScaleControllerRef,
281    ) -> MetaResult<Self> {
282        Ok(Self {
283            env,
284            metadata_manager,
285            barrier_scheduler,
286            source_manager,
287            creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
288            scale_controller,
289        })
290    }
291
292    /// Create streaming job, it works as follows:
293    ///
294    /// 1. Broadcast the actor info based on the scheduling result in the context, build the hanging
295    ///    channels in upstream worker nodes.
296    /// 2. (optional) Get the split information of the `StreamSource` via source manager and patch
297    ///    actors.
298    /// 3. Notify related worker nodes to update and build the actors.
299    /// 4. Store related meta data.
300    ///
301    /// This function is a wrapper over [`Self::run_create_streaming_job_command`].
302    #[await_tree::instrument]
303    pub async fn create_streaming_job(
304        self: &Arc<Self>,
305        stream_job_fragments: StreamJobFragmentsToCreate,
306        ctx: CreateStreamingJobContext,
307        run_command_notifier: Option<oneshot::Sender<MetaResult<()>>>,
308    ) -> MetaResult<NotificationVersion> {
309        let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
310        let await_tree_span = span!(
311            "{:?}({})",
312            ctx.streaming_job.job_type(),
313            ctx.streaming_job.name()
314        );
315
316        let table_id = stream_job_fragments.stream_job_id();
317        let database_id = ctx.streaming_job.database_id().into();
318        let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
319        let execution = StreamingJobExecution::new(table_id, sender.clone());
320        self.creating_job_info.add_job(execution).await;
321
322        let stream_manager = self.clone();
323        let fut = async move {
324            let res: MetaResult<_> = try {
325                let (source_change, streaming_job) = stream_manager
326                    .run_create_streaming_job_command(stream_job_fragments, ctx)
327                    .inspect(move |result| {
328                        if let Some(tx) = run_command_notifier {
329                            let _ = tx.send(match result {
330                                Ok(_) => {
331                                    Ok(())
332                                }
333                                Err(err) => {
334                                    Err(err.clone())
335                                }
336                            });
337                        }
338                    })
339                    .await?;
340                let version = stream_manager
341                    .metadata_manager
342                    .wait_streaming_job_finished(
343                        streaming_job.database_id().into(),
344                        streaming_job.id() as _,
345                    )
346                    .await?;
347                stream_manager.source_manager
348                    .apply_source_change(source_change)
349                    .await;
350                tracing::debug!(?streaming_job, "stream job finish");
351                version
352            };
353
354            match res {
355                Ok(version) => {
356                    let _ = sender
357                        .send(CreatingState::Created { version })
358                        .await
359                        .inspect_err(|_| tracing::warn!("failed to notify created: {table_id}"));
360                }
361                Err(err) => {
362                    let _ = sender
363                        .send(CreatingState::Failed {
364                            reason: err.clone(),
365                        })
366                        .await
367                        .inspect_err(|_| {
368                            tracing::warn!(error = %err.as_report(), "failed to notify failed: {table_id}")
369                        });
370                }
371            }
372        }
373        .in_current_span();
374
375        let fut = (self.env.await_tree_reg())
376            .register(await_tree_key, await_tree_span)
377            .instrument(Box::pin(fut));
378        tokio::spawn(fut);
379
380        while let Some(state) = receiver
381            .recv()
382            .instrument_await("recv_creating_state")
383            .await
384        {
385            match state {
386                CreatingState::Failed { reason } => {
387                    tracing::debug!(id=?table_id, "stream job failed");
388                    // FIXME(kwannoel): For creating stream jobs
389                    // we need to clean up the resources in the stream manager.
390                    self.creating_job_info.delete_job(table_id).await;
391                    return Err(reason);
392                }
393                CreatingState::Canceling { finish_tx } => {
394                    tracing::debug!(id=?table_id, "cancelling streaming job");
395                    if let Ok(table_fragments) = self
396                        .metadata_manager
397                        .get_job_fragments_by_id(&table_id)
398                        .await
399                    {
400                        // try to cancel buffered creating command.
401                        if self
402                            .barrier_scheduler
403                            .try_cancel_scheduled_create(database_id, table_id)
404                        {
405                            tracing::debug!("cancelling streaming job {table_id} in buffer queue.");
406                        } else if !table_fragments.is_created() {
407                            tracing::debug!(
408                                "cancelling streaming job {table_id} by issue cancel command."
409                            );
410                            self.metadata_manager
411                                .catalog_controller
412                                .try_abort_creating_streaming_job(table_id.table_id as _, true)
413                                .await?;
414
415                            self.barrier_scheduler
416                                .run_command(database_id, Command::cancel(&table_fragments))
417                                .await?;
418                        } else {
419                            // streaming job is already completed.
420                            continue;
421                        }
422                        let _ = finish_tx.send(()).inspect_err(|_| {
423                            tracing::warn!("failed to notify cancelled: {table_id}")
424                        });
425                        self.creating_job_info.delete_job(table_id).await;
426                        return Err(MetaError::cancelled("create"));
427                    }
428                }
429                CreatingState::Created { version } => {
430                    self.creating_job_info.delete_job(table_id).await;
431                    return Ok(version);
432                }
433            }
434        }
435        self.creating_job_info.delete_job(table_id).await;
436        bail!("receiver failed to get notification version for finished stream job")
437    }
438
439    /// The function will only return after backfilling finishes
440    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
441    #[await_tree::instrument]
442    async fn run_create_streaming_job_command(
443        &self,
444        stream_job_fragments: StreamJobFragmentsToCreate,
445        CreateStreamingJobContext {
446            streaming_job,
447            upstream_fragment_downstreams,
448            new_no_shuffle,
449            upstream_actors,
450            definition,
451            create_type,
452            job_type,
453            replace_table_job_info,
454            snapshot_backfill_info,
455            cross_db_snapshot_backfill_info,
456            fragment_backfill_ordering,
457            ..
458        }: CreateStreamingJobContext,
459    ) -> MetaResult<(SourceChange, StreamingJob)> {
460        let mut replace_table_command = None;
461
462        tracing::debug!(
463            table_id = %stream_job_fragments.stream_job_id(),
464            "built actors finished"
465        );
466
467        if let Some((streaming_job, context, stream_job_fragments)) = replace_table_job_info {
468            self.metadata_manager
469                .catalog_controller
470                .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
471                .await?;
472
473            let tmp_table_id = stream_job_fragments.stream_job_id();
474            let init_split_assignment = self
475                .source_manager
476                .allocate_splits(&stream_job_fragments)
477                .await?;
478            let cdc_table_snapshot_split_assignment =
479                assign_cdc_table_snapshot_splits_for_replace_table(
480                    context.old_fragments.stream_job_id.table_id,
481                    &stream_job_fragments.inner,
482                    self.env.meta_store_ref(),
483                )
484                .await?;
485
486            replace_table_command = Some(ReplaceStreamJobPlan {
487                old_fragments: context.old_fragments,
488                new_fragments: stream_job_fragments,
489                replace_upstream: context.replace_upstream,
490                upstream_fragment_downstreams: context.upstream_fragment_downstreams,
491                init_split_assignment,
492                streaming_job,
493                tmp_id: tmp_table_id.table_id,
494                to_drop_state_table_ids: Vec::new(), /* the create streaming job command will not drop any state table */
495                auto_refresh_schema_sinks: None,
496                cdc_table_snapshot_split_assignment,
497            });
498        }
499
500        // Here we need to consider:
501        // - Shared source
502        // - Table with connector
503        // - MV on shared source
504        let mut init_split_assignment = self
505            .source_manager
506            .allocate_splits(&stream_job_fragments)
507            .await?;
508        init_split_assignment.extend(
509            self.source_manager
510                .allocate_splits_for_backfill(
511                    &stream_job_fragments,
512                    &new_no_shuffle,
513                    &upstream_actors,
514                )
515                .await?,
516        );
517
518        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
519            iter::once(stream_job_fragments.deref()),
520            self.env.meta_store_ref(),
521        )
522        .await?;
523
524        let source_change = SourceChange::CreateJobFinished {
525            finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
526        };
527
528        let info = CreateStreamingJobCommandInfo {
529            stream_job_fragments,
530            upstream_fragment_downstreams,
531            init_split_assignment,
532            definition: definition.clone(),
533            streaming_job: streaming_job.clone(),
534            job_type,
535            create_type,
536            fragment_backfill_ordering,
537            cdc_table_snapshot_split_assignment,
538        };
539
540        let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
541            tracing::debug!(
542                ?snapshot_backfill_info,
543                "sending Command::CreateSnapshotBackfillStreamingJob"
544            );
545            CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
546        } else {
547            tracing::debug!("sending Command::CreateStreamingJob");
548            if let Some(replace_table_command) = replace_table_command {
549                CreateStreamingJobType::SinkIntoTable(replace_table_command)
550            } else {
551                CreateStreamingJobType::Normal
552            }
553        };
554
555        let command = Command::CreateStreamingJob {
556            info,
557            job_type,
558            cross_db_snapshot_backfill_info,
559        };
560
561        self.barrier_scheduler
562            .run_command(streaming_job.database_id().into(), command)
563            .await?;
564
565        tracing::debug!(?streaming_job, "first barrier collected for stream job");
566
567        Ok((source_change, streaming_job))
568    }
569
570    /// Send replace job command to barrier scheduler.
571    pub async fn replace_stream_job(
572        &self,
573        new_fragments: StreamJobFragmentsToCreate,
574        ReplaceStreamJobContext {
575            old_fragments,
576            replace_upstream,
577            new_no_shuffle,
578            upstream_fragment_downstreams,
579            tmp_id,
580            streaming_job,
581            drop_table_connector_ctx,
582            auto_refresh_schema_sinks,
583            ..
584        }: ReplaceStreamJobContext,
585    ) -> MetaResult<()> {
586        let init_split_assignment = if streaming_job.is_source() {
587            self.source_manager
588                .allocate_splits_for_replace_source(
589                    &new_fragments,
590                    &replace_upstream,
591                    &new_no_shuffle,
592                )
593                .await?
594        } else {
595            self.source_manager.allocate_splits(&new_fragments).await?
596        };
597        tracing::info!(
598            "replace_stream_job - allocate split: {:?}",
599            init_split_assignment
600        );
601
602        let cdc_table_snapshot_split_assignment =
603            assign_cdc_table_snapshot_splits_for_replace_table(
604                old_fragments.stream_job_id.table_id,
605                &new_fragments.inner,
606                self.env.meta_store_ref(),
607            )
608            .await?;
609
610        self.barrier_scheduler
611            .run_command(
612                streaming_job.database_id().into(),
613                Command::ReplaceStreamJob(ReplaceStreamJobPlan {
614                    old_fragments,
615                    new_fragments,
616                    replace_upstream,
617                    upstream_fragment_downstreams,
618                    init_split_assignment,
619                    streaming_job,
620                    tmp_id,
621                    to_drop_state_table_ids: {
622                        if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
623                            vec![TableId::new(
624                                drop_table_connector_ctx.to_remove_state_table_id as _,
625                            )]
626                        } else {
627                            Vec::new()
628                        }
629                    },
630                    auto_refresh_schema_sinks,
631                    cdc_table_snapshot_split_assignment,
632                }),
633            )
634            .await?;
635
636        Ok(())
637    }
638
639    /// Drop streaming jobs by barrier manager, and clean up all related resources. The error will
640    /// be ignored because the recovery process will take over it in cleaning part. Check
641    /// [`Command::DropStreamingJobs`] for details.
642    pub async fn drop_streaming_jobs(
643        &self,
644        database_id: DatabaseId,
645        removed_actors: Vec<ActorId>,
646        streaming_job_ids: Vec<ObjectId>,
647        state_table_ids: Vec<risingwave_meta_model::TableId>,
648        fragment_ids: HashSet<FragmentId>,
649    ) {
650        // TODO(august): This is a workaround for canceling SITT via drop, remove it after refactoring SITT.
651        for &job_id in &streaming_job_ids {
652            if self
653                .creating_job_info
654                .check_job_exists(TableId::new(job_id as _))
655                .await
656            {
657                tracing::info!(
658                    ?job_id,
659                    "streaming job is creating, cancel it with drop directly"
660                );
661                self.metadata_manager
662                    .notify_cancelled(database_id, job_id)
663                    .await;
664            }
665        }
666
667        if !removed_actors.is_empty()
668            || !streaming_job_ids.is_empty()
669            || !state_table_ids.is_empty()
670        {
671            let res = self
672                .barrier_scheduler
673                .run_command(
674                    database_id,
675                    Command::DropStreamingJobs {
676                        table_fragments_ids: streaming_job_ids
677                            .iter()
678                            .map(|job_id| TableId::new(*job_id as _))
679                            .collect(),
680                        actors: removed_actors,
681                        unregistered_state_table_ids: state_table_ids
682                            .iter()
683                            .map(|table_id| TableId::new(*table_id as _))
684                            .collect(),
685                        unregistered_fragment_ids: fragment_ids,
686                    },
687                )
688                .await
689                .inspect_err(|err| {
690                    tracing::error!(error = ?err.as_report(), "failed to run drop command");
691                });
692            if res.is_ok() {
693                self.post_dropping_streaming_jobs(state_table_ids).await;
694            }
695        }
696    }
697
698    async fn post_dropping_streaming_jobs(
699        &self,
700        state_table_ids: Vec<risingwave_meta_model::TableId>,
701    ) {
702        let tables = self
703            .metadata_manager
704            .catalog_controller
705            .complete_dropped_tables(state_table_ids.into_iter())
706            .await;
707        let objects = tables
708            .into_iter()
709            .map(|t| PbObject {
710                object_info: Some(PbObjectInfo::Table(t)),
711            })
712            .collect();
713        let group = PbInfo::ObjectGroup(PbObjectGroup { objects });
714        self.env
715            .notification_manager()
716            .notify_hummock(Operation::Delete, group.clone())
717            .await;
718        self.env
719            .notification_manager()
720            .notify_compactor(Operation::Delete, group)
721            .await;
722    }
723
724    /// Cancel streaming jobs and return the canceled table ids.
725    /// 1. Send cancel message to stream jobs (via `cancel_jobs`).
726    /// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
727    ///
728    /// Cleanup of their state will be cleaned up after the `CancelStreamJob` command succeeds,
729    /// by the barrier manager for both of them.
730    pub async fn cancel_streaming_jobs(&self, table_ids: Vec<TableId>) -> Vec<TableId> {
731        if table_ids.is_empty() {
732            return vec![];
733        }
734
735        let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
736        let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;
737
738        let futures = receivers.into_iter().map(|(id, receiver)| async move {
739            if receiver.await.is_ok() {
740                tracing::info!("canceled streaming job {id}");
741                Some(id)
742            } else {
743                tracing::warn!("failed to cancel streaming job {id}");
744                None
745            }
746        });
747        let mut cancelled_ids = join_all(futures).await.into_iter().flatten().collect_vec();
748
749        // NOTE(kwannoel): For recovered stream jobs, we can directly cancel them by running the barrier command,
750        // since Barrier manager manages the recovered stream jobs.
751        let futures = recovered_job_ids.into_iter().map(|id| async move {
752            tracing::debug!(?id, "cancelling recovered streaming job");
753            let result: MetaResult<()> = try {
754                let fragment = self
755                    .metadata_manager.get_job_fragments_by_id(&id)
756                    .await?;
757                if fragment.is_created() {
758                    Err(MetaError::invalid_parameter(format!(
759                        "streaming job {} is already created",
760                        id
761                    )))?;
762                }
763
764                let (_, database_id) = self.metadata_manager
765                    .catalog_controller
766                    .try_abort_creating_streaming_job(id.table_id as _, true)
767                    .await?;
768
769                if let Some(database_id) = database_id {
770                    self.barrier_scheduler
771                        .run_command(DatabaseId::new(database_id as _), Command::cancel(&fragment))
772                        .await?;
773                }
774            };
775            match result {
776                Ok(_) => {
777                    tracing::info!(?id, "cancelled recovered streaming job");
778                    Some(id)
779                }
780                Err(err) => {
781                    tracing::error!(error=?err.as_report(), "failed to cancel recovered streaming job {id}, does it correspond to any jobs in `SHOW JOBS`?");
782                    None
783                }
784            }
785        });
786        let cancelled_recovered_ids = join_all(futures).await.into_iter().flatten().collect_vec();
787
788        cancelled_ids.extend(cancelled_recovered_ids);
789        cancelled_ids
790    }
791
792    pub(crate) async fn reschedule_streaming_job(
793        &self,
794        job_id: u32,
795        target: JobRescheduleTarget,
796        deferred: bool,
797    ) -> MetaResult<()> {
798        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
799        let background_jobs = self
800            .metadata_manager
801            .list_background_creating_jobs()
802            .await?;
803
804        if !background_jobs.is_empty() {
805            let related_jobs = self
806                .scale_controller
807                .resolve_related_no_shuffle_jobs(&background_jobs)
808                .await?;
809
810            for job in background_jobs {
811                if related_jobs.contains(&job) {
812                    bail!(
813                        "Cannot alter the job {} because the related job {} is currently being created",
814                        job_id,
815                        job.table_id
816                    );
817                }
818            }
819        }
820
821        let JobRescheduleTarget {
822            parallelism: parallelism_change,
823            resource_group: resource_group_change,
824        } = target;
825
826        let database_id = DatabaseId::new(
827            self.metadata_manager
828                .catalog_controller
829                .get_object_database_id(job_id as ObjectId)
830                .await? as _,
831        );
832        let job_id = TableId::new(job_id);
833
834        let worker_nodes = self
835            .metadata_manager
836            .list_active_streaming_compute_nodes()
837            .await?
838            .into_iter()
839            .filter(|w| w.is_streaming_schedulable())
840            .collect_vec();
841
842        // Check if the provided parallelism is valid.
843        let available_parallelism = worker_nodes
844            .iter()
845            .map(|w| w.compute_node_parallelism())
846            .sum::<usize>();
847        let max_parallelism = self
848            .metadata_manager
849            .get_job_max_parallelism(job_id)
850            .await?;
851
852        if let JobParallelismTarget::Update(parallelism) = parallelism_change {
853            match parallelism {
854                TableParallelism::Adaptive => {
855                    if available_parallelism > max_parallelism {
856                        tracing::warn!(
857                            "too many parallelism available, use max parallelism {} will be limited",
858                            max_parallelism
859                        );
860                    }
861                }
862                TableParallelism::Fixed(parallelism) => {
863                    if parallelism > max_parallelism {
864                        bail_invalid_parameter!(
865                            "specified parallelism {} should not exceed max parallelism {}",
866                            parallelism,
867                            max_parallelism
868                        );
869                    }
870                }
871                TableParallelism::Custom => {
872                    bail_invalid_parameter!("should not alter parallelism to custom")
873                }
874            }
875        }
876
877        let table_parallelism_assignment = match &parallelism_change {
878            JobParallelismTarget::Update(parallelism) => HashMap::from([(job_id, *parallelism)]),
879            JobParallelismTarget::Refresh => HashMap::new(),
880        };
881        let resource_group_assignment = match &resource_group_change {
882            JobResourceGroupTarget::Update(target) => {
883                HashMap::from([(job_id.table_id() as ObjectId, target.clone())])
884            }
885            JobResourceGroupTarget::Keep => HashMap::new(),
886        };
887
888        if deferred {
889            tracing::debug!(
890                "deferred mode enabled for job {}, set the parallelism directly to parallelism {:?}, resource group {:?}",
891                job_id,
892                parallelism_change,
893                resource_group_change,
894            );
895            self.scale_controller
896                .post_apply_reschedule(
897                    &HashMap::new(),
898                    &JobReschedulePostUpdates {
899                        parallelism_updates: table_parallelism_assignment,
900                        resource_group_updates: resource_group_assignment,
901                    },
902                )
903                .await?;
904        } else {
905            let reschedule_plan = self
906                .scale_controller
907                .generate_job_reschedule_plan(
908                    JobReschedulePolicy {
909                        targets: HashMap::from([(
910                            job_id.table_id,
911                            JobRescheduleTarget {
912                                parallelism: parallelism_change,
913                                resource_group: resource_group_change,
914                            },
915                        )]),
916                    },
917                    false,
918                )
919                .await?;
920
921            if reschedule_plan.reschedules.is_empty() {
922                tracing::debug!(
923                    "empty reschedule plan generated for job {}, set the parallelism directly to {:?}",
924                    job_id,
925                    reschedule_plan.post_updates
926                );
927                self.scale_controller
928                    .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
929                    .await?;
930            } else {
931                self.reschedule_actors(
932                    database_id,
933                    reschedule_plan,
934                    RescheduleOptions {
935                        resolve_no_shuffle_upstream: false,
936                        skip_create_new_actors: false,
937                    },
938                )
939                .await?;
940            }
941        };
942
943        Ok(())
944    }
945
946    /// This method is copied from `GlobalStreamManager::reschedule_streaming_job` and modified to handle reschedule CDC table backfill.
947    pub(crate) async fn reschedule_cdc_table_backfill(
948        &self,
949        job_id: u32,
950        target: JobRescheduleTarget,
951    ) -> MetaResult<()> {
952        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
953        let JobRescheduleTarget {
954            parallelism: parallelism_change,
955            resource_group: resource_group_change,
956        } = target;
957        let database_id = DatabaseId::new(
958            self.metadata_manager
959                .catalog_controller
960                .get_object_database_id(job_id as ObjectId)
961                .await? as _,
962        );
963        let job_id = TableId::new(job_id);
964        if let JobParallelismTarget::Update(parallelism) = &parallelism_change {
965            match parallelism {
966                TableParallelism::Fixed(_) => {}
967                TableParallelism::Custom => {
968                    bail_invalid_parameter!("should not alter parallelism to custom")
969                }
970                TableParallelism::Adaptive => {
971                    bail_invalid_parameter!("should not alter parallelism to adaptive")
972                }
973            }
974        } else {
975            bail_invalid_parameter!("should not refresh")
976        }
977        match &resource_group_change {
978            JobResourceGroupTarget::Update(_) => {
979                bail_invalid_parameter!("should not update resource group")
980            }
981            JobResourceGroupTarget::Keep => {}
982        };
983        // Only generate reschedule for fragment of CDC table backfill.
984        let reschedule_plan = self
985            .scale_controller
986            .generate_job_reschedule_plan(
987                JobReschedulePolicy {
988                    targets: HashMap::from([(
989                        job_id.table_id,
990                        JobRescheduleTarget {
991                            parallelism: parallelism_change,
992                            resource_group: resource_group_change,
993                        },
994                    )]),
995                },
996                true,
997            )
998            .await?;
999        if reschedule_plan.reschedules.is_empty() {
1000            tracing::debug!(
1001                ?job_id,
1002                post_updates = ?reschedule_plan.post_updates,
1003                "Empty reschedule plan generated for job.",
1004            );
1005            self.scale_controller
1006                .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
1007                .await?;
1008        } else {
1009            self.reschedule_actors(
1010                database_id,
1011                reschedule_plan,
1012                RescheduleOptions {
1013                    resolve_no_shuffle_upstream: false,
1014                    skip_create_new_actors: false,
1015                },
1016            )
1017            .await?;
1018        }
1019
1020        Ok(())
1021    }
1022
1023    // Don't need to add actor, just send a command
1024    pub async fn create_subscription(
1025        self: &Arc<Self>,
1026        subscription: &Subscription,
1027    ) -> MetaResult<()> {
1028        let command = Command::CreateSubscription {
1029            subscription_id: subscription.id,
1030            upstream_mv_table_id: TableId::new(subscription.dependent_table_id),
1031            retention_second: subscription.retention_seconds,
1032        };
1033
1034        tracing::debug!("sending Command::CreateSubscription");
1035        self.barrier_scheduler
1036            .run_command(subscription.database_id.into(), command)
1037            .await?;
1038        Ok(())
1039    }
1040
1041    // Don't need to add actor, just send a command
1042    pub async fn drop_subscription(
1043        self: &Arc<Self>,
1044        database_id: DatabaseId,
1045        subscription_id: u32,
1046        table_id: u32,
1047    ) {
1048        let command = Command::DropSubscription {
1049            subscription_id,
1050            upstream_mv_table_id: TableId::new(table_id),
1051        };
1052
1053        tracing::debug!("sending Command::DropSubscriptions");
1054        let _ = self
1055            .barrier_scheduler
1056            .run_command(database_id, command)
1057            .await
1058            .inspect_err(|err| {
1059                tracing::error!(error = ?err.as_report(), "failed to run drop command");
1060            });
1061    }
1062}