1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::iter;
17use std::sync::Arc;
18
19use await_tree::span;
20use futures::future::join_all;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::id::{JobId, SinkId};
26use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
27use risingwave_meta_model::prelude::Fragment as FragmentModel;
28use risingwave_meta_model::{StreamingParallelism, fragment};
29use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
30use risingwave_pb::expr::PbExprNode;
31use risingwave_pb::meta::table_fragments::ActorStatus;
32use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
33use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
34use thiserror_ext::AsReport;
35use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
36use tracing::Instrument;
37
38use super::{FragmentBackfillOrder, Locations, ReschedulePolicy, ScaleControllerRef};
39use crate::barrier::{
40 BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
41 ReplaceStreamJobPlan, SnapshotBackfillInfo,
42};
43use crate::controller::catalog::DropTableConnectorContext;
44use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
45use crate::error::bail_invalid_parameter;
46use crate::manager::{
47 MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
48};
49use crate::model::{
50 ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
51 FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate,
52 SubscriptionId,
53};
54use crate::stream::SourceManagerRef;
55use crate::stream::cdc::{
56 assign_cdc_table_snapshot_splits, is_parallelized_backfill_enabled_cdc_scan_fragment,
57};
58use crate::{MetaError, MetaResult};
59
60pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
61
62#[derive(Default)]
63pub struct CreateStreamingJobOption {
64 }
66
67#[derive(Debug, Clone)]
68pub struct UpstreamSinkInfo {
69 pub sink_id: SinkId,
70 pub sink_fragment_id: FragmentId,
71 pub sink_output_fields: Vec<PbField>,
72 pub sink_original_target_columns: Vec<PbColumnCatalog>,
74 pub project_exprs: Vec<PbExprNode>,
75 pub new_sink_downstream: DownstreamFragmentRelation,
76}
77
78pub struct CreateStreamingJobContext {
82 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
84 pub new_no_shuffle: FragmentNewNoShuffle,
85 pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
86
87 pub building_locations: Locations,
89
90 pub definition: String,
92
93 pub create_type: CreateType,
94
95 pub job_type: StreamingJobType,
96
97 pub new_upstream_sink: Option<UpstreamSinkInfo>,
99
100 pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
101 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
102
103 pub option: CreateStreamingJobOption,
104
105 pub streaming_job: StreamingJob,
106
107 pub fragment_backfill_ordering: FragmentBackfillOrder,
108
109 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
110}
111
112struct StreamingJobExecution {
113 id: JobId,
114 shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
115 _permit: OwnedSemaphorePermit,
116}
117
118impl StreamingJobExecution {
119 fn new(
120 id: JobId,
121 shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
122 permit: OwnedSemaphorePermit,
123 ) -> Self {
124 Self {
125 id,
126 shutdown_tx: Some(shutdown_tx),
127 _permit: permit,
128 }
129 }
130}
131
132#[derive(Default)]
133struct CreatingStreamingJobInfo {
134 streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
135}
136
137impl CreatingStreamingJobInfo {
138 async fn add_job(&self, job: StreamingJobExecution) {
139 let mut jobs = self.streaming_jobs.lock().await;
140 jobs.insert(job.id, job);
141 }
142
143 async fn delete_job(&self, job_id: JobId) {
144 let mut jobs = self.streaming_jobs.lock().await;
145 jobs.remove(&job_id);
146 }
147
148 async fn cancel_jobs(
149 &self,
150 job_ids: Vec<JobId>,
151 ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
152 let mut jobs = self.streaming_jobs.lock().await;
153 let mut receivers = HashMap::new();
154 let mut background_job_ids = vec![];
155 for job_id in job_ids {
156 if let Some(job) = jobs.get_mut(&job_id) {
157 if let Some(shutdown_tx) = job.shutdown_tx.take() {
158 let (tx, rx) = oneshot::channel();
159 match shutdown_tx.send(tx) {
160 Ok(()) => {
161 receivers.insert(job_id, rx);
162 }
163 Err(_) => {
164 return Err(anyhow::anyhow!(
165 "failed to send shutdown signal for streaming job {}: receiver dropped",
166 job_id
167 )
168 .into());
169 }
170 }
171 }
172 } else {
173 background_job_ids.push(job_id);
175 }
176 }
177
178 Ok((receivers, background_job_ids))
179 }
180}
181
182type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
183
184#[derive(Debug, Clone)]
185pub struct AutoRefreshSchemaSinkContext {
186 pub tmp_sink_id: SinkId,
187 pub original_sink: PbSink,
188 pub original_fragment: Fragment,
189 pub new_schema: Vec<PbColumnCatalog>,
190 pub newly_add_fields: Vec<Field>,
191 pub new_fragment: Fragment,
192 pub new_log_store_table: Option<PbTable>,
193 pub actor_status: BTreeMap<ActorId, ActorStatus>,
194}
195
196impl AutoRefreshSchemaSinkContext {
197 pub fn new_fragment_info(&self) -> InflightFragmentInfo {
198 InflightFragmentInfo {
199 fragment_id: self.new_fragment.fragment_id,
200 distribution_type: self.new_fragment.distribution_type.into(),
201 fragment_type_mask: self.new_fragment.fragment_type_mask,
202 vnode_count: self.new_fragment.vnode_count(),
203 nodes: self.new_fragment.nodes.clone(),
204 actors: self
205 .new_fragment
206 .actors
207 .iter()
208 .map(|actor| {
209 (
210 actor.actor_id as _,
211 InflightActorInfo {
212 worker_id: self.actor_status[&actor.actor_id]
213 .location
214 .as_ref()
215 .unwrap()
216 .worker_node_id,
217 vnode_bitmap: actor.vnode_bitmap.clone(),
218 splits: vec![],
219 },
220 )
221 })
222 .collect(),
223 state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
224 }
225 }
226}
227
228pub struct ReplaceStreamJobContext {
232 pub old_fragments: StreamJobFragments,
234
235 pub replace_upstream: FragmentReplaceUpstream,
237 pub new_no_shuffle: FragmentNewNoShuffle,
238
239 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
241
242 pub building_locations: Locations,
244
245 pub streaming_job: StreamingJob,
246
247 pub tmp_id: JobId,
248
249 pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
251
252 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
253}
254
255pub struct GlobalStreamManager {
257 pub env: MetaSrvEnv,
258
259 pub metadata_manager: MetadataManager,
260
261 pub barrier_scheduler: BarrierScheduler,
263
264 pub source_manager: SourceManagerRef,
266
267 creating_job_info: CreatingStreamingJobInfoRef,
269
270 pub scale_controller: ScaleControllerRef,
271}
272
273impl GlobalStreamManager {
274 pub fn new(
275 env: MetaSrvEnv,
276 metadata_manager: MetadataManager,
277 barrier_scheduler: BarrierScheduler,
278 source_manager: SourceManagerRef,
279 scale_controller: ScaleControllerRef,
280 ) -> MetaResult<Self> {
281 Ok(Self {
282 env,
283 metadata_manager,
284 barrier_scheduler,
285 source_manager,
286 creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
287 scale_controller,
288 })
289 }
290
291 #[await_tree::instrument]
302 pub async fn create_streaming_job(
303 self: &Arc<Self>,
304 stream_job_fragments: StreamJobFragmentsToCreate,
305 ctx: CreateStreamingJobContext,
306 permit: OwnedSemaphorePermit,
307 ) -> MetaResult<NotificationVersion> {
308 let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
309 let await_tree_span = span!(
310 "{:?}({})",
311 ctx.streaming_job.job_type(),
312 ctx.streaming_job.name()
313 );
314
315 let job_id = stream_job_fragments.stream_job_id();
316 let database_id = ctx.streaming_job.database_id();
317
318 let (cancel_tx, cancel_rx) = oneshot::channel();
319 let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
320 self.creating_job_info.add_job(execution).await;
321
322 let stream_manager = self.clone();
323 let fut = async move {
324 let create_type = ctx.create_type;
325 let streaming_job = stream_manager
326 .run_create_streaming_job_command(stream_job_fragments, ctx)
327 .await?;
328 let version = match create_type {
329 CreateType::Background => {
330 stream_manager
331 .env
332 .notification_manager_ref()
333 .current_version()
334 .await
335 }
336 CreateType::Foreground => {
337 stream_manager
338 .metadata_manager
339 .wait_streaming_job_finished(database_id, streaming_job.id() as _)
340 .await?
341 }
342 CreateType::Unspecified => unreachable!(),
343 };
344
345 tracing::debug!(?streaming_job, "stream job finish");
346 Ok(version)
347 }
348 .in_current_span();
349
350 let create_fut = (self.env.await_tree_reg())
351 .register(await_tree_key, await_tree_span)
352 .instrument(Box::pin(fut));
353
354 let result = tokio::select! {
355 biased;
356
357 res = create_fut => res,
358 notifier = cancel_rx => {
359 let notifier = notifier.expect("sender should not be dropped");
360 tracing::debug!(id=%job_id, "cancelling streaming job");
361
362 if let Ok(job_fragments) = self.metadata_manager.get_job_fragments_by_id(job_id)
363 .await {
364 if self.barrier_scheduler.try_cancel_scheduled_create(database_id, job_id) {
366 tracing::debug!("cancelling streaming job {job_id} in buffer queue.");
367 } else if !job_fragments.is_created() {
368 tracing::debug!("cancelling streaming job {job_id} by issue cancel command.");
369
370 let cancel_command = self.metadata_manager.catalog_controller
371 .build_cancel_command(&job_fragments)
372 .await?;
373 self.metadata_manager.catalog_controller
374 .try_abort_creating_streaming_job(job_id, true)
375 .await?;
376
377 self.barrier_scheduler.run_command(database_id, cancel_command).await?;
378 } else {
379 let _ = notifier.send(false).inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
381 return self.metadata_manager.wait_streaming_job_finished(database_id, job_id).await;
382 }
383 }
384 notifier.send(true).expect("receiver should not be dropped");
385 Err(MetaError::cancelled("create"))
386 }
387 };
388
389 tracing::info!("cleaning creating job info: {}", job_id);
390 self.creating_job_info.delete_job(job_id).await;
391 result
392 }
393
394 #[await_tree::instrument]
397 async fn run_create_streaming_job_command(
398 &self,
399 stream_job_fragments: StreamJobFragmentsToCreate,
400 CreateStreamingJobContext {
401 streaming_job,
402 upstream_fragment_downstreams,
403 new_no_shuffle,
404 upstream_actors,
405 definition,
406 create_type,
407 job_type,
408 new_upstream_sink,
409 snapshot_backfill_info,
410 cross_db_snapshot_backfill_info,
411 fragment_backfill_ordering,
412 locality_fragment_state_table_mapping,
413 ..
414 }: CreateStreamingJobContext,
415 ) -> MetaResult<StreamingJob> {
416 tracing::debug!(
417 table_id = %stream_job_fragments.stream_job_id(),
418 "built actors finished"
419 );
420
421 let mut init_split_assignment = self
426 .source_manager
427 .allocate_splits(&stream_job_fragments)
428 .await?;
429
430 init_split_assignment.extend(
431 self.source_manager
432 .allocate_splits_for_backfill(
433 &stream_job_fragments,
434 &new_no_shuffle,
435 &upstream_actors,
436 )
437 .await?,
438 );
439
440 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
441 stream_job_fragments.stream_job_id,
442 &stream_job_fragments,
443 self.env.meta_store_ref(),
444 )
445 .await?;
446 let cdc_table_snapshot_split_assignment = if !cdc_table_snapshot_split_assignment.is_empty()
447 {
448 self.env.cdc_table_backfill_tracker.track_new_job(
449 stream_job_fragments.stream_job_id,
450 cdc_table_snapshot_split_assignment
451 .values()
452 .map(|s| u64::try_from(s.len()).unwrap())
453 .sum(),
454 );
455 self.env
456 .cdc_table_backfill_tracker
457 .add_fragment_table_mapping(
458 stream_job_fragments
459 .fragments
460 .values()
461 .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
462 .map(|f| f.fragment_id),
463 stream_job_fragments.stream_job_id,
464 );
465 CdcTableSnapshotSplitAssignmentWithGeneration::new(
466 cdc_table_snapshot_split_assignment,
467 self.env
468 .cdc_table_backfill_tracker
469 .next_generation(iter::once(stream_job_fragments.stream_job_id)),
470 )
471 } else {
472 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
473 };
474
475 let info = CreateStreamingJobCommandInfo {
476 stream_job_fragments,
477 upstream_fragment_downstreams,
478 init_split_assignment,
479 definition: definition.clone(),
480 streaming_job: streaming_job.clone(),
481 job_type,
482 create_type,
483 fragment_backfill_ordering,
484 cdc_table_snapshot_split_assignment,
485 locality_fragment_state_table_mapping,
486 };
487
488 let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
489 tracing::debug!(
490 ?snapshot_backfill_info,
491 "sending Command::CreateSnapshotBackfillStreamingJob"
492 );
493 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
494 } else {
495 tracing::debug!("sending Command::CreateStreamingJob");
496 if let Some(new_upstream_sink) = new_upstream_sink {
497 CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
498 } else {
499 CreateStreamingJobType::Normal
500 }
501 };
502
503 let command = Command::CreateStreamingJob {
504 info,
505 job_type,
506 cross_db_snapshot_backfill_info,
507 };
508
509 self.barrier_scheduler
510 .run_command(streaming_job.database_id(), command)
511 .await?;
512
513 tracing::debug!(?streaming_job, "first barrier collected for stream job");
514
515 Ok(streaming_job)
516 }
517
518 pub async fn replace_stream_job(
520 &self,
521 new_fragments: StreamJobFragmentsToCreate,
522 ReplaceStreamJobContext {
523 old_fragments,
524 replace_upstream,
525 new_no_shuffle,
526 upstream_fragment_downstreams,
527 tmp_id,
528 streaming_job,
529 drop_table_connector_ctx,
530 auto_refresh_schema_sinks,
531 ..
532 }: ReplaceStreamJobContext,
533 ) -> MetaResult<()> {
534 let init_split_assignment = if streaming_job.is_source() {
535 self.source_manager
536 .allocate_splits_for_replace_source(
537 &new_fragments,
538 &replace_upstream,
539 &new_no_shuffle,
540 )
541 .await?
542 } else {
543 self.source_manager.allocate_splits(&new_fragments).await?
544 };
545 tracing::info!(
546 "replace_stream_job - allocate split: {:?}",
547 init_split_assignment
548 );
549
550 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
551 old_fragments.stream_job_id,
552 &new_fragments.inner,
553 self.env.meta_store_ref(),
554 )
555 .await?;
556
557 self.barrier_scheduler
558 .run_command(
559 streaming_job.database_id(),
560 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
561 old_fragments,
562 new_fragments,
563 replace_upstream,
564 upstream_fragment_downstreams,
565 init_split_assignment,
566 streaming_job,
567 tmp_id,
568 to_drop_state_table_ids: {
569 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
570 vec![drop_table_connector_ctx.to_remove_state_table_id]
571 } else {
572 Vec::new()
573 }
574 },
575 auto_refresh_schema_sinks,
576 cdc_table_snapshot_split_assignment,
577 }),
578 )
579 .await?;
580
581 Ok(())
582 }
583
584 pub async fn drop_streaming_jobs(
588 &self,
589 database_id: DatabaseId,
590 removed_actors: Vec<ActorId>,
591 streaming_job_ids: Vec<JobId>,
592 state_table_ids: Vec<TableId>,
593 fragment_ids: HashSet<FragmentId>,
594 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
595 ) {
596 if !removed_actors.is_empty()
597 || !streaming_job_ids.is_empty()
598 || !state_table_ids.is_empty()
599 {
600 let _ = self
601 .barrier_scheduler
602 .run_command(
603 database_id,
604 Command::DropStreamingJobs {
605 streaming_job_ids: streaming_job_ids.into_iter().collect(),
606 actors: removed_actors,
607 unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
608 unregistered_fragment_ids: fragment_ids,
609 dropped_sink_fragment_by_targets,
610 },
611 )
612 .await
613 .inspect_err(|err| {
614 tracing::error!(error = ?err.as_report(), "failed to run drop command");
615 });
616 }
617 }
618
619 pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
626 if job_ids.is_empty() {
627 return Ok(vec![]);
628 }
629
630 let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
631 let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
632
633 let futures = receivers.into_iter().map(|(id, receiver)| async move {
634 if let Ok(cancelled) = receiver.await
635 && cancelled
636 {
637 tracing::info!("canceled streaming job {id}");
638 Ok(id)
639 } else {
640 Err(MetaError::from(anyhow::anyhow!(
641 "failed to cancel streaming job {id}"
642 )))
643 }
644 });
645 let mut cancelled_ids = join_all(futures)
646 .await
647 .into_iter()
648 .collect::<MetaResult<Vec<_>>>()?;
649
650 let futures = background_job_ids.into_iter().map(|id| async move {
653 let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
654 if fragment.is_created() {
655 tracing::warn!(
656 "streaming job {} is already created, ignore cancel request",
657 id
658 );
659 return Ok(None);
660 }
661 if fragment.is_created() {
662 Err(MetaError::invalid_parameter(format!(
663 "streaming job {} is already created",
664 id
665 )))?;
666 }
667
668 let cancel_command = self
669 .metadata_manager
670 .catalog_controller
671 .build_cancel_command(&fragment)
672 .await?;
673
674 let (_, database_id) = self
675 .metadata_manager
676 .catalog_controller
677 .try_abort_creating_streaming_job(id, true)
678 .await?;
679
680 if let Some(database_id) = database_id {
681 self.barrier_scheduler
682 .run_command(database_id, cancel_command)
683 .await?;
684 }
685
686 tracing::info!(?id, "cancelled background streaming job");
687 Ok(Some(id))
688 });
689 let cancelled_recovered_ids = join_all(futures)
690 .await
691 .into_iter()
692 .collect::<MetaResult<Vec<_>>>()?;
693
694 cancelled_ids.extend(cancelled_recovered_ids.into_iter().flatten());
695 Ok(cancelled_ids)
696 }
697
698 pub(crate) async fn reschedule_streaming_job(
699 &self,
700 job_id: JobId,
701 policy: ReschedulePolicy,
702 deferred: bool,
703 ) -> MetaResult<()> {
704 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
705
706 let background_jobs = self
707 .metadata_manager
708 .list_background_creating_jobs()
709 .await?;
710
711 if !background_jobs.is_empty() {
712 let related_jobs = self
713 .scale_controller
714 .resolve_related_no_shuffle_jobs(&background_jobs)
715 .await?;
716
717 if related_jobs.contains(&job_id) {
718 bail!(
719 "Cannot alter the job {} because the related job {:?} is currently being created",
720 job_id,
721 background_jobs,
722 );
723 }
724 }
725
726 let worker_nodes = self
727 .metadata_manager
728 .list_active_streaming_compute_nodes()
729 .await?
730 .into_iter()
731 .filter(|w| w.is_streaming_schedulable())
732 .collect_vec();
733 let workers = worker_nodes.into_iter().map(|x| (x.id, x)).collect();
734
735 let commands = self
736 .scale_controller
737 .reschedule_inplace(HashMap::from([(job_id, policy)]), workers)
738 .await?;
739
740 if !deferred {
741 let _source_pause_guard = self.source_manager.pause_tick().await;
742
743 for (database_id, command) in commands {
744 self.barrier_scheduler
745 .run_command(database_id, command)
746 .await?;
747 }
748 }
749
750 Ok(())
751 }
752
753 pub(crate) async fn reschedule_cdc_table_backfill(
755 &self,
756 job_id: JobId,
757 target: ReschedulePolicy,
758 ) -> MetaResult<()> {
759 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
760
761 let parallelism_policy = match target {
762 ReschedulePolicy::Parallelism(policy)
763 if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
764 {
765 policy
766 }
767 _ => bail_invalid_parameter!(
768 "CDC backfill reschedule only supports fixed parallelism targets"
769 ),
770 };
771
772 let worker_nodes = self
773 .metadata_manager
774 .list_active_streaming_compute_nodes()
775 .await?
776 .into_iter()
777 .filter(|w| w.is_streaming_schedulable())
778 .collect_vec();
779 let workers = worker_nodes.into_iter().map(|x| (x.id, x)).collect();
780
781 let cdc_fragment_id = {
782 let inner = self.metadata_manager.catalog_controller.inner.read().await;
783 let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
784 .select_only()
785 .columns([
786 fragment::Column::FragmentId,
787 fragment::Column::FragmentTypeMask,
788 ])
789 .filter(fragment::Column::JobId.eq(job_id))
790 .into_tuple()
791 .all(&inner.db)
792 .await?;
793
794 let cdc_fragments = fragments
795 .into_iter()
796 .filter_map(|(fragment_id, mask)| {
797 FragmentTypeMask::from(mask)
798 .contains(FragmentTypeFlag::StreamCdcScan)
799 .then_some(fragment_id)
800 })
801 .collect_vec();
802
803 match cdc_fragments.len() {
804 0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
805 1 => cdc_fragments[0],
806 _ => bail_invalid_parameter!(
807 "multiple StreamCdcScan fragments found for job {}; expected exactly one",
808 job_id
809 ),
810 }
811 };
812
813 let fragment_policy = HashMap::from([(
814 cdc_fragment_id,
815 Some(parallelism_policy.parallelism.clone()),
816 )]);
817
818 let commands = self
819 .scale_controller
820 .reschedule_fragment_inplace(fragment_policy, workers)
821 .await?;
822
823 let _source_pause_guard = self.source_manager.pause_tick().await;
824
825 for (database_id, command) in commands {
826 self.barrier_scheduler
827 .run_command(database_id, command)
828 .await?;
829 }
830
831 Ok(())
832 }
833
834 pub(crate) async fn reschedule_fragments(
835 &self,
836 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
837 ) -> MetaResult<()> {
838 if fragment_targets.is_empty() {
839 return Ok(());
840 }
841
842 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
843
844 let workers = self
845 .metadata_manager
846 .list_active_streaming_compute_nodes()
847 .await?
848 .into_iter()
849 .filter(|w| w.is_streaming_schedulable())
850 .map(|worker| (worker.id, worker))
851 .collect();
852
853 let fragment_policy = fragment_targets
854 .into_iter()
855 .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
856 .collect();
857
858 let commands = self
859 .scale_controller
860 .reschedule_fragment_inplace(fragment_policy, workers)
861 .await?;
862
863 let _source_pause_guard = self.source_manager.pause_tick().await;
864
865 for (database_id, command) in commands {
866 self.barrier_scheduler
867 .run_command(database_id, command)
868 .await?;
869 }
870
871 Ok(())
872 }
873
874 pub async fn create_subscription(
876 self: &Arc<Self>,
877 subscription: &Subscription,
878 ) -> MetaResult<()> {
879 let command = Command::CreateSubscription {
880 subscription_id: subscription.id,
881 upstream_mv_table_id: subscription.dependent_table_id,
882 retention_second: subscription.retention_seconds,
883 };
884
885 tracing::debug!("sending Command::CreateSubscription");
886 self.barrier_scheduler
887 .run_command(subscription.database_id, command)
888 .await?;
889 Ok(())
890 }
891
892 pub async fn drop_subscription(
894 self: &Arc<Self>,
895 database_id: DatabaseId,
896 subscription_id: SubscriptionId,
897 table_id: TableId,
898 ) {
899 let command = Command::DropSubscription {
900 subscription_id,
901 upstream_mv_table_id: table_id,
902 };
903
904 tracing::debug!("sending Command::DropSubscriptions");
905 let _ = self
906 .barrier_scheduler
907 .run_command(database_id, command)
908 .await
909 .inspect_err(|err| {
910 tracing::error!(error = ?err.as_report(), "failed to run drop command");
911 });
912 }
913}