risingwave_meta/stream/
stream_manager.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use await_tree::span;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::{JobId, SinkId};
25use risingwave_connector::source::CdcTableSnapshotSplitRaw;
26use risingwave_meta_model::prelude::Fragment as FragmentModel;
27use risingwave_meta_model::{StreamingParallelism, WorkerId, fragment, streaming_job};
28use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
29use risingwave_pb::expr::PbExprNode;
30use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
31use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
32use thiserror_ext::AsReport;
33use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
34use tracing::Instrument;
35
36use super::{
37    GlobalRefreshManagerRef, ParallelismPolicy, ReschedulePolicy, ScaleControllerRef,
38    StreamFragmentGraph, UserDefinedFragmentBackfillOrder,
39};
40use crate::barrier::{
41    BarrierScheduler, BatchRefreshInfo, Command, CreateStreamingJobCommandInfo,
42    CreateStreamingJobType, ReplaceStreamJobPlan, SnapshotBackfillInfo,
43};
44use crate::controller::catalog::DropTableConnectorContext;
45use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
46use crate::error::bail_invalid_parameter;
47use crate::hummock::HummockManagerRef;
48use crate::manager::{
49    MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
50};
51use crate::model::{
52    ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
53    FragmentReplaceUpstream, StreamActor, StreamContext, StreamJobFragments,
54    StreamJobFragmentsToCreate, SubscriptionId,
55};
56use crate::stream::{ReplaceJobSplitPlan, SourceManagerRef};
57use crate::{MetaError, MetaResult};
58
59pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
60
61pub(crate) async fn cleanup_dropped_streaming_jobs(
62    refresh_manager: &GlobalRefreshManagerRef,
63    hummock_manager: &HummockManagerRef,
64    metadata_manager: &MetadataManager,
65    streaming_job_ids: impl IntoIterator<Item = JobId>,
66    state_table_ids: Vec<TableId>,
67    progress_status: &str,
68) -> MetaResult<()> {
69    for job_id in streaming_job_ids {
70        refresh_manager.remove_progress_tracker(job_id.as_mv_table_id(), progress_status);
71    }
72
73    if state_table_ids.is_empty() {
74        return Ok(());
75    }
76
77    hummock_manager
78        .unregister_table_ids(state_table_ids.clone())
79        .await?;
80    metadata_manager
81        .catalog_controller
82        .complete_dropped_tables(state_table_ids)
83        .await;
84    Ok(())
85}
86
87#[derive(Default)]
88pub struct CreateStreamingJobOption {
89    // leave empty as a placeholder for future option if there is any
90}
91
92#[derive(Debug, Clone)]
93pub struct UpstreamSinkInfo {
94    pub sink_id: SinkId,
95    pub sink_fragment_id: FragmentId,
96    pub sink_output_fields: Vec<PbField>,
97    // for backwards compatibility
98    pub sink_original_target_columns: Vec<PbColumnCatalog>,
99    pub project_exprs: Vec<PbExprNode>,
100    pub new_sink_downstream: DownstreamFragmentRelation,
101}
102
103/// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job.
104///
105/// Note: for better readability, keep this struct complete and immutable once created.
106pub struct CreateStreamingJobContext {
107    /// New fragment relation to add from upstream fragments to downstream fragments.
108    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
109
110    /// The resource group of the database this job belongs to.
111    pub database_resource_group: String,
112
113    /// DDL definition.
114    pub definition: String,
115
116    pub create_type: CreateType,
117
118    pub job_type: StreamingJobType,
119
120    /// Used for sink-into-table.
121    pub new_upstream_sink: Option<UpstreamSinkInfo>,
122
123    pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
124    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
125
126    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
127
128    pub option: CreateStreamingJobOption,
129
130    pub streaming_job: StreamingJob,
131
132    pub fragment_backfill_ordering: UserDefinedFragmentBackfillOrder,
133
134    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
135
136    pub is_serverless_backfill: bool,
137
138    /// The `streaming_job::Model` for this job, loaded from meta store.
139    pub streaming_job_model: streaming_job::Model,
140
141    /// Batch refresh interval in seconds. If set, the MV uses batch refresh semantics.
142    pub refresh_interval_sec: Option<u64>,
143}
144
145struct StreamingJobExecution {
146    id: JobId,
147    shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
148    _permit: OwnedSemaphorePermit,
149}
150
151impl StreamingJobExecution {
152    fn new(
153        id: JobId,
154        shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
155        permit: OwnedSemaphorePermit,
156    ) -> Self {
157        Self {
158            id,
159            shutdown_tx: Some(shutdown_tx),
160            _permit: permit,
161        }
162    }
163}
164
165#[derive(Default)]
166struct CreatingStreamingJobInfo {
167    streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
168}
169
170impl CreatingStreamingJobInfo {
171    async fn add_job(&self, job: StreamingJobExecution) {
172        let mut jobs = self.streaming_jobs.lock().await;
173        jobs.insert(job.id, job);
174    }
175
176    async fn delete_job(&self, job_id: JobId) {
177        let mut jobs = self.streaming_jobs.lock().await;
178        jobs.remove(&job_id);
179    }
180
181    async fn cancel_jobs(
182        &self,
183        job_ids: Vec<JobId>,
184    ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
185        let mut jobs = self.streaming_jobs.lock().await;
186        let mut receivers = HashMap::new();
187        let mut background_job_ids = vec![];
188        for job_id in job_ids {
189            if let Some(job) = jobs.get_mut(&job_id) {
190                if let Some(shutdown_tx) = job.shutdown_tx.take() {
191                    let (tx, rx) = oneshot::channel();
192                    match shutdown_tx.send(tx) {
193                        Ok(()) => {
194                            receivers.insert(job_id, rx);
195                        }
196                        Err(_) => {
197                            return Err(anyhow::anyhow!(
198                                "failed to send shutdown signal for streaming job {}: receiver dropped",
199                                job_id
200                            )
201                            .into());
202                        }
203                    }
204                }
205            } else {
206                // If these job ids do not exist in streaming_jobs, they should be background creating jobs.
207                background_job_ids.push(job_id);
208            }
209        }
210
211        Ok((receivers, background_job_ids))
212    }
213}
214
215type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
216
217#[derive(Debug, Clone)]
218pub struct AutoRefreshSchemaSinkContext {
219    pub tmp_sink_id: SinkId,
220    pub original_sink: PbSink,
221    pub original_fragment: Fragment,
222    pub new_schema: Vec<PbColumnCatalog>,
223    pub newly_add_fields: Vec<Field>,
224    pub removed_column_names: Vec<String>,
225    pub new_fragment: Fragment,
226    pub new_log_store_table: Option<Box<PbTable>>,
227    /// The sink's own stream context (timezone, `config_override`).
228    pub ctx: StreamContext,
229}
230
231impl AutoRefreshSchemaSinkContext {
232    pub fn new_fragment_info(
233        &self,
234        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
235        actor_location: &HashMap<ActorId, WorkerId>,
236    ) -> InflightFragmentInfo {
237        InflightFragmentInfo {
238            fragment_id: self.new_fragment.fragment_id,
239            distribution_type: self.new_fragment.distribution_type.into(),
240            fragment_type_mask: self.new_fragment.fragment_type_mask,
241            vnode_count: self.new_fragment.vnode_count(),
242            nodes: self.new_fragment.nodes.clone(),
243            actors: stream_actors
244                .get(&self.new_fragment.fragment_id)
245                .into_iter()
246                .flatten()
247                .map(|actor| {
248                    (
249                        actor.actor_id,
250                        InflightActorInfo {
251                            worker_id: actor_location[&actor.actor_id],
252                            vnode_bitmap: actor.vnode_bitmap.clone(),
253                            splits: vec![],
254                        },
255                    )
256                })
257                .collect(),
258            state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
259        }
260    }
261}
262
263/// [`ReplaceStreamJobContext`] carries one-time infos for replacing the plan of an existing stream job.
264///
265/// Note: for better readability, keep this struct complete and immutable once created.
266pub struct ReplaceStreamJobContext {
267    /// The old job fragments to be replaced.
268    pub old_fragments: StreamJobFragments,
269
270    /// The updates to be applied to the downstream chain actors. Used for schema change.
271    pub replace_upstream: FragmentReplaceUpstream,
272
273    /// New fragment relation to add from existing upstream fragment to downstream fragment.
274    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
275
276    pub streaming_job: StreamingJob,
277
278    /// The resource group of the database this job belongs to.
279    pub database_resource_group: String,
280
281    pub tmp_id: JobId,
282
283    /// Used for dropping an associated source. Dropping source and related internal tables.
284    pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
285
286    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
287
288    /// The `streaming_job::Model` for this job, loaded from meta store.
289    pub streaming_job_model: streaming_job::Model,
290}
291
292/// `GlobalStreamManager` manages all the streams in the system.
293pub struct GlobalStreamManager {
294    pub env: MetaSrvEnv,
295
296    pub metadata_manager: MetadataManager,
297
298    /// Broadcasts and collect barriers
299    pub barrier_scheduler: BarrierScheduler,
300
301    pub hummock_manager: HummockManagerRef,
302
303    /// Maintains streaming sources from external system like kafka
304    pub source_manager: SourceManagerRef,
305
306    pub refresh_manager: GlobalRefreshManagerRef,
307
308    /// Creating streaming job info.
309    creating_job_info: CreatingStreamingJobInfoRef,
310
311    pub scale_controller: ScaleControllerRef,
312}
313
314impl GlobalStreamManager {
315    pub fn new(
316        env: MetaSrvEnv,
317        metadata_manager: MetadataManager,
318        barrier_scheduler: BarrierScheduler,
319        hummock_manager: HummockManagerRef,
320        source_manager: SourceManagerRef,
321        refresh_manager: GlobalRefreshManagerRef,
322        scale_controller: ScaleControllerRef,
323    ) -> MetaResult<Self> {
324        Ok(Self {
325            env,
326            metadata_manager,
327            barrier_scheduler,
328            hummock_manager,
329            source_manager,
330            refresh_manager,
331            creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
332            scale_controller,
333        })
334    }
335
336    /// Create streaming job, it works as follows:
337    ///
338    /// 1. Broadcast the actor info based on the scheduling result in the context, build the hanging
339    ///    channels in upstream worker nodes.
340    /// 2. (optional) Get the split information of the `StreamSource` via source manager and patch
341    ///    actors.
342    /// 3. Notify related worker nodes to update and build the actors.
343    /// 4. Store related meta data.
344    ///
345    /// This function is a wrapper over [`Self::run_create_streaming_job_command`].
346    #[await_tree::instrument]
347    pub async fn create_streaming_job(
348        self: &Arc<Self>,
349        stream_job_fragments: StreamJobFragmentsToCreate,
350        ctx: CreateStreamingJobContext,
351        permit: OwnedSemaphorePermit,
352    ) -> MetaResult<NotificationVersion> {
353        let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
354        let await_tree_span = span!(
355            "{:?}({})",
356            ctx.streaming_job.job_type(),
357            ctx.streaming_job.name()
358        );
359
360        let job_id = stream_job_fragments.stream_job_id();
361        let database_id = ctx.streaming_job.database_id();
362
363        let (cancel_tx, cancel_rx) = oneshot::channel();
364        let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
365        self.creating_job_info.add_job(execution).await;
366
367        let stream_manager = self.clone();
368        let fut = async move {
369            let create_type = ctx.create_type;
370            let streaming_job = stream_manager
371                .run_create_streaming_job_command(stream_job_fragments, ctx)
372                .await?;
373            let version = match create_type {
374                CreateType::Background => {
375                    stream_manager
376                        .env
377                        .notification_manager_ref()
378                        .current_version()
379                        .await
380                }
381                CreateType::Foreground => {
382                    stream_manager
383                        .metadata_manager
384                        .wait_streaming_job_finished(database_id, streaming_job.id() as _)
385                        .await?
386                }
387                CreateType::Unspecified => unreachable!(),
388            };
389
390            tracing::debug!(?streaming_job, "stream job finish");
391            Ok(version)
392        }
393        .in_current_span();
394
395        let create_fut = (self.env.await_tree_reg())
396            .register(await_tree_key, await_tree_span)
397            .instrument(Box::pin(fut));
398
399        let result = async {
400            tokio::select! {
401                biased;
402
403                res = create_fut => res,
404                notifier = cancel_rx => {
405                    let notifier = notifier.expect("sender should not be dropped");
406                    tracing::debug!(id=%job_id, "cancelling streaming job");
407
408                    enum CancelResult {
409                        Completed(MetaResult<NotificationVersion>),
410                        Failed(MetaError),
411                        Cancelled,
412                    }
413
414                    let cancel_res = if let Ok(job_fragments) =
415                        self.metadata_manager.get_job_fragments_by_id(job_id).await
416                    {
417                        // try to cancel buffered creating command.
418                        if self
419                            .barrier_scheduler
420                            .try_cancel_scheduled_create(database_id, job_id)
421                        {
422                            tracing::debug!(
423                                id=%job_id,
424                                "cancelling streaming job in buffer queue."
425                            );
426                            CancelResult::Cancelled
427                        } else if !job_fragments.is_created() {
428                            tracing::debug!(
429                                id=%job_id,
430                                "cancelling streaming job by issue cancel command."
431                            );
432
433                            let cancel_result: MetaResult<()> = async {
434                                let cancel_command = self.metadata_manager.catalog_controller
435                                    .build_cancel_command(&job_fragments)
436                                    .await?;
437                                let cleanup_state_table_ids =
438                                    job_fragments.all_table_ids().collect_vec();
439                                self.metadata_manager.catalog_controller
440                                    .try_abort_creating_streaming_job(job_id, true)
441                                    .await?;
442
443                                self.barrier_scheduler
444                                    .run_command(database_id, cancel_command)
445                                    .await?;
446                                cleanup_dropped_streaming_jobs(
447                                    &self.refresh_manager,
448                                    &self.hummock_manager,
449                                    &self.metadata_manager,
450                                    [job_id],
451                                    cleanup_state_table_ids,
452                                    "cancel_streaming_job",
453                                )
454                                .await?;
455                                Ok(())
456                            }
457                            .await;
458
459                            match cancel_result {
460                                Ok(()) => CancelResult::Cancelled,
461                                Err(err) => {
462                                    tracing::warn!(
463                                        error = ?err.as_report(),
464                                        id = %job_id,
465                                        "failed to run cancel command for creating streaming job"
466                                    );
467                                    CancelResult::Failed(err)
468                                }
469                            }
470                        } else {
471                            // streaming job is already completed
472                            CancelResult::Completed(
473                                self.metadata_manager
474                                    .wait_streaming_job_finished(database_id, job_id)
475                                    .await,
476                            )
477                        }
478                    } else {
479                        CancelResult::Cancelled
480                    };
481
482                    let (cancelled, result) = match cancel_res {
483                        CancelResult::Completed(result) => (false, result),
484                        CancelResult::Failed(err) => (false, Err(err)),
485                        CancelResult::Cancelled => (true, Err(MetaError::cancelled("create"))),
486                    };
487
488                    let _ = notifier
489                        .send(cancelled)
490                        .inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
491
492                    result
493                }
494            }
495        }
496        .await;
497
498        tracing::debug!("cleaning creating job info: {}", job_id);
499        self.creating_job_info.delete_job(job_id).await;
500        result
501    }
502
503    /// The function will return after barrier collected
504    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
505    #[await_tree::instrument]
506    async fn run_create_streaming_job_command(
507        &self,
508        stream_job_fragments: StreamJobFragmentsToCreate,
509        CreateStreamingJobContext {
510            streaming_job,
511            upstream_fragment_downstreams,
512            database_resource_group,
513            definition,
514            create_type,
515            job_type,
516            new_upstream_sink,
517            snapshot_backfill_info,
518            cross_db_snapshot_backfill_info,
519            fragment_backfill_ordering,
520            locality_fragment_state_table_mapping,
521            cdc_table_snapshot_splits,
522            is_serverless_backfill,
523            streaming_job_model,
524            refresh_interval_sec,
525            ..
526        }: CreateStreamingJobContext,
527    ) -> MetaResult<StreamingJob> {
528        tracing::debug!(
529            table_id = %stream_job_fragments.stream_job_id(),
530            "built actors finished"
531        );
532
533        // Phase 1: Gather fragment-level split information.
534        // - For source fragments: discover splits from the external source.
535        // - For backfill fragments: splits will be aligned in Phase 2 inside the barrier worker
536        //   using the actor-level no-shuffle mapping produced by render_actors.
537        let init_split_assignment = self
538            .source_manager
539            .discover_splits(&stream_job_fragments)
540            .await?;
541
542        let fragment_backfill_ordering =
543            StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
544                fragment_backfill_ordering,
545                &stream_job_fragments.downstreams,
546                || {
547                    stream_job_fragments
548                        .fragments
549                        .iter()
550                        .map(|(fragment_id, fragment)| {
551                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
552                        })
553                },
554            );
555
556        let info = CreateStreamingJobCommandInfo {
557            stream_job_fragments,
558            upstream_fragment_downstreams,
559            init_split_assignment,
560            definition: definition.clone(),
561            streaming_job: streaming_job.clone(),
562            job_type,
563            create_type,
564            database_resource_group,
565            fragment_backfill_ordering,
566            cdc_table_snapshot_splits,
567            locality_fragment_state_table_mapping,
568            is_serverless: is_serverless_backfill,
569            streaming_job_model,
570            refresh_interval_sec,
571        };
572
573        let job_type = if let Some(refresh_interval_sec) = refresh_interval_sec {
574            let snapshot_backfill_info = snapshot_backfill_info.ok_or_else(|| {
575                anyhow::anyhow!(
576                    "batch refresh materialized view must have snapshot backfill upstream"
577                )
578            })?;
579            // Batch refresh jobs must not contain source or source-backfill nodes,
580            // because we skip split assignment resolution for them.
581            for fragment in info.stream_job_fragments.inner.fragments.values() {
582                let mask = fragment.fragment_type_mask;
583                if mask.contains(FragmentTypeFlag::Source)
584                    || mask.contains(FragmentTypeFlag::SourceScan)
585                {
586                    bail!(
587                        "batch refresh materialized views must not depend on sources directly; \
588                         fragment {} has source/source-backfill nodes",
589                        fragment.fragment_id
590                    );
591                }
592            }
593            tracing::debug!(
594                ?snapshot_backfill_info,
595                refresh_interval_sec,
596                "sending Command::CreateBatchRefreshStreamingJob"
597            );
598            CreateStreamingJobType::BatchRefresh(BatchRefreshInfo {
599                snapshot_backfill_info,
600                refresh_interval_sec,
601            })
602        } else if let Some(snapshot_backfill_info) = snapshot_backfill_info {
603            tracing::debug!(
604                ?snapshot_backfill_info,
605                "sending Command::CreateSnapshotBackfillStreamingJob"
606            );
607            CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
608        } else {
609            tracing::debug!("sending Command::CreateStreamingJob");
610            if let Some(new_upstream_sink) = new_upstream_sink {
611                CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
612            } else {
613                CreateStreamingJobType::Normal
614            }
615        };
616
617        let command = Command::CreateStreamingJob {
618            info,
619            job_type,
620            cross_db_snapshot_backfill_info,
621        };
622
623        self.barrier_scheduler
624            .run_command(streaming_job.database_id(), command)
625            .await?;
626
627        tracing::debug!(?streaming_job, "first barrier collected for stream job");
628
629        Ok(streaming_job)
630    }
631
632    /// Send replace job command to barrier scheduler.
633    pub async fn replace_stream_job(
634        &self,
635        new_fragments: StreamJobFragmentsToCreate,
636        ReplaceStreamJobContext {
637            old_fragments,
638            replace_upstream,
639            upstream_fragment_downstreams,
640            tmp_id,
641            streaming_job,
642            drop_table_connector_ctx,
643            auto_refresh_schema_sinks,
644            streaming_job_model,
645            database_resource_group,
646        }: ReplaceStreamJobContext,
647    ) -> MetaResult<()> {
648        // Phase 1: Gather fragment-level split information.
649        // For replace source with existing downstream, splits will be aligned
650        // in Phase 2 inside the barrier worker using actor-level no-shuffle produced by render_actors.
651        // For replace source with no downstream (or non-source), discover splits fresh.
652        let split_plan = if streaming_job.is_source() {
653            match self
654                .source_manager
655                .discover_splits_for_replace_source(&new_fragments, &replace_upstream)
656                .await?
657            {
658                Some(discovered) => ReplaceJobSplitPlan::Discovered(discovered),
659                None => ReplaceJobSplitPlan::AlignFromPrevious,
660            }
661        } else {
662            let discovered = self.source_manager.discover_splits(&new_fragments).await?;
663            ReplaceJobSplitPlan::Discovered(discovered)
664        };
665        tracing::info!("replace_stream_job - split plan: {:?}", split_plan);
666
667        self.barrier_scheduler
668            .run_command(
669                streaming_job.database_id(),
670                Command::ReplaceStreamJob(ReplaceStreamJobPlan {
671                    old_fragments,
672                    new_fragments,
673                    database_resource_group,
674                    replace_upstream,
675                    upstream_fragment_downstreams,
676                    split_plan,
677                    streaming_job,
678                    streaming_job_model,
679                    tmp_id,
680                    to_drop_state_table_ids: {
681                        if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
682                            vec![drop_table_connector_ctx.to_remove_state_table_id]
683                        } else {
684                            Vec::new()
685                        }
686                    },
687                    auto_refresh_schema_sinks,
688                }),
689            )
690            .await?;
691
692        Ok(())
693    }
694
695    /// Drop streaming jobs by barrier manager, and clean up all related resources. The error will
696    /// be ignored because the recovery process will take over it in cleaning part. Check
697    /// [`Command::DropStreamingJobs`] for details.
698    pub async fn drop_streaming_jobs(
699        &self,
700        database_id: DatabaseId,
701        streaming_job_ids: Vec<JobId>,
702        state_table_ids: Vec<TableId>,
703        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
704    ) {
705        if !streaming_job_ids.is_empty() || !state_table_ids.is_empty() {
706            let cleanup_streaming_job_ids = streaming_job_ids.clone();
707            let cleanup_state_table_ids = state_table_ids.clone();
708            let run_result = self
709                .barrier_scheduler
710                .run_command(
711                    database_id,
712                    Command::DropStreamingJobs {
713                        streaming_job_ids: streaming_job_ids.into_iter().collect(),
714                        unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
715                        dropped_sink_fragment_by_targets,
716                    },
717                )
718                .await;
719            let result = match run_result {
720                Ok(()) => {
721                    cleanup_dropped_streaming_jobs(
722                        &self.refresh_manager,
723                        &self.hummock_manager,
724                        &self.metadata_manager,
725                        cleanup_streaming_job_ids,
726                        cleanup_state_table_ids,
727                        "drop_streaming_jobs",
728                    )
729                    .await
730                }
731                Err(err) => Err(err),
732            };
733            let _ = result.inspect_err(|err| {
734                tracing::error!(error = ?err.as_report(), "failed to run drop command");
735            });
736        }
737    }
738
739    /// Cancel streaming jobs and return the canceled table ids.
740    /// 1. Send cancel message to stream jobs (via `cancel_jobs`).
741    /// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
742    ///
743    /// Cleanup of their state is handled by the caller after the drop command is collected.
744    pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
745        if job_ids.is_empty() {
746            return Ok(vec![]);
747        }
748
749        let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
750        let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
751
752        let futures = receivers.into_iter().map(|(id, receiver)| async move {
753            if let Ok(cancelled) = receiver.await
754                && cancelled
755            {
756                tracing::info!("canceled streaming job {id}");
757                Ok(id)
758            } else {
759                Err(MetaError::from(anyhow::anyhow!(
760                    "failed to cancel streaming job {id}"
761                )))
762            }
763        });
764        let mut cancelled_ids = join_all(futures)
765            .await
766            .into_iter()
767            .collect::<MetaResult<Vec<_>>>()?;
768
769        // NOTE(kwannoel): For background_job_ids stream jobs that not tracked in streaming manager,
770        // we can directly cancel them by running the barrier command.
771        let futures = background_job_ids.into_iter().map(|id| async move {
772            let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
773            if fragment.is_created() {
774                tracing::warn!(
775                    "streaming job {} is already created, ignore cancel request",
776                    id
777                );
778                return Ok(None);
779            }
780            if fragment.is_created() {
781                Err(MetaError::invalid_parameter(format!(
782                    "streaming job {} is already created",
783                    id
784                )))?;
785            }
786
787            let cancel_command = self
788                .metadata_manager
789                .catalog_controller
790                .build_cancel_command(&fragment)
791                .await?;
792            let cleanup_state_table_ids = fragment.all_table_ids().collect_vec();
793
794            let (_, database_id) = self
795                .metadata_manager
796                .catalog_controller
797                .try_abort_creating_streaming_job(id, true)
798                .await?;
799
800            if let Some(database_id) = database_id {
801                self.barrier_scheduler
802                    .run_command(database_id, cancel_command)
803                    .await?;
804                cleanup_dropped_streaming_jobs(
805                    &self.refresh_manager,
806                    &self.hummock_manager,
807                    &self.metadata_manager,
808                    [id],
809                    cleanup_state_table_ids,
810                    "cancel_streaming_job",
811                )
812                .await?;
813            }
814
815            tracing::info!(?id, "cancelled background streaming job");
816            Ok(Some(id))
817        });
818        let cancelled_recovered_ids = join_all(futures)
819            .await
820            .into_iter()
821            .collect::<MetaResult<Vec<_>>>()?;
822
823        cancelled_ids.extend(cancelled_recovered_ids.into_iter().flatten());
824        Ok(cancelled_ids)
825    }
826
827    pub(crate) async fn reschedule_streaming_job(
828        &self,
829        job_id: JobId,
830        policy: ReschedulePolicy,
831        deferred: bool,
832    ) -> MetaResult<()> {
833        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
834
835        let background_jobs = self
836            .metadata_manager
837            .list_background_creating_jobs()
838            .await?;
839
840        if !background_jobs.is_empty() {
841            let blocked_jobs = self
842                .metadata_manager
843                .collect_reschedule_blocked_jobs_for_creating_jobs(&background_jobs, !deferred)
844                .await?;
845
846            if blocked_jobs.contains(&job_id) {
847                bail!(
848                    "Cannot alter the job {} because it is blocked by creating unreschedulable backfill jobs",
849                    job_id,
850                );
851            }
852        }
853
854        let commands = self
855            .scale_controller
856            .reschedule_inplace(HashMap::from([(job_id, policy)]))
857            .await?;
858
859        if !deferred {
860            let _source_pause_guard = self.source_manager.pause_tick().await;
861
862            for (database_id, command) in commands {
863                self.barrier_scheduler
864                    .run_command(database_id, command)
865                    .await?;
866            }
867        }
868
869        Ok(())
870    }
871
872    pub(crate) async fn reschedule_streaming_job_backfill_parallelism(
873        &self,
874        job_id: JobId,
875        parallelism: Option<ParallelismPolicy>,
876        deferred: bool,
877    ) -> MetaResult<()> {
878        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
879
880        let background_jobs = self
881            .metadata_manager
882            .list_background_creating_jobs()
883            .await?;
884
885        if !background_jobs.is_empty() {
886            let unreschedulable = self
887                .metadata_manager
888                .collect_unreschedulable_backfill_jobs(&background_jobs, !deferred)
889                .await?;
890
891            if unreschedulable.contains(&job_id) {
892                bail!(
893                    "Cannot alter the job {} because it is a non-reschedulable background backfill job",
894                    job_id,
895                );
896            }
897        }
898
899        let commands = self
900            .scale_controller
901            .reschedule_backfill_parallelism_inplace(HashMap::from([(job_id, parallelism)]))
902            .await?;
903
904        if !deferred {
905            let _source_pause_guard = self.source_manager.pause_tick().await;
906
907            for (database_id, command) in commands {
908                self.barrier_scheduler
909                    .run_command(database_id, command)
910                    .await?;
911            }
912        }
913
914        Ok(())
915    }
916
917    /// This method is copied from `GlobalStreamManager::reschedule_streaming_job` and modified to handle reschedule CDC table backfill.
918    pub(crate) async fn reschedule_cdc_table_backfill(
919        &self,
920        job_id: JobId,
921        target: ReschedulePolicy,
922    ) -> MetaResult<()> {
923        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
924
925        let parallelism_policy = match target {
926            ReschedulePolicy::Parallelism(policy)
927                if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
928            {
929                policy
930            }
931            _ => bail_invalid_parameter!(
932                "CDC backfill reschedule only supports fixed parallelism targets"
933            ),
934        };
935
936        let cdc_fragment_id = {
937            let inner = self.metadata_manager.catalog_controller.inner.read().await;
938            let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
939                .select_only()
940                .columns([
941                    fragment::Column::FragmentId,
942                    fragment::Column::FragmentTypeMask,
943                ])
944                .filter(fragment::Column::JobId.eq(job_id))
945                .into_tuple()
946                .all(&inner.db)
947                .await?;
948
949            let cdc_fragments = fragments
950                .into_iter()
951                .filter_map(|(fragment_id, mask)| {
952                    FragmentTypeMask::from(mask)
953                        .contains(FragmentTypeFlag::StreamCdcScan)
954                        .then_some(fragment_id)
955                })
956                .collect_vec();
957
958            match cdc_fragments.len() {
959                0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
960                1 => cdc_fragments[0],
961                _ => bail_invalid_parameter!(
962                    "multiple StreamCdcScan fragments found for job {}; expected exactly one",
963                    job_id
964                ),
965            }
966        };
967
968        let fragment_policy = HashMap::from([(
969            cdc_fragment_id,
970            Some(parallelism_policy.parallelism.clone()),
971        )]);
972
973        let commands = self
974            .scale_controller
975            .reschedule_fragment_inplace(fragment_policy)
976            .await?;
977
978        let _source_pause_guard = self.source_manager.pause_tick().await;
979
980        for (database_id, command) in commands {
981            self.barrier_scheduler
982                .run_command(database_id, command)
983                .await?;
984        }
985
986        Ok(())
987    }
988
989    pub(crate) async fn reschedule_fragments(
990        &self,
991        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
992    ) -> MetaResult<()> {
993        if fragment_targets.is_empty() {
994            return Ok(());
995        }
996
997        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
998
999        let fragment_policy = fragment_targets
1000            .into_iter()
1001            .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
1002            .collect();
1003
1004        let commands = self
1005            .scale_controller
1006            .reschedule_fragment_inplace(fragment_policy)
1007            .await?;
1008
1009        let _source_pause_guard = self.source_manager.pause_tick().await;
1010
1011        for (database_id, command) in commands {
1012            self.barrier_scheduler
1013                .run_command(database_id, command)
1014                .await?;
1015        }
1016
1017        Ok(())
1018    }
1019
1020    // Don't need to add actor, just send a command
1021    pub async fn create_subscription(
1022        self: &Arc<Self>,
1023        subscription: &Subscription,
1024    ) -> MetaResult<()> {
1025        let command = Command::CreateSubscription {
1026            subscription_id: subscription.id,
1027            upstream_mv_table_id: subscription.dependent_table_id,
1028            retention_second: subscription.retention_seconds,
1029        };
1030
1031        tracing::debug!("sending Command::CreateSubscription");
1032        self.barrier_scheduler
1033            .run_command(subscription.database_id, command)
1034            .await?;
1035        Ok(())
1036    }
1037
1038    // Don't need to add actor, just send a command
1039    pub async fn drop_subscription(
1040        self: &Arc<Self>,
1041        database_id: DatabaseId,
1042        subscription_id: SubscriptionId,
1043        table_id: TableId,
1044    ) {
1045        let command = Command::DropSubscription {
1046            subscription_id,
1047            upstream_mv_table_id: table_id,
1048        };
1049
1050        tracing::debug!("sending Command::DropSubscriptions");
1051        let _ = self
1052            .barrier_scheduler
1053            .run_command(database_id, command)
1054            .await
1055            .inspect_err(|err| {
1056                tracing::error!(error = ?err.as_report(), "failed to run drop command");
1057            });
1058    }
1059
1060    pub async fn alter_subscription_retention(
1061        self: &Arc<Self>,
1062        database_id: DatabaseId,
1063        subscription_id: SubscriptionId,
1064        table_id: TableId,
1065        retention_second: u64,
1066    ) -> MetaResult<()> {
1067        let command = Command::AlterSubscriptionRetention {
1068            subscription_id,
1069            upstream_mv_table_id: table_id,
1070            retention_second,
1071        };
1072
1073        tracing::debug!("sending Command::AlterSubscriptionRetention");
1074        self.barrier_scheduler
1075            .run_command(database_id, command)
1076            .await?;
1077        Ok(())
1078    }
1079}