risingwave_meta/stream/
stream_manager.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::span;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::{JobId, SinkId};
25use risingwave_connector::source::CdcTableSnapshotSplitRaw;
26use risingwave_meta_model::prelude::Fragment as FragmentModel;
27use risingwave_meta_model::{StreamingParallelism, fragment};
28use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
29use risingwave_pb::expr::PbExprNode;
30use risingwave_pb::meta::table_fragments::ActorStatus;
31use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
32use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
33use thiserror_ext::AsReport;
34use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
35use tracing::Instrument;
36
37use super::{
38    Locations, ReplaceJobSplitPlan, ReschedulePolicy, ScaleControllerRef, StreamFragmentGraph,
39    UserDefinedFragmentBackfillOrder,
40};
41use crate::barrier::{
42    BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
43    ReplaceStreamJobPlan, SnapshotBackfillInfo,
44};
45use crate::controller::catalog::DropTableConnectorContext;
46use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
47use crate::error::bail_invalid_parameter;
48use crate::manager::{
49    MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
50};
51use crate::model::{
52    ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
53    FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
54};
55use crate::stream::SourceManagerRef;
56use crate::{MetaError, MetaResult};
57
58pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
59
60#[derive(Default)]
61pub struct CreateStreamingJobOption {
62    // leave empty as a placeholder for future option if there is any
63}
64
65#[derive(Debug, Clone)]
66pub struct UpstreamSinkInfo {
67    pub sink_id: SinkId,
68    pub sink_fragment_id: FragmentId,
69    pub sink_output_fields: Vec<PbField>,
70    // for backwards compatibility
71    pub sink_original_target_columns: Vec<PbColumnCatalog>,
72    pub project_exprs: Vec<PbExprNode>,
73    pub new_sink_downstream: DownstreamFragmentRelation,
74}
75
76/// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job.
77///
78/// Note: for better readability, keep this struct complete and immutable once created.
79pub struct CreateStreamingJobContext {
80    /// New fragment relation to add from upstream fragments to downstream fragments.
81    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
82
83    /// The locations of the actors to build in the streaming job.
84    pub building_locations: Locations,
85
86    /// DDL definition.
87    pub definition: String,
88
89    pub create_type: CreateType,
90
91    pub job_type: StreamingJobType,
92
93    /// Used for sink-into-table.
94    pub new_upstream_sink: Option<UpstreamSinkInfo>,
95
96    pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
97    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
98
99    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
100
101    pub option: CreateStreamingJobOption,
102
103    pub streaming_job: StreamingJob,
104
105    pub fragment_backfill_ordering: UserDefinedFragmentBackfillOrder,
106
107    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
108
109    pub is_serverless_backfill: bool,
110}
111
112struct StreamingJobExecution {
113    id: JobId,
114    shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
115    _permit: OwnedSemaphorePermit,
116}
117
118impl StreamingJobExecution {
119    fn new(
120        id: JobId,
121        shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
122        permit: OwnedSemaphorePermit,
123    ) -> Self {
124        Self {
125            id,
126            shutdown_tx: Some(shutdown_tx),
127            _permit: permit,
128        }
129    }
130}
131
132#[derive(Default)]
133struct CreatingStreamingJobInfo {
134    streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
135}
136
137impl CreatingStreamingJobInfo {
138    async fn add_job(&self, job: StreamingJobExecution) {
139        let mut jobs = self.streaming_jobs.lock().await;
140        jobs.insert(job.id, job);
141    }
142
143    async fn delete_job(&self, job_id: JobId) {
144        let mut jobs = self.streaming_jobs.lock().await;
145        jobs.remove(&job_id);
146    }
147
148    async fn cancel_jobs(
149        &self,
150        job_ids: Vec<JobId>,
151    ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
152        let mut jobs = self.streaming_jobs.lock().await;
153        let mut receivers = HashMap::new();
154        let mut background_job_ids = vec![];
155        for job_id in job_ids {
156            if let Some(job) = jobs.get_mut(&job_id) {
157                if let Some(shutdown_tx) = job.shutdown_tx.take() {
158                    let (tx, rx) = oneshot::channel();
159                    match shutdown_tx.send(tx) {
160                        Ok(()) => {
161                            receivers.insert(job_id, rx);
162                        }
163                        Err(_) => {
164                            return Err(anyhow::anyhow!(
165                                "failed to send shutdown signal for streaming job {}: receiver dropped",
166                                job_id
167                            )
168                            .into());
169                        }
170                    }
171                }
172            } else {
173                // If these job ids do not exist in streaming_jobs, they should be background creating jobs.
174                background_job_ids.push(job_id);
175            }
176        }
177
178        Ok((receivers, background_job_ids))
179    }
180}
181
182type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
183
184#[derive(Debug, Clone)]
185pub struct AutoRefreshSchemaSinkContext {
186    pub tmp_sink_id: SinkId,
187    pub original_sink: PbSink,
188    pub original_fragment: Fragment,
189    pub new_schema: Vec<PbColumnCatalog>,
190    pub newly_add_fields: Vec<Field>,
191    pub new_fragment: Fragment,
192    pub new_log_store_table: Option<PbTable>,
193    pub actor_status: BTreeMap<ActorId, ActorStatus>,
194}
195
196impl AutoRefreshSchemaSinkContext {
197    pub fn new_fragment_info(&self) -> InflightFragmentInfo {
198        InflightFragmentInfo {
199            fragment_id: self.new_fragment.fragment_id,
200            distribution_type: self.new_fragment.distribution_type.into(),
201            fragment_type_mask: self.new_fragment.fragment_type_mask,
202            vnode_count: self.new_fragment.vnode_count(),
203            nodes: self.new_fragment.nodes.clone(),
204            actors: self
205                .new_fragment
206                .actors
207                .iter()
208                .map(|actor| {
209                    (
210                        actor.actor_id as _,
211                        InflightActorInfo {
212                            worker_id: self.actor_status[&actor.actor_id]
213                                .location
214                                .as_ref()
215                                .unwrap()
216                                .worker_node_id,
217                            vnode_bitmap: actor.vnode_bitmap.clone(),
218                            splits: vec![],
219                        },
220                    )
221                })
222                .collect(),
223            state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
224        }
225    }
226}
227
228/// [`ReplaceStreamJobContext`] carries one-time infos for replacing the plan of an existing stream job.
229///
230/// Note: for better readability, keep this struct complete and immutable once created.
231pub struct ReplaceStreamJobContext {
232    /// The old job fragments to be replaced.
233    pub old_fragments: StreamJobFragments,
234
235    /// The updates to be applied to the downstream chain actors. Used for schema change.
236    pub replace_upstream: FragmentReplaceUpstream,
237
238    /// New fragment relation to add from existing upstream fragment to downstream fragment.
239    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
240
241    /// The locations of the actors to build in the new job to replace.
242    pub building_locations: Locations,
243
244    pub streaming_job: StreamingJob,
245
246    pub tmp_id: JobId,
247
248    /// Used for dropping an associated source. Dropping source and related internal tables.
249    pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
250
251    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
252}
253
254/// `GlobalStreamManager` manages all the streams in the system.
255pub struct GlobalStreamManager {
256    pub env: MetaSrvEnv,
257
258    pub metadata_manager: MetadataManager,
259
260    /// Broadcasts and collect barriers
261    pub barrier_scheduler: BarrierScheduler,
262
263    /// Maintains streaming sources from external system like kafka
264    pub source_manager: SourceManagerRef,
265
266    /// Creating streaming job info.
267    creating_job_info: CreatingStreamingJobInfoRef,
268
269    pub scale_controller: ScaleControllerRef,
270}
271
272impl GlobalStreamManager {
273    pub fn new(
274        env: MetaSrvEnv,
275        metadata_manager: MetadataManager,
276        barrier_scheduler: BarrierScheduler,
277        source_manager: SourceManagerRef,
278        scale_controller: ScaleControllerRef,
279    ) -> MetaResult<Self> {
280        Ok(Self {
281            env,
282            metadata_manager,
283            barrier_scheduler,
284            source_manager,
285            creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
286            scale_controller,
287        })
288    }
289
290    /// Create streaming job, it works as follows:
291    ///
292    /// 1. Broadcast the actor info based on the scheduling result in the context, build the hanging
293    ///    channels in upstream worker nodes.
294    /// 2. (optional) Get the split information of the `StreamSource` via source manager and patch
295    ///    actors.
296    /// 3. Notify related worker nodes to update and build the actors.
297    /// 4. Store related meta data.
298    ///
299    /// This function is a wrapper over [`Self::run_create_streaming_job_command`].
300    #[await_tree::instrument]
301    pub async fn create_streaming_job(
302        self: &Arc<Self>,
303        stream_job_fragments: StreamJobFragmentsToCreate,
304        ctx: CreateStreamingJobContext,
305        permit: OwnedSemaphorePermit,
306    ) -> MetaResult<NotificationVersion> {
307        let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
308        let await_tree_span = span!(
309            "{:?}({})",
310            ctx.streaming_job.job_type(),
311            ctx.streaming_job.name()
312        );
313
314        let job_id = stream_job_fragments.stream_job_id();
315        let database_id = ctx.streaming_job.database_id();
316
317        let (cancel_tx, cancel_rx) = oneshot::channel();
318        let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
319        self.creating_job_info.add_job(execution).await;
320
321        let stream_manager = self.clone();
322        let fut = async move {
323            let create_type = ctx.create_type;
324            let streaming_job = stream_manager
325                .run_create_streaming_job_command(stream_job_fragments, ctx)
326                .await?;
327            let version = match create_type {
328                CreateType::Background => {
329                    stream_manager
330                        .env
331                        .notification_manager_ref()
332                        .current_version()
333                        .await
334                }
335                CreateType::Foreground => {
336                    stream_manager
337                        .metadata_manager
338                        .wait_streaming_job_finished(database_id, streaming_job.id() as _)
339                        .await?
340                }
341                CreateType::Unspecified => unreachable!(),
342            };
343
344            tracing::debug!(?streaming_job, "stream job finish");
345            Ok(version)
346        }
347        .in_current_span();
348
349        let create_fut = (self.env.await_tree_reg())
350            .register(await_tree_key, await_tree_span)
351            .instrument(Box::pin(fut));
352
353        let result = async {
354            tokio::select! {
355                biased;
356
357                res = create_fut => res,
358                notifier = cancel_rx => {
359                    let notifier = notifier.expect("sender should not be dropped");
360                    tracing::debug!(id=%job_id, "cancelling streaming job");
361
362                    enum CancelResult {
363                        Completed(MetaResult<NotificationVersion>),
364                        Failed(MetaError),
365                        Cancelled,
366                    }
367
368                    let cancel_res = if let Ok(job_fragments) =
369                        self.metadata_manager.get_job_fragments_by_id(job_id).await
370                    {
371                        // try to cancel buffered creating command.
372                        if self
373                            .barrier_scheduler
374                            .try_cancel_scheduled_create(database_id, job_id)
375                        {
376                            tracing::debug!(
377                                id=%job_id,
378                                "cancelling streaming job in buffer queue."
379                            );
380                            CancelResult::Cancelled
381                        } else if !job_fragments.is_created() {
382                            tracing::debug!(
383                                id=%job_id,
384                                "cancelling streaming job by issue cancel command."
385                            );
386
387                            let cancel_result: MetaResult<()> = async {
388                                let cancel_command = self.metadata_manager.catalog_controller
389                                    .build_cancel_command(&job_fragments)
390                                    .await?;
391                                self.metadata_manager.catalog_controller
392                                    .try_abort_creating_streaming_job(job_id, true)
393                                    .await?;
394
395                                self.barrier_scheduler
396                                    .run_command(database_id, cancel_command)
397                                    .await?;
398                                Ok(())
399                            }
400                            .await;
401
402                            match cancel_result {
403                                Ok(()) => CancelResult::Cancelled,
404                                Err(err) => {
405                                    tracing::warn!(
406                                        error = ?err.as_report(),
407                                        id = %job_id,
408                                        "failed to run cancel command for creating streaming job"
409                                    );
410                                    CancelResult::Failed(err)
411                                }
412                            }
413                        } else {
414                            // streaming job is already completed
415                            CancelResult::Completed(
416                                self.metadata_manager
417                                    .wait_streaming_job_finished(database_id, job_id)
418                                    .await,
419                            )
420                        }
421                    } else {
422                        CancelResult::Cancelled
423                    };
424
425                    let (cancelled, result) = match cancel_res {
426                        CancelResult::Completed(result) => (false, result),
427                        CancelResult::Failed(err) => (false, Err(err)),
428                        CancelResult::Cancelled => (true, Err(MetaError::cancelled("create"))),
429                    };
430
431                    let _ = notifier
432                        .send(cancelled)
433                        .inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
434
435                    result
436                }
437            }
438        }
439        .await;
440
441        tracing::debug!("cleaning creating job info: {}", job_id);
442        self.creating_job_info.delete_job(job_id).await;
443        result
444    }
445
446    /// The function will return after barrier collected
447    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
448    #[await_tree::instrument]
449    async fn run_create_streaming_job_command(
450        &self,
451        stream_job_fragments: StreamJobFragmentsToCreate,
452        CreateStreamingJobContext {
453            streaming_job,
454            upstream_fragment_downstreams,
455            definition,
456            create_type,
457            job_type,
458            new_upstream_sink,
459            snapshot_backfill_info,
460            cross_db_snapshot_backfill_info,
461            fragment_backfill_ordering,
462            locality_fragment_state_table_mapping,
463            cdc_table_snapshot_splits,
464            is_serverless_backfill,
465            ..
466        }: CreateStreamingJobContext,
467    ) -> MetaResult<StreamingJob> {
468        tracing::debug!(
469            table_id = %stream_job_fragments.stream_job_id(),
470            "built actors finished"
471        );
472
473        // Phase 1: Gather fragment-level split information.
474        // - For source fragments: discover splits from the external source.
475        // - For backfill fragments: splits will be aligned in Phase 2 inside the barrier worker
476        //   using the new_no_shuffle mapping and in-flight actor info.
477        let init_split_assignment = self
478            .source_manager
479            .discover_splits(&stream_job_fragments)
480            .await?;
481
482        let fragment_backfill_ordering =
483            StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
484                fragment_backfill_ordering,
485                &stream_job_fragments.downstreams,
486                || {
487                    stream_job_fragments
488                        .fragments
489                        .iter()
490                        .map(|(fragment_id, fragment)| {
491                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
492                        })
493                },
494            );
495
496        let info = CreateStreamingJobCommandInfo {
497            stream_job_fragments,
498            upstream_fragment_downstreams,
499            init_split_assignment,
500            definition: definition.clone(),
501            streaming_job: streaming_job.clone(),
502            job_type,
503            create_type,
504            fragment_backfill_ordering,
505            cdc_table_snapshot_splits,
506            locality_fragment_state_table_mapping,
507            is_serverless: is_serverless_backfill,
508        };
509
510        let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
511            tracing::debug!(
512                ?snapshot_backfill_info,
513                "sending Command::CreateSnapshotBackfillStreamingJob"
514            );
515            CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
516        } else {
517            tracing::debug!("sending Command::CreateStreamingJob");
518            if let Some(new_upstream_sink) = new_upstream_sink {
519                CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
520            } else {
521                CreateStreamingJobType::Normal
522            }
523        };
524
525        let command = Command::CreateStreamingJob {
526            info,
527            job_type,
528            cross_db_snapshot_backfill_info,
529        };
530
531        self.barrier_scheduler
532            .run_command(streaming_job.database_id(), command)
533            .await?;
534
535        tracing::debug!(?streaming_job, "first barrier collected for stream job");
536
537        Ok(streaming_job)
538    }
539
540    /// Send replace job command to barrier scheduler.
541    pub async fn replace_stream_job(
542        &self,
543        new_fragments: StreamJobFragmentsToCreate,
544        ReplaceStreamJobContext {
545            old_fragments,
546            replace_upstream,
547            upstream_fragment_downstreams,
548            tmp_id,
549            streaming_job,
550            drop_table_connector_ctx,
551            auto_refresh_schema_sinks,
552            ..
553        }: ReplaceStreamJobContext,
554    ) -> MetaResult<()> {
555        // Phase 1: Gather fragment-level split information.
556        // For replace source with existing downstream, splits will be aligned
557        // in Phase 2 inside the barrier worker using new_no_shuffle and SharedActorInfos.
558        // For replace source with no downstream (or non-source), discover splits fresh.
559        let split_plan = if streaming_job.is_source() {
560            match self
561                .source_manager
562                .discover_splits_for_replace_source(&new_fragments, &replace_upstream)
563                .await?
564            {
565                Some(discovered) => ReplaceJobSplitPlan::Discovered(discovered),
566                None => ReplaceJobSplitPlan::AlignFromPrevious,
567            }
568        } else {
569            let discovered = self.source_manager.discover_splits(&new_fragments).await?;
570            ReplaceJobSplitPlan::Discovered(discovered)
571        };
572        tracing::info!("replace_stream_job - split plan: {:?}", split_plan);
573
574        self.barrier_scheduler
575            .run_command(
576                streaming_job.database_id(),
577                Command::ReplaceStreamJob(ReplaceStreamJobPlan {
578                    old_fragments,
579                    new_fragments,
580                    replace_upstream,
581                    upstream_fragment_downstreams,
582                    split_plan,
583                    streaming_job,
584                    tmp_id,
585                    to_drop_state_table_ids: {
586                        if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
587                            vec![drop_table_connector_ctx.to_remove_state_table_id]
588                        } else {
589                            Vec::new()
590                        }
591                    },
592                    auto_refresh_schema_sinks,
593                }),
594            )
595            .await?;
596
597        Ok(())
598    }
599
600    /// Drop streaming jobs by barrier manager, and clean up all related resources. The error will
601    /// be ignored because the recovery process will take over it in cleaning part. Check
602    /// [`Command::DropStreamingJobs`] for details.
603    pub async fn drop_streaming_jobs(
604        &self,
605        database_id: DatabaseId,
606        removed_actors: Vec<ActorId>,
607        streaming_job_ids: Vec<JobId>,
608        state_table_ids: Vec<TableId>,
609        fragment_ids: HashSet<FragmentId>,
610        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
611    ) {
612        if !removed_actors.is_empty()
613            || !streaming_job_ids.is_empty()
614            || !state_table_ids.is_empty()
615        {
616            let _ = self
617                .barrier_scheduler
618                .run_command(
619                    database_id,
620                    Command::DropStreamingJobs {
621                        streaming_job_ids: streaming_job_ids.into_iter().collect(),
622                        actors: removed_actors,
623                        unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
624                        unregistered_fragment_ids: fragment_ids,
625                        dropped_sink_fragment_by_targets,
626                    },
627                )
628                .await
629                .inspect_err(|err| {
630                    tracing::error!(error = ?err.as_report(), "failed to run drop command");
631                });
632        }
633    }
634
635    /// Cancel streaming jobs and return the canceled table ids.
636    /// 1. Send cancel message to stream jobs (via `cancel_jobs`).
637    /// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
638    ///
639    /// Cleanup of their state will be cleaned up after the `CancelStreamJob` command succeeds,
640    /// by the barrier manager for both of them.
641    pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
642        if job_ids.is_empty() {
643            return Ok(vec![]);
644        }
645
646        let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
647        let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
648
649        let futures = receivers.into_iter().map(|(id, receiver)| async move {
650            if let Ok(cancelled) = receiver.await
651                && cancelled
652            {
653                tracing::info!("canceled streaming job {id}");
654                Ok(id)
655            } else {
656                Err(MetaError::from(anyhow::anyhow!(
657                    "failed to cancel streaming job {id}"
658                )))
659            }
660        });
661        let mut cancelled_ids = join_all(futures)
662            .await
663            .into_iter()
664            .collect::<MetaResult<Vec<_>>>()?;
665
666        // NOTE(kwannoel): For background_job_ids stream jobs that not tracked in streaming manager,
667        // we can directly cancel them by running the barrier command.
668        let futures = background_job_ids.into_iter().map(|id| async move {
669            let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
670            if fragment.is_created() {
671                tracing::warn!(
672                    "streaming job {} is already created, ignore cancel request",
673                    id
674                );
675                return Ok(None);
676            }
677            if fragment.is_created() {
678                Err(MetaError::invalid_parameter(format!(
679                    "streaming job {} is already created",
680                    id
681                )))?;
682            }
683
684            let cancel_command = self
685                .metadata_manager
686                .catalog_controller
687                .build_cancel_command(&fragment)
688                .await?;
689
690            let (_, database_id) = self
691                .metadata_manager
692                .catalog_controller
693                .try_abort_creating_streaming_job(id, true)
694                .await?;
695
696            if let Some(database_id) = database_id {
697                self.barrier_scheduler
698                    .run_command(database_id, cancel_command)
699                    .await?;
700            }
701
702            tracing::info!(?id, "cancelled background streaming job");
703            Ok(Some(id))
704        });
705        let cancelled_recovered_ids = join_all(futures)
706            .await
707            .into_iter()
708            .collect::<MetaResult<Vec<_>>>()?;
709
710        cancelled_ids.extend(cancelled_recovered_ids.into_iter().flatten());
711        Ok(cancelled_ids)
712    }
713
714    pub(crate) async fn reschedule_streaming_job(
715        &self,
716        job_id: JobId,
717        policy: ReschedulePolicy,
718        deferred: bool,
719    ) -> MetaResult<()> {
720        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
721
722        let background_jobs = self
723            .metadata_manager
724            .list_background_creating_jobs()
725            .await?;
726
727        if !background_jobs.is_empty() {
728            let blocked_jobs = self
729                .metadata_manager
730                .collect_reschedule_blocked_jobs_for_creating_jobs(&background_jobs, !deferred)
731                .await?;
732
733            if blocked_jobs.contains(&job_id) {
734                bail!(
735                    "Cannot alter the job {} because it is blocked by creating unreschedulable backfill jobs",
736                    job_id,
737                );
738            }
739        }
740
741        let commands = self
742            .scale_controller
743            .reschedule_inplace(HashMap::from([(job_id, policy)]))
744            .await?;
745
746        if !deferred {
747            let _source_pause_guard = self.source_manager.pause_tick().await;
748
749            for (database_id, command) in commands {
750                self.barrier_scheduler
751                    .run_command(database_id, command)
752                    .await?;
753            }
754        }
755
756        Ok(())
757    }
758
759    pub(crate) async fn reschedule_streaming_job_backfill_parallelism(
760        &self,
761        job_id: JobId,
762        parallelism: Option<StreamingParallelism>,
763        deferred: bool,
764    ) -> MetaResult<()> {
765        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
766
767        let background_jobs = self
768            .metadata_manager
769            .list_background_creating_jobs()
770            .await?;
771
772        if !background_jobs.is_empty() {
773            let unreschedulable = self
774                .metadata_manager
775                .collect_unreschedulable_backfill_jobs(&background_jobs, !deferred)
776                .await?;
777
778            if unreschedulable.contains(&job_id) {
779                bail!(
780                    "Cannot alter the job {} because it is a non-reschedulable background backfill job",
781                    job_id,
782                );
783            }
784        }
785
786        let commands = self
787            .scale_controller
788            .reschedule_backfill_parallelism_inplace(HashMap::from([(job_id, parallelism)]))
789            .await?;
790
791        if !deferred {
792            let _source_pause_guard = self.source_manager.pause_tick().await;
793
794            for (database_id, command) in commands {
795                self.barrier_scheduler
796                    .run_command(database_id, command)
797                    .await?;
798            }
799        }
800
801        Ok(())
802    }
803
804    /// This method is copied from `GlobalStreamManager::reschedule_streaming_job` and modified to handle reschedule CDC table backfill.
805    pub(crate) async fn reschedule_cdc_table_backfill(
806        &self,
807        job_id: JobId,
808        target: ReschedulePolicy,
809    ) -> MetaResult<()> {
810        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
811
812        let parallelism_policy = match target {
813            ReschedulePolicy::Parallelism(policy)
814                if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
815            {
816                policy
817            }
818            _ => bail_invalid_parameter!(
819                "CDC backfill reschedule only supports fixed parallelism targets"
820            ),
821        };
822
823        let cdc_fragment_id = {
824            let inner = self.metadata_manager.catalog_controller.inner.read().await;
825            let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
826                .select_only()
827                .columns([
828                    fragment::Column::FragmentId,
829                    fragment::Column::FragmentTypeMask,
830                ])
831                .filter(fragment::Column::JobId.eq(job_id))
832                .into_tuple()
833                .all(&inner.db)
834                .await?;
835
836            let cdc_fragments = fragments
837                .into_iter()
838                .filter_map(|(fragment_id, mask)| {
839                    FragmentTypeMask::from(mask)
840                        .contains(FragmentTypeFlag::StreamCdcScan)
841                        .then_some(fragment_id)
842                })
843                .collect_vec();
844
845            match cdc_fragments.len() {
846                0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
847                1 => cdc_fragments[0],
848                _ => bail_invalid_parameter!(
849                    "multiple StreamCdcScan fragments found for job {}; expected exactly one",
850                    job_id
851                ),
852            }
853        };
854
855        let fragment_policy = HashMap::from([(
856            cdc_fragment_id,
857            Some(parallelism_policy.parallelism.clone()),
858        )]);
859
860        let commands = self
861            .scale_controller
862            .reschedule_fragment_inplace(fragment_policy)
863            .await?;
864
865        let _source_pause_guard = self.source_manager.pause_tick().await;
866
867        for (database_id, command) in commands {
868            self.barrier_scheduler
869                .run_command(database_id, command)
870                .await?;
871        }
872
873        Ok(())
874    }
875
876    pub(crate) async fn reschedule_fragments(
877        &self,
878        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
879    ) -> MetaResult<()> {
880        if fragment_targets.is_empty() {
881            return Ok(());
882        }
883
884        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
885
886        let fragment_policy = fragment_targets
887            .into_iter()
888            .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
889            .collect();
890
891        let commands = self
892            .scale_controller
893            .reschedule_fragment_inplace(fragment_policy)
894            .await?;
895
896        let _source_pause_guard = self.source_manager.pause_tick().await;
897
898        for (database_id, command) in commands {
899            self.barrier_scheduler
900                .run_command(database_id, command)
901                .await?;
902        }
903
904        Ok(())
905    }
906
907    // Don't need to add actor, just send a command
908    pub async fn create_subscription(
909        self: &Arc<Self>,
910        subscription: &Subscription,
911    ) -> MetaResult<()> {
912        let command = Command::CreateSubscription {
913            subscription_id: subscription.id,
914            upstream_mv_table_id: subscription.dependent_table_id,
915            retention_second: subscription.retention_seconds,
916        };
917
918        tracing::debug!("sending Command::CreateSubscription");
919        self.barrier_scheduler
920            .run_command(subscription.database_id, command)
921            .await?;
922        Ok(())
923    }
924
925    // Don't need to add actor, just send a command
926    pub async fn drop_subscription(
927        self: &Arc<Self>,
928        database_id: DatabaseId,
929        subscription_id: SubscriptionId,
930        table_id: TableId,
931    ) {
932        let command = Command::DropSubscription {
933            subscription_id,
934            upstream_mv_table_id: table_id,
935        };
936
937        tracing::debug!("sending Command::DropSubscriptions");
938        let _ = self
939            .barrier_scheduler
940            .run_command(database_id, command)
941            .await
942            .inspect_err(|err| {
943                tracing::error!(error = ?err.as_report(), "failed to run drop command");
944            });
945    }
946
947    pub async fn alter_subscription_retention(
948        self: &Arc<Self>,
949        database_id: DatabaseId,
950        subscription_id: SubscriptionId,
951        table_id: TableId,
952        retention_second: u64,
953    ) -> MetaResult<()> {
954        let command = Command::AlterSubscriptionRetention {
955            subscription_id,
956            upstream_mv_table_id: table_id,
957            retention_second,
958        };
959
960        tracing::debug!("sending Command::AlterSubscriptionRetention");
961        self.barrier_scheduler
962            .run_command(database_id, command)
963            .await?;
964        Ok(())
965    }
966}