1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::iter;
17use std::sync::Arc;
18
19use await_tree::span;
20use futures::future::join_all;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::id::JobId;
26use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
27use risingwave_meta_model::prelude::Fragment as FragmentModel;
28use risingwave_meta_model::{ObjectId, StreamingParallelism, fragment};
29use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
30use risingwave_pb::expr::PbExprNode;
31use risingwave_pb::meta::table_fragments::ActorStatus;
32use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
33use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
34use thiserror_ext::AsReport;
35use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
36use tracing::Instrument;
37
38use super::{FragmentBackfillOrder, Locations, ReschedulePolicy, ScaleControllerRef};
39use crate::barrier::{
40 BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
41 ReplaceStreamJobPlan, SnapshotBackfillInfo,
42};
43use crate::controller::catalog::DropTableConnectorContext;
44use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
45use crate::error::bail_invalid_parameter;
46use crate::manager::{
47 MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
48};
49use crate::model::{
50 ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
51 FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate,
52};
53use crate::stream::SourceManagerRef;
54use crate::stream::cdc::{
55 assign_cdc_table_snapshot_splits, is_parallelized_backfill_enabled_cdc_scan_fragment,
56};
57use crate::{MetaError, MetaResult};
58
59pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
60
61#[derive(Default)]
62pub struct CreateStreamingJobOption {
63 }
65
66#[derive(Debug, Clone)]
67pub struct UpstreamSinkInfo {
68 pub sink_id: ObjectId,
69 pub sink_fragment_id: FragmentId,
70 pub sink_output_fields: Vec<PbField>,
71 pub sink_original_target_columns: Vec<PbColumnCatalog>,
73 pub project_exprs: Vec<PbExprNode>,
74 pub new_sink_downstream: DownstreamFragmentRelation,
75}
76
77pub struct CreateStreamingJobContext {
81 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
83 pub new_no_shuffle: FragmentNewNoShuffle,
84 pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
85
86 pub building_locations: Locations,
88
89 pub definition: String,
91
92 pub mv_table_id: Option<u32>,
93
94 pub create_type: CreateType,
95
96 pub job_type: StreamingJobType,
97
98 pub new_upstream_sink: Option<UpstreamSinkInfo>,
100
101 pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
102 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
103
104 pub option: CreateStreamingJobOption,
105
106 pub streaming_job: StreamingJob,
107
108 pub fragment_backfill_ordering: FragmentBackfillOrder,
109
110 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
111}
112
113struct StreamingJobExecution {
114 id: JobId,
115 shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
116 _permit: OwnedSemaphorePermit,
117}
118
119impl StreamingJobExecution {
120 fn new(
121 id: JobId,
122 shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
123 permit: OwnedSemaphorePermit,
124 ) -> Self {
125 Self {
126 id,
127 shutdown_tx: Some(shutdown_tx),
128 _permit: permit,
129 }
130 }
131}
132
133#[derive(Default)]
134struct CreatingStreamingJobInfo {
135 streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
136}
137
138impl CreatingStreamingJobInfo {
139 async fn add_job(&self, job: StreamingJobExecution) {
140 let mut jobs = self.streaming_jobs.lock().await;
141 jobs.insert(job.id, job);
142 }
143
144 async fn delete_job(&self, job_id: JobId) {
145 let mut jobs = self.streaming_jobs.lock().await;
146 jobs.remove(&job_id);
147 }
148
149 async fn cancel_jobs(
150 &self,
151 job_ids: Vec<JobId>,
152 ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
153 let mut jobs = self.streaming_jobs.lock().await;
154 let mut receivers = HashMap::new();
155 let mut background_job_ids = vec![];
156 for job_id in job_ids {
157 if let Some(job) = jobs.get_mut(&job_id) {
158 if let Some(shutdown_tx) = job.shutdown_tx.take() {
159 let (tx, rx) = oneshot::channel();
160 match shutdown_tx.send(tx) {
161 Ok(()) => {
162 receivers.insert(job_id, rx);
163 }
164 Err(_) => {
165 return Err(anyhow::anyhow!(
166 "failed to send shutdown signal for streaming job {}: receiver dropped",
167 job_id
168 )
169 .into());
170 }
171 }
172 }
173 } else {
174 background_job_ids.push(job_id);
176 }
177 }
178
179 Ok((receivers, background_job_ids))
180 }
181
182 async fn check_job_exists(&self, job_id: JobId) -> bool {
183 let jobs = self.streaming_jobs.lock().await;
184 jobs.contains_key(&job_id)
185 }
186}
187
188type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
189
190#[derive(Debug, Clone)]
191pub struct AutoRefreshSchemaSinkContext {
192 pub tmp_sink_id: JobId,
193 pub original_sink: PbSink,
194 pub original_fragment: Fragment,
195 pub new_schema: Vec<PbColumnCatalog>,
196 pub newly_add_fields: Vec<Field>,
197 pub new_fragment: Fragment,
198 pub new_log_store_table: Option<PbTable>,
199 pub actor_status: BTreeMap<ActorId, ActorStatus>,
200}
201
202impl AutoRefreshSchemaSinkContext {
203 pub fn new_fragment_info(&self) -> InflightFragmentInfo {
204 InflightFragmentInfo {
205 fragment_id: self.new_fragment.fragment_id,
206 distribution_type: self.new_fragment.distribution_type.into(),
207 fragment_type_mask: self.new_fragment.fragment_type_mask,
208 vnode_count: self.new_fragment.vnode_count(),
209 nodes: self.new_fragment.nodes.clone(),
210 actors: self
211 .new_fragment
212 .actors
213 .iter()
214 .map(|actor| {
215 (
216 actor.actor_id as _,
217 InflightActorInfo {
218 worker_id: self.actor_status[&actor.actor_id]
219 .location
220 .as_ref()
221 .unwrap()
222 .worker_node_id as _,
223 vnode_bitmap: actor.vnode_bitmap.clone(),
224 splits: vec![],
225 },
226 )
227 })
228 .collect(),
229 state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
230 }
231 }
232}
233
234pub struct ReplaceStreamJobContext {
238 pub old_fragments: StreamJobFragments,
240
241 pub replace_upstream: FragmentReplaceUpstream,
243 pub new_no_shuffle: FragmentNewNoShuffle,
244
245 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
247
248 pub building_locations: Locations,
250
251 pub streaming_job: StreamingJob,
252
253 pub tmp_id: JobId,
254
255 pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
257
258 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
259}
260
261pub struct GlobalStreamManager {
263 pub env: MetaSrvEnv,
264
265 pub metadata_manager: MetadataManager,
266
267 pub barrier_scheduler: BarrierScheduler,
269
270 pub source_manager: SourceManagerRef,
272
273 creating_job_info: CreatingStreamingJobInfoRef,
275
276 pub scale_controller: ScaleControllerRef,
277}
278
279impl GlobalStreamManager {
280 pub fn new(
281 env: MetaSrvEnv,
282 metadata_manager: MetadataManager,
283 barrier_scheduler: BarrierScheduler,
284 source_manager: SourceManagerRef,
285 scale_controller: ScaleControllerRef,
286 ) -> MetaResult<Self> {
287 Ok(Self {
288 env,
289 metadata_manager,
290 barrier_scheduler,
291 source_manager,
292 creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
293 scale_controller,
294 })
295 }
296
297 #[await_tree::instrument]
308 pub async fn create_streaming_job(
309 self: &Arc<Self>,
310 stream_job_fragments: StreamJobFragmentsToCreate,
311 ctx: CreateStreamingJobContext,
312 permit: OwnedSemaphorePermit,
313 ) -> MetaResult<NotificationVersion> {
314 let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
315 let await_tree_span = span!(
316 "{:?}({})",
317 ctx.streaming_job.job_type(),
318 ctx.streaming_job.name()
319 );
320
321 let job_id = stream_job_fragments.stream_job_id();
322 let database_id = ctx.streaming_job.database_id();
323
324 let (cancel_tx, cancel_rx) = oneshot::channel();
325 let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
326 self.creating_job_info.add_job(execution).await;
327
328 let stream_manager = self.clone();
329 let fut = async move {
330 let create_type = ctx.create_type;
331 let streaming_job = stream_manager
332 .run_create_streaming_job_command(stream_job_fragments, ctx)
333 .await?;
334 let version = match create_type {
335 CreateType::Background => {
336 stream_manager
337 .env
338 .notification_manager_ref()
339 .current_version()
340 .await
341 }
342 CreateType::Foreground => {
343 stream_manager
344 .metadata_manager
345 .wait_streaming_job_finished(database_id, streaming_job.id() as _)
346 .await?
347 }
348 CreateType::Unspecified => unreachable!(),
349 };
350
351 tracing::debug!(?streaming_job, "stream job finish");
352 Ok(version)
353 }
354 .in_current_span();
355
356 let create_fut = (self.env.await_tree_reg())
357 .register(await_tree_key, await_tree_span)
358 .instrument(Box::pin(fut));
359
360 let result = tokio::select! {
361 biased;
362
363 res = create_fut => res,
364 notifier = cancel_rx => {
365 let notifier = notifier.expect("sender should not be dropped");
366 tracing::debug!(id=%job_id, "cancelling streaming job");
367
368 if let Ok(job_fragments) = self.metadata_manager.get_job_fragments_by_id(job_id)
369 .await {
370 if self.barrier_scheduler.try_cancel_scheduled_create(database_id, job_id) {
372 tracing::debug!("cancelling streaming job {job_id} in buffer queue.");
373 } else if !job_fragments.is_created() {
374 tracing::debug!("cancelling streaming job {job_id} by issue cancel command.");
375
376 let cancel_command = self.metadata_manager.catalog_controller
377 .build_cancel_command(&job_fragments)
378 .await?;
379 self.metadata_manager.catalog_controller
380 .try_abort_creating_streaming_job(job_id, true)
381 .await?;
382
383 self.barrier_scheduler.run_command(database_id, cancel_command).await?;
384 } else {
385 let _ = notifier.send(false).inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
387 return self.metadata_manager.wait_streaming_job_finished(database_id, job_id).await;
388 }
389 }
390 notifier.send(true).expect("receiver should not be dropped");
391 Err(MetaError::cancelled("create"))
392 }
393 };
394
395 tracing::info!("cleaning creating job info: {}", job_id);
396 self.creating_job_info.delete_job(job_id).await;
397 result
398 }
399
400 #[await_tree::instrument]
403 async fn run_create_streaming_job_command(
404 &self,
405 stream_job_fragments: StreamJobFragmentsToCreate,
406 CreateStreamingJobContext {
407 streaming_job,
408 upstream_fragment_downstreams,
409 new_no_shuffle,
410 upstream_actors,
411 definition,
412 create_type,
413 job_type,
414 new_upstream_sink,
415 snapshot_backfill_info,
416 cross_db_snapshot_backfill_info,
417 fragment_backfill_ordering,
418 locality_fragment_state_table_mapping,
419 ..
420 }: CreateStreamingJobContext,
421 ) -> MetaResult<StreamingJob> {
422 tracing::debug!(
423 table_id = %stream_job_fragments.stream_job_id(),
424 "built actors finished"
425 );
426
427 let mut init_split_assignment = self
432 .source_manager
433 .allocate_splits(&stream_job_fragments)
434 .await?;
435
436 init_split_assignment.extend(
437 self.source_manager
438 .allocate_splits_for_backfill(
439 &stream_job_fragments,
440 &new_no_shuffle,
441 &upstream_actors,
442 )
443 .await?,
444 );
445
446 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
447 stream_job_fragments.stream_job_id,
448 &stream_job_fragments,
449 self.env.meta_store_ref(),
450 )
451 .await?;
452 let cdc_table_snapshot_split_assignment = if !cdc_table_snapshot_split_assignment.is_empty()
453 {
454 self.env.cdc_table_backfill_tracker.track_new_job(
455 stream_job_fragments.stream_job_id,
456 cdc_table_snapshot_split_assignment
457 .values()
458 .map(|s| u64::try_from(s.len()).unwrap())
459 .sum(),
460 );
461 self.env
462 .cdc_table_backfill_tracker
463 .add_fragment_table_mapping(
464 stream_job_fragments
465 .fragments
466 .values()
467 .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
468 .map(|f| f.fragment_id),
469 stream_job_fragments.stream_job_id,
470 );
471 CdcTableSnapshotSplitAssignmentWithGeneration::new(
472 cdc_table_snapshot_split_assignment,
473 self.env
474 .cdc_table_backfill_tracker
475 .next_generation(iter::once(stream_job_fragments.stream_job_id)),
476 )
477 } else {
478 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
479 };
480
481 let info = CreateStreamingJobCommandInfo {
482 stream_job_fragments,
483 upstream_fragment_downstreams,
484 init_split_assignment,
485 definition: definition.clone(),
486 streaming_job: streaming_job.clone(),
487 job_type,
488 create_type,
489 fragment_backfill_ordering,
490 cdc_table_snapshot_split_assignment,
491 locality_fragment_state_table_mapping,
492 };
493
494 let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
495 tracing::debug!(
496 ?snapshot_backfill_info,
497 "sending Command::CreateSnapshotBackfillStreamingJob"
498 );
499 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
500 } else {
501 tracing::debug!("sending Command::CreateStreamingJob");
502 if let Some(new_upstream_sink) = new_upstream_sink {
503 CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
504 } else {
505 CreateStreamingJobType::Normal
506 }
507 };
508
509 let command = Command::CreateStreamingJob {
510 info,
511 job_type,
512 cross_db_snapshot_backfill_info,
513 };
514
515 self.barrier_scheduler
516 .run_command(streaming_job.database_id(), command)
517 .await?;
518
519 tracing::debug!(?streaming_job, "first barrier collected for stream job");
520
521 Ok(streaming_job)
522 }
523
524 pub async fn replace_stream_job(
526 &self,
527 new_fragments: StreamJobFragmentsToCreate,
528 ReplaceStreamJobContext {
529 old_fragments,
530 replace_upstream,
531 new_no_shuffle,
532 upstream_fragment_downstreams,
533 tmp_id,
534 streaming_job,
535 drop_table_connector_ctx,
536 auto_refresh_schema_sinks,
537 ..
538 }: ReplaceStreamJobContext,
539 ) -> MetaResult<()> {
540 let init_split_assignment = if streaming_job.is_source() {
541 self.source_manager
542 .allocate_splits_for_replace_source(
543 &new_fragments,
544 &replace_upstream,
545 &new_no_shuffle,
546 )
547 .await?
548 } else {
549 self.source_manager.allocate_splits(&new_fragments).await?
550 };
551 tracing::info!(
552 "replace_stream_job - allocate split: {:?}",
553 init_split_assignment
554 );
555
556 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
557 old_fragments.stream_job_id,
558 &new_fragments.inner,
559 self.env.meta_store_ref(),
560 )
561 .await?;
562
563 self.barrier_scheduler
564 .run_command(
565 streaming_job.database_id(),
566 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
567 old_fragments,
568 new_fragments,
569 replace_upstream,
570 upstream_fragment_downstreams,
571 init_split_assignment,
572 streaming_job,
573 tmp_id,
574 to_drop_state_table_ids: {
575 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
576 vec![drop_table_connector_ctx.to_remove_state_table_id]
577 } else {
578 Vec::new()
579 }
580 },
581 auto_refresh_schema_sinks,
582 cdc_table_snapshot_split_assignment,
583 }),
584 )
585 .await?;
586
587 Ok(())
588 }
589
590 pub async fn drop_streaming_jobs(
594 &self,
595 database_id: DatabaseId,
596 removed_actors: Vec<ActorId>,
597 streaming_job_ids: Vec<JobId>,
598 state_table_ids: Vec<risingwave_meta_model::TableId>,
599 fragment_ids: HashSet<FragmentId>,
600 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
601 ) {
602 for &job_id in &streaming_job_ids {
604 if self.creating_job_info.check_job_exists(job_id).await {
605 tracing::info!(
606 ?job_id,
607 "streaming job is creating, cancel it with drop directly"
608 );
609 self.metadata_manager
610 .notify_cancelled(database_id, job_id)
611 .await;
612 }
613 }
614
615 if !removed_actors.is_empty()
616 || !streaming_job_ids.is_empty()
617 || !state_table_ids.is_empty()
618 {
619 let _ = self
620 .barrier_scheduler
621 .run_command(
622 database_id,
623 Command::DropStreamingJobs {
624 streaming_job_ids: streaming_job_ids.into_iter().collect(),
625 actors: removed_actors,
626 unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
627 unregistered_fragment_ids: fragment_ids,
628 dropped_sink_fragment_by_targets,
629 },
630 )
631 .await
632 .inspect_err(|err| {
633 tracing::error!(error = ?err.as_report(), "failed to run drop command");
634 });
635 }
636 }
637
638 pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
645 if job_ids.is_empty() {
646 return Ok(vec![]);
647 }
648
649 let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
650 let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
651
652 let futures = receivers.into_iter().map(|(id, receiver)| async move {
653 if let Ok(cancelled) = receiver.await
654 && cancelled
655 {
656 tracing::info!("canceled streaming job {id}");
657 Ok(id)
658 } else {
659 Err(MetaError::from(anyhow::anyhow!(
660 "failed to cancel streaming job {id}"
661 )))
662 }
663 });
664 let mut cancelled_ids = join_all(futures)
665 .await
666 .into_iter()
667 .collect::<MetaResult<Vec<_>>>()?;
668
669 let futures = background_job_ids.into_iter().map(|id| async move {
672 let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
673 if fragment.is_created() {
674 Err(MetaError::invalid_parameter(format!(
675 "streaming job {} is already created",
676 id
677 )))?;
678 }
679
680 let cancel_command = self
681 .metadata_manager
682 .catalog_controller
683 .build_cancel_command(&fragment)
684 .await?;
685
686 let (_, database_id) = self
687 .metadata_manager
688 .catalog_controller
689 .try_abort_creating_streaming_job(id, true)
690 .await?;
691
692 if let Some(database_id) = database_id {
693 self.barrier_scheduler
694 .run_command(database_id, cancel_command)
695 .await?;
696 }
697
698 tracing::info!(?id, "cancelled recovered streaming job");
699 Ok(id)
700 });
701 let cancelled_recovered_ids = join_all(futures)
702 .await
703 .into_iter()
704 .collect::<MetaResult<Vec<_>>>()?;
705
706 cancelled_ids.extend(cancelled_recovered_ids);
707 Ok(cancelled_ids)
708 }
709
710 pub(crate) async fn reschedule_streaming_job(
711 &self,
712 job_id: JobId,
713 policy: ReschedulePolicy,
714 deferred: bool,
715 ) -> MetaResult<()> {
716 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
717
718 let background_jobs = self
719 .metadata_manager
720 .list_background_creating_jobs()
721 .await?;
722
723 if !background_jobs.is_empty() {
724 let related_jobs = self
725 .scale_controller
726 .resolve_related_no_shuffle_jobs(&background_jobs)
727 .await?;
728
729 if related_jobs.contains(&job_id) {
730 bail!(
731 "Cannot alter the job {} because the related job {:?} is currently being created",
732 job_id,
733 background_jobs,
734 );
735 }
736 }
737
738 let worker_nodes = self
739 .metadata_manager
740 .list_active_streaming_compute_nodes()
741 .await?
742 .into_iter()
743 .filter(|w| w.is_streaming_schedulable())
744 .collect_vec();
745 let workers = worker_nodes.into_iter().map(|x| (x.id as i32, x)).collect();
746
747 let commands = self
748 .scale_controller
749 .reschedule_inplace(HashMap::from([(job_id, policy)]), workers)
750 .await?;
751
752 if !deferred {
753 let _source_pause_guard = self.source_manager.pause_tick().await;
754
755 for (database_id, command) in commands {
756 self.barrier_scheduler
757 .run_command(database_id, command)
758 .await?;
759 }
760 }
761
762 Ok(())
763 }
764
765 pub(crate) async fn reschedule_cdc_table_backfill(
767 &self,
768 job_id: u32,
769 target: ReschedulePolicy,
770 ) -> MetaResult<()> {
771 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
772
773 let job_id = TableId::new(job_id);
774
775 let parallelism_policy = match target {
776 ReschedulePolicy::Parallelism(policy)
777 if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
778 {
779 policy
780 }
781 _ => bail_invalid_parameter!(
782 "CDC backfill reschedule only supports fixed parallelism targets"
783 ),
784 };
785
786 let worker_nodes = self
787 .metadata_manager
788 .list_active_streaming_compute_nodes()
789 .await?
790 .into_iter()
791 .filter(|w| w.is_streaming_schedulable())
792 .collect_vec();
793 let workers = worker_nodes.into_iter().map(|x| (x.id as i32, x)).collect();
794
795 let cdc_fragment_id = {
796 let inner = self.metadata_manager.catalog_controller.inner.read().await;
797 let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
798 .select_only()
799 .columns([
800 fragment::Column::FragmentId,
801 fragment::Column::FragmentTypeMask,
802 ])
803 .filter(fragment::Column::JobId.eq(job_id))
804 .into_tuple()
805 .all(&inner.db)
806 .await?;
807
808 let cdc_fragments = fragments
809 .into_iter()
810 .filter_map(|(fragment_id, mask)| {
811 FragmentTypeMask::from(mask)
812 .contains(FragmentTypeFlag::StreamCdcScan)
813 .then_some(fragment_id)
814 })
815 .collect_vec();
816
817 match cdc_fragments.len() {
818 0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
819 1 => cdc_fragments[0],
820 _ => bail_invalid_parameter!(
821 "multiple StreamCdcScan fragments found for job {}; expected exactly one",
822 job_id
823 ),
824 }
825 };
826
827 let fragment_policy = HashMap::from([(
828 cdc_fragment_id,
829 Some(parallelism_policy.parallelism.clone()),
830 )]);
831
832 let commands = self
833 .scale_controller
834 .reschedule_fragment_inplace(fragment_policy, workers)
835 .await?;
836
837 let _source_pause_guard = self.source_manager.pause_tick().await;
838
839 for (database_id, command) in commands {
840 self.barrier_scheduler
841 .run_command(database_id, command)
842 .await?;
843 }
844
845 Ok(())
846 }
847
848 pub(crate) async fn reschedule_fragments(
849 &self,
850 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
851 ) -> MetaResult<()> {
852 if fragment_targets.is_empty() {
853 return Ok(());
854 }
855
856 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
857
858 let workers = self
859 .metadata_manager
860 .list_active_streaming_compute_nodes()
861 .await?
862 .into_iter()
863 .filter(|w| w.is_streaming_schedulable())
864 .map(|worker| (worker.id as i32, worker))
865 .collect();
866
867 let fragment_policy = fragment_targets
868 .into_iter()
869 .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
870 .collect();
871
872 let commands = self
873 .scale_controller
874 .reschedule_fragment_inplace(fragment_policy, workers)
875 .await?;
876
877 let _source_pause_guard = self.source_manager.pause_tick().await;
878
879 for (database_id, command) in commands {
880 self.barrier_scheduler
881 .run_command(database_id, command)
882 .await?;
883 }
884
885 Ok(())
886 }
887
888 pub async fn create_subscription(
890 self: &Arc<Self>,
891 subscription: &Subscription,
892 ) -> MetaResult<()> {
893 let command = Command::CreateSubscription {
894 subscription_id: subscription.id,
895 upstream_mv_table_id: TableId::new(subscription.dependent_table_id),
896 retention_second: subscription.retention_seconds,
897 };
898
899 tracing::debug!("sending Command::CreateSubscription");
900 self.barrier_scheduler
901 .run_command(subscription.database_id.into(), command)
902 .await?;
903 Ok(())
904 }
905
906 pub async fn drop_subscription(
908 self: &Arc<Self>,
909 database_id: DatabaseId,
910 subscription_id: u32,
911 table_id: u32,
912 ) {
913 let command = Command::DropSubscription {
914 subscription_id,
915 upstream_mv_table_id: TableId::new(table_id),
916 };
917
918 tracing::debug!("sending Command::DropSubscriptions");
919 let _ = self
920 .barrier_scheduler
921 .run_command(database_id, command)
922 .await
923 .inspect_err(|err| {
924 tracing::error!(error = ?err.as_report(), "failed to run drop command");
925 });
926 }
927}