1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::span;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::{JobId, SinkId};
25use risingwave_connector::source::CdcTableSnapshotSplitRaw;
26use risingwave_meta_model::prelude::Fragment as FragmentModel;
27use risingwave_meta_model::{StreamingParallelism, fragment};
28use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
29use risingwave_pb::expr::PbExprNode;
30use risingwave_pb::meta::table_fragments::ActorStatus;
31use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
32use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
33use thiserror_ext::AsReport;
34use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
35use tracing::Instrument;
36
37use super::{FragmentBackfillOrder, Locations, ReschedulePolicy, ScaleControllerRef};
38use crate::barrier::{
39 BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
40 ReplaceStreamJobPlan, SnapshotBackfillInfo,
41};
42use crate::controller::catalog::DropTableConnectorContext;
43use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
44use crate::error::bail_invalid_parameter;
45use crate::manager::{
46 MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
47};
48use crate::model::{
49 ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
50 FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate,
51 SubscriptionId,
52};
53use crate::stream::SourceManagerRef;
54use crate::{MetaError, MetaResult};
55
56pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
57
58#[derive(Default)]
59pub struct CreateStreamingJobOption {
60 }
62
63#[derive(Debug, Clone)]
64pub struct UpstreamSinkInfo {
65 pub sink_id: SinkId,
66 pub sink_fragment_id: FragmentId,
67 pub sink_output_fields: Vec<PbField>,
68 pub sink_original_target_columns: Vec<PbColumnCatalog>,
70 pub project_exprs: Vec<PbExprNode>,
71 pub new_sink_downstream: DownstreamFragmentRelation,
72}
73
74pub struct CreateStreamingJobContext {
78 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
80 pub new_no_shuffle: FragmentNewNoShuffle,
81 pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
82
83 pub building_locations: Locations,
85
86 pub definition: String,
88
89 pub create_type: CreateType,
90
91 pub job_type: StreamingJobType,
92
93 pub new_upstream_sink: Option<UpstreamSinkInfo>,
95
96 pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
97 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
98
99 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
100
101 pub option: CreateStreamingJobOption,
102
103 pub streaming_job: StreamingJob,
104
105 pub fragment_backfill_ordering: FragmentBackfillOrder,
106
107 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
108}
109
110struct StreamingJobExecution {
111 id: JobId,
112 shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
113 _permit: OwnedSemaphorePermit,
114}
115
116impl StreamingJobExecution {
117 fn new(
118 id: JobId,
119 shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
120 permit: OwnedSemaphorePermit,
121 ) -> Self {
122 Self {
123 id,
124 shutdown_tx: Some(shutdown_tx),
125 _permit: permit,
126 }
127 }
128}
129
130#[derive(Default)]
131struct CreatingStreamingJobInfo {
132 streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
133}
134
135impl CreatingStreamingJobInfo {
136 async fn add_job(&self, job: StreamingJobExecution) {
137 let mut jobs = self.streaming_jobs.lock().await;
138 jobs.insert(job.id, job);
139 }
140
141 async fn delete_job(&self, job_id: JobId) {
142 let mut jobs = self.streaming_jobs.lock().await;
143 jobs.remove(&job_id);
144 }
145
146 async fn cancel_jobs(
147 &self,
148 job_ids: Vec<JobId>,
149 ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
150 let mut jobs = self.streaming_jobs.lock().await;
151 let mut receivers = HashMap::new();
152 let mut background_job_ids = vec![];
153 for job_id in job_ids {
154 if let Some(job) = jobs.get_mut(&job_id) {
155 if let Some(shutdown_tx) = job.shutdown_tx.take() {
156 let (tx, rx) = oneshot::channel();
157 match shutdown_tx.send(tx) {
158 Ok(()) => {
159 receivers.insert(job_id, rx);
160 }
161 Err(_) => {
162 return Err(anyhow::anyhow!(
163 "failed to send shutdown signal for streaming job {}: receiver dropped",
164 job_id
165 )
166 .into());
167 }
168 }
169 }
170 } else {
171 background_job_ids.push(job_id);
173 }
174 }
175
176 Ok((receivers, background_job_ids))
177 }
178}
179
180type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
181
182#[derive(Debug, Clone)]
183pub struct AutoRefreshSchemaSinkContext {
184 pub tmp_sink_id: SinkId,
185 pub original_sink: PbSink,
186 pub original_fragment: Fragment,
187 pub new_schema: Vec<PbColumnCatalog>,
188 pub newly_add_fields: Vec<Field>,
189 pub new_fragment: Fragment,
190 pub new_log_store_table: Option<PbTable>,
191 pub actor_status: BTreeMap<ActorId, ActorStatus>,
192}
193
194impl AutoRefreshSchemaSinkContext {
195 pub fn new_fragment_info(&self) -> InflightFragmentInfo {
196 InflightFragmentInfo {
197 fragment_id: self.new_fragment.fragment_id,
198 distribution_type: self.new_fragment.distribution_type.into(),
199 fragment_type_mask: self.new_fragment.fragment_type_mask,
200 vnode_count: self.new_fragment.vnode_count(),
201 nodes: self.new_fragment.nodes.clone(),
202 actors: self
203 .new_fragment
204 .actors
205 .iter()
206 .map(|actor| {
207 (
208 actor.actor_id as _,
209 InflightActorInfo {
210 worker_id: self.actor_status[&actor.actor_id]
211 .location
212 .as_ref()
213 .unwrap()
214 .worker_node_id,
215 vnode_bitmap: actor.vnode_bitmap.clone(),
216 splits: vec![],
217 },
218 )
219 })
220 .collect(),
221 state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
222 }
223 }
224}
225
226pub struct ReplaceStreamJobContext {
230 pub old_fragments: StreamJobFragments,
232
233 pub replace_upstream: FragmentReplaceUpstream,
235 pub new_no_shuffle: FragmentNewNoShuffle,
236
237 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
239
240 pub building_locations: Locations,
242
243 pub streaming_job: StreamingJob,
244
245 pub tmp_id: JobId,
246
247 pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
249
250 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
251}
252
253pub struct GlobalStreamManager {
255 pub env: MetaSrvEnv,
256
257 pub metadata_manager: MetadataManager,
258
259 pub barrier_scheduler: BarrierScheduler,
261
262 pub source_manager: SourceManagerRef,
264
265 creating_job_info: CreatingStreamingJobInfoRef,
267
268 pub scale_controller: ScaleControllerRef,
269}
270
271impl GlobalStreamManager {
272 pub fn new(
273 env: MetaSrvEnv,
274 metadata_manager: MetadataManager,
275 barrier_scheduler: BarrierScheduler,
276 source_manager: SourceManagerRef,
277 scale_controller: ScaleControllerRef,
278 ) -> MetaResult<Self> {
279 Ok(Self {
280 env,
281 metadata_manager,
282 barrier_scheduler,
283 source_manager,
284 creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
285 scale_controller,
286 })
287 }
288
289 #[await_tree::instrument]
300 pub async fn create_streaming_job(
301 self: &Arc<Self>,
302 stream_job_fragments: StreamJobFragmentsToCreate,
303 ctx: CreateStreamingJobContext,
304 permit: OwnedSemaphorePermit,
305 ) -> MetaResult<NotificationVersion> {
306 let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
307 let await_tree_span = span!(
308 "{:?}({})",
309 ctx.streaming_job.job_type(),
310 ctx.streaming_job.name()
311 );
312
313 let job_id = stream_job_fragments.stream_job_id();
314 let database_id = ctx.streaming_job.database_id();
315
316 let (cancel_tx, cancel_rx) = oneshot::channel();
317 let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
318 self.creating_job_info.add_job(execution).await;
319
320 let stream_manager = self.clone();
321 let fut = async move {
322 let create_type = ctx.create_type;
323 let streaming_job = stream_manager
324 .run_create_streaming_job_command(stream_job_fragments, ctx)
325 .await?;
326 let version = match create_type {
327 CreateType::Background => {
328 stream_manager
329 .env
330 .notification_manager_ref()
331 .current_version()
332 .await
333 }
334 CreateType::Foreground => {
335 stream_manager
336 .metadata_manager
337 .wait_streaming_job_finished(database_id, streaming_job.id() as _)
338 .await?
339 }
340 CreateType::Unspecified => unreachable!(),
341 };
342
343 tracing::debug!(?streaming_job, "stream job finish");
344 Ok(version)
345 }
346 .in_current_span();
347
348 let create_fut = (self.env.await_tree_reg())
349 .register(await_tree_key, await_tree_span)
350 .instrument(Box::pin(fut));
351
352 let result = tokio::select! {
353 biased;
354
355 res = create_fut => res,
356 notifier = cancel_rx => {
357 let notifier = notifier.expect("sender should not be dropped");
358 tracing::debug!(id=%job_id, "cancelling streaming job");
359
360 if let Ok(job_fragments) = self.metadata_manager.get_job_fragments_by_id(job_id)
361 .await {
362 if self.barrier_scheduler.try_cancel_scheduled_create(database_id, job_id) {
364 tracing::debug!("cancelling streaming job {job_id} in buffer queue.");
365 } else if !job_fragments.is_created() {
366 tracing::debug!("cancelling streaming job {job_id} by issue cancel command.");
367
368 let cancel_command = self.metadata_manager.catalog_controller
369 .build_cancel_command(&job_fragments)
370 .await?;
371 self.metadata_manager.catalog_controller
372 .try_abort_creating_streaming_job(job_id, true)
373 .await?;
374
375 self.barrier_scheduler.run_command(database_id, cancel_command).await?;
376 } else {
377 let _ = notifier.send(false).inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
379 return self.metadata_manager.wait_streaming_job_finished(database_id, job_id).await;
380 }
381 }
382 notifier.send(true).expect("receiver should not be dropped");
383 Err(MetaError::cancelled("create"))
384 }
385 };
386
387 tracing::debug!("cleaning creating job info: {}", job_id);
388 self.creating_job_info.delete_job(job_id).await;
389 result
390 }
391
392 #[await_tree::instrument]
395 async fn run_create_streaming_job_command(
396 &self,
397 stream_job_fragments: StreamJobFragmentsToCreate,
398 CreateStreamingJobContext {
399 streaming_job,
400 upstream_fragment_downstreams,
401 new_no_shuffle,
402 upstream_actors,
403 definition,
404 create_type,
405 job_type,
406 new_upstream_sink,
407 snapshot_backfill_info,
408 cross_db_snapshot_backfill_info,
409 fragment_backfill_ordering,
410 locality_fragment_state_table_mapping,
411 cdc_table_snapshot_splits,
412 ..
413 }: CreateStreamingJobContext,
414 ) -> MetaResult<StreamingJob> {
415 tracing::debug!(
416 table_id = %stream_job_fragments.stream_job_id(),
417 "built actors finished"
418 );
419
420 let mut init_split_assignment = self
425 .source_manager
426 .allocate_splits(&stream_job_fragments)
427 .await?;
428
429 init_split_assignment.extend(
430 self.source_manager
431 .allocate_splits_for_backfill(
432 &stream_job_fragments,
433 &new_no_shuffle,
434 &upstream_actors,
435 )
436 .await?,
437 );
438
439 let info = CreateStreamingJobCommandInfo {
440 stream_job_fragments,
441 upstream_fragment_downstreams,
442 init_split_assignment,
443 definition: definition.clone(),
444 streaming_job: streaming_job.clone(),
445 job_type,
446 create_type,
447 fragment_backfill_ordering,
448 cdc_table_snapshot_splits,
449 locality_fragment_state_table_mapping,
450 };
451
452 let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
453 tracing::debug!(
454 ?snapshot_backfill_info,
455 "sending Command::CreateSnapshotBackfillStreamingJob"
456 );
457 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
458 } else {
459 tracing::debug!("sending Command::CreateStreamingJob");
460 if let Some(new_upstream_sink) = new_upstream_sink {
461 CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
462 } else {
463 CreateStreamingJobType::Normal
464 }
465 };
466
467 let command = Command::CreateStreamingJob {
468 info,
469 job_type,
470 cross_db_snapshot_backfill_info,
471 };
472
473 self.barrier_scheduler
474 .run_command(streaming_job.database_id(), command)
475 .await?;
476
477 tracing::debug!(?streaming_job, "first barrier collected for stream job");
478
479 Ok(streaming_job)
480 }
481
482 pub async fn replace_stream_job(
484 &self,
485 new_fragments: StreamJobFragmentsToCreate,
486 ReplaceStreamJobContext {
487 old_fragments,
488 replace_upstream,
489 new_no_shuffle,
490 upstream_fragment_downstreams,
491 tmp_id,
492 streaming_job,
493 drop_table_connector_ctx,
494 auto_refresh_schema_sinks,
495 ..
496 }: ReplaceStreamJobContext,
497 ) -> MetaResult<()> {
498 let init_split_assignment = if streaming_job.is_source() {
499 self.source_manager
500 .allocate_splits_for_replace_source(
501 &new_fragments,
502 &replace_upstream,
503 &new_no_shuffle,
504 )
505 .await?
506 } else {
507 self.source_manager.allocate_splits(&new_fragments).await?
508 };
509 tracing::info!(
510 "replace_stream_job - allocate split: {:?}",
511 init_split_assignment
512 );
513
514 self.barrier_scheduler
515 .run_command(
516 streaming_job.database_id(),
517 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
518 old_fragments,
519 new_fragments,
520 replace_upstream,
521 upstream_fragment_downstreams,
522 init_split_assignment,
523 streaming_job,
524 tmp_id,
525 to_drop_state_table_ids: {
526 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
527 vec![drop_table_connector_ctx.to_remove_state_table_id]
528 } else {
529 Vec::new()
530 }
531 },
532 auto_refresh_schema_sinks,
533 }),
534 )
535 .await?;
536
537 Ok(())
538 }
539
540 pub async fn drop_streaming_jobs(
544 &self,
545 database_id: DatabaseId,
546 removed_actors: Vec<ActorId>,
547 streaming_job_ids: Vec<JobId>,
548 state_table_ids: Vec<TableId>,
549 fragment_ids: HashSet<FragmentId>,
550 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
551 ) {
552 if !removed_actors.is_empty()
553 || !streaming_job_ids.is_empty()
554 || !state_table_ids.is_empty()
555 {
556 let _ = self
557 .barrier_scheduler
558 .run_command(
559 database_id,
560 Command::DropStreamingJobs {
561 streaming_job_ids: streaming_job_ids.into_iter().collect(),
562 actors: removed_actors,
563 unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
564 unregistered_fragment_ids: fragment_ids,
565 dropped_sink_fragment_by_targets,
566 },
567 )
568 .await
569 .inspect_err(|err| {
570 tracing::error!(error = ?err.as_report(), "failed to run drop command");
571 });
572 }
573 }
574
575 pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
582 if job_ids.is_empty() {
583 return Ok(vec![]);
584 }
585
586 let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
587 let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
588
589 let futures = receivers.into_iter().map(|(id, receiver)| async move {
590 if let Ok(cancelled) = receiver.await
591 && cancelled
592 {
593 tracing::info!("canceled streaming job {id}");
594 Ok(id)
595 } else {
596 Err(MetaError::from(anyhow::anyhow!(
597 "failed to cancel streaming job {id}"
598 )))
599 }
600 });
601 let mut cancelled_ids = join_all(futures)
602 .await
603 .into_iter()
604 .collect::<MetaResult<Vec<_>>>()?;
605
606 let futures = background_job_ids.into_iter().map(|id| async move {
609 let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
610 if fragment.is_created() {
611 tracing::warn!(
612 "streaming job {} is already created, ignore cancel request",
613 id
614 );
615 return Ok(None);
616 }
617 if fragment.is_created() {
618 Err(MetaError::invalid_parameter(format!(
619 "streaming job {} is already created",
620 id
621 )))?;
622 }
623
624 let cancel_command = self
625 .metadata_manager
626 .catalog_controller
627 .build_cancel_command(&fragment)
628 .await?;
629
630 let (_, database_id) = self
631 .metadata_manager
632 .catalog_controller
633 .try_abort_creating_streaming_job(id, true)
634 .await?;
635
636 if let Some(database_id) = database_id {
637 self.barrier_scheduler
638 .run_command(database_id, cancel_command)
639 .await?;
640 }
641
642 tracing::info!(?id, "cancelled background streaming job");
643 Ok(Some(id))
644 });
645 let cancelled_recovered_ids = join_all(futures)
646 .await
647 .into_iter()
648 .collect::<MetaResult<Vec<_>>>()?;
649
650 cancelled_ids.extend(cancelled_recovered_ids.into_iter().flatten());
651 Ok(cancelled_ids)
652 }
653
654 pub(crate) async fn reschedule_streaming_job(
655 &self,
656 job_id: JobId,
657 policy: ReschedulePolicy,
658 deferred: bool,
659 ) -> MetaResult<()> {
660 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
661
662 let background_jobs = self
663 .metadata_manager
664 .list_background_creating_jobs()
665 .await?;
666
667 if !background_jobs.is_empty() {
668 let related_jobs = self
669 .scale_controller
670 .resolve_related_no_shuffle_jobs(&background_jobs)
671 .await?;
672
673 if related_jobs.contains(&job_id) {
674 bail!(
675 "Cannot alter the job {} because the related job {:?} is currently being created",
676 job_id,
677 background_jobs,
678 );
679 }
680 }
681
682 let worker_nodes = self
683 .metadata_manager
684 .list_active_streaming_compute_nodes()
685 .await?
686 .into_iter()
687 .filter(|w| w.is_streaming_schedulable())
688 .collect_vec();
689 let workers = worker_nodes.into_iter().map(|x| (x.id, x)).collect();
690
691 let commands = self
692 .scale_controller
693 .reschedule_inplace(HashMap::from([(job_id, policy)]), workers)
694 .await?;
695
696 if !deferred {
697 let _source_pause_guard = self.source_manager.pause_tick().await;
698
699 for (database_id, command) in commands {
700 self.barrier_scheduler
701 .run_command(database_id, command)
702 .await?;
703 }
704 }
705
706 Ok(())
707 }
708
709 pub(crate) async fn reschedule_cdc_table_backfill(
711 &self,
712 job_id: JobId,
713 target: ReschedulePolicy,
714 ) -> MetaResult<()> {
715 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
716
717 let parallelism_policy = match target {
718 ReschedulePolicy::Parallelism(policy)
719 if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
720 {
721 policy
722 }
723 _ => bail_invalid_parameter!(
724 "CDC backfill reschedule only supports fixed parallelism targets"
725 ),
726 };
727
728 let worker_nodes = self
729 .metadata_manager
730 .list_active_streaming_compute_nodes()
731 .await?
732 .into_iter()
733 .filter(|w| w.is_streaming_schedulable())
734 .collect_vec();
735 let workers = worker_nodes.into_iter().map(|x| (x.id, x)).collect();
736
737 let cdc_fragment_id = {
738 let inner = self.metadata_manager.catalog_controller.inner.read().await;
739 let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
740 .select_only()
741 .columns([
742 fragment::Column::FragmentId,
743 fragment::Column::FragmentTypeMask,
744 ])
745 .filter(fragment::Column::JobId.eq(job_id))
746 .into_tuple()
747 .all(&inner.db)
748 .await?;
749
750 let cdc_fragments = fragments
751 .into_iter()
752 .filter_map(|(fragment_id, mask)| {
753 FragmentTypeMask::from(mask)
754 .contains(FragmentTypeFlag::StreamCdcScan)
755 .then_some(fragment_id)
756 })
757 .collect_vec();
758
759 match cdc_fragments.len() {
760 0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
761 1 => cdc_fragments[0],
762 _ => bail_invalid_parameter!(
763 "multiple StreamCdcScan fragments found for job {}; expected exactly one",
764 job_id
765 ),
766 }
767 };
768
769 let fragment_policy = HashMap::from([(
770 cdc_fragment_id,
771 Some(parallelism_policy.parallelism.clone()),
772 )]);
773
774 let commands = self
775 .scale_controller
776 .reschedule_fragment_inplace(fragment_policy, workers)
777 .await?;
778
779 let _source_pause_guard = self.source_manager.pause_tick().await;
780
781 for (database_id, command) in commands {
782 self.barrier_scheduler
783 .run_command(database_id, command)
784 .await?;
785 }
786
787 Ok(())
788 }
789
790 pub(crate) async fn reschedule_fragments(
791 &self,
792 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
793 ) -> MetaResult<()> {
794 if fragment_targets.is_empty() {
795 return Ok(());
796 }
797
798 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
799
800 let workers = self
801 .metadata_manager
802 .list_active_streaming_compute_nodes()
803 .await?
804 .into_iter()
805 .filter(|w| w.is_streaming_schedulable())
806 .map(|worker| (worker.id, worker))
807 .collect();
808
809 let fragment_policy = fragment_targets
810 .into_iter()
811 .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
812 .collect();
813
814 let commands = self
815 .scale_controller
816 .reschedule_fragment_inplace(fragment_policy, workers)
817 .await?;
818
819 let _source_pause_guard = self.source_manager.pause_tick().await;
820
821 for (database_id, command) in commands {
822 self.barrier_scheduler
823 .run_command(database_id, command)
824 .await?;
825 }
826
827 Ok(())
828 }
829
830 pub async fn create_subscription(
832 self: &Arc<Self>,
833 subscription: &Subscription,
834 ) -> MetaResult<()> {
835 let command = Command::CreateSubscription {
836 subscription_id: subscription.id,
837 upstream_mv_table_id: subscription.dependent_table_id,
838 retention_second: subscription.retention_seconds,
839 };
840
841 tracing::debug!("sending Command::CreateSubscription");
842 self.barrier_scheduler
843 .run_command(subscription.database_id, command)
844 .await?;
845 Ok(())
846 }
847
848 pub async fn drop_subscription(
850 self: &Arc<Self>,
851 database_id: DatabaseId,
852 subscription_id: SubscriptionId,
853 table_id: TableId,
854 ) {
855 let command = Command::DropSubscription {
856 subscription_id,
857 upstream_mv_table_id: table_id,
858 };
859
860 tracing::debug!("sending Command::DropSubscriptions");
861 let _ = self
862 .barrier_scheduler
863 .run_command(database_id, command)
864 .await
865 .inspect_err(|err| {
866 tracing::error!(error = ?err.as_report(), "failed to run drop command");
867 });
868 }
869}