risingwave_meta/stream/
stream_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::iter;
17use std::sync::Arc;
18
19use await_tree::{InstrumentAwait, span};
20use futures::FutureExt;
21use futures::future::join_all;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::{DatabaseId, Field, TableId};
25use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
26use risingwave_meta_model::ObjectId;
27use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
28use risingwave_pb::expr::PbExprNode;
29use risingwave_pb::meta::table_fragments::ActorStatus;
30use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc::Sender;
33use tokio::sync::{Mutex, oneshot};
34use tracing::Instrument;
35
36use super::{
37    FragmentBackfillOrder, JobParallelismTarget, JobReschedulePolicy, JobReschedulePostUpdates,
38    JobRescheduleTarget, JobResourceGroupTarget, Locations, RescheduleOptions, ScaleControllerRef,
39};
40use crate::barrier::{
41    BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
42    ReplaceStreamJobPlan, SnapshotBackfillInfo,
43};
44use crate::controller::catalog::DropTableConnectorContext;
45use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
46use crate::error::bail_invalid_parameter;
47use crate::manager::{
48    MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
49};
50use crate::model::{
51    ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
52    FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate,
53    TableParallelism,
54};
55use crate::stream::cdc::{
56    assign_cdc_table_snapshot_splits, is_parallelized_backfill_enabled_cdc_scan_fragment,
57};
58use crate::stream::{SourceChange, SourceManagerRef};
59use crate::{MetaError, MetaResult};
60
61pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
62
63#[derive(Default)]
64pub struct CreateStreamingJobOption {
65    // leave empty as a placeholder for future option if there is any
66}
67
68#[derive(Debug, Clone)]
69pub struct UpstreamSinkInfo {
70    pub sink_id: ObjectId,
71    pub sink_fragment_id: FragmentId,
72    pub sink_output_fields: Vec<PbField>,
73    // for backwards compatibility
74    pub sink_original_target_columns: Vec<PbColumnCatalog>,
75    pub project_exprs: Vec<PbExprNode>,
76    pub new_sink_downstream: DownstreamFragmentRelation,
77}
78
79/// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job.
80///
81/// Note: for better readability, keep this struct complete and immutable once created.
82pub struct CreateStreamingJobContext {
83    /// New fragment relation to add from upstream fragments to downstream fragments.
84    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
85    pub new_no_shuffle: FragmentNewNoShuffle,
86    pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
87
88    /// The locations of the actors to build in the streaming job.
89    pub building_locations: Locations,
90
91    /// DDL definition.
92    pub definition: String,
93
94    pub mv_table_id: Option<u32>,
95
96    pub create_type: CreateType,
97
98    pub job_type: StreamingJobType,
99
100    /// Used for sink-into-table.
101    pub new_upstream_sink: Option<UpstreamSinkInfo>,
102
103    pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
104    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
105
106    pub option: CreateStreamingJobOption,
107
108    pub streaming_job: StreamingJob,
109
110    pub fragment_backfill_ordering: FragmentBackfillOrder,
111}
112
113pub enum CreatingState {
114    Failed { reason: MetaError },
115    // sender is used to notify the canceling result.
116    Canceling { finish_tx: oneshot::Sender<()> },
117    Created { version: NotificationVersion },
118}
119
120struct StreamingJobExecution {
121    id: TableId,
122    shutdown_tx: Option<Sender<CreatingState>>,
123}
124
125impl StreamingJobExecution {
126    fn new(id: TableId, shutdown_tx: Sender<CreatingState>) -> Self {
127        Self {
128            id,
129            shutdown_tx: Some(shutdown_tx),
130        }
131    }
132}
133
134#[derive(Default)]
135struct CreatingStreamingJobInfo {
136    streaming_jobs: Mutex<HashMap<TableId, StreamingJobExecution>>,
137}
138
139impl CreatingStreamingJobInfo {
140    async fn add_job(&self, job: StreamingJobExecution) {
141        let mut jobs = self.streaming_jobs.lock().await;
142        jobs.insert(job.id, job);
143    }
144
145    async fn delete_job(&self, job_id: TableId) {
146        let mut jobs = self.streaming_jobs.lock().await;
147        jobs.remove(&job_id);
148    }
149
150    async fn cancel_jobs(
151        &self,
152        job_ids: Vec<TableId>,
153    ) -> (HashMap<TableId, oneshot::Receiver<()>>, Vec<TableId>) {
154        let mut jobs = self.streaming_jobs.lock().await;
155        let mut receivers = HashMap::new();
156        let mut recovered_job_ids = vec![];
157        for job_id in job_ids {
158            if let Some(job) = jobs.get_mut(&job_id) {
159                if let Some(shutdown_tx) = job.shutdown_tx.take() {
160                    let (tx, rx) = oneshot::channel();
161                    if shutdown_tx
162                        .send(CreatingState::Canceling { finish_tx: tx })
163                        .await
164                        .is_ok()
165                    {
166                        receivers.insert(job_id, rx);
167                    } else {
168                        tracing::warn!(id=?job_id, "failed to send canceling state");
169                    }
170                }
171            } else {
172                // If these job ids do not exist in streaming_jobs,
173                // we can infer they either:
174                // 1. are entirely non-existent,
175                // 2. OR they are recovered streaming jobs, and managed by BarrierManager.
176                recovered_job_ids.push(job_id);
177            }
178        }
179        (receivers, recovered_job_ids)
180    }
181
182    async fn check_job_exists(&self, job_id: TableId) -> bool {
183        let jobs = self.streaming_jobs.lock().await;
184        jobs.contains_key(&job_id)
185    }
186}
187
188type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
189
190#[derive(Debug, Clone)]
191pub struct AutoRefreshSchemaSinkContext {
192    pub tmp_sink_id: ObjectId,
193    pub original_sink: PbSink,
194    pub original_fragment: Fragment,
195    pub new_schema: Vec<PbColumnCatalog>,
196    pub newly_add_fields: Vec<Field>,
197    pub new_fragment: Fragment,
198    pub new_log_store_table: Option<PbTable>,
199    pub actor_status: BTreeMap<ActorId, ActorStatus>,
200}
201
202impl AutoRefreshSchemaSinkContext {
203    pub fn new_fragment_info(&self) -> InflightFragmentInfo {
204        InflightFragmentInfo {
205            fragment_id: self.new_fragment.fragment_id,
206            distribution_type: self.new_fragment.distribution_type.into(),
207            nodes: self.new_fragment.nodes.clone(),
208            actors: self
209                .new_fragment
210                .actors
211                .iter()
212                .map(|actor| {
213                    (
214                        actor.actor_id as _,
215                        InflightActorInfo {
216                            worker_id: self.actor_status[&actor.actor_id]
217                                .location
218                                .as_ref()
219                                .unwrap()
220                                .worker_node_id as _,
221                            vnode_bitmap: actor.vnode_bitmap.clone(),
222                            splits: vec![],
223                        },
224                    )
225                })
226                .collect(),
227            state_table_ids: self
228                .new_fragment
229                .state_table_ids
230                .iter()
231                .map(|table| (*table).into())
232                .collect(),
233        }
234    }
235}
236
237/// [`ReplaceStreamJobContext`] carries one-time infos for replacing the plan of an existing stream job.
238///
239/// Note: for better readability, keep this struct complete and immutable once created.
240pub struct ReplaceStreamJobContext {
241    /// The old job fragments to be replaced.
242    pub old_fragments: StreamJobFragments,
243
244    /// The updates to be applied to the downstream chain actors. Used for schema change.
245    pub replace_upstream: FragmentReplaceUpstream,
246    pub new_no_shuffle: FragmentNewNoShuffle,
247
248    /// New fragment relation to add from existing upstream fragment to downstream fragment.
249    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
250
251    /// The locations of the actors to build in the new job to replace.
252    pub building_locations: Locations,
253
254    pub streaming_job: StreamingJob,
255
256    pub tmp_id: u32,
257
258    /// Used for dropping an associated source. Dropping source and related internal tables.
259    pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
260
261    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
262}
263
264/// `GlobalStreamManager` manages all the streams in the system.
265pub struct GlobalStreamManager {
266    pub env: MetaSrvEnv,
267
268    pub metadata_manager: MetadataManager,
269
270    /// Broadcasts and collect barriers
271    pub barrier_scheduler: BarrierScheduler,
272
273    /// Maintains streaming sources from external system like kafka
274    pub source_manager: SourceManagerRef,
275
276    /// Creating streaming job info.
277    creating_job_info: CreatingStreamingJobInfoRef,
278
279    pub scale_controller: ScaleControllerRef,
280}
281
282impl GlobalStreamManager {
283    pub fn new(
284        env: MetaSrvEnv,
285        metadata_manager: MetadataManager,
286        barrier_scheduler: BarrierScheduler,
287        source_manager: SourceManagerRef,
288        scale_controller: ScaleControllerRef,
289    ) -> MetaResult<Self> {
290        Ok(Self {
291            env,
292            metadata_manager,
293            barrier_scheduler,
294            source_manager,
295            creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
296            scale_controller,
297        })
298    }
299
300    /// Create streaming job, it works as follows:
301    ///
302    /// 1. Broadcast the actor info based on the scheduling result in the context, build the hanging
303    ///    channels in upstream worker nodes.
304    /// 2. (optional) Get the split information of the `StreamSource` via source manager and patch
305    ///    actors.
306    /// 3. Notify related worker nodes to update and build the actors.
307    /// 4. Store related meta data.
308    ///
309    /// This function is a wrapper over [`Self::run_create_streaming_job_command`].
310    #[await_tree::instrument]
311    pub async fn create_streaming_job(
312        self: &Arc<Self>,
313        stream_job_fragments: StreamJobFragmentsToCreate,
314        ctx: CreateStreamingJobContext,
315        run_command_notifier: Option<oneshot::Sender<MetaResult<()>>>,
316    ) -> MetaResult<NotificationVersion> {
317        let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
318        let await_tree_span = span!(
319            "{:?}({})",
320            ctx.streaming_job.job_type(),
321            ctx.streaming_job.name()
322        );
323
324        let table_id = stream_job_fragments.stream_job_id();
325        let database_id = ctx.streaming_job.database_id().into();
326        let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
327        let execution = StreamingJobExecution::new(table_id, sender.clone());
328        self.creating_job_info.add_job(execution).await;
329
330        let stream_manager = self.clone();
331        let fut = async move {
332            let res: MetaResult<_> = try {
333                let (source_change, streaming_job) = stream_manager
334                    .run_create_streaming_job_command(stream_job_fragments, ctx)
335                    .inspect(move |result| {
336                        if let Some(tx) = run_command_notifier {
337                            let _ = tx.send(match result {
338                                Ok(_) => {
339                                    Ok(())
340                                }
341                                Err(err) => {
342                                    Err(err.clone())
343                                }
344                            });
345                        }
346                    })
347                    .await?;
348                let version = stream_manager
349                    .metadata_manager
350                    .wait_streaming_job_finished(
351                        streaming_job.database_id().into(),
352                        streaming_job.id() as _,
353                    )
354                    .await?;
355                stream_manager.source_manager
356                    .apply_source_change(source_change)
357                    .await;
358                tracing::debug!(?streaming_job, "stream job finish");
359                version
360            };
361
362            match res {
363                Ok(version) => {
364                    let _ = sender
365                        .send(CreatingState::Created { version })
366                        .await
367                        .inspect_err(|_| tracing::warn!("failed to notify created: {table_id}"));
368                }
369                Err(err) => {
370                    let _ = sender
371                        .send(CreatingState::Failed {
372                            reason: err.clone(),
373                        })
374                        .await
375                        .inspect_err(|_| {
376                            tracing::warn!(error = %err.as_report(), "failed to notify failed: {table_id}")
377                        });
378                }
379            }
380        }
381        .in_current_span();
382
383        let fut = (self.env.await_tree_reg())
384            .register(await_tree_key, await_tree_span)
385            .instrument(Box::pin(fut));
386        tokio::spawn(fut);
387
388        while let Some(state) = receiver
389            .recv()
390            .instrument_await("recv_creating_state")
391            .await
392        {
393            match state {
394                CreatingState::Failed { reason } => {
395                    tracing::debug!(id=?table_id, "stream job failed");
396                    // FIXME(kwannoel): For creating stream jobs
397                    // we need to clean up the resources in the stream manager.
398                    self.creating_job_info.delete_job(table_id).await;
399                    return Err(reason);
400                }
401                CreatingState::Canceling { finish_tx } => {
402                    tracing::debug!(id=?table_id, "cancelling streaming job");
403                    if let Ok(table_fragments) = self
404                        .metadata_manager
405                        .get_job_fragments_by_id(&table_id)
406                        .await
407                    {
408                        // try to cancel buffered creating command.
409                        if self
410                            .barrier_scheduler
411                            .try_cancel_scheduled_create(database_id, table_id)
412                        {
413                            tracing::debug!("cancelling streaming job {table_id} in buffer queue.");
414                        } else if !table_fragments.is_created() {
415                            tracing::debug!(
416                                "cancelling streaming job {table_id} by issue cancel command."
417                            );
418
419                            let cancel_command = self
420                                .metadata_manager
421                                .catalog_controller
422                                .build_cancel_command(&table_fragments)
423                                .await?;
424
425                            self.metadata_manager
426                                .catalog_controller
427                                .try_abort_creating_streaming_job(table_id.table_id as _, true)
428                                .await?;
429
430                            self.barrier_scheduler
431                                .run_command(database_id, cancel_command)
432                                .await?;
433                        } else {
434                            // streaming job is already completed.
435                            continue;
436                        }
437                        let _ = finish_tx.send(()).inspect_err(|_| {
438                            tracing::warn!("failed to notify cancelled: {table_id}")
439                        });
440                        self.creating_job_info.delete_job(table_id).await;
441                        return Err(MetaError::cancelled("create"));
442                    }
443                }
444                CreatingState::Created { version } => {
445                    self.creating_job_info.delete_job(table_id).await;
446                    return Ok(version);
447                }
448            }
449        }
450        self.creating_job_info.delete_job(table_id).await;
451        bail!("receiver failed to get notification version for finished stream job")
452    }
453
454    /// The function will only return after backfilling finishes
455    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
456    #[await_tree::instrument]
457    async fn run_create_streaming_job_command(
458        &self,
459        stream_job_fragments: StreamJobFragmentsToCreate,
460        CreateStreamingJobContext {
461            streaming_job,
462            upstream_fragment_downstreams,
463            new_no_shuffle,
464            upstream_actors,
465            definition,
466            create_type,
467            job_type,
468            new_upstream_sink,
469            snapshot_backfill_info,
470            cross_db_snapshot_backfill_info,
471            fragment_backfill_ordering,
472            ..
473        }: CreateStreamingJobContext,
474    ) -> MetaResult<(SourceChange, StreamingJob)> {
475        tracing::debug!(
476            table_id = %stream_job_fragments.stream_job_id(),
477            "built actors finished"
478        );
479
480        // Here we need to consider:
481        // - Shared source
482        // - Table with connector
483        // - MV on shared source
484        let mut init_split_assignment = self
485            .source_manager
486            .allocate_splits(&stream_job_fragments)
487            .await?;
488        init_split_assignment.extend(
489            self.source_manager
490                .allocate_splits_for_backfill(
491                    &stream_job_fragments,
492                    &new_no_shuffle,
493                    &upstream_actors,
494                )
495                .await?,
496        );
497
498        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
499            stream_job_fragments.stream_job_id.table_id,
500            &stream_job_fragments,
501            self.env.meta_store_ref(),
502        )
503        .await?;
504        let cdc_table_snapshot_split_assignment = if !cdc_table_snapshot_split_assignment.is_empty()
505        {
506            self.env.cdc_table_backfill_tracker.track_new_job(
507                stream_job_fragments.stream_job_id.table_id,
508                cdc_table_snapshot_split_assignment
509                    .values()
510                    .map(|s| u64::try_from(s.len()).unwrap())
511                    .sum(),
512            );
513            self.env
514                .cdc_table_backfill_tracker
515                .add_fragment_table_mapping(
516                    stream_job_fragments
517                        .fragments
518                        .values()
519                        .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
520                        .map(|f| f.fragment_id),
521                    stream_job_fragments.stream_job_id.table_id,
522                );
523            CdcTableSnapshotSplitAssignmentWithGeneration::new(
524                cdc_table_snapshot_split_assignment,
525                self.env
526                    .cdc_table_backfill_tracker
527                    .next_generation(iter::once(stream_job_fragments.stream_job_id.table_id)),
528            )
529        } else {
530            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
531        };
532
533        let source_change = SourceChange::CreateJobFinished {
534            finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
535        };
536
537        let info = CreateStreamingJobCommandInfo {
538            stream_job_fragments,
539            upstream_fragment_downstreams,
540            init_split_assignment,
541            definition: definition.clone(),
542            streaming_job: streaming_job.clone(),
543            job_type,
544            create_type,
545            fragment_backfill_ordering,
546            cdc_table_snapshot_split_assignment,
547        };
548
549        let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
550            tracing::debug!(
551                ?snapshot_backfill_info,
552                "sending Command::CreateSnapshotBackfillStreamingJob"
553            );
554            CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
555        } else {
556            tracing::debug!("sending Command::CreateStreamingJob");
557            if let Some(new_upstream_sink) = new_upstream_sink {
558                CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
559            } else {
560                CreateStreamingJobType::Normal
561            }
562        };
563
564        let command = Command::CreateStreamingJob {
565            info,
566            job_type,
567            cross_db_snapshot_backfill_info,
568        };
569
570        self.barrier_scheduler
571            .run_command(streaming_job.database_id().into(), command)
572            .await?;
573
574        tracing::debug!(?streaming_job, "first barrier collected for stream job");
575
576        Ok((source_change, streaming_job))
577    }
578
579    /// Send replace job command to barrier scheduler.
580    pub async fn replace_stream_job(
581        &self,
582        new_fragments: StreamJobFragmentsToCreate,
583        ReplaceStreamJobContext {
584            old_fragments,
585            replace_upstream,
586            new_no_shuffle,
587            upstream_fragment_downstreams,
588            tmp_id,
589            streaming_job,
590            drop_table_connector_ctx,
591            auto_refresh_schema_sinks,
592            ..
593        }: ReplaceStreamJobContext,
594    ) -> MetaResult<()> {
595        let init_split_assignment = if streaming_job.is_source() {
596            self.source_manager
597                .allocate_splits_for_replace_source(
598                    &new_fragments,
599                    &replace_upstream,
600                    &new_no_shuffle,
601                )
602                .await?
603        } else {
604            self.source_manager.allocate_splits(&new_fragments).await?
605        };
606        tracing::info!(
607            "replace_stream_job - allocate split: {:?}",
608            init_split_assignment
609        );
610
611        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
612            old_fragments.stream_job_id.table_id,
613            &new_fragments.inner,
614            self.env.meta_store_ref(),
615        )
616        .await?;
617
618        self.barrier_scheduler
619            .run_command(
620                streaming_job.database_id().into(),
621                Command::ReplaceStreamJob(ReplaceStreamJobPlan {
622                    old_fragments,
623                    new_fragments,
624                    replace_upstream,
625                    upstream_fragment_downstreams,
626                    init_split_assignment,
627                    streaming_job,
628                    tmp_id,
629                    to_drop_state_table_ids: {
630                        if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
631                            vec![TableId::new(
632                                drop_table_connector_ctx.to_remove_state_table_id as _,
633                            )]
634                        } else {
635                            Vec::new()
636                        }
637                    },
638                    auto_refresh_schema_sinks,
639                    cdc_table_snapshot_split_assignment,
640                }),
641            )
642            .await?;
643
644        Ok(())
645    }
646
647    /// Drop streaming jobs by barrier manager, and clean up all related resources. The error will
648    /// be ignored because the recovery process will take over it in cleaning part. Check
649    /// [`Command::DropStreamingJobs`] for details.
650    pub async fn drop_streaming_jobs(
651        &self,
652        database_id: DatabaseId,
653        removed_actors: Vec<ActorId>,
654        streaming_job_ids: Vec<ObjectId>,
655        state_table_ids: Vec<risingwave_meta_model::TableId>,
656        fragment_ids: HashSet<FragmentId>,
657        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
658    ) {
659        // TODO(august): This is a workaround for canceling SITT via drop, remove it after refactoring SITT.
660        for &job_id in &streaming_job_ids {
661            if self
662                .creating_job_info
663                .check_job_exists(TableId::new(job_id as _))
664                .await
665            {
666                tracing::info!(
667                    ?job_id,
668                    "streaming job is creating, cancel it with drop directly"
669                );
670                self.metadata_manager
671                    .notify_cancelled(database_id, job_id)
672                    .await;
673            }
674        }
675
676        if !removed_actors.is_empty()
677            || !streaming_job_ids.is_empty()
678            || !state_table_ids.is_empty()
679        {
680            let _ = self
681                .barrier_scheduler
682                .run_command(
683                    database_id,
684                    Command::DropStreamingJobs {
685                        streaming_job_ids: streaming_job_ids
686                            .iter()
687                            .map(|job_id| TableId::new(*job_id as _))
688                            .collect(),
689                        actors: removed_actors,
690                        unregistered_state_table_ids: state_table_ids
691                            .iter()
692                            .map(|table_id| TableId::new(*table_id as _))
693                            .collect(),
694                        unregistered_fragment_ids: fragment_ids,
695                        dropped_sink_fragment_by_targets,
696                    },
697                )
698                .await
699                .inspect_err(|err| {
700                    tracing::error!(error = ?err.as_report(), "failed to run drop command");
701                });
702        }
703    }
704
705    /// Cancel streaming jobs and return the canceled table ids.
706    /// 1. Send cancel message to stream jobs (via `cancel_jobs`).
707    /// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
708    ///
709    /// Cleanup of their state will be cleaned up after the `CancelStreamJob` command succeeds,
710    /// by the barrier manager for both of them.
711    pub async fn cancel_streaming_jobs(&self, table_ids: Vec<TableId>) -> Vec<TableId> {
712        if table_ids.is_empty() {
713            return vec![];
714        }
715
716        let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
717        let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;
718
719        let futures = receivers.into_iter().map(|(id, receiver)| async move {
720            if receiver.await.is_ok() {
721                tracing::info!("canceled streaming job {id}");
722                Some(id)
723            } else {
724                tracing::warn!("failed to cancel streaming job {id}");
725                None
726            }
727        });
728        let mut cancelled_ids = join_all(futures).await.into_iter().flatten().collect_vec();
729
730        // NOTE(kwannoel): For recovered stream jobs, we can directly cancel them by running the barrier command,
731        // since Barrier manager manages the recovered stream jobs.
732        let futures = recovered_job_ids.into_iter().map(|id| async move {
733            tracing::debug!(?id, "cancelling recovered streaming job");
734            let result: MetaResult<()> = try {
735                let fragment = self
736                    .metadata_manager.get_job_fragments_by_id(&id)
737                    .await?;
738                if fragment.is_created() {
739                    Err(MetaError::invalid_parameter(format!(
740                        "streaming job {} is already created",
741                        id
742                    )))?;
743                }
744
745                let cancel_command = self
746                    .metadata_manager
747                    .catalog_controller
748                    .build_cancel_command(&fragment)
749                    .await?;
750
751                let (_, database_id) = self.metadata_manager
752                    .catalog_controller
753                    .try_abort_creating_streaming_job(id.table_id as _, true)
754                    .await?;
755
756                if let Some(database_id) = database_id {
757                    self.barrier_scheduler
758                        .run_command(DatabaseId::new(database_id as _), cancel_command)
759                        .await?;
760                }
761            };
762            match result {
763                Ok(_) => {
764                    tracing::info!(?id, "cancelled recovered streaming job");
765                    Some(id)
766                }
767                Err(err) => {
768                    tracing::error!(error=?err.as_report(), "failed to cancel recovered streaming job {id}, does it correspond to any jobs in `SHOW JOBS`?");
769                    None
770                }
771            }
772        });
773        let cancelled_recovered_ids = join_all(futures).await.into_iter().flatten().collect_vec();
774
775        cancelled_ids.extend(cancelled_recovered_ids);
776        cancelled_ids
777    }
778
779    pub(crate) async fn reschedule_streaming_job(
780        &self,
781        job_id: u32,
782        target: JobRescheduleTarget,
783        deferred: bool,
784    ) -> MetaResult<()> {
785        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
786        let background_jobs = self
787            .metadata_manager
788            .list_background_creating_jobs()
789            .await?;
790
791        if !background_jobs.is_empty() {
792            let related_jobs = self
793                .scale_controller
794                .resolve_related_no_shuffle_jobs(&background_jobs)
795                .await?;
796
797            for job in background_jobs {
798                if related_jobs.contains(&job) {
799                    bail!(
800                        "Cannot alter the job {} because the related job {} is currently being created",
801                        job_id,
802                        job.table_id
803                    );
804                }
805            }
806        }
807
808        let JobRescheduleTarget {
809            parallelism: parallelism_change,
810            resource_group: resource_group_change,
811        } = target;
812
813        let database_id = DatabaseId::new(
814            self.metadata_manager
815                .catalog_controller
816                .get_object_database_id(job_id as ObjectId)
817                .await? as _,
818        );
819        let job_id = TableId::new(job_id);
820
821        let worker_nodes = self
822            .metadata_manager
823            .list_active_streaming_compute_nodes()
824            .await?
825            .into_iter()
826            .filter(|w| w.is_streaming_schedulable())
827            .collect_vec();
828
829        // Check if the provided parallelism is valid.
830        let available_parallelism = worker_nodes
831            .iter()
832            .map(|w| w.compute_node_parallelism())
833            .sum::<usize>();
834        let max_parallelism = self
835            .metadata_manager
836            .get_job_max_parallelism(job_id)
837            .await?;
838
839        if let JobParallelismTarget::Update(parallelism) = parallelism_change {
840            match parallelism {
841                TableParallelism::Adaptive => {
842                    if available_parallelism > max_parallelism {
843                        tracing::warn!(
844                            "too many parallelism available, use max parallelism {} will be limited",
845                            max_parallelism
846                        );
847                    }
848                }
849                TableParallelism::Fixed(parallelism) => {
850                    if parallelism > max_parallelism {
851                        bail_invalid_parameter!(
852                            "specified parallelism {} should not exceed max parallelism {}",
853                            parallelism,
854                            max_parallelism
855                        );
856                    }
857                }
858                TableParallelism::Custom => {
859                    bail_invalid_parameter!("should not alter parallelism to custom")
860                }
861            }
862        }
863
864        let table_parallelism_assignment = match &parallelism_change {
865            JobParallelismTarget::Update(parallelism) => HashMap::from([(job_id, *parallelism)]),
866            JobParallelismTarget::Refresh => HashMap::new(),
867        };
868        let resource_group_assignment = match &resource_group_change {
869            JobResourceGroupTarget::Update(target) => {
870                HashMap::from([(job_id.table_id() as ObjectId, target.clone())])
871            }
872            JobResourceGroupTarget::Keep => HashMap::new(),
873        };
874
875        if deferred {
876            tracing::debug!(
877                "deferred mode enabled for job {}, set the parallelism directly to parallelism {:?}, resource group {:?}",
878                job_id,
879                parallelism_change,
880                resource_group_change,
881            );
882            self.scale_controller
883                .post_apply_reschedule(
884                    &HashMap::new(),
885                    &JobReschedulePostUpdates {
886                        parallelism_updates: table_parallelism_assignment,
887                        resource_group_updates: resource_group_assignment,
888                    },
889                )
890                .await?;
891        } else {
892            let reschedule_plan = self
893                .scale_controller
894                .generate_job_reschedule_plan(
895                    JobReschedulePolicy {
896                        targets: HashMap::from([(
897                            job_id.table_id,
898                            JobRescheduleTarget {
899                                parallelism: parallelism_change,
900                                resource_group: resource_group_change,
901                            },
902                        )]),
903                    },
904                    false,
905                )
906                .await?;
907
908            if reschedule_plan.reschedules.is_empty() {
909                tracing::debug!(
910                    "empty reschedule plan generated for job {}, set the parallelism directly to {:?}",
911                    job_id,
912                    reschedule_plan.post_updates
913                );
914                self.scale_controller
915                    .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
916                    .await?;
917            } else {
918                self.reschedule_actors(
919                    database_id,
920                    reschedule_plan,
921                    RescheduleOptions {
922                        resolve_no_shuffle_upstream: false,
923                        skip_create_new_actors: false,
924                    },
925                )
926                .await?;
927            }
928        };
929
930        Ok(())
931    }
932
933    /// This method is copied from `GlobalStreamManager::reschedule_streaming_job` and modified to handle reschedule CDC table backfill.
934    pub(crate) async fn reschedule_cdc_table_backfill(
935        &self,
936        job_id: u32,
937        target: JobRescheduleTarget,
938    ) -> MetaResult<()> {
939        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
940        let JobRescheduleTarget {
941            parallelism: parallelism_change,
942            resource_group: resource_group_change,
943        } = target;
944        let database_id = DatabaseId::new(
945            self.metadata_manager
946                .catalog_controller
947                .get_object_database_id(job_id as ObjectId)
948                .await? as _,
949        );
950        let job_id = TableId::new(job_id);
951        if let JobParallelismTarget::Update(parallelism) = &parallelism_change {
952            match parallelism {
953                TableParallelism::Fixed(_) => {}
954                TableParallelism::Custom => {
955                    bail_invalid_parameter!("should not alter parallelism to custom")
956                }
957                TableParallelism::Adaptive => {
958                    bail_invalid_parameter!("should not alter parallelism to adaptive")
959                }
960            }
961        } else {
962            bail_invalid_parameter!("should not refresh")
963        }
964        match &resource_group_change {
965            JobResourceGroupTarget::Update(_) => {
966                bail_invalid_parameter!("should not update resource group")
967            }
968            JobResourceGroupTarget::Keep => {}
969        };
970        // Only generate reschedule for fragment of CDC table backfill.
971        let reschedule_plan = self
972            .scale_controller
973            .generate_job_reschedule_plan(
974                JobReschedulePolicy {
975                    targets: HashMap::from([(
976                        job_id.table_id,
977                        JobRescheduleTarget {
978                            parallelism: parallelism_change,
979                            resource_group: resource_group_change,
980                        },
981                    )]),
982                },
983                true,
984            )
985            .await?;
986        if reschedule_plan.reschedules.is_empty() {
987            tracing::debug!(
988                ?job_id,
989                post_updates = ?reschedule_plan.post_updates,
990                "Empty reschedule plan generated for job.",
991            );
992            self.scale_controller
993                .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
994                .await?;
995        } else {
996            self.reschedule_actors(
997                database_id,
998                reschedule_plan,
999                RescheduleOptions {
1000                    resolve_no_shuffle_upstream: false,
1001                    skip_create_new_actors: false,
1002                },
1003            )
1004            .await?;
1005        }
1006
1007        Ok(())
1008    }
1009
1010    // Don't need to add actor, just send a command
1011    pub async fn create_subscription(
1012        self: &Arc<Self>,
1013        subscription: &Subscription,
1014    ) -> MetaResult<()> {
1015        let command = Command::CreateSubscription {
1016            subscription_id: subscription.id,
1017            upstream_mv_table_id: TableId::new(subscription.dependent_table_id),
1018            retention_second: subscription.retention_seconds,
1019        };
1020
1021        tracing::debug!("sending Command::CreateSubscription");
1022        self.barrier_scheduler
1023            .run_command(subscription.database_id.into(), command)
1024            .await?;
1025        Ok(())
1026    }
1027
1028    // Don't need to add actor, just send a command
1029    pub async fn drop_subscription(
1030        self: &Arc<Self>,
1031        database_id: DatabaseId,
1032        subscription_id: u32,
1033        table_id: u32,
1034    ) {
1035        let command = Command::DropSubscription {
1036            subscription_id,
1037            upstream_mv_table_id: TableId::new(table_id),
1038        };
1039
1040        tracing::debug!("sending Command::DropSubscriptions");
1041        let _ = self
1042            .barrier_scheduler
1043            .run_command(database_id, command)
1044            .await
1045            .inspect_err(|err| {
1046                tracing::error!(error = ?err.as_report(), "failed to run drop command");
1047            });
1048    }
1049}