risingwave_meta/stream/
stream_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::iter;
17use std::sync::Arc;
18
19use await_tree::span;
20use futures::future::join_all;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, Field, TableId};
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
26use risingwave_meta_model::ObjectId;
27use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
28use risingwave_pb::expr::PbExprNode;
29use risingwave_pb::meta::table_fragments::ActorStatus;
30use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
31use thiserror_ext::AsReport;
32use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
33use tracing::Instrument;
34
35use super::{
36    FragmentBackfillOrder, JobParallelismTarget, JobReschedulePolicy, JobReschedulePostUpdates,
37    JobRescheduleTarget, JobResourceGroupTarget, Locations, RescheduleOptions, ScaleControllerRef,
38};
39use crate::barrier::{
40    BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
41    ReplaceStreamJobPlan, SnapshotBackfillInfo,
42};
43use crate::controller::catalog::DropTableConnectorContext;
44use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
45use crate::error::bail_invalid_parameter;
46use crate::manager::{
47    MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
48};
49use crate::model::{
50    ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
51    FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate,
52    TableParallelism,
53};
54use crate::stream::SourceManagerRef;
55use crate::stream::cdc::{
56    assign_cdc_table_snapshot_splits, is_parallelized_backfill_enabled_cdc_scan_fragment,
57};
58use crate::{MetaError, MetaResult};
59
60pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
61
62#[derive(Default)]
63pub struct CreateStreamingJobOption {
64    // leave empty as a placeholder for future option if there is any
65}
66
67#[derive(Debug, Clone)]
68pub struct UpstreamSinkInfo {
69    pub sink_id: ObjectId,
70    pub sink_fragment_id: FragmentId,
71    pub sink_output_fields: Vec<PbField>,
72    // for backwards compatibility
73    pub sink_original_target_columns: Vec<PbColumnCatalog>,
74    pub project_exprs: Vec<PbExprNode>,
75    pub new_sink_downstream: DownstreamFragmentRelation,
76}
77
78/// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job.
79///
80/// Note: for better readability, keep this struct complete and immutable once created.
81pub struct CreateStreamingJobContext {
82    /// New fragment relation to add from upstream fragments to downstream fragments.
83    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
84    pub new_no_shuffle: FragmentNewNoShuffle,
85    pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
86
87    /// The locations of the actors to build in the streaming job.
88    pub building_locations: Locations,
89
90    /// DDL definition.
91    pub definition: String,
92
93    pub mv_table_id: Option<u32>,
94
95    pub create_type: CreateType,
96
97    pub job_type: StreamingJobType,
98
99    /// Used for sink-into-table.
100    pub new_upstream_sink: Option<UpstreamSinkInfo>,
101
102    pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
103    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
104
105    pub option: CreateStreamingJobOption,
106
107    pub streaming_job: StreamingJob,
108
109    pub fragment_backfill_ordering: FragmentBackfillOrder,
110
111    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
112}
113
114struct StreamingJobExecution {
115    id: TableId,
116    shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
117    _permit: OwnedSemaphorePermit,
118}
119
120impl StreamingJobExecution {
121    fn new(
122        id: TableId,
123        shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
124        permit: OwnedSemaphorePermit,
125    ) -> Self {
126        Self {
127            id,
128            shutdown_tx: Some(shutdown_tx),
129            _permit: permit,
130        }
131    }
132}
133
134#[derive(Default)]
135struct CreatingStreamingJobInfo {
136    streaming_jobs: Mutex<HashMap<TableId, StreamingJobExecution>>,
137}
138
139impl CreatingStreamingJobInfo {
140    async fn add_job(&self, job: StreamingJobExecution) {
141        let mut jobs = self.streaming_jobs.lock().await;
142        jobs.insert(job.id, job);
143    }
144
145    async fn delete_job(&self, job_id: TableId) {
146        let mut jobs = self.streaming_jobs.lock().await;
147        jobs.remove(&job_id);
148    }
149
150    async fn cancel_jobs(
151        &self,
152        job_ids: Vec<TableId>,
153    ) -> (HashMap<TableId, oneshot::Receiver<bool>>, Vec<TableId>) {
154        let mut jobs = self.streaming_jobs.lock().await;
155        let mut receivers = HashMap::new();
156        let mut background_job_ids = vec![];
157        for job_id in job_ids {
158            if let Some(job) = jobs.get_mut(&job_id) {
159                if let Some(shutdown_tx) = job.shutdown_tx.take() {
160                    let (tx, rx) = oneshot::channel();
161                    if shutdown_tx.send(tx).is_ok() {
162                        receivers.insert(job_id, rx);
163                    } else {
164                        tracing::warn!(id=?job_id, "failed to send canceling state");
165                    }
166                }
167            } else {
168                // If these job ids do not exist in streaming_jobs, they should be background creating jobs.
169                background_job_ids.push(job_id);
170            }
171        }
172        (receivers, background_job_ids)
173    }
174
175    async fn check_job_exists(&self, job_id: TableId) -> bool {
176        let jobs = self.streaming_jobs.lock().await;
177        jobs.contains_key(&job_id)
178    }
179}
180
181type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
182
183#[derive(Debug, Clone)]
184pub struct AutoRefreshSchemaSinkContext {
185    pub tmp_sink_id: ObjectId,
186    pub original_sink: PbSink,
187    pub original_fragment: Fragment,
188    pub new_schema: Vec<PbColumnCatalog>,
189    pub newly_add_fields: Vec<Field>,
190    pub new_fragment: Fragment,
191    pub new_log_store_table: Option<PbTable>,
192    pub actor_status: BTreeMap<ActorId, ActorStatus>,
193}
194
195impl AutoRefreshSchemaSinkContext {
196    pub fn new_fragment_info(&self) -> InflightFragmentInfo {
197        InflightFragmentInfo {
198            fragment_id: self.new_fragment.fragment_id,
199            distribution_type: self.new_fragment.distribution_type.into(),
200            fragment_type_mask: self.new_fragment.fragment_type_mask,
201            vnode_count: self.new_fragment.vnode_count(),
202            nodes: self.new_fragment.nodes.clone(),
203            actors: self
204                .new_fragment
205                .actors
206                .iter()
207                .map(|actor| {
208                    (
209                        actor.actor_id as _,
210                        InflightActorInfo {
211                            worker_id: self.actor_status[&actor.actor_id]
212                                .location
213                                .as_ref()
214                                .unwrap()
215                                .worker_node_id as _,
216                            vnode_bitmap: actor.vnode_bitmap.clone(),
217                            splits: vec![],
218                        },
219                    )
220                })
221                .collect(),
222            state_table_ids: self
223                .new_fragment
224                .state_table_ids
225                .iter()
226                .map(|table| (*table).into())
227                .collect(),
228        }
229    }
230}
231
232/// [`ReplaceStreamJobContext`] carries one-time infos for replacing the plan of an existing stream job.
233///
234/// Note: for better readability, keep this struct complete and immutable once created.
235pub struct ReplaceStreamJobContext {
236    /// The old job fragments to be replaced.
237    pub old_fragments: StreamJobFragments,
238
239    /// The updates to be applied to the downstream chain actors. Used for schema change.
240    pub replace_upstream: FragmentReplaceUpstream,
241    pub new_no_shuffle: FragmentNewNoShuffle,
242
243    /// New fragment relation to add from existing upstream fragment to downstream fragment.
244    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
245
246    /// The locations of the actors to build in the new job to replace.
247    pub building_locations: Locations,
248
249    pub streaming_job: StreamingJob,
250
251    pub tmp_id: u32,
252
253    /// Used for dropping an associated source. Dropping source and related internal tables.
254    pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
255
256    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
257}
258
259/// `GlobalStreamManager` manages all the streams in the system.
260pub struct GlobalStreamManager {
261    pub env: MetaSrvEnv,
262
263    pub metadata_manager: MetadataManager,
264
265    /// Broadcasts and collect barriers
266    pub barrier_scheduler: BarrierScheduler,
267
268    /// Maintains streaming sources from external system like kafka
269    pub source_manager: SourceManagerRef,
270
271    /// Creating streaming job info.
272    creating_job_info: CreatingStreamingJobInfoRef,
273
274    pub scale_controller: ScaleControllerRef,
275}
276
277impl GlobalStreamManager {
278    pub fn new(
279        env: MetaSrvEnv,
280        metadata_manager: MetadataManager,
281        barrier_scheduler: BarrierScheduler,
282        source_manager: SourceManagerRef,
283        scale_controller: ScaleControllerRef,
284    ) -> MetaResult<Self> {
285        Ok(Self {
286            env,
287            metadata_manager,
288            barrier_scheduler,
289            source_manager,
290            creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
291            scale_controller,
292        })
293    }
294
295    /// Create streaming job, it works as follows:
296    ///
297    /// 1. Broadcast the actor info based on the scheduling result in the context, build the hanging
298    ///    channels in upstream worker nodes.
299    /// 2. (optional) Get the split information of the `StreamSource` via source manager and patch
300    ///    actors.
301    /// 3. Notify related worker nodes to update and build the actors.
302    /// 4. Store related meta data.
303    ///
304    /// This function is a wrapper over [`Self::run_create_streaming_job_command`].
305    #[await_tree::instrument]
306    pub async fn create_streaming_job(
307        self: &Arc<Self>,
308        stream_job_fragments: StreamJobFragmentsToCreate,
309        ctx: CreateStreamingJobContext,
310        permit: OwnedSemaphorePermit,
311    ) -> MetaResult<NotificationVersion> {
312        let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
313        let await_tree_span = span!(
314            "{:?}({})",
315            ctx.streaming_job.job_type(),
316            ctx.streaming_job.name()
317        );
318
319        let table_id = stream_job_fragments.stream_job_id();
320        let database_id = ctx.streaming_job.database_id().into();
321
322        let (cancel_tx, cancel_rx) = oneshot::channel();
323        let execution = StreamingJobExecution::new(table_id, cancel_tx, permit);
324        self.creating_job_info.add_job(execution).await;
325
326        let stream_manager = self.clone();
327        let fut = async move {
328            let create_type = ctx.create_type;
329            let streaming_job = stream_manager
330                .run_create_streaming_job_command(stream_job_fragments, ctx)
331                .await?;
332            let version = match create_type {
333                CreateType::Background => {
334                    stream_manager
335                        .env
336                        .notification_manager_ref()
337                        .current_version()
338                        .await
339                }
340                CreateType::Foreground => {
341                    stream_manager
342                        .metadata_manager
343                        .wait_streaming_job_finished(database_id, streaming_job.id() as _)
344                        .await?
345                }
346                CreateType::Unspecified => unreachable!(),
347            };
348
349            tracing::debug!(?streaming_job, "stream job finish");
350            Ok(version)
351        }
352        .in_current_span();
353
354        let create_fut = (self.env.await_tree_reg())
355            .register(await_tree_key, await_tree_span)
356            .instrument(Box::pin(fut));
357
358        let result = tokio::select! {
359            biased;
360
361            res = create_fut => res,
362            notifier = cancel_rx => {
363                let notifier = notifier.expect("sender should not be dropped");
364                tracing::debug!(id=?table_id, "cancelling streaming job");
365
366                if let Ok(job_fragments) = self.metadata_manager.get_job_fragments_by_id(&table_id)
367                    .await {
368                    // try to cancel buffered creating command.
369                    if self.barrier_scheduler.try_cancel_scheduled_create(database_id, table_id) {
370                        tracing::debug!("cancelling streaming job {table_id} in buffer queue.");
371                    } else if !job_fragments.is_created() {
372                        tracing::debug!("cancelling streaming job {table_id} by issue cancel command.");
373
374                        let cancel_command = self.metadata_manager.catalog_controller
375                            .build_cancel_command(&job_fragments)
376                            .await?;
377                        self.metadata_manager.catalog_controller
378                            .try_abort_creating_streaming_job(table_id.table_id as _, true)
379                            .await?;
380
381                        self.barrier_scheduler.run_command(database_id, cancel_command).await?;
382                    } else {
383                        // streaming job is already completed
384                        let _ = notifier.send(false).inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
385                        return self.metadata_manager.wait_streaming_job_finished(database_id, table_id.table_id as _).await;
386                    }
387                }
388                notifier.send(true).expect("receiver should not be dropped");
389                Err(MetaError::cancelled("create"))
390            }
391        };
392
393        tracing::info!("cleaning creating job info: {}", table_id);
394        self.creating_job_info.delete_job(table_id).await;
395        result
396    }
397
398    /// The function will return after barrier collected
399    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
400    #[await_tree::instrument]
401    async fn run_create_streaming_job_command(
402        &self,
403        stream_job_fragments: StreamJobFragmentsToCreate,
404        CreateStreamingJobContext {
405            streaming_job,
406            upstream_fragment_downstreams,
407            new_no_shuffle,
408            upstream_actors,
409            definition,
410            create_type,
411            job_type,
412            new_upstream_sink,
413            snapshot_backfill_info,
414            cross_db_snapshot_backfill_info,
415            fragment_backfill_ordering,
416            locality_fragment_state_table_mapping,
417            ..
418        }: CreateStreamingJobContext,
419    ) -> MetaResult<StreamingJob> {
420        tracing::debug!(
421            table_id = %stream_job_fragments.stream_job_id(),
422            "built actors finished"
423        );
424
425        // Here we need to consider:
426        // - Shared source
427        // - Table with connector
428        // - MV on shared source
429        let mut init_split_assignment = self
430            .source_manager
431            .allocate_splits(&stream_job_fragments)
432            .await?;
433        init_split_assignment.extend(
434            self.source_manager
435                .allocate_splits_for_backfill(
436                    &stream_job_fragments,
437                    &new_no_shuffle,
438                    &upstream_actors,
439                )
440                .await?,
441        );
442
443        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
444            stream_job_fragments.stream_job_id.table_id,
445            &stream_job_fragments,
446            self.env.meta_store_ref(),
447        )
448        .await?;
449        let cdc_table_snapshot_split_assignment = if !cdc_table_snapshot_split_assignment.is_empty()
450        {
451            self.env.cdc_table_backfill_tracker.track_new_job(
452                stream_job_fragments.stream_job_id.table_id,
453                cdc_table_snapshot_split_assignment
454                    .values()
455                    .map(|s| u64::try_from(s.len()).unwrap())
456                    .sum(),
457            );
458            self.env
459                .cdc_table_backfill_tracker
460                .add_fragment_table_mapping(
461                    stream_job_fragments
462                        .fragments
463                        .values()
464                        .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
465                        .map(|f| f.fragment_id),
466                    stream_job_fragments.stream_job_id.table_id,
467                );
468            CdcTableSnapshotSplitAssignmentWithGeneration::new(
469                cdc_table_snapshot_split_assignment,
470                self.env
471                    .cdc_table_backfill_tracker
472                    .next_generation(iter::once(stream_job_fragments.stream_job_id.table_id)),
473            )
474        } else {
475            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
476        };
477
478        let info = CreateStreamingJobCommandInfo {
479            stream_job_fragments,
480            upstream_fragment_downstreams,
481            init_split_assignment,
482            definition: definition.clone(),
483            streaming_job: streaming_job.clone(),
484            job_type,
485            create_type,
486            fragment_backfill_ordering,
487            cdc_table_snapshot_split_assignment,
488            locality_fragment_state_table_mapping,
489        };
490
491        let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
492            tracing::debug!(
493                ?snapshot_backfill_info,
494                "sending Command::CreateSnapshotBackfillStreamingJob"
495            );
496            CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
497        } else {
498            tracing::debug!("sending Command::CreateStreamingJob");
499            if let Some(new_upstream_sink) = new_upstream_sink {
500                CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
501            } else {
502                CreateStreamingJobType::Normal
503            }
504        };
505
506        let command = Command::CreateStreamingJob {
507            info,
508            job_type,
509            cross_db_snapshot_backfill_info,
510        };
511
512        self.barrier_scheduler
513            .run_command(streaming_job.database_id().into(), command)
514            .await?;
515
516        tracing::debug!(?streaming_job, "first barrier collected for stream job");
517
518        Ok(streaming_job)
519    }
520
521    /// Send replace job command to barrier scheduler.
522    pub async fn replace_stream_job(
523        &self,
524        new_fragments: StreamJobFragmentsToCreate,
525        ReplaceStreamJobContext {
526            old_fragments,
527            replace_upstream,
528            new_no_shuffle,
529            upstream_fragment_downstreams,
530            tmp_id,
531            streaming_job,
532            drop_table_connector_ctx,
533            auto_refresh_schema_sinks,
534            ..
535        }: ReplaceStreamJobContext,
536    ) -> MetaResult<()> {
537        let init_split_assignment = if streaming_job.is_source() {
538            self.source_manager
539                .allocate_splits_for_replace_source(
540                    &new_fragments,
541                    &replace_upstream,
542                    &new_no_shuffle,
543                )
544                .await?
545        } else {
546            self.source_manager.allocate_splits(&new_fragments).await?
547        };
548        tracing::info!(
549            "replace_stream_job - allocate split: {:?}",
550            init_split_assignment
551        );
552
553        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
554            old_fragments.stream_job_id.table_id,
555            &new_fragments.inner,
556            self.env.meta_store_ref(),
557        )
558        .await?;
559
560        self.barrier_scheduler
561            .run_command(
562                streaming_job.database_id().into(),
563                Command::ReplaceStreamJob(ReplaceStreamJobPlan {
564                    old_fragments,
565                    new_fragments,
566                    replace_upstream,
567                    upstream_fragment_downstreams,
568                    init_split_assignment,
569                    streaming_job,
570                    tmp_id,
571                    to_drop_state_table_ids: {
572                        if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
573                            vec![TableId::new(
574                                drop_table_connector_ctx.to_remove_state_table_id as _,
575                            )]
576                        } else {
577                            Vec::new()
578                        }
579                    },
580                    auto_refresh_schema_sinks,
581                    cdc_table_snapshot_split_assignment,
582                }),
583            )
584            .await?;
585
586        Ok(())
587    }
588
589    /// Drop streaming jobs by barrier manager, and clean up all related resources. The error will
590    /// be ignored because the recovery process will take over it in cleaning part. Check
591    /// [`Command::DropStreamingJobs`] for details.
592    pub async fn drop_streaming_jobs(
593        &self,
594        database_id: DatabaseId,
595        removed_actors: Vec<ActorId>,
596        streaming_job_ids: Vec<ObjectId>,
597        state_table_ids: Vec<risingwave_meta_model::TableId>,
598        fragment_ids: HashSet<FragmentId>,
599        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
600    ) {
601        // TODO(august): This is a workaround for canceling SITT via drop, remove it after refactoring SITT.
602        for &job_id in &streaming_job_ids {
603            if self
604                .creating_job_info
605                .check_job_exists(TableId::new(job_id as _))
606                .await
607            {
608                tracing::info!(
609                    ?job_id,
610                    "streaming job is creating, cancel it with drop directly"
611                );
612                self.metadata_manager
613                    .notify_cancelled(database_id, job_id)
614                    .await;
615            }
616        }
617
618        if !removed_actors.is_empty()
619            || !streaming_job_ids.is_empty()
620            || !state_table_ids.is_empty()
621        {
622            let _ = self
623                .barrier_scheduler
624                .run_command(
625                    database_id,
626                    Command::DropStreamingJobs {
627                        streaming_job_ids: streaming_job_ids
628                            .iter()
629                            .map(|job_id| TableId::new(*job_id as _))
630                            .collect(),
631                        actors: removed_actors,
632                        unregistered_state_table_ids: state_table_ids
633                            .iter()
634                            .map(|table_id| TableId::new(*table_id as _))
635                            .collect(),
636                        unregistered_fragment_ids: fragment_ids,
637                        dropped_sink_fragment_by_targets,
638                    },
639                )
640                .await
641                .inspect_err(|err| {
642                    tracing::error!(error = ?err.as_report(), "failed to run drop command");
643                });
644        }
645    }
646
647    /// Cancel streaming jobs and return the canceled table ids.
648    /// 1. Send cancel message to stream jobs (via `cancel_jobs`).
649    /// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
650    ///
651    /// Cleanup of their state will be cleaned up after the `CancelStreamJob` command succeeds,
652    /// by the barrier manager for both of them.
653    pub async fn cancel_streaming_jobs(&self, table_ids: Vec<TableId>) -> Vec<TableId> {
654        if table_ids.is_empty() {
655            return vec![];
656        }
657
658        let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
659        let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;
660
661        let futures = receivers.into_iter().map(|(id, receiver)| async move {
662            if let Ok(cancelled) = receiver.await
663                && cancelled
664            {
665                tracing::info!("canceled streaming job {id}");
666                Some(id)
667            } else {
668                tracing::warn!("failed to cancel streaming job {id}");
669                None
670            }
671        });
672        let mut cancelled_ids = join_all(futures).await.into_iter().flatten().collect_vec();
673
674        // NOTE(kwannoel): For background_job_ids stream jobs that not tracked in streaming manager,
675        // we can directly cancel them by running the barrier command.
676        let futures = background_job_ids.into_iter().map(|id| async move {
677            tracing::debug!(?id, "cancelling background streaming job");
678            let result: MetaResult<()> = try {
679                let fragment = self
680                    .metadata_manager.get_job_fragments_by_id(&id)
681                    .await?;
682                if fragment.is_created() {
683                    Err(MetaError::invalid_parameter(format!(
684                        "streaming job {} is already created",
685                        id
686                    )))?;
687                }
688
689                let cancel_command = self
690                    .metadata_manager
691                    .catalog_controller
692                    .build_cancel_command(&fragment)
693                    .await?;
694
695                let (_, database_id) = self.metadata_manager
696                    .catalog_controller
697                    .try_abort_creating_streaming_job(id.table_id as _, true)
698                    .await?;
699
700                if let Some(database_id) = database_id {
701                    self.barrier_scheduler
702                        .run_command(DatabaseId::new(database_id as _), cancel_command)
703                        .await?;
704                }
705            };
706            match result {
707                Ok(_) => {
708                    tracing::info!(?id, "cancelled recovered streaming job");
709                    Some(id)
710                }
711                Err(err) => {
712                    tracing::error!(error=?err.as_report(), "failed to cancel recovered streaming job {id}, does it correspond to any jobs in `SHOW JOBS`?");
713                    None
714                }
715            }
716        });
717        let cancelled_recovered_ids = join_all(futures).await.into_iter().flatten().collect_vec();
718
719        cancelled_ids.extend(cancelled_recovered_ids);
720        cancelled_ids
721    }
722
723    pub(crate) async fn reschedule_streaming_job(
724        &self,
725        job_id: u32,
726        target: JobRescheduleTarget,
727        deferred: bool,
728    ) -> MetaResult<()> {
729        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
730        let background_jobs = self
731            .metadata_manager
732            .list_background_creating_jobs()
733            .await?;
734
735        if !background_jobs.is_empty() {
736            let related_jobs = self
737                .scale_controller
738                .resolve_related_no_shuffle_jobs(&background_jobs)
739                .await?;
740
741            for job in background_jobs {
742                if related_jobs.contains(&job) {
743                    bail!(
744                        "Cannot alter the job {} because the related job {} is currently being created",
745                        job_id,
746                        job.table_id
747                    );
748                }
749            }
750        }
751
752        let JobRescheduleTarget {
753            parallelism: parallelism_change,
754            resource_group: resource_group_change,
755        } = target;
756
757        let database_id = DatabaseId::new(
758            self.metadata_manager
759                .catalog_controller
760                .get_object_database_id(job_id as ObjectId)
761                .await? as _,
762        );
763        let job_id = TableId::new(job_id);
764
765        let worker_nodes = self
766            .metadata_manager
767            .list_active_streaming_compute_nodes()
768            .await?
769            .into_iter()
770            .filter(|w| w.is_streaming_schedulable())
771            .collect_vec();
772
773        // Check if the provided parallelism is valid.
774        let available_parallelism = worker_nodes
775            .iter()
776            .map(|w| w.compute_node_parallelism())
777            .sum::<usize>();
778        let max_parallelism = self
779            .metadata_manager
780            .get_job_max_parallelism(job_id)
781            .await?;
782
783        if let JobParallelismTarget::Update(parallelism) = parallelism_change {
784            match parallelism {
785                TableParallelism::Adaptive => {
786                    if available_parallelism > max_parallelism {
787                        tracing::warn!(
788                            "too many parallelism available, use max parallelism {} will be limited",
789                            max_parallelism
790                        );
791                    }
792                }
793                TableParallelism::Fixed(parallelism) => {
794                    if parallelism > max_parallelism {
795                        bail_invalid_parameter!(
796                            "specified parallelism {} should not exceed max parallelism {}",
797                            parallelism,
798                            max_parallelism
799                        );
800                    }
801                }
802                TableParallelism::Custom => {
803                    bail_invalid_parameter!("should not alter parallelism to custom")
804                }
805            }
806        }
807
808        let table_parallelism_assignment = match &parallelism_change {
809            JobParallelismTarget::Update(parallelism) => HashMap::from([(job_id, *parallelism)]),
810            JobParallelismTarget::Refresh => HashMap::new(),
811        };
812        let resource_group_assignment = match &resource_group_change {
813            JobResourceGroupTarget::Update(target) => {
814                HashMap::from([(job_id.table_id() as ObjectId, target.clone())])
815            }
816            JobResourceGroupTarget::Keep => HashMap::new(),
817        };
818
819        if deferred {
820            tracing::debug!(
821                "deferred mode enabled for job {}, set the parallelism directly to parallelism {:?}, resource group {:?}",
822                job_id,
823                parallelism_change,
824                resource_group_change,
825            );
826            self.scale_controller
827                .post_apply_reschedule(
828                    &HashMap::new(),
829                    &JobReschedulePostUpdates {
830                        parallelism_updates: table_parallelism_assignment,
831                        resource_group_updates: resource_group_assignment,
832                    },
833                )
834                .await?;
835        } else {
836            let reschedule_plan = self
837                .scale_controller
838                .generate_job_reschedule_plan(
839                    JobReschedulePolicy {
840                        targets: HashMap::from([(
841                            job_id.table_id,
842                            JobRescheduleTarget {
843                                parallelism: parallelism_change,
844                                resource_group: resource_group_change,
845                            },
846                        )]),
847                    },
848                    false,
849                )
850                .await?;
851
852            if reschedule_plan.reschedules.is_empty() {
853                tracing::debug!(
854                    "empty reschedule plan generated for job {}, set the parallelism directly to {:?}",
855                    job_id,
856                    reschedule_plan.post_updates
857                );
858                self.scale_controller
859                    .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
860                    .await?;
861            } else {
862                self.reschedule_actors(
863                    database_id,
864                    reschedule_plan,
865                    RescheduleOptions {
866                        resolve_no_shuffle_upstream: false,
867                        skip_create_new_actors: false,
868                    },
869                )
870                .await?;
871            }
872        };
873
874        Ok(())
875    }
876
877    /// This method is copied from `GlobalStreamManager::reschedule_streaming_job` and modified to handle reschedule CDC table backfill.
878    pub(crate) async fn reschedule_cdc_table_backfill(
879        &self,
880        job_id: u32,
881        target: JobRescheduleTarget,
882    ) -> MetaResult<()> {
883        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
884        let JobRescheduleTarget {
885            parallelism: parallelism_change,
886            resource_group: resource_group_change,
887        } = target;
888        let database_id = DatabaseId::new(
889            self.metadata_manager
890                .catalog_controller
891                .get_object_database_id(job_id as ObjectId)
892                .await? as _,
893        );
894        let job_id = TableId::new(job_id);
895        if let JobParallelismTarget::Update(parallelism) = &parallelism_change {
896            match parallelism {
897                TableParallelism::Fixed(_) => {}
898                TableParallelism::Custom => {
899                    bail_invalid_parameter!("should not alter parallelism to custom")
900                }
901                TableParallelism::Adaptive => {
902                    bail_invalid_parameter!("should not alter parallelism to adaptive")
903                }
904            }
905        } else {
906            bail_invalid_parameter!("should not refresh")
907        }
908        match &resource_group_change {
909            JobResourceGroupTarget::Update(_) => {
910                bail_invalid_parameter!("should not update resource group")
911            }
912            JobResourceGroupTarget::Keep => {}
913        };
914        // Only generate reschedule for fragment of CDC table backfill.
915        let reschedule_plan = self
916            .scale_controller
917            .generate_job_reschedule_plan(
918                JobReschedulePolicy {
919                    targets: HashMap::from([(
920                        job_id.table_id,
921                        JobRescheduleTarget {
922                            parallelism: parallelism_change,
923                            resource_group: resource_group_change,
924                        },
925                    )]),
926                },
927                true,
928            )
929            .await?;
930        if reschedule_plan.reschedules.is_empty() {
931            tracing::debug!(
932                ?job_id,
933                post_updates = ?reschedule_plan.post_updates,
934                "Empty reschedule plan generated for job.",
935            );
936            self.scale_controller
937                .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
938                .await?;
939        } else {
940            self.reschedule_actors(
941                database_id,
942                reschedule_plan,
943                RescheduleOptions {
944                    resolve_no_shuffle_upstream: false,
945                    skip_create_new_actors: false,
946                },
947            )
948            .await?;
949        }
950
951        Ok(())
952    }
953
954    // Don't need to add actor, just send a command
955    pub async fn create_subscription(
956        self: &Arc<Self>,
957        subscription: &Subscription,
958    ) -> MetaResult<()> {
959        let command = Command::CreateSubscription {
960            subscription_id: subscription.id,
961            upstream_mv_table_id: TableId::new(subscription.dependent_table_id),
962            retention_second: subscription.retention_seconds,
963        };
964
965        tracing::debug!("sending Command::CreateSubscription");
966        self.barrier_scheduler
967            .run_command(subscription.database_id.into(), command)
968            .await?;
969        Ok(())
970    }
971
972    // Don't need to add actor, just send a command
973    pub async fn drop_subscription(
974        self: &Arc<Self>,
975        database_id: DatabaseId,
976        subscription_id: u32,
977        table_id: u32,
978    ) {
979        let command = Command::DropSubscription {
980            subscription_id,
981            upstream_mv_table_id: TableId::new(table_id),
982        };
983
984        tracing::debug!("sending Command::DropSubscriptions");
985        let _ = self
986            .barrier_scheduler
987            .run_command(database_id, command)
988            .await
989            .inspect_err(|err| {
990                tracing::error!(error = ?err.as_report(), "failed to run drop command");
991            });
992    }
993}