risingwave_meta/stream/
stream_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::iter;
17use std::sync::Arc;
18
19use await_tree::{InstrumentAwait, span};
20use futures::FutureExt;
21use futures::future::join_all;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::{DatabaseId, Field, TableId};
25use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
26use risingwave_meta_model::ObjectId;
27use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
28use risingwave_pb::expr::PbExprNode;
29use risingwave_pb::meta::table_fragments::ActorStatus;
30use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc::Sender;
33use tokio::sync::{Mutex, oneshot};
34use tracing::Instrument;
35
36use super::{
37    FragmentBackfillOrder, JobParallelismTarget, JobReschedulePolicy, JobReschedulePostUpdates,
38    JobRescheduleTarget, JobResourceGroupTarget, Locations, RescheduleOptions, ScaleControllerRef,
39};
40use crate::barrier::{
41    BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
42    ReplaceStreamJobPlan, SnapshotBackfillInfo,
43};
44use crate::controller::catalog::DropTableConnectorContext;
45use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
46use crate::error::bail_invalid_parameter;
47use crate::manager::{
48    MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
49};
50use crate::model::{
51    ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
52    FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate,
53    TableParallelism,
54};
55use crate::stream::cdc::{
56    assign_cdc_table_snapshot_splits, is_parallelized_backfill_enabled_cdc_scan_fragment,
57};
58use crate::stream::{SourceChange, SourceManagerRef};
59use crate::{MetaError, MetaResult};
60
61pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
62
63#[derive(Default)]
64pub struct CreateStreamingJobOption {
65    // leave empty as a placeholder for future option if there is any
66}
67
68#[derive(Debug, Clone)]
69pub struct UpstreamSinkInfo {
70    pub sink_id: ObjectId,
71    pub sink_fragment_id: FragmentId,
72    pub sink_output_fields: Vec<PbField>,
73    // for backwards compatibility
74    pub sink_original_target_columns: Vec<PbColumnCatalog>,
75    pub project_exprs: Vec<PbExprNode>,
76    pub new_sink_downstream: DownstreamFragmentRelation,
77}
78
79/// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job.
80///
81/// Note: for better readability, keep this struct complete and immutable once created.
82pub struct CreateStreamingJobContext {
83    /// New fragment relation to add from upstream fragments to downstream fragments.
84    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
85    pub new_no_shuffle: FragmentNewNoShuffle,
86    pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
87
88    /// The locations of the actors to build in the streaming job.
89    pub building_locations: Locations,
90
91    /// DDL definition.
92    pub definition: String,
93
94    pub mv_table_id: Option<u32>,
95
96    pub create_type: CreateType,
97
98    pub job_type: StreamingJobType,
99
100    /// Used for sink-into-table.
101    pub new_upstream_sink: Option<UpstreamSinkInfo>,
102
103    pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
104    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
105
106    pub option: CreateStreamingJobOption,
107
108    pub streaming_job: StreamingJob,
109
110    pub fragment_backfill_ordering: FragmentBackfillOrder,
111}
112
113pub enum CreatingState {
114    Failed { reason: MetaError },
115    // sender is used to notify the canceling result.
116    Canceling { finish_tx: oneshot::Sender<()> },
117    Created { version: NotificationVersion },
118}
119
120struct StreamingJobExecution {
121    id: TableId,
122    shutdown_tx: Option<Sender<CreatingState>>,
123}
124
125impl StreamingJobExecution {
126    fn new(id: TableId, shutdown_tx: Sender<CreatingState>) -> Self {
127        Self {
128            id,
129            shutdown_tx: Some(shutdown_tx),
130        }
131    }
132}
133
134#[derive(Default)]
135struct CreatingStreamingJobInfo {
136    streaming_jobs: Mutex<HashMap<TableId, StreamingJobExecution>>,
137}
138
139impl CreatingStreamingJobInfo {
140    async fn add_job(&self, job: StreamingJobExecution) {
141        let mut jobs = self.streaming_jobs.lock().await;
142        jobs.insert(job.id, job);
143    }
144
145    async fn delete_job(&self, job_id: TableId) {
146        let mut jobs = self.streaming_jobs.lock().await;
147        jobs.remove(&job_id);
148    }
149
150    async fn cancel_jobs(
151        &self,
152        job_ids: Vec<TableId>,
153    ) -> (HashMap<TableId, oneshot::Receiver<()>>, Vec<TableId>) {
154        let mut jobs = self.streaming_jobs.lock().await;
155        let mut receivers = HashMap::new();
156        let mut recovered_job_ids = vec![];
157        for job_id in job_ids {
158            if let Some(job) = jobs.get_mut(&job_id) {
159                if let Some(shutdown_tx) = job.shutdown_tx.take() {
160                    let (tx, rx) = oneshot::channel();
161                    if shutdown_tx
162                        .send(CreatingState::Canceling { finish_tx: tx })
163                        .await
164                        .is_ok()
165                    {
166                        receivers.insert(job_id, rx);
167                    } else {
168                        tracing::warn!(id=?job_id, "failed to send canceling state");
169                    }
170                }
171            } else {
172                // If these job ids do not exist in streaming_jobs,
173                // we can infer they either:
174                // 1. are entirely non-existent,
175                // 2. OR they are recovered streaming jobs, and managed by BarrierManager.
176                recovered_job_ids.push(job_id);
177            }
178        }
179        (receivers, recovered_job_ids)
180    }
181
182    async fn check_job_exists(&self, job_id: TableId) -> bool {
183        let jobs = self.streaming_jobs.lock().await;
184        jobs.contains_key(&job_id)
185    }
186}
187
188type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
189
190#[derive(Debug, Clone)]
191pub struct AutoRefreshSchemaSinkContext {
192    pub tmp_sink_id: ObjectId,
193    pub original_sink: PbSink,
194    pub original_fragment: Fragment,
195    pub new_schema: Vec<PbColumnCatalog>,
196    pub newly_add_fields: Vec<Field>,
197    pub new_fragment: Fragment,
198    pub new_log_store_table: Option<PbTable>,
199    pub actor_status: BTreeMap<ActorId, ActorStatus>,
200}
201
202impl AutoRefreshSchemaSinkContext {
203    pub fn new_fragment_info(&self) -> InflightFragmentInfo {
204        InflightFragmentInfo {
205            fragment_id: self.new_fragment.fragment_id,
206            distribution_type: self.new_fragment.distribution_type.into(),
207            nodes: self.new_fragment.nodes.clone(),
208            actors: self
209                .new_fragment
210                .actors
211                .iter()
212                .map(|actor| {
213                    (
214                        actor.actor_id as _,
215                        InflightActorInfo {
216                            worker_id: self.actor_status[&actor.actor_id]
217                                .location
218                                .as_ref()
219                                .unwrap()
220                                .worker_node_id as _,
221                            vnode_bitmap: actor.vnode_bitmap.clone(),
222                        },
223                    )
224                })
225                .collect(),
226            state_table_ids: self
227                .new_fragment
228                .state_table_ids
229                .iter()
230                .map(|table| (*table).into())
231                .collect(),
232        }
233    }
234}
235
236/// [`ReplaceStreamJobContext`] carries one-time infos for replacing the plan of an existing stream job.
237///
238/// Note: for better readability, keep this struct complete and immutable once created.
239pub struct ReplaceStreamJobContext {
240    /// The old job fragments to be replaced.
241    pub old_fragments: StreamJobFragments,
242
243    /// The updates to be applied to the downstream chain actors. Used for schema change.
244    pub replace_upstream: FragmentReplaceUpstream,
245    pub new_no_shuffle: FragmentNewNoShuffle,
246
247    /// New fragment relation to add from existing upstream fragment to downstream fragment.
248    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
249
250    /// The locations of the actors to build in the new job to replace.
251    pub building_locations: Locations,
252
253    pub streaming_job: StreamingJob,
254
255    pub tmp_id: u32,
256
257    /// Used for dropping an associated source. Dropping source and related internal tables.
258    pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
259
260    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
261}
262
263/// `GlobalStreamManager` manages all the streams in the system.
264pub struct GlobalStreamManager {
265    pub env: MetaSrvEnv,
266
267    pub metadata_manager: MetadataManager,
268
269    /// Broadcasts and collect barriers
270    pub barrier_scheduler: BarrierScheduler,
271
272    /// Maintains streaming sources from external system like kafka
273    pub source_manager: SourceManagerRef,
274
275    /// Creating streaming job info.
276    creating_job_info: CreatingStreamingJobInfoRef,
277
278    pub scale_controller: ScaleControllerRef,
279}
280
281impl GlobalStreamManager {
282    pub fn new(
283        env: MetaSrvEnv,
284        metadata_manager: MetadataManager,
285        barrier_scheduler: BarrierScheduler,
286        source_manager: SourceManagerRef,
287        scale_controller: ScaleControllerRef,
288    ) -> MetaResult<Self> {
289        Ok(Self {
290            env,
291            metadata_manager,
292            barrier_scheduler,
293            source_manager,
294            creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
295            scale_controller,
296        })
297    }
298
299    /// Create streaming job, it works as follows:
300    ///
301    /// 1. Broadcast the actor info based on the scheduling result in the context, build the hanging
302    ///    channels in upstream worker nodes.
303    /// 2. (optional) Get the split information of the `StreamSource` via source manager and patch
304    ///    actors.
305    /// 3. Notify related worker nodes to update and build the actors.
306    /// 4. Store related meta data.
307    ///
308    /// This function is a wrapper over [`Self::run_create_streaming_job_command`].
309    #[await_tree::instrument]
310    pub async fn create_streaming_job(
311        self: &Arc<Self>,
312        stream_job_fragments: StreamJobFragmentsToCreate,
313        ctx: CreateStreamingJobContext,
314        run_command_notifier: Option<oneshot::Sender<MetaResult<()>>>,
315    ) -> MetaResult<NotificationVersion> {
316        let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
317        let await_tree_span = span!(
318            "{:?}({})",
319            ctx.streaming_job.job_type(),
320            ctx.streaming_job.name()
321        );
322
323        let table_id = stream_job_fragments.stream_job_id();
324        let database_id = ctx.streaming_job.database_id().into();
325        let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
326        let execution = StreamingJobExecution::new(table_id, sender.clone());
327        self.creating_job_info.add_job(execution).await;
328
329        let stream_manager = self.clone();
330        let fut = async move {
331            let res: MetaResult<_> = try {
332                let (source_change, streaming_job) = stream_manager
333                    .run_create_streaming_job_command(stream_job_fragments, ctx)
334                    .inspect(move |result| {
335                        if let Some(tx) = run_command_notifier {
336                            let _ = tx.send(match result {
337                                Ok(_) => {
338                                    Ok(())
339                                }
340                                Err(err) => {
341                                    Err(err.clone())
342                                }
343                            });
344                        }
345                    })
346                    .await?;
347                let version = stream_manager
348                    .metadata_manager
349                    .wait_streaming_job_finished(
350                        streaming_job.database_id().into(),
351                        streaming_job.id() as _,
352                    )
353                    .await?;
354                stream_manager.source_manager
355                    .apply_source_change(source_change)
356                    .await;
357                tracing::debug!(?streaming_job, "stream job finish");
358                version
359            };
360
361            match res {
362                Ok(version) => {
363                    let _ = sender
364                        .send(CreatingState::Created { version })
365                        .await
366                        .inspect_err(|_| tracing::warn!("failed to notify created: {table_id}"));
367                }
368                Err(err) => {
369                    let _ = sender
370                        .send(CreatingState::Failed {
371                            reason: err.clone(),
372                        })
373                        .await
374                        .inspect_err(|_| {
375                            tracing::warn!(error = %err.as_report(), "failed to notify failed: {table_id}")
376                        });
377                }
378            }
379        }
380        .in_current_span();
381
382        let fut = (self.env.await_tree_reg())
383            .register(await_tree_key, await_tree_span)
384            .instrument(Box::pin(fut));
385        tokio::spawn(fut);
386
387        while let Some(state) = receiver
388            .recv()
389            .instrument_await("recv_creating_state")
390            .await
391        {
392            match state {
393                CreatingState::Failed { reason } => {
394                    tracing::debug!(id=?table_id, "stream job failed");
395                    // FIXME(kwannoel): For creating stream jobs
396                    // we need to clean up the resources in the stream manager.
397                    self.creating_job_info.delete_job(table_id).await;
398                    return Err(reason);
399                }
400                CreatingState::Canceling { finish_tx } => {
401                    tracing::debug!(id=?table_id, "cancelling streaming job");
402                    if let Ok(table_fragments) = self
403                        .metadata_manager
404                        .get_job_fragments_by_id(&table_id)
405                        .await
406                    {
407                        // try to cancel buffered creating command.
408                        if self
409                            .barrier_scheduler
410                            .try_cancel_scheduled_create(database_id, table_id)
411                        {
412                            tracing::debug!("cancelling streaming job {table_id} in buffer queue.");
413                        } else if !table_fragments.is_created() {
414                            tracing::debug!(
415                                "cancelling streaming job {table_id} by issue cancel command."
416                            );
417
418                            let cancel_command = self
419                                .metadata_manager
420                                .catalog_controller
421                                .build_cancel_command(&table_fragments)
422                                .await?;
423
424                            self.metadata_manager
425                                .catalog_controller
426                                .try_abort_creating_streaming_job(table_id.table_id as _, true)
427                                .await?;
428
429                            self.barrier_scheduler
430                                .run_command(database_id, cancel_command)
431                                .await?;
432                        } else {
433                            // streaming job is already completed.
434                            continue;
435                        }
436                        let _ = finish_tx.send(()).inspect_err(|_| {
437                            tracing::warn!("failed to notify cancelled: {table_id}")
438                        });
439                        self.creating_job_info.delete_job(table_id).await;
440                        return Err(MetaError::cancelled("create"));
441                    }
442                }
443                CreatingState::Created { version } => {
444                    self.creating_job_info.delete_job(table_id).await;
445                    return Ok(version);
446                }
447            }
448        }
449        self.creating_job_info.delete_job(table_id).await;
450        bail!("receiver failed to get notification version for finished stream job")
451    }
452
453    /// The function will only return after backfilling finishes
454    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
455    #[await_tree::instrument]
456    async fn run_create_streaming_job_command(
457        &self,
458        stream_job_fragments: StreamJobFragmentsToCreate,
459        CreateStreamingJobContext {
460            streaming_job,
461            upstream_fragment_downstreams,
462            new_no_shuffle,
463            upstream_actors,
464            definition,
465            create_type,
466            job_type,
467            new_upstream_sink,
468            snapshot_backfill_info,
469            cross_db_snapshot_backfill_info,
470            fragment_backfill_ordering,
471            ..
472        }: CreateStreamingJobContext,
473    ) -> MetaResult<(SourceChange, StreamingJob)> {
474        tracing::debug!(
475            table_id = %stream_job_fragments.stream_job_id(),
476            "built actors finished"
477        );
478
479        // Here we need to consider:
480        // - Shared source
481        // - Table with connector
482        // - MV on shared source
483        let mut init_split_assignment = self
484            .source_manager
485            .allocate_splits(&stream_job_fragments)
486            .await?;
487        init_split_assignment.extend(
488            self.source_manager
489                .allocate_splits_for_backfill(
490                    &stream_job_fragments,
491                    &new_no_shuffle,
492                    &upstream_actors,
493                )
494                .await?,
495        );
496
497        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
498            stream_job_fragments.stream_job_id.table_id,
499            &stream_job_fragments,
500            self.env.meta_store_ref(),
501        )
502        .await?;
503        let cdc_table_snapshot_split_assignment = if !cdc_table_snapshot_split_assignment.is_empty()
504        {
505            self.env.cdc_table_backfill_tracker.track_new_job(
506                stream_job_fragments.stream_job_id.table_id,
507                cdc_table_snapshot_split_assignment
508                    .values()
509                    .map(|s| u64::try_from(s.len()).unwrap())
510                    .sum(),
511            );
512            self.env
513                .cdc_table_backfill_tracker
514                .add_fragment_table_mapping(
515                    stream_job_fragments
516                        .fragments
517                        .values()
518                        .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
519                        .map(|f| f.fragment_id),
520                    stream_job_fragments.stream_job_id.table_id,
521                );
522            CdcTableSnapshotSplitAssignmentWithGeneration::new(
523                cdc_table_snapshot_split_assignment,
524                self.env
525                    .cdc_table_backfill_tracker
526                    .next_generation(iter::once(stream_job_fragments.stream_job_id.table_id)),
527            )
528        } else {
529            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
530        };
531
532        let source_change = SourceChange::CreateJobFinished {
533            finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
534        };
535
536        let info = CreateStreamingJobCommandInfo {
537            stream_job_fragments,
538            upstream_fragment_downstreams,
539            init_split_assignment,
540            definition: definition.clone(),
541            streaming_job: streaming_job.clone(),
542            job_type,
543            create_type,
544            fragment_backfill_ordering,
545            cdc_table_snapshot_split_assignment,
546        };
547
548        let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
549            tracing::debug!(
550                ?snapshot_backfill_info,
551                "sending Command::CreateSnapshotBackfillStreamingJob"
552            );
553            CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
554        } else {
555            tracing::debug!("sending Command::CreateStreamingJob");
556            if let Some(new_upstream_sink) = new_upstream_sink {
557                CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
558            } else {
559                CreateStreamingJobType::Normal
560            }
561        };
562
563        let command = Command::CreateStreamingJob {
564            info,
565            job_type,
566            cross_db_snapshot_backfill_info,
567        };
568
569        self.barrier_scheduler
570            .run_command(streaming_job.database_id().into(), command)
571            .await?;
572
573        tracing::debug!(?streaming_job, "first barrier collected for stream job");
574
575        Ok((source_change, streaming_job))
576    }
577
578    /// Send replace job command to barrier scheduler.
579    pub async fn replace_stream_job(
580        &self,
581        new_fragments: StreamJobFragmentsToCreate,
582        ReplaceStreamJobContext {
583            old_fragments,
584            replace_upstream,
585            new_no_shuffle,
586            upstream_fragment_downstreams,
587            tmp_id,
588            streaming_job,
589            drop_table_connector_ctx,
590            auto_refresh_schema_sinks,
591            ..
592        }: ReplaceStreamJobContext,
593    ) -> MetaResult<()> {
594        let init_split_assignment = if streaming_job.is_source() {
595            self.source_manager
596                .allocate_splits_for_replace_source(
597                    &new_fragments,
598                    &replace_upstream,
599                    &new_no_shuffle,
600                )
601                .await?
602        } else {
603            self.source_manager.allocate_splits(&new_fragments).await?
604        };
605        tracing::info!(
606            "replace_stream_job - allocate split: {:?}",
607            init_split_assignment
608        );
609
610        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
611            old_fragments.stream_job_id.table_id,
612            &new_fragments.inner,
613            self.env.meta_store_ref(),
614        )
615        .await?;
616
617        self.barrier_scheduler
618            .run_command(
619                streaming_job.database_id().into(),
620                Command::ReplaceStreamJob(ReplaceStreamJobPlan {
621                    old_fragments,
622                    new_fragments,
623                    replace_upstream,
624                    upstream_fragment_downstreams,
625                    init_split_assignment,
626                    streaming_job,
627                    tmp_id,
628                    to_drop_state_table_ids: {
629                        if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
630                            vec![TableId::new(
631                                drop_table_connector_ctx.to_remove_state_table_id as _,
632                            )]
633                        } else {
634                            Vec::new()
635                        }
636                    },
637                    auto_refresh_schema_sinks,
638                    cdc_table_snapshot_split_assignment,
639                }),
640            )
641            .await?;
642
643        Ok(())
644    }
645
646    /// Drop streaming jobs by barrier manager, and clean up all related resources. The error will
647    /// be ignored because the recovery process will take over it in cleaning part. Check
648    /// [`Command::DropStreamingJobs`] for details.
649    pub async fn drop_streaming_jobs(
650        &self,
651        database_id: DatabaseId,
652        removed_actors: Vec<ActorId>,
653        streaming_job_ids: Vec<ObjectId>,
654        state_table_ids: Vec<risingwave_meta_model::TableId>,
655        fragment_ids: HashSet<FragmentId>,
656        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
657    ) {
658        // TODO(august): This is a workaround for canceling SITT via drop, remove it after refactoring SITT.
659        for &job_id in &streaming_job_ids {
660            if self
661                .creating_job_info
662                .check_job_exists(TableId::new(job_id as _))
663                .await
664            {
665                tracing::info!(
666                    ?job_id,
667                    "streaming job is creating, cancel it with drop directly"
668                );
669                self.metadata_manager
670                    .notify_cancelled(database_id, job_id)
671                    .await;
672            }
673        }
674
675        if !removed_actors.is_empty()
676            || !streaming_job_ids.is_empty()
677            || !state_table_ids.is_empty()
678        {
679            let _ = self
680                .barrier_scheduler
681                .run_command(
682                    database_id,
683                    Command::DropStreamingJobs {
684                        streaming_job_ids: streaming_job_ids
685                            .iter()
686                            .map(|job_id| TableId::new(*job_id as _))
687                            .collect(),
688                        actors: removed_actors,
689                        unregistered_state_table_ids: state_table_ids
690                            .iter()
691                            .map(|table_id| TableId::new(*table_id as _))
692                            .collect(),
693                        unregistered_fragment_ids: fragment_ids,
694                        dropped_sink_fragment_by_targets,
695                    },
696                )
697                .await
698                .inspect_err(|err| {
699                    tracing::error!(error = ?err.as_report(), "failed to run drop command");
700                });
701        }
702    }
703
704    /// Cancel streaming jobs and return the canceled table ids.
705    /// 1. Send cancel message to stream jobs (via `cancel_jobs`).
706    /// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
707    ///
708    /// Cleanup of their state will be cleaned up after the `CancelStreamJob` command succeeds,
709    /// by the barrier manager for both of them.
710    pub async fn cancel_streaming_jobs(&self, table_ids: Vec<TableId>) -> Vec<TableId> {
711        if table_ids.is_empty() {
712            return vec![];
713        }
714
715        let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
716        let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;
717
718        let futures = receivers.into_iter().map(|(id, receiver)| async move {
719            if receiver.await.is_ok() {
720                tracing::info!("canceled streaming job {id}");
721                Some(id)
722            } else {
723                tracing::warn!("failed to cancel streaming job {id}");
724                None
725            }
726        });
727        let mut cancelled_ids = join_all(futures).await.into_iter().flatten().collect_vec();
728
729        // NOTE(kwannoel): For recovered stream jobs, we can directly cancel them by running the barrier command,
730        // since Barrier manager manages the recovered stream jobs.
731        let futures = recovered_job_ids.into_iter().map(|id| async move {
732            tracing::debug!(?id, "cancelling recovered streaming job");
733            let result: MetaResult<()> = try {
734                let fragment = self
735                    .metadata_manager.get_job_fragments_by_id(&id)
736                    .await?;
737                if fragment.is_created() {
738                    Err(MetaError::invalid_parameter(format!(
739                        "streaming job {} is already created",
740                        id
741                    )))?;
742                }
743
744                let cancel_command = self
745                    .metadata_manager
746                    .catalog_controller
747                    .build_cancel_command(&fragment)
748                    .await?;
749
750                let (_, database_id) = self.metadata_manager
751                    .catalog_controller
752                    .try_abort_creating_streaming_job(id.table_id as _, true)
753                    .await?;
754
755                if let Some(database_id) = database_id {
756                    self.barrier_scheduler
757                        .run_command(DatabaseId::new(database_id as _), cancel_command)
758                        .await?;
759                }
760            };
761            match result {
762                Ok(_) => {
763                    tracing::info!(?id, "cancelled recovered streaming job");
764                    Some(id)
765                }
766                Err(err) => {
767                    tracing::error!(error=?err.as_report(), "failed to cancel recovered streaming job {id}, does it correspond to any jobs in `SHOW JOBS`?");
768                    None
769                }
770            }
771        });
772        let cancelled_recovered_ids = join_all(futures).await.into_iter().flatten().collect_vec();
773
774        cancelled_ids.extend(cancelled_recovered_ids);
775        cancelled_ids
776    }
777
778    pub(crate) async fn reschedule_streaming_job(
779        &self,
780        job_id: u32,
781        target: JobRescheduleTarget,
782        deferred: bool,
783    ) -> MetaResult<()> {
784        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
785        let background_jobs = self
786            .metadata_manager
787            .list_background_creating_jobs()
788            .await?;
789
790        if !background_jobs.is_empty() {
791            let related_jobs = self
792                .scale_controller
793                .resolve_related_no_shuffle_jobs(&background_jobs)
794                .await?;
795
796            for job in background_jobs {
797                if related_jobs.contains(&job) {
798                    bail!(
799                        "Cannot alter the job {} because the related job {} is currently being created",
800                        job_id,
801                        job.table_id
802                    );
803                }
804            }
805        }
806
807        let JobRescheduleTarget {
808            parallelism: parallelism_change,
809            resource_group: resource_group_change,
810        } = target;
811
812        let database_id = DatabaseId::new(
813            self.metadata_manager
814                .catalog_controller
815                .get_object_database_id(job_id as ObjectId)
816                .await? as _,
817        );
818        let job_id = TableId::new(job_id);
819
820        let worker_nodes = self
821            .metadata_manager
822            .list_active_streaming_compute_nodes()
823            .await?
824            .into_iter()
825            .filter(|w| w.is_streaming_schedulable())
826            .collect_vec();
827
828        // Check if the provided parallelism is valid.
829        let available_parallelism = worker_nodes
830            .iter()
831            .map(|w| w.compute_node_parallelism())
832            .sum::<usize>();
833        let max_parallelism = self
834            .metadata_manager
835            .get_job_max_parallelism(job_id)
836            .await?;
837
838        if let JobParallelismTarget::Update(parallelism) = parallelism_change {
839            match parallelism {
840                TableParallelism::Adaptive => {
841                    if available_parallelism > max_parallelism {
842                        tracing::warn!(
843                            "too many parallelism available, use max parallelism {} will be limited",
844                            max_parallelism
845                        );
846                    }
847                }
848                TableParallelism::Fixed(parallelism) => {
849                    if parallelism > max_parallelism {
850                        bail_invalid_parameter!(
851                            "specified parallelism {} should not exceed max parallelism {}",
852                            parallelism,
853                            max_parallelism
854                        );
855                    }
856                }
857                TableParallelism::Custom => {
858                    bail_invalid_parameter!("should not alter parallelism to custom")
859                }
860            }
861        }
862
863        let table_parallelism_assignment = match &parallelism_change {
864            JobParallelismTarget::Update(parallelism) => HashMap::from([(job_id, *parallelism)]),
865            JobParallelismTarget::Refresh => HashMap::new(),
866        };
867        let resource_group_assignment = match &resource_group_change {
868            JobResourceGroupTarget::Update(target) => {
869                HashMap::from([(job_id.table_id() as ObjectId, target.clone())])
870            }
871            JobResourceGroupTarget::Keep => HashMap::new(),
872        };
873
874        if deferred {
875            tracing::debug!(
876                "deferred mode enabled for job {}, set the parallelism directly to parallelism {:?}, resource group {:?}",
877                job_id,
878                parallelism_change,
879                resource_group_change,
880            );
881            self.scale_controller
882                .post_apply_reschedule(
883                    &HashMap::new(),
884                    &JobReschedulePostUpdates {
885                        parallelism_updates: table_parallelism_assignment,
886                        resource_group_updates: resource_group_assignment,
887                    },
888                )
889                .await?;
890        } else {
891            let reschedule_plan = self
892                .scale_controller
893                .generate_job_reschedule_plan(
894                    JobReschedulePolicy {
895                        targets: HashMap::from([(
896                            job_id.table_id,
897                            JobRescheduleTarget {
898                                parallelism: parallelism_change,
899                                resource_group: resource_group_change,
900                            },
901                        )]),
902                    },
903                    false,
904                )
905                .await?;
906
907            if reschedule_plan.reschedules.is_empty() {
908                tracing::debug!(
909                    "empty reschedule plan generated for job {}, set the parallelism directly to {:?}",
910                    job_id,
911                    reschedule_plan.post_updates
912                );
913                self.scale_controller
914                    .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
915                    .await?;
916            } else {
917                self.reschedule_actors(
918                    database_id,
919                    reschedule_plan,
920                    RescheduleOptions {
921                        resolve_no_shuffle_upstream: false,
922                        skip_create_new_actors: false,
923                    },
924                )
925                .await?;
926            }
927        };
928
929        Ok(())
930    }
931
932    /// This method is copied from `GlobalStreamManager::reschedule_streaming_job` and modified to handle reschedule CDC table backfill.
933    pub(crate) async fn reschedule_cdc_table_backfill(
934        &self,
935        job_id: u32,
936        target: JobRescheduleTarget,
937    ) -> MetaResult<()> {
938        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
939        let JobRescheduleTarget {
940            parallelism: parallelism_change,
941            resource_group: resource_group_change,
942        } = target;
943        let database_id = DatabaseId::new(
944            self.metadata_manager
945                .catalog_controller
946                .get_object_database_id(job_id as ObjectId)
947                .await? as _,
948        );
949        let job_id = TableId::new(job_id);
950        if let JobParallelismTarget::Update(parallelism) = &parallelism_change {
951            match parallelism {
952                TableParallelism::Fixed(_) => {}
953                TableParallelism::Custom => {
954                    bail_invalid_parameter!("should not alter parallelism to custom")
955                }
956                TableParallelism::Adaptive => {
957                    bail_invalid_parameter!("should not alter parallelism to adaptive")
958                }
959            }
960        } else {
961            bail_invalid_parameter!("should not refresh")
962        }
963        match &resource_group_change {
964            JobResourceGroupTarget::Update(_) => {
965                bail_invalid_parameter!("should not update resource group")
966            }
967            JobResourceGroupTarget::Keep => {}
968        };
969        // Only generate reschedule for fragment of CDC table backfill.
970        let reschedule_plan = self
971            .scale_controller
972            .generate_job_reschedule_plan(
973                JobReschedulePolicy {
974                    targets: HashMap::from([(
975                        job_id.table_id,
976                        JobRescheduleTarget {
977                            parallelism: parallelism_change,
978                            resource_group: resource_group_change,
979                        },
980                    )]),
981                },
982                true,
983            )
984            .await?;
985        if reschedule_plan.reschedules.is_empty() {
986            tracing::debug!(
987                ?job_id,
988                post_updates = ?reschedule_plan.post_updates,
989                "Empty reschedule plan generated for job.",
990            );
991            self.scale_controller
992                .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
993                .await?;
994        } else {
995            self.reschedule_actors(
996                database_id,
997                reschedule_plan,
998                RescheduleOptions {
999                    resolve_no_shuffle_upstream: false,
1000                    skip_create_new_actors: false,
1001                },
1002            )
1003            .await?;
1004        }
1005
1006        Ok(())
1007    }
1008
1009    // Don't need to add actor, just send a command
1010    pub async fn create_subscription(
1011        self: &Arc<Self>,
1012        subscription: &Subscription,
1013    ) -> MetaResult<()> {
1014        let command = Command::CreateSubscription {
1015            subscription_id: subscription.id,
1016            upstream_mv_table_id: TableId::new(subscription.dependent_table_id),
1017            retention_second: subscription.retention_seconds,
1018        };
1019
1020        tracing::debug!("sending Command::CreateSubscription");
1021        self.barrier_scheduler
1022            .run_command(subscription.database_id.into(), command)
1023            .await?;
1024        Ok(())
1025    }
1026
1027    // Don't need to add actor, just send a command
1028    pub async fn drop_subscription(
1029        self: &Arc<Self>,
1030        database_id: DatabaseId,
1031        subscription_id: u32,
1032        table_id: u32,
1033    ) {
1034        let command = Command::DropSubscription {
1035            subscription_id,
1036            upstream_mv_table_id: TableId::new(table_id),
1037        };
1038
1039        tracing::debug!("sending Command::DropSubscriptions");
1040        let _ = self
1041            .barrier_scheduler
1042            .run_command(database_id, command)
1043            .await
1044            .inspect_err(|err| {
1045                tracing::error!(error = ?err.as_report(), "failed to run drop command");
1046            });
1047    }
1048}