risingwave_meta/stream/
stream_manager.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::span;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::{JobId, SinkId};
25use risingwave_connector::source::CdcTableSnapshotSplitRaw;
26use risingwave_meta_model::prelude::Fragment as FragmentModel;
27use risingwave_meta_model::{StreamingParallelism, fragment};
28use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
29use risingwave_pb::expr::PbExprNode;
30use risingwave_pb::meta::table_fragments::ActorStatus;
31use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
32use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
33use thiserror_ext::AsReport;
34use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
35use tracing::Instrument;
36
37use super::{FragmentBackfillOrder, Locations, ReschedulePolicy, ScaleControllerRef};
38use crate::barrier::{
39    BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
40    ReplaceStreamJobPlan, SnapshotBackfillInfo,
41};
42use crate::controller::catalog::DropTableConnectorContext;
43use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
44use crate::error::bail_invalid_parameter;
45use crate::manager::{
46    MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
47};
48use crate::model::{
49    ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
50    FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate,
51    SubscriptionId,
52};
53use crate::stream::SourceManagerRef;
54use crate::{MetaError, MetaResult};
55
56pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
57
58#[derive(Default)]
59pub struct CreateStreamingJobOption {
60    // leave empty as a placeholder for future option if there is any
61}
62
63#[derive(Debug, Clone)]
64pub struct UpstreamSinkInfo {
65    pub sink_id: SinkId,
66    pub sink_fragment_id: FragmentId,
67    pub sink_output_fields: Vec<PbField>,
68    // for backwards compatibility
69    pub sink_original_target_columns: Vec<PbColumnCatalog>,
70    pub project_exprs: Vec<PbExprNode>,
71    pub new_sink_downstream: DownstreamFragmentRelation,
72}
73
74/// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job.
75///
76/// Note: for better readability, keep this struct complete and immutable once created.
77pub struct CreateStreamingJobContext {
78    /// New fragment relation to add from upstream fragments to downstream fragments.
79    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
80    pub new_no_shuffle: FragmentNewNoShuffle,
81    pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
82
83    /// The locations of the actors to build in the streaming job.
84    pub building_locations: Locations,
85
86    /// DDL definition.
87    pub definition: String,
88
89    pub create_type: CreateType,
90
91    pub job_type: StreamingJobType,
92
93    /// Used for sink-into-table.
94    pub new_upstream_sink: Option<UpstreamSinkInfo>,
95
96    pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
97    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
98
99    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
100
101    pub option: CreateStreamingJobOption,
102
103    pub streaming_job: StreamingJob,
104
105    pub fragment_backfill_ordering: FragmentBackfillOrder,
106
107    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
108}
109
110struct StreamingJobExecution {
111    id: JobId,
112    shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
113    _permit: OwnedSemaphorePermit,
114}
115
116impl StreamingJobExecution {
117    fn new(
118        id: JobId,
119        shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
120        permit: OwnedSemaphorePermit,
121    ) -> Self {
122        Self {
123            id,
124            shutdown_tx: Some(shutdown_tx),
125            _permit: permit,
126        }
127    }
128}
129
130#[derive(Default)]
131struct CreatingStreamingJobInfo {
132    streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
133}
134
135impl CreatingStreamingJobInfo {
136    async fn add_job(&self, job: StreamingJobExecution) {
137        let mut jobs = self.streaming_jobs.lock().await;
138        jobs.insert(job.id, job);
139    }
140
141    async fn delete_job(&self, job_id: JobId) {
142        let mut jobs = self.streaming_jobs.lock().await;
143        jobs.remove(&job_id);
144    }
145
146    async fn cancel_jobs(
147        &self,
148        job_ids: Vec<JobId>,
149    ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
150        let mut jobs = self.streaming_jobs.lock().await;
151        let mut receivers = HashMap::new();
152        let mut background_job_ids = vec![];
153        for job_id in job_ids {
154            if let Some(job) = jobs.get_mut(&job_id) {
155                if let Some(shutdown_tx) = job.shutdown_tx.take() {
156                    let (tx, rx) = oneshot::channel();
157                    match shutdown_tx.send(tx) {
158                        Ok(()) => {
159                            receivers.insert(job_id, rx);
160                        }
161                        Err(_) => {
162                            return Err(anyhow::anyhow!(
163                                "failed to send shutdown signal for streaming job {}: receiver dropped",
164                                job_id
165                            )
166                            .into());
167                        }
168                    }
169                }
170            } else {
171                // If these job ids do not exist in streaming_jobs, they should be background creating jobs.
172                background_job_ids.push(job_id);
173            }
174        }
175
176        Ok((receivers, background_job_ids))
177    }
178}
179
180type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
181
182#[derive(Debug, Clone)]
183pub struct AutoRefreshSchemaSinkContext {
184    pub tmp_sink_id: SinkId,
185    pub original_sink: PbSink,
186    pub original_fragment: Fragment,
187    pub new_schema: Vec<PbColumnCatalog>,
188    pub newly_add_fields: Vec<Field>,
189    pub new_fragment: Fragment,
190    pub new_log_store_table: Option<PbTable>,
191    pub actor_status: BTreeMap<ActorId, ActorStatus>,
192}
193
194impl AutoRefreshSchemaSinkContext {
195    pub fn new_fragment_info(&self) -> InflightFragmentInfo {
196        InflightFragmentInfo {
197            fragment_id: self.new_fragment.fragment_id,
198            distribution_type: self.new_fragment.distribution_type.into(),
199            fragment_type_mask: self.new_fragment.fragment_type_mask,
200            vnode_count: self.new_fragment.vnode_count(),
201            nodes: self.new_fragment.nodes.clone(),
202            actors: self
203                .new_fragment
204                .actors
205                .iter()
206                .map(|actor| {
207                    (
208                        actor.actor_id as _,
209                        InflightActorInfo {
210                            worker_id: self.actor_status[&actor.actor_id]
211                                .location
212                                .as_ref()
213                                .unwrap()
214                                .worker_node_id,
215                            vnode_bitmap: actor.vnode_bitmap.clone(),
216                            splits: vec![],
217                        },
218                    )
219                })
220                .collect(),
221            state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
222        }
223    }
224}
225
226/// [`ReplaceStreamJobContext`] carries one-time infos for replacing the plan of an existing stream job.
227///
228/// Note: for better readability, keep this struct complete and immutable once created.
229pub struct ReplaceStreamJobContext {
230    /// The old job fragments to be replaced.
231    pub old_fragments: StreamJobFragments,
232
233    /// The updates to be applied to the downstream chain actors. Used for schema change.
234    pub replace_upstream: FragmentReplaceUpstream,
235    pub new_no_shuffle: FragmentNewNoShuffle,
236
237    /// New fragment relation to add from existing upstream fragment to downstream fragment.
238    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
239
240    /// The locations of the actors to build in the new job to replace.
241    pub building_locations: Locations,
242
243    pub streaming_job: StreamingJob,
244
245    pub tmp_id: JobId,
246
247    /// Used for dropping an associated source. Dropping source and related internal tables.
248    pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
249
250    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
251}
252
253/// `GlobalStreamManager` manages all the streams in the system.
254pub struct GlobalStreamManager {
255    pub env: MetaSrvEnv,
256
257    pub metadata_manager: MetadataManager,
258
259    /// Broadcasts and collect barriers
260    pub barrier_scheduler: BarrierScheduler,
261
262    /// Maintains streaming sources from external system like kafka
263    pub source_manager: SourceManagerRef,
264
265    /// Creating streaming job info.
266    creating_job_info: CreatingStreamingJobInfoRef,
267
268    pub scale_controller: ScaleControllerRef,
269}
270
271impl GlobalStreamManager {
272    pub fn new(
273        env: MetaSrvEnv,
274        metadata_manager: MetadataManager,
275        barrier_scheduler: BarrierScheduler,
276        source_manager: SourceManagerRef,
277        scale_controller: ScaleControllerRef,
278    ) -> MetaResult<Self> {
279        Ok(Self {
280            env,
281            metadata_manager,
282            barrier_scheduler,
283            source_manager,
284            creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
285            scale_controller,
286        })
287    }
288
289    /// Create streaming job, it works as follows:
290    ///
291    /// 1. Broadcast the actor info based on the scheduling result in the context, build the hanging
292    ///    channels in upstream worker nodes.
293    /// 2. (optional) Get the split information of the `StreamSource` via source manager and patch
294    ///    actors.
295    /// 3. Notify related worker nodes to update and build the actors.
296    /// 4. Store related meta data.
297    ///
298    /// This function is a wrapper over [`Self::run_create_streaming_job_command`].
299    #[await_tree::instrument]
300    pub async fn create_streaming_job(
301        self: &Arc<Self>,
302        stream_job_fragments: StreamJobFragmentsToCreate,
303        ctx: CreateStreamingJobContext,
304        permit: OwnedSemaphorePermit,
305    ) -> MetaResult<NotificationVersion> {
306        let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
307        let await_tree_span = span!(
308            "{:?}({})",
309            ctx.streaming_job.job_type(),
310            ctx.streaming_job.name()
311        );
312
313        let job_id = stream_job_fragments.stream_job_id();
314        let database_id = ctx.streaming_job.database_id();
315
316        let (cancel_tx, cancel_rx) = oneshot::channel();
317        let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
318        self.creating_job_info.add_job(execution).await;
319
320        let stream_manager = self.clone();
321        let fut = async move {
322            let create_type = ctx.create_type;
323            let streaming_job = stream_manager
324                .run_create_streaming_job_command(stream_job_fragments, ctx)
325                .await?;
326            let version = match create_type {
327                CreateType::Background => {
328                    stream_manager
329                        .env
330                        .notification_manager_ref()
331                        .current_version()
332                        .await
333                }
334                CreateType::Foreground => {
335                    stream_manager
336                        .metadata_manager
337                        .wait_streaming_job_finished(database_id, streaming_job.id() as _)
338                        .await?
339                }
340                CreateType::Unspecified => unreachable!(),
341            };
342
343            tracing::debug!(?streaming_job, "stream job finish");
344            Ok(version)
345        }
346        .in_current_span();
347
348        let create_fut = (self.env.await_tree_reg())
349            .register(await_tree_key, await_tree_span)
350            .instrument(Box::pin(fut));
351
352        let result = tokio::select! {
353            biased;
354
355            res = create_fut => res,
356            notifier = cancel_rx => {
357                let notifier = notifier.expect("sender should not be dropped");
358                tracing::debug!(id=%job_id, "cancelling streaming job");
359
360                if let Ok(job_fragments) = self.metadata_manager.get_job_fragments_by_id(job_id)
361                    .await {
362                    // try to cancel buffered creating command.
363                    if self.barrier_scheduler.try_cancel_scheduled_create(database_id, job_id) {
364                        tracing::debug!("cancelling streaming job {job_id} in buffer queue.");
365                    } else if !job_fragments.is_created() {
366                        tracing::debug!("cancelling streaming job {job_id} by issue cancel command.");
367
368                        let cancel_command = self.metadata_manager.catalog_controller
369                            .build_cancel_command(&job_fragments)
370                            .await?;
371                        self.metadata_manager.catalog_controller
372                            .try_abort_creating_streaming_job(job_id, true)
373                            .await?;
374
375                        self.barrier_scheduler.run_command(database_id, cancel_command).await?;
376                    } else {
377                        // streaming job is already completed
378                        let _ = notifier.send(false).inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
379                        return self.metadata_manager.wait_streaming_job_finished(database_id, job_id).await;
380                    }
381                }
382                notifier.send(true).expect("receiver should not be dropped");
383                Err(MetaError::cancelled("create"))
384            }
385        };
386
387        tracing::debug!("cleaning creating job info: {}", job_id);
388        self.creating_job_info.delete_job(job_id).await;
389        result
390    }
391
392    /// The function will return after barrier collected
393    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
394    #[await_tree::instrument]
395    async fn run_create_streaming_job_command(
396        &self,
397        stream_job_fragments: StreamJobFragmentsToCreate,
398        CreateStreamingJobContext {
399            streaming_job,
400            upstream_fragment_downstreams,
401            new_no_shuffle,
402            upstream_actors,
403            definition,
404            create_type,
405            job_type,
406            new_upstream_sink,
407            snapshot_backfill_info,
408            cross_db_snapshot_backfill_info,
409            fragment_backfill_ordering,
410            locality_fragment_state_table_mapping,
411            cdc_table_snapshot_splits,
412            ..
413        }: CreateStreamingJobContext,
414    ) -> MetaResult<StreamingJob> {
415        tracing::debug!(
416            table_id = %stream_job_fragments.stream_job_id(),
417            "built actors finished"
418        );
419
420        // Here we need to consider:
421        // - Shared source
422        // - Table with connector
423        // - MV on shared source
424        let mut init_split_assignment = self
425            .source_manager
426            .allocate_splits(&stream_job_fragments)
427            .await?;
428
429        init_split_assignment.extend(
430            self.source_manager
431                .allocate_splits_for_backfill(
432                    &stream_job_fragments,
433                    &new_no_shuffle,
434                    &upstream_actors,
435                )
436                .await?,
437        );
438
439        let info = CreateStreamingJobCommandInfo {
440            stream_job_fragments,
441            upstream_fragment_downstreams,
442            init_split_assignment,
443            definition: definition.clone(),
444            streaming_job: streaming_job.clone(),
445            job_type,
446            create_type,
447            fragment_backfill_ordering,
448            cdc_table_snapshot_splits,
449            locality_fragment_state_table_mapping,
450        };
451
452        let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
453            tracing::debug!(
454                ?snapshot_backfill_info,
455                "sending Command::CreateSnapshotBackfillStreamingJob"
456            );
457            CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
458        } else {
459            tracing::debug!("sending Command::CreateStreamingJob");
460            if let Some(new_upstream_sink) = new_upstream_sink {
461                CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
462            } else {
463                CreateStreamingJobType::Normal
464            }
465        };
466
467        let command = Command::CreateStreamingJob {
468            info,
469            job_type,
470            cross_db_snapshot_backfill_info,
471        };
472
473        self.barrier_scheduler
474            .run_command(streaming_job.database_id(), command)
475            .await?;
476
477        tracing::debug!(?streaming_job, "first barrier collected for stream job");
478
479        Ok(streaming_job)
480    }
481
482    /// Send replace job command to barrier scheduler.
483    pub async fn replace_stream_job(
484        &self,
485        new_fragments: StreamJobFragmentsToCreate,
486        ReplaceStreamJobContext {
487            old_fragments,
488            replace_upstream,
489            new_no_shuffle,
490            upstream_fragment_downstreams,
491            tmp_id,
492            streaming_job,
493            drop_table_connector_ctx,
494            auto_refresh_schema_sinks,
495            ..
496        }: ReplaceStreamJobContext,
497    ) -> MetaResult<()> {
498        let init_split_assignment = if streaming_job.is_source() {
499            self.source_manager
500                .allocate_splits_for_replace_source(
501                    &new_fragments,
502                    &replace_upstream,
503                    &new_no_shuffle,
504                )
505                .await?
506        } else {
507            self.source_manager.allocate_splits(&new_fragments).await?
508        };
509        tracing::info!(
510            "replace_stream_job - allocate split: {:?}",
511            init_split_assignment
512        );
513
514        self.barrier_scheduler
515            .run_command(
516                streaming_job.database_id(),
517                Command::ReplaceStreamJob(ReplaceStreamJobPlan {
518                    old_fragments,
519                    new_fragments,
520                    replace_upstream,
521                    upstream_fragment_downstreams,
522                    init_split_assignment,
523                    streaming_job,
524                    tmp_id,
525                    to_drop_state_table_ids: {
526                        if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
527                            vec![drop_table_connector_ctx.to_remove_state_table_id]
528                        } else {
529                            Vec::new()
530                        }
531                    },
532                    auto_refresh_schema_sinks,
533                }),
534            )
535            .await?;
536
537        Ok(())
538    }
539
540    /// Drop streaming jobs by barrier manager, and clean up all related resources. The error will
541    /// be ignored because the recovery process will take over it in cleaning part. Check
542    /// [`Command::DropStreamingJobs`] for details.
543    pub async fn drop_streaming_jobs(
544        &self,
545        database_id: DatabaseId,
546        removed_actors: Vec<ActorId>,
547        streaming_job_ids: Vec<JobId>,
548        state_table_ids: Vec<TableId>,
549        fragment_ids: HashSet<FragmentId>,
550        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
551    ) {
552        if !removed_actors.is_empty()
553            || !streaming_job_ids.is_empty()
554            || !state_table_ids.is_empty()
555        {
556            let _ = self
557                .barrier_scheduler
558                .run_command(
559                    database_id,
560                    Command::DropStreamingJobs {
561                        streaming_job_ids: streaming_job_ids.into_iter().collect(),
562                        actors: removed_actors,
563                        unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
564                        unregistered_fragment_ids: fragment_ids,
565                        dropped_sink_fragment_by_targets,
566                    },
567                )
568                .await
569                .inspect_err(|err| {
570                    tracing::error!(error = ?err.as_report(), "failed to run drop command");
571                });
572        }
573    }
574
575    /// Cancel streaming jobs and return the canceled table ids.
576    /// 1. Send cancel message to stream jobs (via `cancel_jobs`).
577    /// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
578    ///
579    /// Cleanup of their state will be cleaned up after the `CancelStreamJob` command succeeds,
580    /// by the barrier manager for both of them.
581    pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
582        if job_ids.is_empty() {
583            return Ok(vec![]);
584        }
585
586        let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
587        let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
588
589        let futures = receivers.into_iter().map(|(id, receiver)| async move {
590            if let Ok(cancelled) = receiver.await
591                && cancelled
592            {
593                tracing::info!("canceled streaming job {id}");
594                Ok(id)
595            } else {
596                Err(MetaError::from(anyhow::anyhow!(
597                    "failed to cancel streaming job {id}"
598                )))
599            }
600        });
601        let mut cancelled_ids = join_all(futures)
602            .await
603            .into_iter()
604            .collect::<MetaResult<Vec<_>>>()?;
605
606        // NOTE(kwannoel): For background_job_ids stream jobs that not tracked in streaming manager,
607        // we can directly cancel them by running the barrier command.
608        let futures = background_job_ids.into_iter().map(|id| async move {
609            let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
610            if fragment.is_created() {
611                tracing::warn!(
612                    "streaming job {} is already created, ignore cancel request",
613                    id
614                );
615                return Ok(None);
616            }
617            if fragment.is_created() {
618                Err(MetaError::invalid_parameter(format!(
619                    "streaming job {} is already created",
620                    id
621                )))?;
622            }
623
624            let cancel_command = self
625                .metadata_manager
626                .catalog_controller
627                .build_cancel_command(&fragment)
628                .await?;
629
630            let (_, database_id) = self
631                .metadata_manager
632                .catalog_controller
633                .try_abort_creating_streaming_job(id, true)
634                .await?;
635
636            if let Some(database_id) = database_id {
637                self.barrier_scheduler
638                    .run_command(database_id, cancel_command)
639                    .await?;
640            }
641
642            tracing::info!(?id, "cancelled background streaming job");
643            Ok(Some(id))
644        });
645        let cancelled_recovered_ids = join_all(futures)
646            .await
647            .into_iter()
648            .collect::<MetaResult<Vec<_>>>()?;
649
650        cancelled_ids.extend(cancelled_recovered_ids.into_iter().flatten());
651        Ok(cancelled_ids)
652    }
653
654    pub(crate) async fn reschedule_streaming_job(
655        &self,
656        job_id: JobId,
657        policy: ReschedulePolicy,
658        deferred: bool,
659    ) -> MetaResult<()> {
660        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
661
662        let background_jobs = self
663            .metadata_manager
664            .list_background_creating_jobs()
665            .await?;
666
667        if !background_jobs.is_empty() {
668            let related_jobs = self
669                .scale_controller
670                .resolve_related_no_shuffle_jobs(&background_jobs)
671                .await?;
672
673            if related_jobs.contains(&job_id) {
674                bail!(
675                    "Cannot alter the job {} because the related job {:?} is currently being created",
676                    job_id,
677                    background_jobs,
678                );
679            }
680        }
681
682        let worker_nodes = self
683            .metadata_manager
684            .list_active_streaming_compute_nodes()
685            .await?
686            .into_iter()
687            .filter(|w| w.is_streaming_schedulable())
688            .collect_vec();
689        let workers = worker_nodes.into_iter().map(|x| (x.id, x)).collect();
690
691        let commands = self
692            .scale_controller
693            .reschedule_inplace(HashMap::from([(job_id, policy)]), workers)
694            .await?;
695
696        if !deferred {
697            let _source_pause_guard = self.source_manager.pause_tick().await;
698
699            for (database_id, command) in commands {
700                self.barrier_scheduler
701                    .run_command(database_id, command)
702                    .await?;
703            }
704        }
705
706        Ok(())
707    }
708
709    /// This method is copied from `GlobalStreamManager::reschedule_streaming_job` and modified to handle reschedule CDC table backfill.
710    pub(crate) async fn reschedule_cdc_table_backfill(
711        &self,
712        job_id: JobId,
713        target: ReschedulePolicy,
714    ) -> MetaResult<()> {
715        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
716
717        let parallelism_policy = match target {
718            ReschedulePolicy::Parallelism(policy)
719                if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
720            {
721                policy
722            }
723            _ => bail_invalid_parameter!(
724                "CDC backfill reschedule only supports fixed parallelism targets"
725            ),
726        };
727
728        let worker_nodes = self
729            .metadata_manager
730            .list_active_streaming_compute_nodes()
731            .await?
732            .into_iter()
733            .filter(|w| w.is_streaming_schedulable())
734            .collect_vec();
735        let workers = worker_nodes.into_iter().map(|x| (x.id, x)).collect();
736
737        let cdc_fragment_id = {
738            let inner = self.metadata_manager.catalog_controller.inner.read().await;
739            let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
740                .select_only()
741                .columns([
742                    fragment::Column::FragmentId,
743                    fragment::Column::FragmentTypeMask,
744                ])
745                .filter(fragment::Column::JobId.eq(job_id))
746                .into_tuple()
747                .all(&inner.db)
748                .await?;
749
750            let cdc_fragments = fragments
751                .into_iter()
752                .filter_map(|(fragment_id, mask)| {
753                    FragmentTypeMask::from(mask)
754                        .contains(FragmentTypeFlag::StreamCdcScan)
755                        .then_some(fragment_id)
756                })
757                .collect_vec();
758
759            match cdc_fragments.len() {
760                0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
761                1 => cdc_fragments[0],
762                _ => bail_invalid_parameter!(
763                    "multiple StreamCdcScan fragments found for job {}; expected exactly one",
764                    job_id
765                ),
766            }
767        };
768
769        let fragment_policy = HashMap::from([(
770            cdc_fragment_id,
771            Some(parallelism_policy.parallelism.clone()),
772        )]);
773
774        let commands = self
775            .scale_controller
776            .reschedule_fragment_inplace(fragment_policy, workers)
777            .await?;
778
779        let _source_pause_guard = self.source_manager.pause_tick().await;
780
781        for (database_id, command) in commands {
782            self.barrier_scheduler
783                .run_command(database_id, command)
784                .await?;
785        }
786
787        Ok(())
788    }
789
790    pub(crate) async fn reschedule_fragments(
791        &self,
792        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
793    ) -> MetaResult<()> {
794        if fragment_targets.is_empty() {
795            return Ok(());
796        }
797
798        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
799
800        let workers = self
801            .metadata_manager
802            .list_active_streaming_compute_nodes()
803            .await?
804            .into_iter()
805            .filter(|w| w.is_streaming_schedulable())
806            .map(|worker| (worker.id, worker))
807            .collect();
808
809        let fragment_policy = fragment_targets
810            .into_iter()
811            .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
812            .collect();
813
814        let commands = self
815            .scale_controller
816            .reschedule_fragment_inplace(fragment_policy, workers)
817            .await?;
818
819        let _source_pause_guard = self.source_manager.pause_tick().await;
820
821        for (database_id, command) in commands {
822            self.barrier_scheduler
823                .run_command(database_id, command)
824                .await?;
825        }
826
827        Ok(())
828    }
829
830    // Don't need to add actor, just send a command
831    pub async fn create_subscription(
832        self: &Arc<Self>,
833        subscription: &Subscription,
834    ) -> MetaResult<()> {
835        let command = Command::CreateSubscription {
836            subscription_id: subscription.id,
837            upstream_mv_table_id: subscription.dependent_table_id,
838            retention_second: subscription.retention_seconds,
839        };
840
841        tracing::debug!("sending Command::CreateSubscription");
842        self.barrier_scheduler
843            .run_command(subscription.database_id, command)
844            .await?;
845        Ok(())
846    }
847
848    // Don't need to add actor, just send a command
849    pub async fn drop_subscription(
850        self: &Arc<Self>,
851        database_id: DatabaseId,
852        subscription_id: SubscriptionId,
853        table_id: TableId,
854    ) {
855        let command = Command::DropSubscription {
856            subscription_id,
857            upstream_mv_table_id: table_id,
858        };
859
860        tracing::debug!("sending Command::DropSubscriptions");
861        let _ = self
862            .barrier_scheduler
863            .run_command(database_id, command)
864            .await
865            .inspect_err(|err| {
866                tracing::error!(error = ?err.as_report(), "failed to run drop command");
867            });
868    }
869}