risingwave_meta/stream/
stream_manager.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::span;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::{JobId, SinkId};
25use risingwave_connector::source::CdcTableSnapshotSplitRaw;
26use risingwave_meta_model::prelude::Fragment as FragmentModel;
27use risingwave_meta_model::{StreamingParallelism, WorkerId, fragment, streaming_job};
28use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
29use risingwave_pb::expr::PbExprNode;
30use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
31use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
32use thiserror_ext::AsReport;
33use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
34use tracing::Instrument;
35
36use super::{
37    ReschedulePolicy, ScaleControllerRef, StreamFragmentGraph, UserDefinedFragmentBackfillOrder,
38};
39use crate::barrier::{
40    BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
41    ReplaceStreamJobPlan, SnapshotBackfillInfo,
42};
43use crate::controller::catalog::DropTableConnectorContext;
44use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
45use crate::error::bail_invalid_parameter;
46use crate::manager::{
47    MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
48};
49use crate::model::{
50    ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
51    FragmentReplaceUpstream, StreamActor, StreamContext, StreamJobFragments,
52    StreamJobFragmentsToCreate, SubscriptionId,
53};
54use crate::stream::{ReplaceJobSplitPlan, SourceManagerRef};
55use crate::{MetaError, MetaResult};
56
57pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
58
59#[derive(Default)]
60pub struct CreateStreamingJobOption {
61    // leave empty as a placeholder for future option if there is any
62}
63
64#[derive(Debug, Clone)]
65pub struct UpstreamSinkInfo {
66    pub sink_id: SinkId,
67    pub sink_fragment_id: FragmentId,
68    pub sink_output_fields: Vec<PbField>,
69    // for backwards compatibility
70    pub sink_original_target_columns: Vec<PbColumnCatalog>,
71    pub project_exprs: Vec<PbExprNode>,
72    pub new_sink_downstream: DownstreamFragmentRelation,
73}
74
75/// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job.
76///
77/// Note: for better readability, keep this struct complete and immutable once created.
78pub struct CreateStreamingJobContext {
79    /// New fragment relation to add from upstream fragments to downstream fragments.
80    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
81
82    /// The resource group of the database this job belongs to.
83    pub database_resource_group: String,
84
85    /// DDL definition.
86    pub definition: String,
87
88    pub create_type: CreateType,
89
90    pub job_type: StreamingJobType,
91
92    /// Used for sink-into-table.
93    pub new_upstream_sink: Option<UpstreamSinkInfo>,
94
95    pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
96    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
97
98    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
99
100    pub option: CreateStreamingJobOption,
101
102    pub streaming_job: StreamingJob,
103
104    pub fragment_backfill_ordering: UserDefinedFragmentBackfillOrder,
105
106    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
107
108    pub is_serverless_backfill: bool,
109
110    /// The `streaming_job::Model` for this job, loaded from meta store.
111    pub streaming_job_model: streaming_job::Model,
112}
113
114struct StreamingJobExecution {
115    id: JobId,
116    shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
117    _permit: OwnedSemaphorePermit,
118}
119
120impl StreamingJobExecution {
121    fn new(
122        id: JobId,
123        shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
124        permit: OwnedSemaphorePermit,
125    ) -> Self {
126        Self {
127            id,
128            shutdown_tx: Some(shutdown_tx),
129            _permit: permit,
130        }
131    }
132}
133
134#[derive(Default)]
135struct CreatingStreamingJobInfo {
136    streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
137}
138
139impl CreatingStreamingJobInfo {
140    async fn add_job(&self, job: StreamingJobExecution) {
141        let mut jobs = self.streaming_jobs.lock().await;
142        jobs.insert(job.id, job);
143    }
144
145    async fn delete_job(&self, job_id: JobId) {
146        let mut jobs = self.streaming_jobs.lock().await;
147        jobs.remove(&job_id);
148    }
149
150    async fn cancel_jobs(
151        &self,
152        job_ids: Vec<JobId>,
153    ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
154        let mut jobs = self.streaming_jobs.lock().await;
155        let mut receivers = HashMap::new();
156        let mut background_job_ids = vec![];
157        for job_id in job_ids {
158            if let Some(job) = jobs.get_mut(&job_id) {
159                if let Some(shutdown_tx) = job.shutdown_tx.take() {
160                    let (tx, rx) = oneshot::channel();
161                    match shutdown_tx.send(tx) {
162                        Ok(()) => {
163                            receivers.insert(job_id, rx);
164                        }
165                        Err(_) => {
166                            return Err(anyhow::anyhow!(
167                                "failed to send shutdown signal for streaming job {}: receiver dropped",
168                                job_id
169                            )
170                            .into());
171                        }
172                    }
173                }
174            } else {
175                // If these job ids do not exist in streaming_jobs, they should be background creating jobs.
176                background_job_ids.push(job_id);
177            }
178        }
179
180        Ok((receivers, background_job_ids))
181    }
182}
183
184type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
185
186#[derive(Debug, Clone)]
187pub struct AutoRefreshSchemaSinkContext {
188    pub tmp_sink_id: SinkId,
189    pub original_sink: PbSink,
190    pub original_fragment: Fragment,
191    pub new_schema: Vec<PbColumnCatalog>,
192    pub newly_add_fields: Vec<Field>,
193    pub removed_column_names: Vec<String>,
194    pub new_fragment: Fragment,
195    pub new_log_store_table: Option<PbTable>,
196    /// The sink's own stream context (timezone, `config_override`).
197    pub ctx: StreamContext,
198}
199
200impl AutoRefreshSchemaSinkContext {
201    pub fn new_fragment_info(
202        &self,
203        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
204        actor_location: &HashMap<ActorId, WorkerId>,
205    ) -> InflightFragmentInfo {
206        InflightFragmentInfo {
207            fragment_id: self.new_fragment.fragment_id,
208            distribution_type: self.new_fragment.distribution_type.into(),
209            fragment_type_mask: self.new_fragment.fragment_type_mask,
210            vnode_count: self.new_fragment.vnode_count(),
211            nodes: self.new_fragment.nodes.clone(),
212            actors: stream_actors
213                .get(&self.new_fragment.fragment_id)
214                .into_iter()
215                .flatten()
216                .map(|actor| {
217                    (
218                        actor.actor_id,
219                        InflightActorInfo {
220                            worker_id: actor_location[&actor.actor_id],
221                            vnode_bitmap: actor.vnode_bitmap.clone(),
222                            splits: vec![],
223                        },
224                    )
225                })
226                .collect(),
227            state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
228        }
229    }
230}
231
232/// [`ReplaceStreamJobContext`] carries one-time infos for replacing the plan of an existing stream job.
233///
234/// Note: for better readability, keep this struct complete and immutable once created.
235pub struct ReplaceStreamJobContext {
236    /// The old job fragments to be replaced.
237    pub old_fragments: StreamJobFragments,
238
239    /// The updates to be applied to the downstream chain actors. Used for schema change.
240    pub replace_upstream: FragmentReplaceUpstream,
241
242    /// New fragment relation to add from existing upstream fragment to downstream fragment.
243    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
244
245    pub streaming_job: StreamingJob,
246
247    /// The resource group of the database this job belongs to.
248    pub database_resource_group: String,
249
250    pub tmp_id: JobId,
251
252    /// Used for dropping an associated source. Dropping source and related internal tables.
253    pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
254
255    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
256
257    /// The `streaming_job::Model` for this job, loaded from meta store.
258    pub streaming_job_model: streaming_job::Model,
259}
260
261/// `GlobalStreamManager` manages all the streams in the system.
262pub struct GlobalStreamManager {
263    pub env: MetaSrvEnv,
264
265    pub metadata_manager: MetadataManager,
266
267    /// Broadcasts and collect barriers
268    pub barrier_scheduler: BarrierScheduler,
269
270    /// Maintains streaming sources from external system like kafka
271    pub source_manager: SourceManagerRef,
272
273    /// Creating streaming job info.
274    creating_job_info: CreatingStreamingJobInfoRef,
275
276    pub scale_controller: ScaleControllerRef,
277}
278
279impl GlobalStreamManager {
280    pub fn new(
281        env: MetaSrvEnv,
282        metadata_manager: MetadataManager,
283        barrier_scheduler: BarrierScheduler,
284        source_manager: SourceManagerRef,
285        scale_controller: ScaleControllerRef,
286    ) -> MetaResult<Self> {
287        Ok(Self {
288            env,
289            metadata_manager,
290            barrier_scheduler,
291            source_manager,
292            creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
293            scale_controller,
294        })
295    }
296
297    /// Create streaming job, it works as follows:
298    ///
299    /// 1. Broadcast the actor info based on the scheduling result in the context, build the hanging
300    ///    channels in upstream worker nodes.
301    /// 2. (optional) Get the split information of the `StreamSource` via source manager and patch
302    ///    actors.
303    /// 3. Notify related worker nodes to update and build the actors.
304    /// 4. Store related meta data.
305    ///
306    /// This function is a wrapper over [`Self::run_create_streaming_job_command`].
307    #[await_tree::instrument]
308    pub async fn create_streaming_job(
309        self: &Arc<Self>,
310        stream_job_fragments: StreamJobFragmentsToCreate,
311        ctx: CreateStreamingJobContext,
312        permit: OwnedSemaphorePermit,
313    ) -> MetaResult<NotificationVersion> {
314        let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
315        let await_tree_span = span!(
316            "{:?}({})",
317            ctx.streaming_job.job_type(),
318            ctx.streaming_job.name()
319        );
320
321        let job_id = stream_job_fragments.stream_job_id();
322        let database_id = ctx.streaming_job.database_id();
323
324        let (cancel_tx, cancel_rx) = oneshot::channel();
325        let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
326        self.creating_job_info.add_job(execution).await;
327
328        let stream_manager = self.clone();
329        let fut = async move {
330            let create_type = ctx.create_type;
331            let streaming_job = stream_manager
332                .run_create_streaming_job_command(stream_job_fragments, ctx)
333                .await?;
334            let version = match create_type {
335                CreateType::Background => {
336                    stream_manager
337                        .env
338                        .notification_manager_ref()
339                        .current_version()
340                        .await
341                }
342                CreateType::Foreground => {
343                    stream_manager
344                        .metadata_manager
345                        .wait_streaming_job_finished(database_id, streaming_job.id() as _)
346                        .await?
347                }
348                CreateType::Unspecified => unreachable!(),
349            };
350
351            tracing::debug!(?streaming_job, "stream job finish");
352            Ok(version)
353        }
354        .in_current_span();
355
356        let create_fut = (self.env.await_tree_reg())
357            .register(await_tree_key, await_tree_span)
358            .instrument(Box::pin(fut));
359
360        let result = async {
361            tokio::select! {
362                biased;
363
364                res = create_fut => res,
365                notifier = cancel_rx => {
366                    let notifier = notifier.expect("sender should not be dropped");
367                    tracing::debug!(id=%job_id, "cancelling streaming job");
368
369                    enum CancelResult {
370                        Completed(MetaResult<NotificationVersion>),
371                        Failed(MetaError),
372                        Cancelled,
373                    }
374
375                    let cancel_res = if let Ok(job_fragments) =
376                        self.metadata_manager.get_job_fragments_by_id(job_id).await
377                    {
378                        // try to cancel buffered creating command.
379                        if self
380                            .barrier_scheduler
381                            .try_cancel_scheduled_create(database_id, job_id)
382                        {
383                            tracing::debug!(
384                                id=%job_id,
385                                "cancelling streaming job in buffer queue."
386                            );
387                            CancelResult::Cancelled
388                        } else if !job_fragments.is_created() {
389                            tracing::debug!(
390                                id=%job_id,
391                                "cancelling streaming job by issue cancel command."
392                            );
393
394                            let cancel_result: MetaResult<()> = async {
395                                let cancel_command = self.metadata_manager.catalog_controller
396                                    .build_cancel_command(&job_fragments)
397                                    .await?;
398                                self.metadata_manager.catalog_controller
399                                    .try_abort_creating_streaming_job(job_id, true)
400                                    .await?;
401
402                                self.barrier_scheduler
403                                    .run_command(database_id, cancel_command)
404                                    .await?;
405                                Ok(())
406                            }
407                            .await;
408
409                            match cancel_result {
410                                Ok(()) => CancelResult::Cancelled,
411                                Err(err) => {
412                                    tracing::warn!(
413                                        error = ?err.as_report(),
414                                        id = %job_id,
415                                        "failed to run cancel command for creating streaming job"
416                                    );
417                                    CancelResult::Failed(err)
418                                }
419                            }
420                        } else {
421                            // streaming job is already completed
422                            CancelResult::Completed(
423                                self.metadata_manager
424                                    .wait_streaming_job_finished(database_id, job_id)
425                                    .await,
426                            )
427                        }
428                    } else {
429                        CancelResult::Cancelled
430                    };
431
432                    let (cancelled, result) = match cancel_res {
433                        CancelResult::Completed(result) => (false, result),
434                        CancelResult::Failed(err) => (false, Err(err)),
435                        CancelResult::Cancelled => (true, Err(MetaError::cancelled("create"))),
436                    };
437
438                    let _ = notifier
439                        .send(cancelled)
440                        .inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
441
442                    result
443                }
444            }
445        }
446        .await;
447
448        tracing::debug!("cleaning creating job info: {}", job_id);
449        self.creating_job_info.delete_job(job_id).await;
450        result
451    }
452
453    /// The function will return after barrier collected
454    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
455    #[await_tree::instrument]
456    async fn run_create_streaming_job_command(
457        &self,
458        stream_job_fragments: StreamJobFragmentsToCreate,
459        CreateStreamingJobContext {
460            streaming_job,
461            upstream_fragment_downstreams,
462            database_resource_group,
463            definition,
464            create_type,
465            job_type,
466            new_upstream_sink,
467            snapshot_backfill_info,
468            cross_db_snapshot_backfill_info,
469            fragment_backfill_ordering,
470            locality_fragment_state_table_mapping,
471            cdc_table_snapshot_splits,
472            is_serverless_backfill,
473            streaming_job_model,
474            ..
475        }: CreateStreamingJobContext,
476    ) -> MetaResult<StreamingJob> {
477        tracing::debug!(
478            table_id = %stream_job_fragments.stream_job_id(),
479            "built actors finished"
480        );
481
482        // Phase 1: Gather fragment-level split information.
483        // - For source fragments: discover splits from the external source.
484        // - For backfill fragments: splits will be aligned in Phase 2 inside the barrier worker
485        //   using the actor-level no-shuffle mapping produced by render_actors.
486        let init_split_assignment = self
487            .source_manager
488            .discover_splits(&stream_job_fragments)
489            .await?;
490
491        let fragment_backfill_ordering =
492            StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
493                fragment_backfill_ordering,
494                &stream_job_fragments.downstreams,
495                || {
496                    stream_job_fragments
497                        .fragments
498                        .iter()
499                        .map(|(fragment_id, fragment)| {
500                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
501                        })
502                },
503            );
504
505        let info = CreateStreamingJobCommandInfo {
506            stream_job_fragments,
507            upstream_fragment_downstreams,
508            init_split_assignment,
509            definition: definition.clone(),
510            streaming_job: streaming_job.clone(),
511            job_type,
512            create_type,
513            database_resource_group,
514            fragment_backfill_ordering,
515            cdc_table_snapshot_splits,
516            locality_fragment_state_table_mapping,
517            is_serverless: is_serverless_backfill,
518            streaming_job_model,
519        };
520
521        let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
522            tracing::debug!(
523                ?snapshot_backfill_info,
524                "sending Command::CreateSnapshotBackfillStreamingJob"
525            );
526            CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
527        } else {
528            tracing::debug!("sending Command::CreateStreamingJob");
529            if let Some(new_upstream_sink) = new_upstream_sink {
530                CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
531            } else {
532                CreateStreamingJobType::Normal
533            }
534        };
535
536        let command = Command::CreateStreamingJob {
537            info,
538            job_type,
539            cross_db_snapshot_backfill_info,
540        };
541
542        self.barrier_scheduler
543            .run_command(streaming_job.database_id(), command)
544            .await?;
545
546        tracing::debug!(?streaming_job, "first barrier collected for stream job");
547
548        Ok(streaming_job)
549    }
550
551    /// Send replace job command to barrier scheduler.
552    pub async fn replace_stream_job(
553        &self,
554        new_fragments: StreamJobFragmentsToCreate,
555        ReplaceStreamJobContext {
556            old_fragments,
557            replace_upstream,
558            upstream_fragment_downstreams,
559            tmp_id,
560            streaming_job,
561            drop_table_connector_ctx,
562            auto_refresh_schema_sinks,
563            streaming_job_model,
564            database_resource_group,
565        }: ReplaceStreamJobContext,
566    ) -> MetaResult<()> {
567        // Phase 1: Gather fragment-level split information.
568        // For replace source with existing downstream, splits will be aligned
569        // in Phase 2 inside the barrier worker using actor-level no-shuffle produced by render_actors.
570        // For replace source with no downstream (or non-source), discover splits fresh.
571        let split_plan = if streaming_job.is_source() {
572            match self
573                .source_manager
574                .discover_splits_for_replace_source(&new_fragments, &replace_upstream)
575                .await?
576            {
577                Some(discovered) => ReplaceJobSplitPlan::Discovered(discovered),
578                None => ReplaceJobSplitPlan::AlignFromPrevious,
579            }
580        } else {
581            let discovered = self.source_manager.discover_splits(&new_fragments).await?;
582            ReplaceJobSplitPlan::Discovered(discovered)
583        };
584        tracing::info!("replace_stream_job - split plan: {:?}", split_plan);
585
586        self.barrier_scheduler
587            .run_command(
588                streaming_job.database_id(),
589                Command::ReplaceStreamJob(ReplaceStreamJobPlan {
590                    old_fragments,
591                    new_fragments,
592                    database_resource_group,
593                    replace_upstream,
594                    upstream_fragment_downstreams,
595                    split_plan,
596                    streaming_job,
597                    streaming_job_model,
598                    tmp_id,
599                    to_drop_state_table_ids: {
600                        if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
601                            vec![drop_table_connector_ctx.to_remove_state_table_id]
602                        } else {
603                            Vec::new()
604                        }
605                    },
606                    auto_refresh_schema_sinks,
607                }),
608            )
609            .await?;
610
611        Ok(())
612    }
613
614    /// Drop streaming jobs by barrier manager, and clean up all related resources. The error will
615    /// be ignored because the recovery process will take over it in cleaning part. Check
616    /// [`Command::DropStreamingJobs`] for details.
617    pub async fn drop_streaming_jobs(
618        &self,
619        database_id: DatabaseId,
620        streaming_job_ids: Vec<JobId>,
621        state_table_ids: Vec<TableId>,
622        fragment_ids: HashSet<FragmentId>,
623        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
624    ) {
625        if !streaming_job_ids.is_empty() || !state_table_ids.is_empty() {
626            let _ = self
627                .barrier_scheduler
628                .run_command(
629                    database_id,
630                    Command::DropStreamingJobs {
631                        streaming_job_ids: streaming_job_ids.into_iter().collect(),
632                        unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
633                        unregistered_fragment_ids: fragment_ids,
634                        dropped_sink_fragment_by_targets,
635                    },
636                )
637                .await
638                .inspect_err(|err| {
639                    tracing::error!(error = ?err.as_report(), "failed to run drop command");
640                });
641        }
642    }
643
644    /// Cancel streaming jobs and return the canceled table ids.
645    /// 1. Send cancel message to stream jobs (via `cancel_jobs`).
646    /// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
647    ///
648    /// Cleanup of their state will be cleaned up after the `CancelStreamJob` command succeeds,
649    /// by the barrier manager for both of them.
650    pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
651        if job_ids.is_empty() {
652            return Ok(vec![]);
653        }
654
655        let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
656        let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
657
658        let futures = receivers.into_iter().map(|(id, receiver)| async move {
659            if let Ok(cancelled) = receiver.await
660                && cancelled
661            {
662                tracing::info!("canceled streaming job {id}");
663                Ok(id)
664            } else {
665                Err(MetaError::from(anyhow::anyhow!(
666                    "failed to cancel streaming job {id}"
667                )))
668            }
669        });
670        let mut cancelled_ids = join_all(futures)
671            .await
672            .into_iter()
673            .collect::<MetaResult<Vec<_>>>()?;
674
675        // NOTE(kwannoel): For background_job_ids stream jobs that not tracked in streaming manager,
676        // we can directly cancel them by running the barrier command.
677        let futures = background_job_ids.into_iter().map(|id| async move {
678            let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
679            if fragment.is_created() {
680                tracing::warn!(
681                    "streaming job {} is already created, ignore cancel request",
682                    id
683                );
684                return Ok(None);
685            }
686            if fragment.is_created() {
687                Err(MetaError::invalid_parameter(format!(
688                    "streaming job {} is already created",
689                    id
690                )))?;
691            }
692
693            let cancel_command = self
694                .metadata_manager
695                .catalog_controller
696                .build_cancel_command(&fragment)
697                .await?;
698
699            let (_, database_id) = self
700                .metadata_manager
701                .catalog_controller
702                .try_abort_creating_streaming_job(id, true)
703                .await?;
704
705            if let Some(database_id) = database_id {
706                self.barrier_scheduler
707                    .run_command(database_id, cancel_command)
708                    .await?;
709            }
710
711            tracing::info!(?id, "cancelled background streaming job");
712            Ok(Some(id))
713        });
714        let cancelled_recovered_ids = join_all(futures)
715            .await
716            .into_iter()
717            .collect::<MetaResult<Vec<_>>>()?;
718
719        cancelled_ids.extend(cancelled_recovered_ids.into_iter().flatten());
720        Ok(cancelled_ids)
721    }
722
723    pub(crate) async fn reschedule_streaming_job(
724        &self,
725        job_id: JobId,
726        policy: ReschedulePolicy,
727        deferred: bool,
728    ) -> MetaResult<()> {
729        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
730
731        let background_jobs = self
732            .metadata_manager
733            .list_background_creating_jobs()
734            .await?;
735
736        if !background_jobs.is_empty() {
737            let blocked_jobs = self
738                .metadata_manager
739                .collect_reschedule_blocked_jobs_for_creating_jobs(&background_jobs, !deferred)
740                .await?;
741
742            if blocked_jobs.contains(&job_id) {
743                bail!(
744                    "Cannot alter the job {} because it is blocked by creating unreschedulable backfill jobs",
745                    job_id,
746                );
747            }
748        }
749
750        let commands = self
751            .scale_controller
752            .reschedule_inplace(HashMap::from([(job_id, policy)]))
753            .await?;
754
755        if !deferred {
756            let _source_pause_guard = self.source_manager.pause_tick().await;
757
758            for (database_id, command) in commands {
759                self.barrier_scheduler
760                    .run_command(database_id, command)
761                    .await?;
762            }
763        }
764
765        Ok(())
766    }
767
768    pub(crate) async fn reschedule_streaming_job_backfill_parallelism(
769        &self,
770        job_id: JobId,
771        parallelism: Option<StreamingParallelism>,
772        deferred: bool,
773    ) -> MetaResult<()> {
774        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
775
776        let background_jobs = self
777            .metadata_manager
778            .list_background_creating_jobs()
779            .await?;
780
781        if !background_jobs.is_empty() {
782            let unreschedulable = self
783                .metadata_manager
784                .collect_unreschedulable_backfill_jobs(&background_jobs, !deferred)
785                .await?;
786
787            if unreschedulable.contains(&job_id) {
788                bail!(
789                    "Cannot alter the job {} because it is a non-reschedulable background backfill job",
790                    job_id,
791                );
792            }
793        }
794
795        let commands = self
796            .scale_controller
797            .reschedule_backfill_parallelism_inplace(HashMap::from([(job_id, parallelism)]))
798            .await?;
799
800        if !deferred {
801            let _source_pause_guard = self.source_manager.pause_tick().await;
802
803            for (database_id, command) in commands {
804                self.barrier_scheduler
805                    .run_command(database_id, command)
806                    .await?;
807            }
808        }
809
810        Ok(())
811    }
812
813    /// This method is copied from `GlobalStreamManager::reschedule_streaming_job` and modified to handle reschedule CDC table backfill.
814    pub(crate) async fn reschedule_cdc_table_backfill(
815        &self,
816        job_id: JobId,
817        target: ReschedulePolicy,
818    ) -> MetaResult<()> {
819        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
820
821        let parallelism_policy = match target {
822            ReschedulePolicy::Parallelism(policy)
823                if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
824            {
825                policy
826            }
827            _ => bail_invalid_parameter!(
828                "CDC backfill reschedule only supports fixed parallelism targets"
829            ),
830        };
831
832        let cdc_fragment_id = {
833            let inner = self.metadata_manager.catalog_controller.inner.read().await;
834            let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
835                .select_only()
836                .columns([
837                    fragment::Column::FragmentId,
838                    fragment::Column::FragmentTypeMask,
839                ])
840                .filter(fragment::Column::JobId.eq(job_id))
841                .into_tuple()
842                .all(&inner.db)
843                .await?;
844
845            let cdc_fragments = fragments
846                .into_iter()
847                .filter_map(|(fragment_id, mask)| {
848                    FragmentTypeMask::from(mask)
849                        .contains(FragmentTypeFlag::StreamCdcScan)
850                        .then_some(fragment_id)
851                })
852                .collect_vec();
853
854            match cdc_fragments.len() {
855                0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
856                1 => cdc_fragments[0],
857                _ => bail_invalid_parameter!(
858                    "multiple StreamCdcScan fragments found for job {}; expected exactly one",
859                    job_id
860                ),
861            }
862        };
863
864        let fragment_policy = HashMap::from([(
865            cdc_fragment_id,
866            Some(parallelism_policy.parallelism.clone()),
867        )]);
868
869        let commands = self
870            .scale_controller
871            .reschedule_fragment_inplace(fragment_policy)
872            .await?;
873
874        let _source_pause_guard = self.source_manager.pause_tick().await;
875
876        for (database_id, command) in commands {
877            self.barrier_scheduler
878                .run_command(database_id, command)
879                .await?;
880        }
881
882        Ok(())
883    }
884
885    pub(crate) async fn reschedule_fragments(
886        &self,
887        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
888    ) -> MetaResult<()> {
889        if fragment_targets.is_empty() {
890            return Ok(());
891        }
892
893        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
894
895        let fragment_policy = fragment_targets
896            .into_iter()
897            .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
898            .collect();
899
900        let commands = self
901            .scale_controller
902            .reschedule_fragment_inplace(fragment_policy)
903            .await?;
904
905        let _source_pause_guard = self.source_manager.pause_tick().await;
906
907        for (database_id, command) in commands {
908            self.barrier_scheduler
909                .run_command(database_id, command)
910                .await?;
911        }
912
913        Ok(())
914    }
915
916    // Don't need to add actor, just send a command
917    pub async fn create_subscription(
918        self: &Arc<Self>,
919        subscription: &Subscription,
920    ) -> MetaResult<()> {
921        let command = Command::CreateSubscription {
922            subscription_id: subscription.id,
923            upstream_mv_table_id: subscription.dependent_table_id,
924            retention_second: subscription.retention_seconds,
925        };
926
927        tracing::debug!("sending Command::CreateSubscription");
928        self.barrier_scheduler
929            .run_command(subscription.database_id, command)
930            .await?;
931        Ok(())
932    }
933
934    // Don't need to add actor, just send a command
935    pub async fn drop_subscription(
936        self: &Arc<Self>,
937        database_id: DatabaseId,
938        subscription_id: SubscriptionId,
939        table_id: TableId,
940    ) {
941        let command = Command::DropSubscription {
942            subscription_id,
943            upstream_mv_table_id: table_id,
944        };
945
946        tracing::debug!("sending Command::DropSubscriptions");
947        let _ = self
948            .barrier_scheduler
949            .run_command(database_id, command)
950            .await
951            .inspect_err(|err| {
952                tracing::error!(error = ?err.as_report(), "failed to run drop command");
953            });
954    }
955
956    pub async fn alter_subscription_retention(
957        self: &Arc<Self>,
958        database_id: DatabaseId,
959        subscription_id: SubscriptionId,
960        table_id: TableId,
961        retention_second: u64,
962    ) -> MetaResult<()> {
963        let command = Command::AlterSubscriptionRetention {
964            subscription_id,
965            upstream_mv_table_id: table_id,
966            retention_second,
967        };
968
969        tracing::debug!("sending Command::AlterSubscriptionRetention");
970        self.barrier_scheduler
971            .run_command(database_id, command)
972            .await?;
973        Ok(())
974    }
975}