risingwave_meta/stream/
stream_manager.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::span;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::{JobId, SinkId};
25use risingwave_connector::source::CdcTableSnapshotSplitRaw;
26use risingwave_meta_model::prelude::Fragment as FragmentModel;
27use risingwave_meta_model::{StreamingParallelism, fragment};
28use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
29use risingwave_pb::expr::PbExprNode;
30use risingwave_pb::meta::table_fragments::ActorStatus;
31use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
32use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
33use thiserror_ext::AsReport;
34use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
35use tracing::Instrument;
36
37use super::{
38    Locations, ReschedulePolicy, ScaleControllerRef, StreamFragmentGraph,
39    UserDefinedFragmentBackfillOrder,
40};
41use crate::barrier::{
42    BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
43    ReplaceStreamJobPlan, SnapshotBackfillInfo,
44};
45use crate::controller::catalog::DropTableConnectorContext;
46use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
47use crate::error::bail_invalid_parameter;
48use crate::manager::{
49    MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
50};
51use crate::model::{
52    ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
53    FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate,
54    SubscriptionId,
55};
56use crate::stream::SourceManagerRef;
57use crate::{MetaError, MetaResult};
58
59pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
60
61#[derive(Default)]
62pub struct CreateStreamingJobOption {
63    // leave empty as a placeholder for future option if there is any
64}
65
66#[derive(Debug, Clone)]
67pub struct UpstreamSinkInfo {
68    pub sink_id: SinkId,
69    pub sink_fragment_id: FragmentId,
70    pub sink_output_fields: Vec<PbField>,
71    // for backwards compatibility
72    pub sink_original_target_columns: Vec<PbColumnCatalog>,
73    pub project_exprs: Vec<PbExprNode>,
74    pub new_sink_downstream: DownstreamFragmentRelation,
75}
76
77/// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job.
78///
79/// Note: for better readability, keep this struct complete and immutable once created.
80pub struct CreateStreamingJobContext {
81    /// New fragment relation to add from upstream fragments to downstream fragments.
82    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
83    pub new_no_shuffle: FragmentNewNoShuffle,
84    pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
85
86    /// The locations of the actors to build in the streaming job.
87    pub building_locations: Locations,
88
89    /// DDL definition.
90    pub definition: String,
91
92    pub create_type: CreateType,
93
94    pub job_type: StreamingJobType,
95
96    /// Used for sink-into-table.
97    pub new_upstream_sink: Option<UpstreamSinkInfo>,
98
99    pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
100    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
101
102    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
103
104    pub option: CreateStreamingJobOption,
105
106    pub streaming_job: StreamingJob,
107
108    pub fragment_backfill_ordering: UserDefinedFragmentBackfillOrder,
109
110    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
111
112    pub is_serverless_backfill: bool,
113}
114
115struct StreamingJobExecution {
116    id: JobId,
117    shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
118    _permit: OwnedSemaphorePermit,
119}
120
121impl StreamingJobExecution {
122    fn new(
123        id: JobId,
124        shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
125        permit: OwnedSemaphorePermit,
126    ) -> Self {
127        Self {
128            id,
129            shutdown_tx: Some(shutdown_tx),
130            _permit: permit,
131        }
132    }
133}
134
135#[derive(Default)]
136struct CreatingStreamingJobInfo {
137    streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
138}
139
140impl CreatingStreamingJobInfo {
141    async fn add_job(&self, job: StreamingJobExecution) {
142        let mut jobs = self.streaming_jobs.lock().await;
143        jobs.insert(job.id, job);
144    }
145
146    async fn delete_job(&self, job_id: JobId) {
147        let mut jobs = self.streaming_jobs.lock().await;
148        jobs.remove(&job_id);
149    }
150
151    async fn cancel_jobs(
152        &self,
153        job_ids: Vec<JobId>,
154    ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
155        let mut jobs = self.streaming_jobs.lock().await;
156        let mut receivers = HashMap::new();
157        let mut background_job_ids = vec![];
158        for job_id in job_ids {
159            if let Some(job) = jobs.get_mut(&job_id) {
160                if let Some(shutdown_tx) = job.shutdown_tx.take() {
161                    let (tx, rx) = oneshot::channel();
162                    match shutdown_tx.send(tx) {
163                        Ok(()) => {
164                            receivers.insert(job_id, rx);
165                        }
166                        Err(_) => {
167                            return Err(anyhow::anyhow!(
168                                "failed to send shutdown signal for streaming job {}: receiver dropped",
169                                job_id
170                            )
171                            .into());
172                        }
173                    }
174                }
175            } else {
176                // If these job ids do not exist in streaming_jobs, they should be background creating jobs.
177                background_job_ids.push(job_id);
178            }
179        }
180
181        Ok((receivers, background_job_ids))
182    }
183}
184
185type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
186
187#[derive(Debug, Clone)]
188pub struct AutoRefreshSchemaSinkContext {
189    pub tmp_sink_id: SinkId,
190    pub original_sink: PbSink,
191    pub original_fragment: Fragment,
192    pub new_schema: Vec<PbColumnCatalog>,
193    pub newly_add_fields: Vec<Field>,
194    pub new_fragment: Fragment,
195    pub new_log_store_table: Option<PbTable>,
196    pub actor_status: BTreeMap<ActorId, ActorStatus>,
197}
198
199impl AutoRefreshSchemaSinkContext {
200    pub fn new_fragment_info(&self) -> InflightFragmentInfo {
201        InflightFragmentInfo {
202            fragment_id: self.new_fragment.fragment_id,
203            distribution_type: self.new_fragment.distribution_type.into(),
204            fragment_type_mask: self.new_fragment.fragment_type_mask,
205            vnode_count: self.new_fragment.vnode_count(),
206            nodes: self.new_fragment.nodes.clone(),
207            actors: self
208                .new_fragment
209                .actors
210                .iter()
211                .map(|actor| {
212                    (
213                        actor.actor_id as _,
214                        InflightActorInfo {
215                            worker_id: self.actor_status[&actor.actor_id]
216                                .location
217                                .as_ref()
218                                .unwrap()
219                                .worker_node_id,
220                            vnode_bitmap: actor.vnode_bitmap.clone(),
221                            splits: vec![],
222                        },
223                    )
224                })
225                .collect(),
226            state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
227        }
228    }
229}
230
231/// [`ReplaceStreamJobContext`] carries one-time infos for replacing the plan of an existing stream job.
232///
233/// Note: for better readability, keep this struct complete and immutable once created.
234pub struct ReplaceStreamJobContext {
235    /// The old job fragments to be replaced.
236    pub old_fragments: StreamJobFragments,
237
238    /// The updates to be applied to the downstream chain actors. Used for schema change.
239    pub replace_upstream: FragmentReplaceUpstream,
240    pub new_no_shuffle: FragmentNewNoShuffle,
241
242    /// New fragment relation to add from existing upstream fragment to downstream fragment.
243    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
244
245    /// The locations of the actors to build in the new job to replace.
246    pub building_locations: Locations,
247
248    pub streaming_job: StreamingJob,
249
250    pub tmp_id: JobId,
251
252    /// Used for dropping an associated source. Dropping source and related internal tables.
253    pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
254
255    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
256}
257
258/// `GlobalStreamManager` manages all the streams in the system.
259pub struct GlobalStreamManager {
260    pub env: MetaSrvEnv,
261
262    pub metadata_manager: MetadataManager,
263
264    /// Broadcasts and collect barriers
265    pub barrier_scheduler: BarrierScheduler,
266
267    /// Maintains streaming sources from external system like kafka
268    pub source_manager: SourceManagerRef,
269
270    /// Creating streaming job info.
271    creating_job_info: CreatingStreamingJobInfoRef,
272
273    pub scale_controller: ScaleControllerRef,
274}
275
276impl GlobalStreamManager {
277    pub fn new(
278        env: MetaSrvEnv,
279        metadata_manager: MetadataManager,
280        barrier_scheduler: BarrierScheduler,
281        source_manager: SourceManagerRef,
282        scale_controller: ScaleControllerRef,
283    ) -> MetaResult<Self> {
284        Ok(Self {
285            env,
286            metadata_manager,
287            barrier_scheduler,
288            source_manager,
289            creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
290            scale_controller,
291        })
292    }
293
294    /// Create streaming job, it works as follows:
295    ///
296    /// 1. Broadcast the actor info based on the scheduling result in the context, build the hanging
297    ///    channels in upstream worker nodes.
298    /// 2. (optional) Get the split information of the `StreamSource` via source manager and patch
299    ///    actors.
300    /// 3. Notify related worker nodes to update and build the actors.
301    /// 4. Store related meta data.
302    ///
303    /// This function is a wrapper over [`Self::run_create_streaming_job_command`].
304    #[await_tree::instrument]
305    pub async fn create_streaming_job(
306        self: &Arc<Self>,
307        stream_job_fragments: StreamJobFragmentsToCreate,
308        ctx: CreateStreamingJobContext,
309        permit: OwnedSemaphorePermit,
310    ) -> MetaResult<NotificationVersion> {
311        let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
312        let await_tree_span = span!(
313            "{:?}({})",
314            ctx.streaming_job.job_type(),
315            ctx.streaming_job.name()
316        );
317
318        let job_id = stream_job_fragments.stream_job_id();
319        let database_id = ctx.streaming_job.database_id();
320
321        let (cancel_tx, cancel_rx) = oneshot::channel();
322        let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
323        self.creating_job_info.add_job(execution).await;
324
325        let stream_manager = self.clone();
326        let fut = async move {
327            let create_type = ctx.create_type;
328            let streaming_job = stream_manager
329                .run_create_streaming_job_command(stream_job_fragments, ctx)
330                .await?;
331            let version = match create_type {
332                CreateType::Background => {
333                    stream_manager
334                        .env
335                        .notification_manager_ref()
336                        .current_version()
337                        .await
338                }
339                CreateType::Foreground => {
340                    stream_manager
341                        .metadata_manager
342                        .wait_streaming_job_finished(database_id, streaming_job.id() as _)
343                        .await?
344                }
345                CreateType::Unspecified => unreachable!(),
346            };
347
348            tracing::debug!(?streaming_job, "stream job finish");
349            Ok(version)
350        }
351        .in_current_span();
352
353        let create_fut = (self.env.await_tree_reg())
354            .register(await_tree_key, await_tree_span)
355            .instrument(Box::pin(fut));
356
357        let result = async {
358            tokio::select! {
359                biased;
360
361                res = create_fut => res,
362                notifier = cancel_rx => {
363                    let notifier = notifier.expect("sender should not be dropped");
364                    tracing::debug!(id=%job_id, "cancelling streaming job");
365
366                    enum CancelResult {
367                        Completed(MetaResult<NotificationVersion>),
368                        Failed(MetaError),
369                        Cancelled,
370                    }
371
372                    let cancel_res = if let Ok(job_fragments) =
373                        self.metadata_manager.get_job_fragments_by_id(job_id).await
374                    {
375                        // try to cancel buffered creating command.
376                        if self
377                            .barrier_scheduler
378                            .try_cancel_scheduled_create(database_id, job_id)
379                        {
380                            tracing::debug!(
381                                id=%job_id,
382                                "cancelling streaming job in buffer queue."
383                            );
384                            CancelResult::Cancelled
385                        } else if !job_fragments.is_created() {
386                            tracing::debug!(
387                                id=%job_id,
388                                "cancelling streaming job by issue cancel command."
389                            );
390
391                            let cancel_result: MetaResult<()> = async {
392                                let cancel_command = self.metadata_manager.catalog_controller
393                                    .build_cancel_command(&job_fragments)
394                                    .await?;
395                                self.metadata_manager.catalog_controller
396                                    .try_abort_creating_streaming_job(job_id, true)
397                                    .await?;
398
399                                self.barrier_scheduler
400                                    .run_command(database_id, cancel_command)
401                                    .await?;
402                                Ok(())
403                            }
404                            .await;
405
406                            match cancel_result {
407                                Ok(()) => CancelResult::Cancelled,
408                                Err(err) => {
409                                    tracing::warn!(
410                                        error = ?err.as_report(),
411                                        id = %job_id,
412                                        "failed to run cancel command for creating streaming job"
413                                    );
414                                    CancelResult::Failed(err)
415                                }
416                            }
417                        } else {
418                            // streaming job is already completed
419                            CancelResult::Completed(
420                                self.metadata_manager
421                                    .wait_streaming_job_finished(database_id, job_id)
422                                    .await,
423                            )
424                        }
425                    } else {
426                        CancelResult::Cancelled
427                    };
428
429                    let (cancelled, result) = match cancel_res {
430                        CancelResult::Completed(result) => (false, result),
431                        CancelResult::Failed(err) => (false, Err(err)),
432                        CancelResult::Cancelled => (true, Err(MetaError::cancelled("create"))),
433                    };
434
435                    let _ = notifier
436                        .send(cancelled)
437                        .inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
438
439                    result
440                }
441            }
442        }
443        .await;
444
445        tracing::debug!("cleaning creating job info: {}", job_id);
446        self.creating_job_info.delete_job(job_id).await;
447        result
448    }
449
450    /// The function will return after barrier collected
451    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
452    #[await_tree::instrument]
453    async fn run_create_streaming_job_command(
454        &self,
455        stream_job_fragments: StreamJobFragmentsToCreate,
456        CreateStreamingJobContext {
457            streaming_job,
458            upstream_fragment_downstreams,
459            new_no_shuffle,
460            upstream_actors,
461            definition,
462            create_type,
463            job_type,
464            new_upstream_sink,
465            snapshot_backfill_info,
466            cross_db_snapshot_backfill_info,
467            fragment_backfill_ordering,
468            locality_fragment_state_table_mapping,
469            cdc_table_snapshot_splits,
470            is_serverless_backfill,
471            ..
472        }: CreateStreamingJobContext,
473    ) -> MetaResult<StreamingJob> {
474        tracing::debug!(
475            table_id = %stream_job_fragments.stream_job_id(),
476            "built actors finished"
477        );
478
479        // Here we need to consider:
480        // - Shared source
481        // - Table with connector
482        // - MV on shared source
483        let mut init_split_assignment = self
484            .source_manager
485            .allocate_splits(&stream_job_fragments)
486            .await?;
487
488        init_split_assignment.extend(
489            self.source_manager
490                .allocate_splits_for_backfill(
491                    &stream_job_fragments,
492                    &new_no_shuffle,
493                    &upstream_actors,
494                )
495                .await?,
496        );
497
498        let fragment_backfill_ordering =
499            StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
500                fragment_backfill_ordering,
501                &stream_job_fragments.downstreams,
502                || {
503                    stream_job_fragments
504                        .fragments
505                        .iter()
506                        .map(|(fragment_id, fragment)| {
507                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
508                        })
509                },
510            );
511
512        let info = CreateStreamingJobCommandInfo {
513            stream_job_fragments,
514            upstream_fragment_downstreams,
515            init_split_assignment,
516            definition: definition.clone(),
517            streaming_job: streaming_job.clone(),
518            job_type,
519            create_type,
520            fragment_backfill_ordering,
521            cdc_table_snapshot_splits,
522            locality_fragment_state_table_mapping,
523            is_serverless: is_serverless_backfill,
524        };
525
526        let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
527            tracing::debug!(
528                ?snapshot_backfill_info,
529                "sending Command::CreateSnapshotBackfillStreamingJob"
530            );
531            CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
532        } else {
533            tracing::debug!("sending Command::CreateStreamingJob");
534            if let Some(new_upstream_sink) = new_upstream_sink {
535                CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
536            } else {
537                CreateStreamingJobType::Normal
538            }
539        };
540
541        let command = Command::CreateStreamingJob {
542            info,
543            job_type,
544            cross_db_snapshot_backfill_info,
545        };
546
547        self.barrier_scheduler
548            .run_command(streaming_job.database_id(), command)
549            .await?;
550
551        tracing::debug!(?streaming_job, "first barrier collected for stream job");
552
553        Ok(streaming_job)
554    }
555
556    /// Send replace job command to barrier scheduler.
557    pub async fn replace_stream_job(
558        &self,
559        new_fragments: StreamJobFragmentsToCreate,
560        ReplaceStreamJobContext {
561            old_fragments,
562            replace_upstream,
563            new_no_shuffle,
564            upstream_fragment_downstreams,
565            tmp_id,
566            streaming_job,
567            drop_table_connector_ctx,
568            auto_refresh_schema_sinks,
569            ..
570        }: ReplaceStreamJobContext,
571    ) -> MetaResult<()> {
572        let init_split_assignment = if streaming_job.is_source() {
573            self.source_manager
574                .allocate_splits_for_replace_source(
575                    &new_fragments,
576                    &replace_upstream,
577                    &new_no_shuffle,
578                )
579                .await?
580        } else {
581            self.source_manager.allocate_splits(&new_fragments).await?
582        };
583        tracing::info!(
584            "replace_stream_job - allocate split: {:?}",
585            init_split_assignment
586        );
587
588        self.barrier_scheduler
589            .run_command(
590                streaming_job.database_id(),
591                Command::ReplaceStreamJob(ReplaceStreamJobPlan {
592                    old_fragments,
593                    new_fragments,
594                    replace_upstream,
595                    upstream_fragment_downstreams,
596                    init_split_assignment,
597                    streaming_job,
598                    tmp_id,
599                    to_drop_state_table_ids: {
600                        if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
601                            vec![drop_table_connector_ctx.to_remove_state_table_id]
602                        } else {
603                            Vec::new()
604                        }
605                    },
606                    auto_refresh_schema_sinks,
607                }),
608            )
609            .await?;
610
611        Ok(())
612    }
613
614    /// Drop streaming jobs by barrier manager, and clean up all related resources. The error will
615    /// be ignored because the recovery process will take over it in cleaning part. Check
616    /// [`Command::DropStreamingJobs`] for details.
617    pub async fn drop_streaming_jobs(
618        &self,
619        database_id: DatabaseId,
620        removed_actors: Vec<ActorId>,
621        streaming_job_ids: Vec<JobId>,
622        state_table_ids: Vec<TableId>,
623        fragment_ids: HashSet<FragmentId>,
624        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
625    ) {
626        if !removed_actors.is_empty()
627            || !streaming_job_ids.is_empty()
628            || !state_table_ids.is_empty()
629        {
630            let _ = self
631                .barrier_scheduler
632                .run_command(
633                    database_id,
634                    Command::DropStreamingJobs {
635                        streaming_job_ids: streaming_job_ids.into_iter().collect(),
636                        actors: removed_actors,
637                        unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
638                        unregistered_fragment_ids: fragment_ids,
639                        dropped_sink_fragment_by_targets,
640                    },
641                )
642                .await
643                .inspect_err(|err| {
644                    tracing::error!(error = ?err.as_report(), "failed to run drop command");
645                });
646        }
647    }
648
649    /// Cancel streaming jobs and return the canceled table ids.
650    /// 1. Send cancel message to stream jobs (via `cancel_jobs`).
651    /// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
652    ///
653    /// Cleanup of their state will be cleaned up after the `CancelStreamJob` command succeeds,
654    /// by the barrier manager for both of them.
655    pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
656        if job_ids.is_empty() {
657            return Ok(vec![]);
658        }
659
660        let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
661        let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
662
663        let futures = receivers.into_iter().map(|(id, receiver)| async move {
664            if let Ok(cancelled) = receiver.await
665                && cancelled
666            {
667                tracing::info!("canceled streaming job {id}");
668                Ok(id)
669            } else {
670                Err(MetaError::from(anyhow::anyhow!(
671                    "failed to cancel streaming job {id}"
672                )))
673            }
674        });
675        let mut cancelled_ids = join_all(futures)
676            .await
677            .into_iter()
678            .collect::<MetaResult<Vec<_>>>()?;
679
680        // NOTE(kwannoel): For background_job_ids stream jobs that not tracked in streaming manager,
681        // we can directly cancel them by running the barrier command.
682        let futures = background_job_ids.into_iter().map(|id| async move {
683            let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
684            if fragment.is_created() {
685                tracing::warn!(
686                    "streaming job {} is already created, ignore cancel request",
687                    id
688                );
689                return Ok(None);
690            }
691            if fragment.is_created() {
692                Err(MetaError::invalid_parameter(format!(
693                    "streaming job {} is already created",
694                    id
695                )))?;
696            }
697
698            let cancel_command = self
699                .metadata_manager
700                .catalog_controller
701                .build_cancel_command(&fragment)
702                .await?;
703
704            let (_, database_id) = self
705                .metadata_manager
706                .catalog_controller
707                .try_abort_creating_streaming_job(id, true)
708                .await?;
709
710            if let Some(database_id) = database_id {
711                self.barrier_scheduler
712                    .run_command(database_id, cancel_command)
713                    .await?;
714            }
715
716            tracing::info!(?id, "cancelled background streaming job");
717            Ok(Some(id))
718        });
719        let cancelled_recovered_ids = join_all(futures)
720            .await
721            .into_iter()
722            .collect::<MetaResult<Vec<_>>>()?;
723
724        cancelled_ids.extend(cancelled_recovered_ids.into_iter().flatten());
725        Ok(cancelled_ids)
726    }
727
728    pub(crate) async fn reschedule_streaming_job(
729        &self,
730        job_id: JobId,
731        policy: ReschedulePolicy,
732        deferred: bool,
733    ) -> MetaResult<()> {
734        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
735
736        let background_jobs = self
737            .metadata_manager
738            .list_background_creating_jobs()
739            .await?;
740
741        if !background_jobs.is_empty() {
742            let unreschedulable = self
743                .metadata_manager
744                .collect_unreschedulable_backfill_jobs(&background_jobs, !deferred)
745                .await?;
746
747            if unreschedulable.contains(&job_id) {
748                bail!(
749                    "Cannot alter the job {} because it is a non-reschedulable background backfill job",
750                    job_id,
751                );
752            }
753        }
754
755        let commands = self
756            .scale_controller
757            .reschedule_inplace(HashMap::from([(job_id, policy)]))
758            .await?;
759
760        if !deferred {
761            let _source_pause_guard = self.source_manager.pause_tick().await;
762
763            for (database_id, command) in commands {
764                self.barrier_scheduler
765                    .run_command(database_id, command)
766                    .await?;
767            }
768        }
769
770        Ok(())
771    }
772
773    /// This method is copied from `GlobalStreamManager::reschedule_streaming_job` and modified to handle reschedule CDC table backfill.
774    pub(crate) async fn reschedule_cdc_table_backfill(
775        &self,
776        job_id: JobId,
777        target: ReschedulePolicy,
778    ) -> MetaResult<()> {
779        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
780
781        let parallelism_policy = match target {
782            ReschedulePolicy::Parallelism(policy)
783                if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
784            {
785                policy
786            }
787            _ => bail_invalid_parameter!(
788                "CDC backfill reschedule only supports fixed parallelism targets"
789            ),
790        };
791
792        let cdc_fragment_id = {
793            let inner = self.metadata_manager.catalog_controller.inner.read().await;
794            let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
795                .select_only()
796                .columns([
797                    fragment::Column::FragmentId,
798                    fragment::Column::FragmentTypeMask,
799                ])
800                .filter(fragment::Column::JobId.eq(job_id))
801                .into_tuple()
802                .all(&inner.db)
803                .await?;
804
805            let cdc_fragments = fragments
806                .into_iter()
807                .filter_map(|(fragment_id, mask)| {
808                    FragmentTypeMask::from(mask)
809                        .contains(FragmentTypeFlag::StreamCdcScan)
810                        .then_some(fragment_id)
811                })
812                .collect_vec();
813
814            match cdc_fragments.len() {
815                0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
816                1 => cdc_fragments[0],
817                _ => bail_invalid_parameter!(
818                    "multiple StreamCdcScan fragments found for job {}; expected exactly one",
819                    job_id
820                ),
821            }
822        };
823
824        let fragment_policy = HashMap::from([(
825            cdc_fragment_id,
826            Some(parallelism_policy.parallelism.clone()),
827        )]);
828
829        let commands = self
830            .scale_controller
831            .reschedule_fragment_inplace(fragment_policy)
832            .await?;
833
834        let _source_pause_guard = self.source_manager.pause_tick().await;
835
836        for (database_id, command) in commands {
837            self.barrier_scheduler
838                .run_command(database_id, command)
839                .await?;
840        }
841
842        Ok(())
843    }
844
845    pub(crate) async fn reschedule_fragments(
846        &self,
847        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
848    ) -> MetaResult<()> {
849        if fragment_targets.is_empty() {
850            return Ok(());
851        }
852
853        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
854
855        let fragment_policy = fragment_targets
856            .into_iter()
857            .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
858            .collect();
859
860        let commands = self
861            .scale_controller
862            .reschedule_fragment_inplace(fragment_policy)
863            .await?;
864
865        let _source_pause_guard = self.source_manager.pause_tick().await;
866
867        for (database_id, command) in commands {
868            self.barrier_scheduler
869                .run_command(database_id, command)
870                .await?;
871        }
872
873        Ok(())
874    }
875
876    // Don't need to add actor, just send a command
877    pub async fn create_subscription(
878        self: &Arc<Self>,
879        subscription: &Subscription,
880    ) -> MetaResult<()> {
881        let command = Command::CreateSubscription {
882            subscription_id: subscription.id,
883            upstream_mv_table_id: subscription.dependent_table_id,
884            retention_second: subscription.retention_seconds,
885        };
886
887        tracing::debug!("sending Command::CreateSubscription");
888        self.barrier_scheduler
889            .run_command(subscription.database_id, command)
890            .await?;
891        Ok(())
892    }
893
894    // Don't need to add actor, just send a command
895    pub async fn drop_subscription(
896        self: &Arc<Self>,
897        database_id: DatabaseId,
898        subscription_id: SubscriptionId,
899        table_id: TableId,
900    ) {
901        let command = Command::DropSubscription {
902            subscription_id,
903            upstream_mv_table_id: table_id,
904        };
905
906        tracing::debug!("sending Command::DropSubscriptions");
907        let _ = self
908            .barrier_scheduler
909            .run_command(database_id, command)
910            .await
911            .inspect_err(|err| {
912                tracing::error!(error = ?err.as_report(), "failed to run drop command");
913            });
914    }
915}