1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::span;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::{JobId, SinkId};
25use risingwave_connector::source::CdcTableSnapshotSplitRaw;
26use risingwave_meta_model::prelude::Fragment as FragmentModel;
27use risingwave_meta_model::{StreamingParallelism, fragment};
28use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
29use risingwave_pb::expr::PbExprNode;
30use risingwave_pb::meta::table_fragments::ActorStatus;
31use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
32use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
33use thiserror_ext::AsReport;
34use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
35use tracing::Instrument;
36
37use super::{
38 Locations, ReplaceJobSplitPlan, ReschedulePolicy, ScaleControllerRef, StreamFragmentGraph,
39 UserDefinedFragmentBackfillOrder,
40};
41use crate::barrier::{
42 BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
43 ReplaceStreamJobPlan, SnapshotBackfillInfo,
44};
45use crate::controller::catalog::DropTableConnectorContext;
46use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
47use crate::error::bail_invalid_parameter;
48use crate::manager::{
49 MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
50};
51use crate::model::{
52 ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
53 FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
54};
55use crate::stream::SourceManagerRef;
56use crate::{MetaError, MetaResult};
57
58pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
59
60#[derive(Default)]
61pub struct CreateStreamingJobOption {
62 }
64
65#[derive(Debug, Clone)]
66pub struct UpstreamSinkInfo {
67 pub sink_id: SinkId,
68 pub sink_fragment_id: FragmentId,
69 pub sink_output_fields: Vec<PbField>,
70 pub sink_original_target_columns: Vec<PbColumnCatalog>,
72 pub project_exprs: Vec<PbExprNode>,
73 pub new_sink_downstream: DownstreamFragmentRelation,
74}
75
76pub struct CreateStreamingJobContext {
80 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
82
83 pub building_locations: Locations,
85
86 pub definition: String,
88
89 pub create_type: CreateType,
90
91 pub job_type: StreamingJobType,
92
93 pub new_upstream_sink: Option<UpstreamSinkInfo>,
95
96 pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
97 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
98
99 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
100
101 pub option: CreateStreamingJobOption,
102
103 pub streaming_job: StreamingJob,
104
105 pub fragment_backfill_ordering: UserDefinedFragmentBackfillOrder,
106
107 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
108
109 pub is_serverless_backfill: bool,
110}
111
112struct StreamingJobExecution {
113 id: JobId,
114 shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
115 _permit: OwnedSemaphorePermit,
116}
117
118impl StreamingJobExecution {
119 fn new(
120 id: JobId,
121 shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
122 permit: OwnedSemaphorePermit,
123 ) -> Self {
124 Self {
125 id,
126 shutdown_tx: Some(shutdown_tx),
127 _permit: permit,
128 }
129 }
130}
131
132#[derive(Default)]
133struct CreatingStreamingJobInfo {
134 streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
135}
136
137impl CreatingStreamingJobInfo {
138 async fn add_job(&self, job: StreamingJobExecution) {
139 let mut jobs = self.streaming_jobs.lock().await;
140 jobs.insert(job.id, job);
141 }
142
143 async fn delete_job(&self, job_id: JobId) {
144 let mut jobs = self.streaming_jobs.lock().await;
145 jobs.remove(&job_id);
146 }
147
148 async fn cancel_jobs(
149 &self,
150 job_ids: Vec<JobId>,
151 ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
152 let mut jobs = self.streaming_jobs.lock().await;
153 let mut receivers = HashMap::new();
154 let mut background_job_ids = vec![];
155 for job_id in job_ids {
156 if let Some(job) = jobs.get_mut(&job_id) {
157 if let Some(shutdown_tx) = job.shutdown_tx.take() {
158 let (tx, rx) = oneshot::channel();
159 match shutdown_tx.send(tx) {
160 Ok(()) => {
161 receivers.insert(job_id, rx);
162 }
163 Err(_) => {
164 return Err(anyhow::anyhow!(
165 "failed to send shutdown signal for streaming job {}: receiver dropped",
166 job_id
167 )
168 .into());
169 }
170 }
171 }
172 } else {
173 background_job_ids.push(job_id);
175 }
176 }
177
178 Ok((receivers, background_job_ids))
179 }
180}
181
182type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
183
184#[derive(Debug, Clone)]
185pub struct AutoRefreshSchemaSinkContext {
186 pub tmp_sink_id: SinkId,
187 pub original_sink: PbSink,
188 pub original_fragment: Fragment,
189 pub new_schema: Vec<PbColumnCatalog>,
190 pub newly_add_fields: Vec<Field>,
191 pub new_fragment: Fragment,
192 pub new_log_store_table: Option<PbTable>,
193 pub actor_status: BTreeMap<ActorId, ActorStatus>,
194}
195
196impl AutoRefreshSchemaSinkContext {
197 pub fn new_fragment_info(&self) -> InflightFragmentInfo {
198 InflightFragmentInfo {
199 fragment_id: self.new_fragment.fragment_id,
200 distribution_type: self.new_fragment.distribution_type.into(),
201 fragment_type_mask: self.new_fragment.fragment_type_mask,
202 vnode_count: self.new_fragment.vnode_count(),
203 nodes: self.new_fragment.nodes.clone(),
204 actors: self
205 .new_fragment
206 .actors
207 .iter()
208 .map(|actor| {
209 (
210 actor.actor_id as _,
211 InflightActorInfo {
212 worker_id: self.actor_status[&actor.actor_id]
213 .location
214 .as_ref()
215 .unwrap()
216 .worker_node_id,
217 vnode_bitmap: actor.vnode_bitmap.clone(),
218 splits: vec![],
219 },
220 )
221 })
222 .collect(),
223 state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
224 }
225 }
226}
227
228pub struct ReplaceStreamJobContext {
232 pub old_fragments: StreamJobFragments,
234
235 pub replace_upstream: FragmentReplaceUpstream,
237
238 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
240
241 pub building_locations: Locations,
243
244 pub streaming_job: StreamingJob,
245
246 pub tmp_id: JobId,
247
248 pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
250
251 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
252}
253
254pub struct GlobalStreamManager {
256 pub env: MetaSrvEnv,
257
258 pub metadata_manager: MetadataManager,
259
260 pub barrier_scheduler: BarrierScheduler,
262
263 pub source_manager: SourceManagerRef,
265
266 creating_job_info: CreatingStreamingJobInfoRef,
268
269 pub scale_controller: ScaleControllerRef,
270}
271
272impl GlobalStreamManager {
273 pub fn new(
274 env: MetaSrvEnv,
275 metadata_manager: MetadataManager,
276 barrier_scheduler: BarrierScheduler,
277 source_manager: SourceManagerRef,
278 scale_controller: ScaleControllerRef,
279 ) -> MetaResult<Self> {
280 Ok(Self {
281 env,
282 metadata_manager,
283 barrier_scheduler,
284 source_manager,
285 creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
286 scale_controller,
287 })
288 }
289
290 #[await_tree::instrument]
301 pub async fn create_streaming_job(
302 self: &Arc<Self>,
303 stream_job_fragments: StreamJobFragmentsToCreate,
304 ctx: CreateStreamingJobContext,
305 permit: OwnedSemaphorePermit,
306 ) -> MetaResult<NotificationVersion> {
307 let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
308 let await_tree_span = span!(
309 "{:?}({})",
310 ctx.streaming_job.job_type(),
311 ctx.streaming_job.name()
312 );
313
314 let job_id = stream_job_fragments.stream_job_id();
315 let database_id = ctx.streaming_job.database_id();
316
317 let (cancel_tx, cancel_rx) = oneshot::channel();
318 let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
319 self.creating_job_info.add_job(execution).await;
320
321 let stream_manager = self.clone();
322 let fut = async move {
323 let create_type = ctx.create_type;
324 let streaming_job = stream_manager
325 .run_create_streaming_job_command(stream_job_fragments, ctx)
326 .await?;
327 let version = match create_type {
328 CreateType::Background => {
329 stream_manager
330 .env
331 .notification_manager_ref()
332 .current_version()
333 .await
334 }
335 CreateType::Foreground => {
336 stream_manager
337 .metadata_manager
338 .wait_streaming_job_finished(database_id, streaming_job.id() as _)
339 .await?
340 }
341 CreateType::Unspecified => unreachable!(),
342 };
343
344 tracing::debug!(?streaming_job, "stream job finish");
345 Ok(version)
346 }
347 .in_current_span();
348
349 let create_fut = (self.env.await_tree_reg())
350 .register(await_tree_key, await_tree_span)
351 .instrument(Box::pin(fut));
352
353 let result = async {
354 tokio::select! {
355 biased;
356
357 res = create_fut => res,
358 notifier = cancel_rx => {
359 let notifier = notifier.expect("sender should not be dropped");
360 tracing::debug!(id=%job_id, "cancelling streaming job");
361
362 enum CancelResult {
363 Completed(MetaResult<NotificationVersion>),
364 Failed(MetaError),
365 Cancelled,
366 }
367
368 let cancel_res = if let Ok(job_fragments) =
369 self.metadata_manager.get_job_fragments_by_id(job_id).await
370 {
371 if self
373 .barrier_scheduler
374 .try_cancel_scheduled_create(database_id, job_id)
375 {
376 tracing::debug!(
377 id=%job_id,
378 "cancelling streaming job in buffer queue."
379 );
380 CancelResult::Cancelled
381 } else if !job_fragments.is_created() {
382 tracing::debug!(
383 id=%job_id,
384 "cancelling streaming job by issue cancel command."
385 );
386
387 let cancel_result: MetaResult<()> = async {
388 let cancel_command = self.metadata_manager.catalog_controller
389 .build_cancel_command(&job_fragments)
390 .await?;
391 self.metadata_manager.catalog_controller
392 .try_abort_creating_streaming_job(job_id, true)
393 .await?;
394
395 self.barrier_scheduler
396 .run_command(database_id, cancel_command)
397 .await?;
398 Ok(())
399 }
400 .await;
401
402 match cancel_result {
403 Ok(()) => CancelResult::Cancelled,
404 Err(err) => {
405 tracing::warn!(
406 error = ?err.as_report(),
407 id = %job_id,
408 "failed to run cancel command for creating streaming job"
409 );
410 CancelResult::Failed(err)
411 }
412 }
413 } else {
414 CancelResult::Completed(
416 self.metadata_manager
417 .wait_streaming_job_finished(database_id, job_id)
418 .await,
419 )
420 }
421 } else {
422 CancelResult::Cancelled
423 };
424
425 let (cancelled, result) = match cancel_res {
426 CancelResult::Completed(result) => (false, result),
427 CancelResult::Failed(err) => (false, Err(err)),
428 CancelResult::Cancelled => (true, Err(MetaError::cancelled("create"))),
429 };
430
431 let _ = notifier
432 .send(cancelled)
433 .inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
434
435 result
436 }
437 }
438 }
439 .await;
440
441 tracing::debug!("cleaning creating job info: {}", job_id);
442 self.creating_job_info.delete_job(job_id).await;
443 result
444 }
445
446 #[await_tree::instrument]
449 async fn run_create_streaming_job_command(
450 &self,
451 stream_job_fragments: StreamJobFragmentsToCreate,
452 CreateStreamingJobContext {
453 streaming_job,
454 upstream_fragment_downstreams,
455 definition,
456 create_type,
457 job_type,
458 new_upstream_sink,
459 snapshot_backfill_info,
460 cross_db_snapshot_backfill_info,
461 fragment_backfill_ordering,
462 locality_fragment_state_table_mapping,
463 cdc_table_snapshot_splits,
464 is_serverless_backfill,
465 ..
466 }: CreateStreamingJobContext,
467 ) -> MetaResult<StreamingJob> {
468 tracing::debug!(
469 table_id = %stream_job_fragments.stream_job_id(),
470 "built actors finished"
471 );
472
473 let init_split_assignment = self
478 .source_manager
479 .discover_splits(&stream_job_fragments)
480 .await?;
481
482 let fragment_backfill_ordering =
483 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
484 fragment_backfill_ordering,
485 &stream_job_fragments.downstreams,
486 || {
487 stream_job_fragments
488 .fragments
489 .iter()
490 .map(|(fragment_id, fragment)| {
491 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
492 })
493 },
494 );
495
496 let info = CreateStreamingJobCommandInfo {
497 stream_job_fragments,
498 upstream_fragment_downstreams,
499 init_split_assignment,
500 definition: definition.clone(),
501 streaming_job: streaming_job.clone(),
502 job_type,
503 create_type,
504 fragment_backfill_ordering,
505 cdc_table_snapshot_splits,
506 locality_fragment_state_table_mapping,
507 is_serverless: is_serverless_backfill,
508 };
509
510 let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
511 tracing::debug!(
512 ?snapshot_backfill_info,
513 "sending Command::CreateSnapshotBackfillStreamingJob"
514 );
515 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
516 } else {
517 tracing::debug!("sending Command::CreateStreamingJob");
518 if let Some(new_upstream_sink) = new_upstream_sink {
519 CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
520 } else {
521 CreateStreamingJobType::Normal
522 }
523 };
524
525 let command = Command::CreateStreamingJob {
526 info,
527 job_type,
528 cross_db_snapshot_backfill_info,
529 };
530
531 self.barrier_scheduler
532 .run_command(streaming_job.database_id(), command)
533 .await?;
534
535 tracing::debug!(?streaming_job, "first barrier collected for stream job");
536
537 Ok(streaming_job)
538 }
539
540 pub async fn replace_stream_job(
542 &self,
543 new_fragments: StreamJobFragmentsToCreate,
544 ReplaceStreamJobContext {
545 old_fragments,
546 replace_upstream,
547 upstream_fragment_downstreams,
548 tmp_id,
549 streaming_job,
550 drop_table_connector_ctx,
551 auto_refresh_schema_sinks,
552 ..
553 }: ReplaceStreamJobContext,
554 ) -> MetaResult<()> {
555 let split_plan = if streaming_job.is_source() {
560 match self
561 .source_manager
562 .discover_splits_for_replace_source(&new_fragments, &replace_upstream)
563 .await?
564 {
565 Some(discovered) => ReplaceJobSplitPlan::Discovered(discovered),
566 None => ReplaceJobSplitPlan::AlignFromPrevious,
567 }
568 } else {
569 let discovered = self.source_manager.discover_splits(&new_fragments).await?;
570 ReplaceJobSplitPlan::Discovered(discovered)
571 };
572 tracing::info!("replace_stream_job - split plan: {:?}", split_plan);
573
574 self.barrier_scheduler
575 .run_command(
576 streaming_job.database_id(),
577 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
578 old_fragments,
579 new_fragments,
580 replace_upstream,
581 upstream_fragment_downstreams,
582 split_plan,
583 streaming_job,
584 tmp_id,
585 to_drop_state_table_ids: {
586 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
587 vec![drop_table_connector_ctx.to_remove_state_table_id]
588 } else {
589 Vec::new()
590 }
591 },
592 auto_refresh_schema_sinks,
593 }),
594 )
595 .await?;
596
597 Ok(())
598 }
599
600 pub async fn drop_streaming_jobs(
604 &self,
605 database_id: DatabaseId,
606 removed_actors: Vec<ActorId>,
607 streaming_job_ids: Vec<JobId>,
608 state_table_ids: Vec<TableId>,
609 fragment_ids: HashSet<FragmentId>,
610 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
611 ) {
612 if !removed_actors.is_empty()
613 || !streaming_job_ids.is_empty()
614 || !state_table_ids.is_empty()
615 {
616 let _ = self
617 .barrier_scheduler
618 .run_command(
619 database_id,
620 Command::DropStreamingJobs {
621 streaming_job_ids: streaming_job_ids.into_iter().collect(),
622 actors: removed_actors,
623 unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
624 unregistered_fragment_ids: fragment_ids,
625 dropped_sink_fragment_by_targets,
626 },
627 )
628 .await
629 .inspect_err(|err| {
630 tracing::error!(error = ?err.as_report(), "failed to run drop command");
631 });
632 }
633 }
634
635 pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
642 if job_ids.is_empty() {
643 return Ok(vec![]);
644 }
645
646 let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
647 let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
648
649 let futures = receivers.into_iter().map(|(id, receiver)| async move {
650 if let Ok(cancelled) = receiver.await
651 && cancelled
652 {
653 tracing::info!("canceled streaming job {id}");
654 Ok(id)
655 } else {
656 Err(MetaError::from(anyhow::anyhow!(
657 "failed to cancel streaming job {id}"
658 )))
659 }
660 });
661 let mut cancelled_ids = join_all(futures)
662 .await
663 .into_iter()
664 .collect::<MetaResult<Vec<_>>>()?;
665
666 let futures = background_job_ids.into_iter().map(|id| async move {
669 let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
670 if fragment.is_created() {
671 tracing::warn!(
672 "streaming job {} is already created, ignore cancel request",
673 id
674 );
675 return Ok(None);
676 }
677 if fragment.is_created() {
678 Err(MetaError::invalid_parameter(format!(
679 "streaming job {} is already created",
680 id
681 )))?;
682 }
683
684 let cancel_command = self
685 .metadata_manager
686 .catalog_controller
687 .build_cancel_command(&fragment)
688 .await?;
689
690 let (_, database_id) = self
691 .metadata_manager
692 .catalog_controller
693 .try_abort_creating_streaming_job(id, true)
694 .await?;
695
696 if let Some(database_id) = database_id {
697 self.barrier_scheduler
698 .run_command(database_id, cancel_command)
699 .await?;
700 }
701
702 tracing::info!(?id, "cancelled background streaming job");
703 Ok(Some(id))
704 });
705 let cancelled_recovered_ids = join_all(futures)
706 .await
707 .into_iter()
708 .collect::<MetaResult<Vec<_>>>()?;
709
710 cancelled_ids.extend(cancelled_recovered_ids.into_iter().flatten());
711 Ok(cancelled_ids)
712 }
713
714 pub(crate) async fn reschedule_streaming_job(
715 &self,
716 job_id: JobId,
717 policy: ReschedulePolicy,
718 deferred: bool,
719 ) -> MetaResult<()> {
720 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
721
722 let background_jobs = self
723 .metadata_manager
724 .list_background_creating_jobs()
725 .await?;
726
727 if !background_jobs.is_empty() {
728 let blocked_jobs = self
729 .metadata_manager
730 .collect_reschedule_blocked_jobs_for_creating_jobs(&background_jobs, !deferred)
731 .await?;
732
733 if blocked_jobs.contains(&job_id) {
734 bail!(
735 "Cannot alter the job {} because it is blocked by creating unreschedulable backfill jobs",
736 job_id,
737 );
738 }
739 }
740
741 let commands = self
742 .scale_controller
743 .reschedule_inplace(HashMap::from([(job_id, policy)]))
744 .await?;
745
746 if !deferred {
747 let _source_pause_guard = self.source_manager.pause_tick().await;
748
749 for (database_id, command) in commands {
750 self.barrier_scheduler
751 .run_command(database_id, command)
752 .await?;
753 }
754 }
755
756 Ok(())
757 }
758
759 pub(crate) async fn reschedule_streaming_job_backfill_parallelism(
760 &self,
761 job_id: JobId,
762 parallelism: Option<StreamingParallelism>,
763 deferred: bool,
764 ) -> MetaResult<()> {
765 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
766
767 let background_jobs = self
768 .metadata_manager
769 .list_background_creating_jobs()
770 .await?;
771
772 if !background_jobs.is_empty() {
773 let unreschedulable = self
774 .metadata_manager
775 .collect_unreschedulable_backfill_jobs(&background_jobs, !deferred)
776 .await?;
777
778 if unreschedulable.contains(&job_id) {
779 bail!(
780 "Cannot alter the job {} because it is a non-reschedulable background backfill job",
781 job_id,
782 );
783 }
784 }
785
786 let commands = self
787 .scale_controller
788 .reschedule_backfill_parallelism_inplace(HashMap::from([(job_id, parallelism)]))
789 .await?;
790
791 if !deferred {
792 let _source_pause_guard = self.source_manager.pause_tick().await;
793
794 for (database_id, command) in commands {
795 self.barrier_scheduler
796 .run_command(database_id, command)
797 .await?;
798 }
799 }
800
801 Ok(())
802 }
803
804 pub(crate) async fn reschedule_cdc_table_backfill(
806 &self,
807 job_id: JobId,
808 target: ReschedulePolicy,
809 ) -> MetaResult<()> {
810 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
811
812 let parallelism_policy = match target {
813 ReschedulePolicy::Parallelism(policy)
814 if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
815 {
816 policy
817 }
818 _ => bail_invalid_parameter!(
819 "CDC backfill reschedule only supports fixed parallelism targets"
820 ),
821 };
822
823 let cdc_fragment_id = {
824 let inner = self.metadata_manager.catalog_controller.inner.read().await;
825 let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
826 .select_only()
827 .columns([
828 fragment::Column::FragmentId,
829 fragment::Column::FragmentTypeMask,
830 ])
831 .filter(fragment::Column::JobId.eq(job_id))
832 .into_tuple()
833 .all(&inner.db)
834 .await?;
835
836 let cdc_fragments = fragments
837 .into_iter()
838 .filter_map(|(fragment_id, mask)| {
839 FragmentTypeMask::from(mask)
840 .contains(FragmentTypeFlag::StreamCdcScan)
841 .then_some(fragment_id)
842 })
843 .collect_vec();
844
845 match cdc_fragments.len() {
846 0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
847 1 => cdc_fragments[0],
848 _ => bail_invalid_parameter!(
849 "multiple StreamCdcScan fragments found for job {}; expected exactly one",
850 job_id
851 ),
852 }
853 };
854
855 let fragment_policy = HashMap::from([(
856 cdc_fragment_id,
857 Some(parallelism_policy.parallelism.clone()),
858 )]);
859
860 let commands = self
861 .scale_controller
862 .reschedule_fragment_inplace(fragment_policy)
863 .await?;
864
865 let _source_pause_guard = self.source_manager.pause_tick().await;
866
867 for (database_id, command) in commands {
868 self.barrier_scheduler
869 .run_command(database_id, command)
870 .await?;
871 }
872
873 Ok(())
874 }
875
876 pub(crate) async fn reschedule_fragments(
877 &self,
878 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
879 ) -> MetaResult<()> {
880 if fragment_targets.is_empty() {
881 return Ok(());
882 }
883
884 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
885
886 let fragment_policy = fragment_targets
887 .into_iter()
888 .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
889 .collect();
890
891 let commands = self
892 .scale_controller
893 .reschedule_fragment_inplace(fragment_policy)
894 .await?;
895
896 let _source_pause_guard = self.source_manager.pause_tick().await;
897
898 for (database_id, command) in commands {
899 self.barrier_scheduler
900 .run_command(database_id, command)
901 .await?;
902 }
903
904 Ok(())
905 }
906
907 pub async fn create_subscription(
909 self: &Arc<Self>,
910 subscription: &Subscription,
911 ) -> MetaResult<()> {
912 let command = Command::CreateSubscription {
913 subscription_id: subscription.id,
914 upstream_mv_table_id: subscription.dependent_table_id,
915 retention_second: subscription.retention_seconds,
916 };
917
918 tracing::debug!("sending Command::CreateSubscription");
919 self.barrier_scheduler
920 .run_command(subscription.database_id, command)
921 .await?;
922 Ok(())
923 }
924
925 pub async fn drop_subscription(
927 self: &Arc<Self>,
928 database_id: DatabaseId,
929 subscription_id: SubscriptionId,
930 table_id: TableId,
931 ) {
932 let command = Command::DropSubscription {
933 subscription_id,
934 upstream_mv_table_id: table_id,
935 };
936
937 tracing::debug!("sending Command::DropSubscriptions");
938 let _ = self
939 .barrier_scheduler
940 .run_command(database_id, command)
941 .await
942 .inspect_err(|err| {
943 tracing::error!(error = ?err.as_report(), "failed to run drop command");
944 });
945 }
946
947 pub async fn alter_subscription_retention(
948 self: &Arc<Self>,
949 database_id: DatabaseId,
950 subscription_id: SubscriptionId,
951 table_id: TableId,
952 retention_second: u64,
953 ) -> MetaResult<()> {
954 let command = Command::AlterSubscriptionRetention {
955 subscription_id,
956 upstream_mv_table_id: table_id,
957 retention_second,
958 };
959
960 tracing::debug!("sending Command::AlterSubscriptionRetention");
961 self.barrier_scheduler
962 .run_command(database_id, command)
963 .await?;
964 Ok(())
965 }
966}