1use std::collections::HashMap;
16use std::sync::Arc;
17
18use await_tree::span;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::{JobId, SinkId};
25use risingwave_connector::source::CdcTableSnapshotSplitRaw;
26use risingwave_meta_model::prelude::Fragment as FragmentModel;
27use risingwave_meta_model::{StreamingParallelism, WorkerId, fragment, streaming_job};
28use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
29use risingwave_pb::expr::PbExprNode;
30use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
31use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
32use thiserror_ext::AsReport;
33use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
34use tracing::Instrument;
35
36use super::{
37 GlobalRefreshManagerRef, ParallelismPolicy, ReschedulePolicy, ScaleControllerRef,
38 StreamFragmentGraph, UserDefinedFragmentBackfillOrder,
39};
40use crate::barrier::{
41 BarrierScheduler, BatchRefreshInfo, Command, CreateStreamingJobCommandInfo,
42 CreateStreamingJobType, ReplaceStreamJobPlan, SnapshotBackfillInfo,
43};
44use crate::controller::catalog::DropTableConnectorContext;
45use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
46use crate::error::bail_invalid_parameter;
47use crate::hummock::HummockManagerRef;
48use crate::manager::{
49 MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
50};
51use crate::model::{
52 ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
53 FragmentReplaceUpstream, StreamActor, StreamContext, StreamJobFragments,
54 StreamJobFragmentsToCreate, SubscriptionId,
55};
56use crate::stream::{ReplaceJobSplitPlan, SourceManagerRef};
57use crate::{MetaError, MetaResult};
58
59pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
60
61pub(crate) async fn cleanup_dropped_streaming_jobs(
62 refresh_manager: &GlobalRefreshManagerRef,
63 hummock_manager: &HummockManagerRef,
64 metadata_manager: &MetadataManager,
65 streaming_job_ids: impl IntoIterator<Item = JobId>,
66 state_table_ids: Vec<TableId>,
67 progress_status: &str,
68) -> MetaResult<()> {
69 for job_id in streaming_job_ids {
70 refresh_manager.remove_progress_tracker(job_id.as_mv_table_id(), progress_status);
71 }
72
73 if state_table_ids.is_empty() {
74 return Ok(());
75 }
76
77 hummock_manager
78 .unregister_table_ids(state_table_ids.clone())
79 .await?;
80 metadata_manager
81 .catalog_controller
82 .complete_dropped_tables(state_table_ids)
83 .await;
84 Ok(())
85}
86
87#[derive(Default)]
88pub struct CreateStreamingJobOption {
89 }
91
92#[derive(Debug, Clone)]
93pub struct UpstreamSinkInfo {
94 pub sink_id: SinkId,
95 pub sink_fragment_id: FragmentId,
96 pub sink_output_fields: Vec<PbField>,
97 pub sink_original_target_columns: Vec<PbColumnCatalog>,
99 pub project_exprs: Vec<PbExprNode>,
100 pub new_sink_downstream: DownstreamFragmentRelation,
101}
102
103pub struct CreateStreamingJobContext {
107 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
109
110 pub database_resource_group: String,
112
113 pub definition: String,
115
116 pub create_type: CreateType,
117
118 pub job_type: StreamingJobType,
119
120 pub new_upstream_sink: Option<UpstreamSinkInfo>,
122
123 pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
124 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
125
126 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
127
128 pub option: CreateStreamingJobOption,
129
130 pub streaming_job: StreamingJob,
131
132 pub fragment_backfill_ordering: UserDefinedFragmentBackfillOrder,
133
134 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
135
136 pub is_serverless_backfill: bool,
137
138 pub streaming_job_model: streaming_job::Model,
140
141 pub refresh_interval_sec: Option<u64>,
143}
144
145struct StreamingJobExecution {
146 id: JobId,
147 shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
148 _permit: OwnedSemaphorePermit,
149}
150
151impl StreamingJobExecution {
152 fn new(
153 id: JobId,
154 shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
155 permit: OwnedSemaphorePermit,
156 ) -> Self {
157 Self {
158 id,
159 shutdown_tx: Some(shutdown_tx),
160 _permit: permit,
161 }
162 }
163}
164
165#[derive(Default)]
166struct CreatingStreamingJobInfo {
167 streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
168}
169
170impl CreatingStreamingJobInfo {
171 async fn add_job(&self, job: StreamingJobExecution) {
172 let mut jobs = self.streaming_jobs.lock().await;
173 jobs.insert(job.id, job);
174 }
175
176 async fn delete_job(&self, job_id: JobId) {
177 let mut jobs = self.streaming_jobs.lock().await;
178 jobs.remove(&job_id);
179 }
180
181 async fn cancel_jobs(
182 &self,
183 job_ids: Vec<JobId>,
184 ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
185 let mut jobs = self.streaming_jobs.lock().await;
186 let mut receivers = HashMap::new();
187 let mut background_job_ids = vec![];
188 for job_id in job_ids {
189 if let Some(job) = jobs.get_mut(&job_id) {
190 if let Some(shutdown_tx) = job.shutdown_tx.take() {
191 let (tx, rx) = oneshot::channel();
192 match shutdown_tx.send(tx) {
193 Ok(()) => {
194 receivers.insert(job_id, rx);
195 }
196 Err(_) => {
197 return Err(anyhow::anyhow!(
198 "failed to send shutdown signal for streaming job {}: receiver dropped",
199 job_id
200 )
201 .into());
202 }
203 }
204 }
205 } else {
206 background_job_ids.push(job_id);
208 }
209 }
210
211 Ok((receivers, background_job_ids))
212 }
213}
214
215type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
216
217#[derive(Debug, Clone)]
218pub struct AutoRefreshSchemaSinkContext {
219 pub tmp_sink_id: SinkId,
220 pub original_sink: PbSink,
221 pub original_fragment: Fragment,
222 pub new_schema: Vec<PbColumnCatalog>,
223 pub newly_add_fields: Vec<Field>,
224 pub removed_column_names: Vec<String>,
225 pub new_fragment: Fragment,
226 pub new_log_store_table: Option<Box<PbTable>>,
227 pub ctx: StreamContext,
229}
230
231impl AutoRefreshSchemaSinkContext {
232 pub fn new_fragment_info(
233 &self,
234 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
235 actor_location: &HashMap<ActorId, WorkerId>,
236 ) -> InflightFragmentInfo {
237 InflightFragmentInfo {
238 fragment_id: self.new_fragment.fragment_id,
239 distribution_type: self.new_fragment.distribution_type.into(),
240 fragment_type_mask: self.new_fragment.fragment_type_mask,
241 vnode_count: self.new_fragment.vnode_count(),
242 nodes: self.new_fragment.nodes.clone(),
243 actors: stream_actors
244 .get(&self.new_fragment.fragment_id)
245 .into_iter()
246 .flatten()
247 .map(|actor| {
248 (
249 actor.actor_id,
250 InflightActorInfo {
251 worker_id: actor_location[&actor.actor_id],
252 vnode_bitmap: actor.vnode_bitmap.clone(),
253 splits: vec![],
254 },
255 )
256 })
257 .collect(),
258 state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
259 }
260 }
261}
262
263pub struct ReplaceStreamJobContext {
267 pub old_fragments: StreamJobFragments,
269
270 pub replace_upstream: FragmentReplaceUpstream,
272
273 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
275
276 pub streaming_job: StreamingJob,
277
278 pub database_resource_group: String,
280
281 pub tmp_id: JobId,
282
283 pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
285
286 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
287
288 pub streaming_job_model: streaming_job::Model,
290}
291
292pub struct GlobalStreamManager {
294 pub env: MetaSrvEnv,
295
296 pub metadata_manager: MetadataManager,
297
298 pub barrier_scheduler: BarrierScheduler,
300
301 pub hummock_manager: HummockManagerRef,
302
303 pub source_manager: SourceManagerRef,
305
306 pub refresh_manager: GlobalRefreshManagerRef,
307
308 creating_job_info: CreatingStreamingJobInfoRef,
310
311 pub scale_controller: ScaleControllerRef,
312}
313
314impl GlobalStreamManager {
315 pub fn new(
316 env: MetaSrvEnv,
317 metadata_manager: MetadataManager,
318 barrier_scheduler: BarrierScheduler,
319 hummock_manager: HummockManagerRef,
320 source_manager: SourceManagerRef,
321 refresh_manager: GlobalRefreshManagerRef,
322 scale_controller: ScaleControllerRef,
323 ) -> MetaResult<Self> {
324 Ok(Self {
325 env,
326 metadata_manager,
327 barrier_scheduler,
328 hummock_manager,
329 source_manager,
330 refresh_manager,
331 creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
332 scale_controller,
333 })
334 }
335
336 #[await_tree::instrument]
347 pub async fn create_streaming_job(
348 self: &Arc<Self>,
349 stream_job_fragments: StreamJobFragmentsToCreate,
350 ctx: CreateStreamingJobContext,
351 permit: OwnedSemaphorePermit,
352 ) -> MetaResult<NotificationVersion> {
353 let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
354 let await_tree_span = span!(
355 "{:?}({})",
356 ctx.streaming_job.job_type(),
357 ctx.streaming_job.name()
358 );
359
360 let job_id = stream_job_fragments.stream_job_id();
361 let database_id = ctx.streaming_job.database_id();
362
363 let (cancel_tx, cancel_rx) = oneshot::channel();
364 let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
365 self.creating_job_info.add_job(execution).await;
366
367 let stream_manager = self.clone();
368 let fut = async move {
369 let create_type = ctx.create_type;
370 let streaming_job = stream_manager
371 .run_create_streaming_job_command(stream_job_fragments, ctx)
372 .await?;
373 let version = match create_type {
374 CreateType::Background => {
375 stream_manager
376 .env
377 .notification_manager_ref()
378 .current_version()
379 .await
380 }
381 CreateType::Foreground => {
382 stream_manager
383 .metadata_manager
384 .wait_streaming_job_finished(database_id, streaming_job.id() as _)
385 .await?
386 }
387 CreateType::Unspecified => unreachable!(),
388 };
389
390 tracing::debug!(?streaming_job, "stream job finish");
391 Ok(version)
392 }
393 .in_current_span();
394
395 let create_fut = (self.env.await_tree_reg())
396 .register(await_tree_key, await_tree_span)
397 .instrument(Box::pin(fut));
398
399 let result = async {
400 tokio::select! {
401 biased;
402
403 res = create_fut => res,
404 notifier = cancel_rx => {
405 let notifier = notifier.expect("sender should not be dropped");
406 tracing::debug!(id=%job_id, "cancelling streaming job");
407
408 enum CancelResult {
409 Completed(MetaResult<NotificationVersion>),
410 Failed(MetaError),
411 Cancelled,
412 }
413
414 let cancel_res = if let Ok(job_fragments) =
415 self.metadata_manager.get_job_fragments_by_id(job_id).await
416 {
417 if self
419 .barrier_scheduler
420 .try_cancel_scheduled_create(database_id, job_id)
421 {
422 tracing::debug!(
423 id=%job_id,
424 "cancelling streaming job in buffer queue."
425 );
426 CancelResult::Cancelled
427 } else if !job_fragments.is_created() {
428 tracing::debug!(
429 id=%job_id,
430 "cancelling streaming job by issue cancel command."
431 );
432
433 let cancel_result: MetaResult<()> = async {
434 let cancel_command = self.metadata_manager.catalog_controller
435 .build_cancel_command(&job_fragments)
436 .await?;
437 let cleanup_state_table_ids =
438 job_fragments.all_table_ids().collect_vec();
439 self.metadata_manager.catalog_controller
440 .try_abort_creating_streaming_job(job_id, true)
441 .await?;
442
443 self.barrier_scheduler
444 .run_command(database_id, cancel_command)
445 .await?;
446 cleanup_dropped_streaming_jobs(
447 &self.refresh_manager,
448 &self.hummock_manager,
449 &self.metadata_manager,
450 [job_id],
451 cleanup_state_table_ids,
452 "cancel_streaming_job",
453 )
454 .await?;
455 Ok(())
456 }
457 .await;
458
459 match cancel_result {
460 Ok(()) => CancelResult::Cancelled,
461 Err(err) => {
462 tracing::warn!(
463 error = ?err.as_report(),
464 id = %job_id,
465 "failed to run cancel command for creating streaming job"
466 );
467 CancelResult::Failed(err)
468 }
469 }
470 } else {
471 CancelResult::Completed(
473 self.metadata_manager
474 .wait_streaming_job_finished(database_id, job_id)
475 .await,
476 )
477 }
478 } else {
479 CancelResult::Cancelled
480 };
481
482 let (cancelled, result) = match cancel_res {
483 CancelResult::Completed(result) => (false, result),
484 CancelResult::Failed(err) => (false, Err(err)),
485 CancelResult::Cancelled => (true, Err(MetaError::cancelled("create"))),
486 };
487
488 let _ = notifier
489 .send(cancelled)
490 .inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
491
492 result
493 }
494 }
495 }
496 .await;
497
498 tracing::debug!("cleaning creating job info: {}", job_id);
499 self.creating_job_info.delete_job(job_id).await;
500 result
501 }
502
503 #[await_tree::instrument]
506 async fn run_create_streaming_job_command(
507 &self,
508 stream_job_fragments: StreamJobFragmentsToCreate,
509 CreateStreamingJobContext {
510 streaming_job,
511 upstream_fragment_downstreams,
512 database_resource_group,
513 definition,
514 create_type,
515 job_type,
516 new_upstream_sink,
517 snapshot_backfill_info,
518 cross_db_snapshot_backfill_info,
519 fragment_backfill_ordering,
520 locality_fragment_state_table_mapping,
521 cdc_table_snapshot_splits,
522 is_serverless_backfill,
523 streaming_job_model,
524 refresh_interval_sec,
525 ..
526 }: CreateStreamingJobContext,
527 ) -> MetaResult<StreamingJob> {
528 tracing::debug!(
529 table_id = %stream_job_fragments.stream_job_id(),
530 "built actors finished"
531 );
532
533 let init_split_assignment = self
538 .source_manager
539 .discover_splits(&stream_job_fragments)
540 .await?;
541
542 let fragment_backfill_ordering =
543 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
544 fragment_backfill_ordering,
545 &stream_job_fragments.downstreams,
546 || {
547 stream_job_fragments
548 .fragments
549 .iter()
550 .map(|(fragment_id, fragment)| {
551 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
552 })
553 },
554 );
555
556 let info = CreateStreamingJobCommandInfo {
557 stream_job_fragments,
558 upstream_fragment_downstreams,
559 init_split_assignment,
560 definition: definition.clone(),
561 streaming_job: streaming_job.clone(),
562 job_type,
563 create_type,
564 database_resource_group,
565 fragment_backfill_ordering,
566 cdc_table_snapshot_splits,
567 locality_fragment_state_table_mapping,
568 is_serverless: is_serverless_backfill,
569 streaming_job_model,
570 refresh_interval_sec,
571 };
572
573 let job_type = if let Some(refresh_interval_sec) = refresh_interval_sec {
574 let snapshot_backfill_info = snapshot_backfill_info.ok_or_else(|| {
575 anyhow::anyhow!(
576 "batch refresh materialized view must have snapshot backfill upstream"
577 )
578 })?;
579 for fragment in info.stream_job_fragments.inner.fragments.values() {
582 let mask = fragment.fragment_type_mask;
583 if mask.contains(FragmentTypeFlag::Source)
584 || mask.contains(FragmentTypeFlag::SourceScan)
585 {
586 bail!(
587 "batch refresh materialized views must not depend on sources directly; \
588 fragment {} has source/source-backfill nodes",
589 fragment.fragment_id
590 );
591 }
592 }
593 tracing::debug!(
594 ?snapshot_backfill_info,
595 refresh_interval_sec,
596 "sending Command::CreateBatchRefreshStreamingJob"
597 );
598 CreateStreamingJobType::BatchRefresh(BatchRefreshInfo {
599 snapshot_backfill_info,
600 refresh_interval_sec,
601 })
602 } else if let Some(snapshot_backfill_info) = snapshot_backfill_info {
603 tracing::debug!(
604 ?snapshot_backfill_info,
605 "sending Command::CreateSnapshotBackfillStreamingJob"
606 );
607 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
608 } else {
609 tracing::debug!("sending Command::CreateStreamingJob");
610 if let Some(new_upstream_sink) = new_upstream_sink {
611 CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
612 } else {
613 CreateStreamingJobType::Normal
614 }
615 };
616
617 let command = Command::CreateStreamingJob {
618 info,
619 job_type,
620 cross_db_snapshot_backfill_info,
621 };
622
623 self.barrier_scheduler
624 .run_command(streaming_job.database_id(), command)
625 .await?;
626
627 tracing::debug!(?streaming_job, "first barrier collected for stream job");
628
629 Ok(streaming_job)
630 }
631
632 pub async fn replace_stream_job(
634 &self,
635 new_fragments: StreamJobFragmentsToCreate,
636 ReplaceStreamJobContext {
637 old_fragments,
638 replace_upstream,
639 upstream_fragment_downstreams,
640 tmp_id,
641 streaming_job,
642 drop_table_connector_ctx,
643 auto_refresh_schema_sinks,
644 streaming_job_model,
645 database_resource_group,
646 }: ReplaceStreamJobContext,
647 ) -> MetaResult<()> {
648 let split_plan = if streaming_job.is_source() {
653 match self
654 .source_manager
655 .discover_splits_for_replace_source(&new_fragments, &replace_upstream)
656 .await?
657 {
658 Some(discovered) => ReplaceJobSplitPlan::Discovered(discovered),
659 None => ReplaceJobSplitPlan::AlignFromPrevious,
660 }
661 } else {
662 let discovered = self.source_manager.discover_splits(&new_fragments).await?;
663 ReplaceJobSplitPlan::Discovered(discovered)
664 };
665 tracing::info!("replace_stream_job - split plan: {:?}", split_plan);
666
667 self.barrier_scheduler
668 .run_command(
669 streaming_job.database_id(),
670 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
671 old_fragments,
672 new_fragments,
673 database_resource_group,
674 replace_upstream,
675 upstream_fragment_downstreams,
676 split_plan,
677 streaming_job,
678 streaming_job_model,
679 tmp_id,
680 to_drop_state_table_ids: {
681 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
682 vec![drop_table_connector_ctx.to_remove_state_table_id]
683 } else {
684 Vec::new()
685 }
686 },
687 auto_refresh_schema_sinks,
688 }),
689 )
690 .await?;
691
692 Ok(())
693 }
694
695 pub async fn drop_streaming_jobs(
699 &self,
700 database_id: DatabaseId,
701 streaming_job_ids: Vec<JobId>,
702 state_table_ids: Vec<TableId>,
703 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
704 ) {
705 if !streaming_job_ids.is_empty() || !state_table_ids.is_empty() {
706 let cleanup_streaming_job_ids = streaming_job_ids.clone();
707 let cleanup_state_table_ids = state_table_ids.clone();
708 let run_result = self
709 .barrier_scheduler
710 .run_command(
711 database_id,
712 Command::DropStreamingJobs {
713 streaming_job_ids: streaming_job_ids.into_iter().collect(),
714 unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
715 dropped_sink_fragment_by_targets,
716 },
717 )
718 .await;
719 let result = match run_result {
720 Ok(()) => {
721 cleanup_dropped_streaming_jobs(
722 &self.refresh_manager,
723 &self.hummock_manager,
724 &self.metadata_manager,
725 cleanup_streaming_job_ids,
726 cleanup_state_table_ids,
727 "drop_streaming_jobs",
728 )
729 .await
730 }
731 Err(err) => Err(err),
732 };
733 let _ = result.inspect_err(|err| {
734 tracing::error!(error = ?err.as_report(), "failed to run drop command");
735 });
736 }
737 }
738
739 pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
745 if job_ids.is_empty() {
746 return Ok(vec![]);
747 }
748
749 let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
750 let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
751
752 let futures = receivers.into_iter().map(|(id, receiver)| async move {
753 if let Ok(cancelled) = receiver.await
754 && cancelled
755 {
756 tracing::info!("canceled streaming job {id}");
757 Ok(id)
758 } else {
759 Err(MetaError::from(anyhow::anyhow!(
760 "failed to cancel streaming job {id}"
761 )))
762 }
763 });
764 let mut cancelled_ids = join_all(futures)
765 .await
766 .into_iter()
767 .collect::<MetaResult<Vec<_>>>()?;
768
769 let futures = background_job_ids.into_iter().map(|id| async move {
772 let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
773 if fragment.is_created() {
774 tracing::warn!(
775 "streaming job {} is already created, ignore cancel request",
776 id
777 );
778 return Ok(None);
779 }
780 if fragment.is_created() {
781 Err(MetaError::invalid_parameter(format!(
782 "streaming job {} is already created",
783 id
784 )))?;
785 }
786
787 let cancel_command = self
788 .metadata_manager
789 .catalog_controller
790 .build_cancel_command(&fragment)
791 .await?;
792 let cleanup_state_table_ids = fragment.all_table_ids().collect_vec();
793
794 let (_, database_id) = self
795 .metadata_manager
796 .catalog_controller
797 .try_abort_creating_streaming_job(id, true)
798 .await?;
799
800 if let Some(database_id) = database_id {
801 self.barrier_scheduler
802 .run_command(database_id, cancel_command)
803 .await?;
804 cleanup_dropped_streaming_jobs(
805 &self.refresh_manager,
806 &self.hummock_manager,
807 &self.metadata_manager,
808 [id],
809 cleanup_state_table_ids,
810 "cancel_streaming_job",
811 )
812 .await?;
813 }
814
815 tracing::info!(?id, "cancelled background streaming job");
816 Ok(Some(id))
817 });
818 let cancelled_recovered_ids = join_all(futures)
819 .await
820 .into_iter()
821 .collect::<MetaResult<Vec<_>>>()?;
822
823 cancelled_ids.extend(cancelled_recovered_ids.into_iter().flatten());
824 Ok(cancelled_ids)
825 }
826
827 pub(crate) async fn reschedule_streaming_job(
828 &self,
829 job_id: JobId,
830 policy: ReschedulePolicy,
831 deferred: bool,
832 ) -> MetaResult<()> {
833 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
834
835 let background_jobs = self
836 .metadata_manager
837 .list_background_creating_jobs()
838 .await?;
839
840 if !background_jobs.is_empty() {
841 let blocked_jobs = self
842 .metadata_manager
843 .collect_reschedule_blocked_jobs_for_creating_jobs(&background_jobs, !deferred)
844 .await?;
845
846 if blocked_jobs.contains(&job_id) {
847 bail!(
848 "Cannot alter the job {} because it is blocked by creating unreschedulable backfill jobs",
849 job_id,
850 );
851 }
852 }
853
854 let commands = self
855 .scale_controller
856 .reschedule_inplace(HashMap::from([(job_id, policy)]))
857 .await?;
858
859 if !deferred {
860 let _source_pause_guard = self.source_manager.pause_tick().await;
861
862 for (database_id, command) in commands {
863 self.barrier_scheduler
864 .run_command(database_id, command)
865 .await?;
866 }
867 }
868
869 Ok(())
870 }
871
872 pub(crate) async fn reschedule_streaming_job_backfill_parallelism(
873 &self,
874 job_id: JobId,
875 parallelism: Option<ParallelismPolicy>,
876 deferred: bool,
877 ) -> MetaResult<()> {
878 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
879
880 let background_jobs = self
881 .metadata_manager
882 .list_background_creating_jobs()
883 .await?;
884
885 if !background_jobs.is_empty() {
886 let unreschedulable = self
887 .metadata_manager
888 .collect_unreschedulable_backfill_jobs(&background_jobs, !deferred)
889 .await?;
890
891 if unreschedulable.contains(&job_id) {
892 bail!(
893 "Cannot alter the job {} because it is a non-reschedulable background backfill job",
894 job_id,
895 );
896 }
897 }
898
899 let commands = self
900 .scale_controller
901 .reschedule_backfill_parallelism_inplace(HashMap::from([(job_id, parallelism)]))
902 .await?;
903
904 if !deferred {
905 let _source_pause_guard = self.source_manager.pause_tick().await;
906
907 for (database_id, command) in commands {
908 self.barrier_scheduler
909 .run_command(database_id, command)
910 .await?;
911 }
912 }
913
914 Ok(())
915 }
916
917 pub(crate) async fn reschedule_cdc_table_backfill(
919 &self,
920 job_id: JobId,
921 target: ReschedulePolicy,
922 ) -> MetaResult<()> {
923 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
924
925 let parallelism_policy = match target {
926 ReschedulePolicy::Parallelism(policy)
927 if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
928 {
929 policy
930 }
931 _ => bail_invalid_parameter!(
932 "CDC backfill reschedule only supports fixed parallelism targets"
933 ),
934 };
935
936 let cdc_fragment_id = {
937 let inner = self.metadata_manager.catalog_controller.inner.read().await;
938 let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
939 .select_only()
940 .columns([
941 fragment::Column::FragmentId,
942 fragment::Column::FragmentTypeMask,
943 ])
944 .filter(fragment::Column::JobId.eq(job_id))
945 .into_tuple()
946 .all(&inner.db)
947 .await?;
948
949 let cdc_fragments = fragments
950 .into_iter()
951 .filter_map(|(fragment_id, mask)| {
952 FragmentTypeMask::from(mask)
953 .contains(FragmentTypeFlag::StreamCdcScan)
954 .then_some(fragment_id)
955 })
956 .collect_vec();
957
958 match cdc_fragments.len() {
959 0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
960 1 => cdc_fragments[0],
961 _ => bail_invalid_parameter!(
962 "multiple StreamCdcScan fragments found for job {}; expected exactly one",
963 job_id
964 ),
965 }
966 };
967
968 let fragment_policy = HashMap::from([(
969 cdc_fragment_id,
970 Some(parallelism_policy.parallelism.clone()),
971 )]);
972
973 let commands = self
974 .scale_controller
975 .reschedule_fragment_inplace(fragment_policy)
976 .await?;
977
978 let _source_pause_guard = self.source_manager.pause_tick().await;
979
980 for (database_id, command) in commands {
981 self.barrier_scheduler
982 .run_command(database_id, command)
983 .await?;
984 }
985
986 Ok(())
987 }
988
989 pub(crate) async fn reschedule_fragments(
990 &self,
991 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
992 ) -> MetaResult<()> {
993 if fragment_targets.is_empty() {
994 return Ok(());
995 }
996
997 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
998
999 let fragment_policy = fragment_targets
1000 .into_iter()
1001 .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
1002 .collect();
1003
1004 let commands = self
1005 .scale_controller
1006 .reschedule_fragment_inplace(fragment_policy)
1007 .await?;
1008
1009 let _source_pause_guard = self.source_manager.pause_tick().await;
1010
1011 for (database_id, command) in commands {
1012 self.barrier_scheduler
1013 .run_command(database_id, command)
1014 .await?;
1015 }
1016
1017 Ok(())
1018 }
1019
1020 pub async fn create_subscription(
1022 self: &Arc<Self>,
1023 subscription: &Subscription,
1024 ) -> MetaResult<()> {
1025 let command = Command::CreateSubscription {
1026 subscription_id: subscription.id,
1027 upstream_mv_table_id: subscription.dependent_table_id,
1028 retention_second: subscription.retention_seconds,
1029 };
1030
1031 tracing::debug!("sending Command::CreateSubscription");
1032 self.barrier_scheduler
1033 .run_command(subscription.database_id, command)
1034 .await?;
1035 Ok(())
1036 }
1037
1038 pub async fn drop_subscription(
1040 self: &Arc<Self>,
1041 database_id: DatabaseId,
1042 subscription_id: SubscriptionId,
1043 table_id: TableId,
1044 ) {
1045 let command = Command::DropSubscription {
1046 subscription_id,
1047 upstream_mv_table_id: table_id,
1048 };
1049
1050 tracing::debug!("sending Command::DropSubscriptions");
1051 let _ = self
1052 .barrier_scheduler
1053 .run_command(database_id, command)
1054 .await
1055 .inspect_err(|err| {
1056 tracing::error!(error = ?err.as_report(), "failed to run drop command");
1057 });
1058 }
1059
1060 pub async fn alter_subscription_retention(
1061 self: &Arc<Self>,
1062 database_id: DatabaseId,
1063 subscription_id: SubscriptionId,
1064 table_id: TableId,
1065 retention_second: u64,
1066 ) -> MetaResult<()> {
1067 let command = Command::AlterSubscriptionRetention {
1068 subscription_id,
1069 upstream_mv_table_id: table_id,
1070 retention_second,
1071 };
1072
1073 tracing::debug!("sending Command::AlterSubscriptionRetention");
1074 self.barrier_scheduler
1075 .run_command(database_id, command)
1076 .await?;
1077 Ok(())
1078 }
1079}