1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::span;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::{JobId, SinkId};
25use risingwave_connector::source::CdcTableSnapshotSplitRaw;
26use risingwave_meta_model::prelude::Fragment as FragmentModel;
27use risingwave_meta_model::{StreamingParallelism, fragment};
28use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
29use risingwave_pb::expr::PbExprNode;
30use risingwave_pb::meta::table_fragments::ActorStatus;
31use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
32use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
33use thiserror_ext::AsReport;
34use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
35use tracing::Instrument;
36
37use super::{
38 Locations, ReschedulePolicy, ScaleControllerRef, StreamFragmentGraph,
39 UserDefinedFragmentBackfillOrder,
40};
41use crate::barrier::{
42 BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
43 ReplaceStreamJobPlan, SnapshotBackfillInfo,
44};
45use crate::controller::catalog::DropTableConnectorContext;
46use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
47use crate::error::bail_invalid_parameter;
48use crate::manager::{
49 MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
50};
51use crate::model::{
52 ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
53 FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate,
54 SubscriptionId,
55};
56use crate::stream::SourceManagerRef;
57use crate::{MetaError, MetaResult};
58
59pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
60
61#[derive(Default)]
62pub struct CreateStreamingJobOption {
63 }
65
66#[derive(Debug, Clone)]
67pub struct UpstreamSinkInfo {
68 pub sink_id: SinkId,
69 pub sink_fragment_id: FragmentId,
70 pub sink_output_fields: Vec<PbField>,
71 pub sink_original_target_columns: Vec<PbColumnCatalog>,
73 pub project_exprs: Vec<PbExprNode>,
74 pub new_sink_downstream: DownstreamFragmentRelation,
75}
76
77pub struct CreateStreamingJobContext {
81 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
83 pub new_no_shuffle: FragmentNewNoShuffle,
84 pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
85
86 pub building_locations: Locations,
88
89 pub definition: String,
91
92 pub create_type: CreateType,
93
94 pub job_type: StreamingJobType,
95
96 pub new_upstream_sink: Option<UpstreamSinkInfo>,
98
99 pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
100 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
101
102 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
103
104 pub option: CreateStreamingJobOption,
105
106 pub streaming_job: StreamingJob,
107
108 pub fragment_backfill_ordering: UserDefinedFragmentBackfillOrder,
109
110 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
111
112 pub is_serverless_backfill: bool,
113}
114
115struct StreamingJobExecution {
116 id: JobId,
117 shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
118 _permit: OwnedSemaphorePermit,
119}
120
121impl StreamingJobExecution {
122 fn new(
123 id: JobId,
124 shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
125 permit: OwnedSemaphorePermit,
126 ) -> Self {
127 Self {
128 id,
129 shutdown_tx: Some(shutdown_tx),
130 _permit: permit,
131 }
132 }
133}
134
135#[derive(Default)]
136struct CreatingStreamingJobInfo {
137 streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
138}
139
140impl CreatingStreamingJobInfo {
141 async fn add_job(&self, job: StreamingJobExecution) {
142 let mut jobs = self.streaming_jobs.lock().await;
143 jobs.insert(job.id, job);
144 }
145
146 async fn delete_job(&self, job_id: JobId) {
147 let mut jobs = self.streaming_jobs.lock().await;
148 jobs.remove(&job_id);
149 }
150
151 async fn cancel_jobs(
152 &self,
153 job_ids: Vec<JobId>,
154 ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
155 let mut jobs = self.streaming_jobs.lock().await;
156 let mut receivers = HashMap::new();
157 let mut background_job_ids = vec![];
158 for job_id in job_ids {
159 if let Some(job) = jobs.get_mut(&job_id) {
160 if let Some(shutdown_tx) = job.shutdown_tx.take() {
161 let (tx, rx) = oneshot::channel();
162 match shutdown_tx.send(tx) {
163 Ok(()) => {
164 receivers.insert(job_id, rx);
165 }
166 Err(_) => {
167 return Err(anyhow::anyhow!(
168 "failed to send shutdown signal for streaming job {}: receiver dropped",
169 job_id
170 )
171 .into());
172 }
173 }
174 }
175 } else {
176 background_job_ids.push(job_id);
178 }
179 }
180
181 Ok((receivers, background_job_ids))
182 }
183}
184
185type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
186
187#[derive(Debug, Clone)]
188pub struct AutoRefreshSchemaSinkContext {
189 pub tmp_sink_id: SinkId,
190 pub original_sink: PbSink,
191 pub original_fragment: Fragment,
192 pub new_schema: Vec<PbColumnCatalog>,
193 pub newly_add_fields: Vec<Field>,
194 pub new_fragment: Fragment,
195 pub new_log_store_table: Option<PbTable>,
196 pub actor_status: BTreeMap<ActorId, ActorStatus>,
197}
198
199impl AutoRefreshSchemaSinkContext {
200 pub fn new_fragment_info(&self) -> InflightFragmentInfo {
201 InflightFragmentInfo {
202 fragment_id: self.new_fragment.fragment_id,
203 distribution_type: self.new_fragment.distribution_type.into(),
204 fragment_type_mask: self.new_fragment.fragment_type_mask,
205 vnode_count: self.new_fragment.vnode_count(),
206 nodes: self.new_fragment.nodes.clone(),
207 actors: self
208 .new_fragment
209 .actors
210 .iter()
211 .map(|actor| {
212 (
213 actor.actor_id as _,
214 InflightActorInfo {
215 worker_id: self.actor_status[&actor.actor_id]
216 .location
217 .as_ref()
218 .unwrap()
219 .worker_node_id,
220 vnode_bitmap: actor.vnode_bitmap.clone(),
221 splits: vec![],
222 },
223 )
224 })
225 .collect(),
226 state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
227 }
228 }
229}
230
231pub struct ReplaceStreamJobContext {
235 pub old_fragments: StreamJobFragments,
237
238 pub replace_upstream: FragmentReplaceUpstream,
240 pub new_no_shuffle: FragmentNewNoShuffle,
241
242 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
244
245 pub building_locations: Locations,
247
248 pub streaming_job: StreamingJob,
249
250 pub tmp_id: JobId,
251
252 pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
254
255 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
256}
257
258pub struct GlobalStreamManager {
260 pub env: MetaSrvEnv,
261
262 pub metadata_manager: MetadataManager,
263
264 pub barrier_scheduler: BarrierScheduler,
266
267 pub source_manager: SourceManagerRef,
269
270 creating_job_info: CreatingStreamingJobInfoRef,
272
273 pub scale_controller: ScaleControllerRef,
274}
275
276impl GlobalStreamManager {
277 pub fn new(
278 env: MetaSrvEnv,
279 metadata_manager: MetadataManager,
280 barrier_scheduler: BarrierScheduler,
281 source_manager: SourceManagerRef,
282 scale_controller: ScaleControllerRef,
283 ) -> MetaResult<Self> {
284 Ok(Self {
285 env,
286 metadata_manager,
287 barrier_scheduler,
288 source_manager,
289 creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
290 scale_controller,
291 })
292 }
293
294 #[await_tree::instrument]
305 pub async fn create_streaming_job(
306 self: &Arc<Self>,
307 stream_job_fragments: StreamJobFragmentsToCreate,
308 ctx: CreateStreamingJobContext,
309 permit: OwnedSemaphorePermit,
310 ) -> MetaResult<NotificationVersion> {
311 let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
312 let await_tree_span = span!(
313 "{:?}({})",
314 ctx.streaming_job.job_type(),
315 ctx.streaming_job.name()
316 );
317
318 let job_id = stream_job_fragments.stream_job_id();
319 let database_id = ctx.streaming_job.database_id();
320
321 let (cancel_tx, cancel_rx) = oneshot::channel();
322 let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
323 self.creating_job_info.add_job(execution).await;
324
325 let stream_manager = self.clone();
326 let fut = async move {
327 let create_type = ctx.create_type;
328 let streaming_job = stream_manager
329 .run_create_streaming_job_command(stream_job_fragments, ctx)
330 .await?;
331 let version = match create_type {
332 CreateType::Background => {
333 stream_manager
334 .env
335 .notification_manager_ref()
336 .current_version()
337 .await
338 }
339 CreateType::Foreground => {
340 stream_manager
341 .metadata_manager
342 .wait_streaming_job_finished(database_id, streaming_job.id() as _)
343 .await?
344 }
345 CreateType::Unspecified => unreachable!(),
346 };
347
348 tracing::debug!(?streaming_job, "stream job finish");
349 Ok(version)
350 }
351 .in_current_span();
352
353 let create_fut = (self.env.await_tree_reg())
354 .register(await_tree_key, await_tree_span)
355 .instrument(Box::pin(fut));
356
357 let result = async {
358 tokio::select! {
359 biased;
360
361 res = create_fut => res,
362 notifier = cancel_rx => {
363 let notifier = notifier.expect("sender should not be dropped");
364 tracing::debug!(id=%job_id, "cancelling streaming job");
365
366 enum CancelResult {
367 Completed(MetaResult<NotificationVersion>),
368 Failed(MetaError),
369 Cancelled,
370 }
371
372 let cancel_res = if let Ok(job_fragments) =
373 self.metadata_manager.get_job_fragments_by_id(job_id).await
374 {
375 if self
377 .barrier_scheduler
378 .try_cancel_scheduled_create(database_id, job_id)
379 {
380 tracing::debug!(
381 id=%job_id,
382 "cancelling streaming job in buffer queue."
383 );
384 CancelResult::Cancelled
385 } else if !job_fragments.is_created() {
386 tracing::debug!(
387 id=%job_id,
388 "cancelling streaming job by issue cancel command."
389 );
390
391 let cancel_result: MetaResult<()> = async {
392 let cancel_command = self.metadata_manager.catalog_controller
393 .build_cancel_command(&job_fragments)
394 .await?;
395 self.metadata_manager.catalog_controller
396 .try_abort_creating_streaming_job(job_id, true)
397 .await?;
398
399 self.barrier_scheduler
400 .run_command(database_id, cancel_command)
401 .await?;
402 Ok(())
403 }
404 .await;
405
406 match cancel_result {
407 Ok(()) => CancelResult::Cancelled,
408 Err(err) => {
409 tracing::warn!(
410 error = ?err.as_report(),
411 id = %job_id,
412 "failed to run cancel command for creating streaming job"
413 );
414 CancelResult::Failed(err)
415 }
416 }
417 } else {
418 CancelResult::Completed(
420 self.metadata_manager
421 .wait_streaming_job_finished(database_id, job_id)
422 .await,
423 )
424 }
425 } else {
426 CancelResult::Cancelled
427 };
428
429 let (cancelled, result) = match cancel_res {
430 CancelResult::Completed(result) => (false, result),
431 CancelResult::Failed(err) => (false, Err(err)),
432 CancelResult::Cancelled => (true, Err(MetaError::cancelled("create"))),
433 };
434
435 let _ = notifier
436 .send(cancelled)
437 .inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
438
439 result
440 }
441 }
442 }
443 .await;
444
445 tracing::debug!("cleaning creating job info: {}", job_id);
446 self.creating_job_info.delete_job(job_id).await;
447 result
448 }
449
450 #[await_tree::instrument]
453 async fn run_create_streaming_job_command(
454 &self,
455 stream_job_fragments: StreamJobFragmentsToCreate,
456 CreateStreamingJobContext {
457 streaming_job,
458 upstream_fragment_downstreams,
459 new_no_shuffle,
460 upstream_actors,
461 definition,
462 create_type,
463 job_type,
464 new_upstream_sink,
465 snapshot_backfill_info,
466 cross_db_snapshot_backfill_info,
467 fragment_backfill_ordering,
468 locality_fragment_state_table_mapping,
469 cdc_table_snapshot_splits,
470 is_serverless_backfill,
471 ..
472 }: CreateStreamingJobContext,
473 ) -> MetaResult<StreamingJob> {
474 tracing::debug!(
475 table_id = %stream_job_fragments.stream_job_id(),
476 "built actors finished"
477 );
478
479 let mut init_split_assignment = self
484 .source_manager
485 .allocate_splits(&stream_job_fragments)
486 .await?;
487
488 init_split_assignment.extend(
489 self.source_manager
490 .allocate_splits_for_backfill(
491 &stream_job_fragments,
492 &new_no_shuffle,
493 &upstream_actors,
494 )
495 .await?,
496 );
497
498 let fragment_backfill_ordering =
499 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
500 fragment_backfill_ordering,
501 &stream_job_fragments.downstreams,
502 || {
503 stream_job_fragments
504 .fragments
505 .iter()
506 .map(|(fragment_id, fragment)| {
507 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
508 })
509 },
510 );
511
512 let info = CreateStreamingJobCommandInfo {
513 stream_job_fragments,
514 upstream_fragment_downstreams,
515 init_split_assignment,
516 definition: definition.clone(),
517 streaming_job: streaming_job.clone(),
518 job_type,
519 create_type,
520 fragment_backfill_ordering,
521 cdc_table_snapshot_splits,
522 locality_fragment_state_table_mapping,
523 is_serverless: is_serverless_backfill,
524 };
525
526 let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
527 tracing::debug!(
528 ?snapshot_backfill_info,
529 "sending Command::CreateSnapshotBackfillStreamingJob"
530 );
531 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
532 } else {
533 tracing::debug!("sending Command::CreateStreamingJob");
534 if let Some(new_upstream_sink) = new_upstream_sink {
535 CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
536 } else {
537 CreateStreamingJobType::Normal
538 }
539 };
540
541 let command = Command::CreateStreamingJob {
542 info,
543 job_type,
544 cross_db_snapshot_backfill_info,
545 };
546
547 self.barrier_scheduler
548 .run_command(streaming_job.database_id(), command)
549 .await?;
550
551 tracing::debug!(?streaming_job, "first barrier collected for stream job");
552
553 Ok(streaming_job)
554 }
555
556 pub async fn replace_stream_job(
558 &self,
559 new_fragments: StreamJobFragmentsToCreate,
560 ReplaceStreamJobContext {
561 old_fragments,
562 replace_upstream,
563 new_no_shuffle,
564 upstream_fragment_downstreams,
565 tmp_id,
566 streaming_job,
567 drop_table_connector_ctx,
568 auto_refresh_schema_sinks,
569 ..
570 }: ReplaceStreamJobContext,
571 ) -> MetaResult<()> {
572 let init_split_assignment = if streaming_job.is_source() {
573 self.source_manager
574 .allocate_splits_for_replace_source(
575 &new_fragments,
576 &replace_upstream,
577 &new_no_shuffle,
578 )
579 .await?
580 } else {
581 self.source_manager.allocate_splits(&new_fragments).await?
582 };
583 tracing::info!(
584 "replace_stream_job - allocate split: {:?}",
585 init_split_assignment
586 );
587
588 self.barrier_scheduler
589 .run_command(
590 streaming_job.database_id(),
591 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
592 old_fragments,
593 new_fragments,
594 replace_upstream,
595 upstream_fragment_downstreams,
596 init_split_assignment,
597 streaming_job,
598 tmp_id,
599 to_drop_state_table_ids: {
600 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
601 vec![drop_table_connector_ctx.to_remove_state_table_id]
602 } else {
603 Vec::new()
604 }
605 },
606 auto_refresh_schema_sinks,
607 }),
608 )
609 .await?;
610
611 Ok(())
612 }
613
614 pub async fn drop_streaming_jobs(
618 &self,
619 database_id: DatabaseId,
620 removed_actors: Vec<ActorId>,
621 streaming_job_ids: Vec<JobId>,
622 state_table_ids: Vec<TableId>,
623 fragment_ids: HashSet<FragmentId>,
624 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
625 ) {
626 if !removed_actors.is_empty()
627 || !streaming_job_ids.is_empty()
628 || !state_table_ids.is_empty()
629 {
630 let _ = self
631 .barrier_scheduler
632 .run_command(
633 database_id,
634 Command::DropStreamingJobs {
635 streaming_job_ids: streaming_job_ids.into_iter().collect(),
636 actors: removed_actors,
637 unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
638 unregistered_fragment_ids: fragment_ids,
639 dropped_sink_fragment_by_targets,
640 },
641 )
642 .await
643 .inspect_err(|err| {
644 tracing::error!(error = ?err.as_report(), "failed to run drop command");
645 });
646 }
647 }
648
649 pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
656 if job_ids.is_empty() {
657 return Ok(vec![]);
658 }
659
660 let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
661 let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
662
663 let futures = receivers.into_iter().map(|(id, receiver)| async move {
664 if let Ok(cancelled) = receiver.await
665 && cancelled
666 {
667 tracing::info!("canceled streaming job {id}");
668 Ok(id)
669 } else {
670 Err(MetaError::from(anyhow::anyhow!(
671 "failed to cancel streaming job {id}"
672 )))
673 }
674 });
675 let mut cancelled_ids = join_all(futures)
676 .await
677 .into_iter()
678 .collect::<MetaResult<Vec<_>>>()?;
679
680 let futures = background_job_ids.into_iter().map(|id| async move {
683 let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
684 if fragment.is_created() {
685 tracing::warn!(
686 "streaming job {} is already created, ignore cancel request",
687 id
688 );
689 return Ok(None);
690 }
691 if fragment.is_created() {
692 Err(MetaError::invalid_parameter(format!(
693 "streaming job {} is already created",
694 id
695 )))?;
696 }
697
698 let cancel_command = self
699 .metadata_manager
700 .catalog_controller
701 .build_cancel_command(&fragment)
702 .await?;
703
704 let (_, database_id) = self
705 .metadata_manager
706 .catalog_controller
707 .try_abort_creating_streaming_job(id, true)
708 .await?;
709
710 if let Some(database_id) = database_id {
711 self.barrier_scheduler
712 .run_command(database_id, cancel_command)
713 .await?;
714 }
715
716 tracing::info!(?id, "cancelled background streaming job");
717 Ok(Some(id))
718 });
719 let cancelled_recovered_ids = join_all(futures)
720 .await
721 .into_iter()
722 .collect::<MetaResult<Vec<_>>>()?;
723
724 cancelled_ids.extend(cancelled_recovered_ids.into_iter().flatten());
725 Ok(cancelled_ids)
726 }
727
728 pub(crate) async fn reschedule_streaming_job(
729 &self,
730 job_id: JobId,
731 policy: ReschedulePolicy,
732 deferred: bool,
733 ) -> MetaResult<()> {
734 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
735
736 let background_jobs = self
737 .metadata_manager
738 .list_background_creating_jobs()
739 .await?;
740
741 if !background_jobs.is_empty() {
742 let unreschedulable = self
743 .metadata_manager
744 .collect_unreschedulable_backfill_jobs(&background_jobs, !deferred)
745 .await?;
746
747 if unreschedulable.contains(&job_id) {
748 bail!(
749 "Cannot alter the job {} because it is a non-reschedulable background backfill job",
750 job_id,
751 );
752 }
753 }
754
755 let commands = self
756 .scale_controller
757 .reschedule_inplace(HashMap::from([(job_id, policy)]))
758 .await?;
759
760 if !deferred {
761 let _source_pause_guard = self.source_manager.pause_tick().await;
762
763 for (database_id, command) in commands {
764 self.barrier_scheduler
765 .run_command(database_id, command)
766 .await?;
767 }
768 }
769
770 Ok(())
771 }
772
773 pub(crate) async fn reschedule_cdc_table_backfill(
775 &self,
776 job_id: JobId,
777 target: ReschedulePolicy,
778 ) -> MetaResult<()> {
779 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
780
781 let parallelism_policy = match target {
782 ReschedulePolicy::Parallelism(policy)
783 if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
784 {
785 policy
786 }
787 _ => bail_invalid_parameter!(
788 "CDC backfill reschedule only supports fixed parallelism targets"
789 ),
790 };
791
792 let cdc_fragment_id = {
793 let inner = self.metadata_manager.catalog_controller.inner.read().await;
794 let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
795 .select_only()
796 .columns([
797 fragment::Column::FragmentId,
798 fragment::Column::FragmentTypeMask,
799 ])
800 .filter(fragment::Column::JobId.eq(job_id))
801 .into_tuple()
802 .all(&inner.db)
803 .await?;
804
805 let cdc_fragments = fragments
806 .into_iter()
807 .filter_map(|(fragment_id, mask)| {
808 FragmentTypeMask::from(mask)
809 .contains(FragmentTypeFlag::StreamCdcScan)
810 .then_some(fragment_id)
811 })
812 .collect_vec();
813
814 match cdc_fragments.len() {
815 0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
816 1 => cdc_fragments[0],
817 _ => bail_invalid_parameter!(
818 "multiple StreamCdcScan fragments found for job {}; expected exactly one",
819 job_id
820 ),
821 }
822 };
823
824 let fragment_policy = HashMap::from([(
825 cdc_fragment_id,
826 Some(parallelism_policy.parallelism.clone()),
827 )]);
828
829 let commands = self
830 .scale_controller
831 .reschedule_fragment_inplace(fragment_policy)
832 .await?;
833
834 let _source_pause_guard = self.source_manager.pause_tick().await;
835
836 for (database_id, command) in commands {
837 self.barrier_scheduler
838 .run_command(database_id, command)
839 .await?;
840 }
841
842 Ok(())
843 }
844
845 pub(crate) async fn reschedule_fragments(
846 &self,
847 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
848 ) -> MetaResult<()> {
849 if fragment_targets.is_empty() {
850 return Ok(());
851 }
852
853 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
854
855 let fragment_policy = fragment_targets
856 .into_iter()
857 .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
858 .collect();
859
860 let commands = self
861 .scale_controller
862 .reschedule_fragment_inplace(fragment_policy)
863 .await?;
864
865 let _source_pause_guard = self.source_manager.pause_tick().await;
866
867 for (database_id, command) in commands {
868 self.barrier_scheduler
869 .run_command(database_id, command)
870 .await?;
871 }
872
873 Ok(())
874 }
875
876 pub async fn create_subscription(
878 self: &Arc<Self>,
879 subscription: &Subscription,
880 ) -> MetaResult<()> {
881 let command = Command::CreateSubscription {
882 subscription_id: subscription.id,
883 upstream_mv_table_id: subscription.dependent_table_id,
884 retention_second: subscription.retention_seconds,
885 };
886
887 tracing::debug!("sending Command::CreateSubscription");
888 self.barrier_scheduler
889 .run_command(subscription.database_id, command)
890 .await?;
891 Ok(())
892 }
893
894 pub async fn drop_subscription(
896 self: &Arc<Self>,
897 database_id: DatabaseId,
898 subscription_id: SubscriptionId,
899 table_id: TableId,
900 ) {
901 let command = Command::DropSubscription {
902 subscription_id,
903 upstream_mv_table_id: table_id,
904 };
905
906 tracing::debug!("sending Command::DropSubscriptions");
907 let _ = self
908 .barrier_scheduler
909 .run_command(database_id, command)
910 .await
911 .inspect_err(|err| {
912 tracing::error!(error = ?err.as_report(), "failed to run drop command");
913 });
914 }
915}