1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::span;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::{JobId, SinkId};
25use risingwave_connector::source::CdcTableSnapshotSplitRaw;
26use risingwave_meta_model::prelude::Fragment as FragmentModel;
27use risingwave_meta_model::{StreamingParallelism, WorkerId, fragment, streaming_job};
28use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
29use risingwave_pb::expr::PbExprNode;
30use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
31use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
32use thiserror_ext::AsReport;
33use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
34use tracing::Instrument;
35
36use super::{
37 ReschedulePolicy, ScaleControllerRef, StreamFragmentGraph, UserDefinedFragmentBackfillOrder,
38};
39use crate::barrier::{
40 BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
41 ReplaceStreamJobPlan, SnapshotBackfillInfo,
42};
43use crate::controller::catalog::DropTableConnectorContext;
44use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
45use crate::error::bail_invalid_parameter;
46use crate::manager::{
47 MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
48};
49use crate::model::{
50 ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
51 FragmentReplaceUpstream, StreamActor, StreamContext, StreamJobFragments,
52 StreamJobFragmentsToCreate, SubscriptionId,
53};
54use crate::stream::{ReplaceJobSplitPlan, SourceManagerRef};
55use crate::{MetaError, MetaResult};
56
57pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
58
59#[derive(Default)]
60pub struct CreateStreamingJobOption {
61 }
63
64#[derive(Debug, Clone)]
65pub struct UpstreamSinkInfo {
66 pub sink_id: SinkId,
67 pub sink_fragment_id: FragmentId,
68 pub sink_output_fields: Vec<PbField>,
69 pub sink_original_target_columns: Vec<PbColumnCatalog>,
71 pub project_exprs: Vec<PbExprNode>,
72 pub new_sink_downstream: DownstreamFragmentRelation,
73}
74
75pub struct CreateStreamingJobContext {
79 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
81
82 pub database_resource_group: String,
84
85 pub definition: String,
87
88 pub create_type: CreateType,
89
90 pub job_type: StreamingJobType,
91
92 pub new_upstream_sink: Option<UpstreamSinkInfo>,
94
95 pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
96 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
97
98 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
99
100 pub option: CreateStreamingJobOption,
101
102 pub streaming_job: StreamingJob,
103
104 pub fragment_backfill_ordering: UserDefinedFragmentBackfillOrder,
105
106 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
107
108 pub is_serverless_backfill: bool,
109
110 pub streaming_job_model: streaming_job::Model,
112}
113
114struct StreamingJobExecution {
115 id: JobId,
116 shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
117 _permit: OwnedSemaphorePermit,
118}
119
120impl StreamingJobExecution {
121 fn new(
122 id: JobId,
123 shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
124 permit: OwnedSemaphorePermit,
125 ) -> Self {
126 Self {
127 id,
128 shutdown_tx: Some(shutdown_tx),
129 _permit: permit,
130 }
131 }
132}
133
134#[derive(Default)]
135struct CreatingStreamingJobInfo {
136 streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
137}
138
139impl CreatingStreamingJobInfo {
140 async fn add_job(&self, job: StreamingJobExecution) {
141 let mut jobs = self.streaming_jobs.lock().await;
142 jobs.insert(job.id, job);
143 }
144
145 async fn delete_job(&self, job_id: JobId) {
146 let mut jobs = self.streaming_jobs.lock().await;
147 jobs.remove(&job_id);
148 }
149
150 async fn cancel_jobs(
151 &self,
152 job_ids: Vec<JobId>,
153 ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
154 let mut jobs = self.streaming_jobs.lock().await;
155 let mut receivers = HashMap::new();
156 let mut background_job_ids = vec![];
157 for job_id in job_ids {
158 if let Some(job) = jobs.get_mut(&job_id) {
159 if let Some(shutdown_tx) = job.shutdown_tx.take() {
160 let (tx, rx) = oneshot::channel();
161 match shutdown_tx.send(tx) {
162 Ok(()) => {
163 receivers.insert(job_id, rx);
164 }
165 Err(_) => {
166 return Err(anyhow::anyhow!(
167 "failed to send shutdown signal for streaming job {}: receiver dropped",
168 job_id
169 )
170 .into());
171 }
172 }
173 }
174 } else {
175 background_job_ids.push(job_id);
177 }
178 }
179
180 Ok((receivers, background_job_ids))
181 }
182}
183
184type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
185
186#[derive(Debug, Clone)]
187pub struct AutoRefreshSchemaSinkContext {
188 pub tmp_sink_id: SinkId,
189 pub original_sink: PbSink,
190 pub original_fragment: Fragment,
191 pub new_schema: Vec<PbColumnCatalog>,
192 pub newly_add_fields: Vec<Field>,
193 pub removed_column_names: Vec<String>,
194 pub new_fragment: Fragment,
195 pub new_log_store_table: Option<PbTable>,
196 pub ctx: StreamContext,
198}
199
200impl AutoRefreshSchemaSinkContext {
201 pub fn new_fragment_info(
202 &self,
203 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
204 actor_location: &HashMap<ActorId, WorkerId>,
205 ) -> InflightFragmentInfo {
206 InflightFragmentInfo {
207 fragment_id: self.new_fragment.fragment_id,
208 distribution_type: self.new_fragment.distribution_type.into(),
209 fragment_type_mask: self.new_fragment.fragment_type_mask,
210 vnode_count: self.new_fragment.vnode_count(),
211 nodes: self.new_fragment.nodes.clone(),
212 actors: stream_actors
213 .get(&self.new_fragment.fragment_id)
214 .into_iter()
215 .flatten()
216 .map(|actor| {
217 (
218 actor.actor_id,
219 InflightActorInfo {
220 worker_id: actor_location[&actor.actor_id],
221 vnode_bitmap: actor.vnode_bitmap.clone(),
222 splits: vec![],
223 },
224 )
225 })
226 .collect(),
227 state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
228 }
229 }
230}
231
232pub struct ReplaceStreamJobContext {
236 pub old_fragments: StreamJobFragments,
238
239 pub replace_upstream: FragmentReplaceUpstream,
241
242 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
244
245 pub streaming_job: StreamingJob,
246
247 pub database_resource_group: String,
249
250 pub tmp_id: JobId,
251
252 pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
254
255 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
256
257 pub streaming_job_model: streaming_job::Model,
259}
260
261pub struct GlobalStreamManager {
263 pub env: MetaSrvEnv,
264
265 pub metadata_manager: MetadataManager,
266
267 pub barrier_scheduler: BarrierScheduler,
269
270 pub source_manager: SourceManagerRef,
272
273 creating_job_info: CreatingStreamingJobInfoRef,
275
276 pub scale_controller: ScaleControllerRef,
277}
278
279impl GlobalStreamManager {
280 pub fn new(
281 env: MetaSrvEnv,
282 metadata_manager: MetadataManager,
283 barrier_scheduler: BarrierScheduler,
284 source_manager: SourceManagerRef,
285 scale_controller: ScaleControllerRef,
286 ) -> MetaResult<Self> {
287 Ok(Self {
288 env,
289 metadata_manager,
290 barrier_scheduler,
291 source_manager,
292 creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
293 scale_controller,
294 })
295 }
296
297 #[await_tree::instrument]
308 pub async fn create_streaming_job(
309 self: &Arc<Self>,
310 stream_job_fragments: StreamJobFragmentsToCreate,
311 ctx: CreateStreamingJobContext,
312 permit: OwnedSemaphorePermit,
313 ) -> MetaResult<NotificationVersion> {
314 let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
315 let await_tree_span = span!(
316 "{:?}({})",
317 ctx.streaming_job.job_type(),
318 ctx.streaming_job.name()
319 );
320
321 let job_id = stream_job_fragments.stream_job_id();
322 let database_id = ctx.streaming_job.database_id();
323
324 let (cancel_tx, cancel_rx) = oneshot::channel();
325 let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
326 self.creating_job_info.add_job(execution).await;
327
328 let stream_manager = self.clone();
329 let fut = async move {
330 let create_type = ctx.create_type;
331 let streaming_job = stream_manager
332 .run_create_streaming_job_command(stream_job_fragments, ctx)
333 .await?;
334 let version = match create_type {
335 CreateType::Background => {
336 stream_manager
337 .env
338 .notification_manager_ref()
339 .current_version()
340 .await
341 }
342 CreateType::Foreground => {
343 stream_manager
344 .metadata_manager
345 .wait_streaming_job_finished(database_id, streaming_job.id() as _)
346 .await?
347 }
348 CreateType::Unspecified => unreachable!(),
349 };
350
351 tracing::debug!(?streaming_job, "stream job finish");
352 Ok(version)
353 }
354 .in_current_span();
355
356 let create_fut = (self.env.await_tree_reg())
357 .register(await_tree_key, await_tree_span)
358 .instrument(Box::pin(fut));
359
360 let result = async {
361 tokio::select! {
362 biased;
363
364 res = create_fut => res,
365 notifier = cancel_rx => {
366 let notifier = notifier.expect("sender should not be dropped");
367 tracing::debug!(id=%job_id, "cancelling streaming job");
368
369 enum CancelResult {
370 Completed(MetaResult<NotificationVersion>),
371 Failed(MetaError),
372 Cancelled,
373 }
374
375 let cancel_res = if let Ok(job_fragments) =
376 self.metadata_manager.get_job_fragments_by_id(job_id).await
377 {
378 if self
380 .barrier_scheduler
381 .try_cancel_scheduled_create(database_id, job_id)
382 {
383 tracing::debug!(
384 id=%job_id,
385 "cancelling streaming job in buffer queue."
386 );
387 CancelResult::Cancelled
388 } else if !job_fragments.is_created() {
389 tracing::debug!(
390 id=%job_id,
391 "cancelling streaming job by issue cancel command."
392 );
393
394 let cancel_result: MetaResult<()> = async {
395 let cancel_command = self.metadata_manager.catalog_controller
396 .build_cancel_command(&job_fragments)
397 .await?;
398 self.metadata_manager.catalog_controller
399 .try_abort_creating_streaming_job(job_id, true)
400 .await?;
401
402 self.barrier_scheduler
403 .run_command(database_id, cancel_command)
404 .await?;
405 Ok(())
406 }
407 .await;
408
409 match cancel_result {
410 Ok(()) => CancelResult::Cancelled,
411 Err(err) => {
412 tracing::warn!(
413 error = ?err.as_report(),
414 id = %job_id,
415 "failed to run cancel command for creating streaming job"
416 );
417 CancelResult::Failed(err)
418 }
419 }
420 } else {
421 CancelResult::Completed(
423 self.metadata_manager
424 .wait_streaming_job_finished(database_id, job_id)
425 .await,
426 )
427 }
428 } else {
429 CancelResult::Cancelled
430 };
431
432 let (cancelled, result) = match cancel_res {
433 CancelResult::Completed(result) => (false, result),
434 CancelResult::Failed(err) => (false, Err(err)),
435 CancelResult::Cancelled => (true, Err(MetaError::cancelled("create"))),
436 };
437
438 let _ = notifier
439 .send(cancelled)
440 .inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
441
442 result
443 }
444 }
445 }
446 .await;
447
448 tracing::debug!("cleaning creating job info: {}", job_id);
449 self.creating_job_info.delete_job(job_id).await;
450 result
451 }
452
453 #[await_tree::instrument]
456 async fn run_create_streaming_job_command(
457 &self,
458 stream_job_fragments: StreamJobFragmentsToCreate,
459 CreateStreamingJobContext {
460 streaming_job,
461 upstream_fragment_downstreams,
462 database_resource_group,
463 definition,
464 create_type,
465 job_type,
466 new_upstream_sink,
467 snapshot_backfill_info,
468 cross_db_snapshot_backfill_info,
469 fragment_backfill_ordering,
470 locality_fragment_state_table_mapping,
471 cdc_table_snapshot_splits,
472 is_serverless_backfill,
473 streaming_job_model,
474 ..
475 }: CreateStreamingJobContext,
476 ) -> MetaResult<StreamingJob> {
477 tracing::debug!(
478 table_id = %stream_job_fragments.stream_job_id(),
479 "built actors finished"
480 );
481
482 let init_split_assignment = self
487 .source_manager
488 .discover_splits(&stream_job_fragments)
489 .await?;
490
491 let fragment_backfill_ordering =
492 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
493 fragment_backfill_ordering,
494 &stream_job_fragments.downstreams,
495 || {
496 stream_job_fragments
497 .fragments
498 .iter()
499 .map(|(fragment_id, fragment)| {
500 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
501 })
502 },
503 );
504
505 let info = CreateStreamingJobCommandInfo {
506 stream_job_fragments,
507 upstream_fragment_downstreams,
508 init_split_assignment,
509 definition: definition.clone(),
510 streaming_job: streaming_job.clone(),
511 job_type,
512 create_type,
513 database_resource_group,
514 fragment_backfill_ordering,
515 cdc_table_snapshot_splits,
516 locality_fragment_state_table_mapping,
517 is_serverless: is_serverless_backfill,
518 streaming_job_model,
519 };
520
521 let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
522 tracing::debug!(
523 ?snapshot_backfill_info,
524 "sending Command::CreateSnapshotBackfillStreamingJob"
525 );
526 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
527 } else {
528 tracing::debug!("sending Command::CreateStreamingJob");
529 if let Some(new_upstream_sink) = new_upstream_sink {
530 CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
531 } else {
532 CreateStreamingJobType::Normal
533 }
534 };
535
536 let command = Command::CreateStreamingJob {
537 info,
538 job_type,
539 cross_db_snapshot_backfill_info,
540 };
541
542 self.barrier_scheduler
543 .run_command(streaming_job.database_id(), command)
544 .await?;
545
546 tracing::debug!(?streaming_job, "first barrier collected for stream job");
547
548 Ok(streaming_job)
549 }
550
551 pub async fn replace_stream_job(
553 &self,
554 new_fragments: StreamJobFragmentsToCreate,
555 ReplaceStreamJobContext {
556 old_fragments,
557 replace_upstream,
558 upstream_fragment_downstreams,
559 tmp_id,
560 streaming_job,
561 drop_table_connector_ctx,
562 auto_refresh_schema_sinks,
563 streaming_job_model,
564 database_resource_group,
565 }: ReplaceStreamJobContext,
566 ) -> MetaResult<()> {
567 let split_plan = if streaming_job.is_source() {
572 match self
573 .source_manager
574 .discover_splits_for_replace_source(&new_fragments, &replace_upstream)
575 .await?
576 {
577 Some(discovered) => ReplaceJobSplitPlan::Discovered(discovered),
578 None => ReplaceJobSplitPlan::AlignFromPrevious,
579 }
580 } else {
581 let discovered = self.source_manager.discover_splits(&new_fragments).await?;
582 ReplaceJobSplitPlan::Discovered(discovered)
583 };
584 tracing::info!("replace_stream_job - split plan: {:?}", split_plan);
585
586 self.barrier_scheduler
587 .run_command(
588 streaming_job.database_id(),
589 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
590 old_fragments,
591 new_fragments,
592 database_resource_group,
593 replace_upstream,
594 upstream_fragment_downstreams,
595 split_plan,
596 streaming_job,
597 streaming_job_model,
598 tmp_id,
599 to_drop_state_table_ids: {
600 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
601 vec![drop_table_connector_ctx.to_remove_state_table_id]
602 } else {
603 Vec::new()
604 }
605 },
606 auto_refresh_schema_sinks,
607 }),
608 )
609 .await?;
610
611 Ok(())
612 }
613
614 pub async fn drop_streaming_jobs(
618 &self,
619 database_id: DatabaseId,
620 streaming_job_ids: Vec<JobId>,
621 state_table_ids: Vec<TableId>,
622 fragment_ids: HashSet<FragmentId>,
623 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
624 ) {
625 if !streaming_job_ids.is_empty() || !state_table_ids.is_empty() {
626 let _ = self
627 .barrier_scheduler
628 .run_command(
629 database_id,
630 Command::DropStreamingJobs {
631 streaming_job_ids: streaming_job_ids.into_iter().collect(),
632 unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
633 unregistered_fragment_ids: fragment_ids,
634 dropped_sink_fragment_by_targets,
635 },
636 )
637 .await
638 .inspect_err(|err| {
639 tracing::error!(error = ?err.as_report(), "failed to run drop command");
640 });
641 }
642 }
643
644 pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
651 if job_ids.is_empty() {
652 return Ok(vec![]);
653 }
654
655 let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
656 let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
657
658 let futures = receivers.into_iter().map(|(id, receiver)| async move {
659 if let Ok(cancelled) = receiver.await
660 && cancelled
661 {
662 tracing::info!("canceled streaming job {id}");
663 Ok(id)
664 } else {
665 Err(MetaError::from(anyhow::anyhow!(
666 "failed to cancel streaming job {id}"
667 )))
668 }
669 });
670 let mut cancelled_ids = join_all(futures)
671 .await
672 .into_iter()
673 .collect::<MetaResult<Vec<_>>>()?;
674
675 let futures = background_job_ids.into_iter().map(|id| async move {
678 let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
679 if fragment.is_created() {
680 tracing::warn!(
681 "streaming job {} is already created, ignore cancel request",
682 id
683 );
684 return Ok(None);
685 }
686 if fragment.is_created() {
687 Err(MetaError::invalid_parameter(format!(
688 "streaming job {} is already created",
689 id
690 )))?;
691 }
692
693 let cancel_command = self
694 .metadata_manager
695 .catalog_controller
696 .build_cancel_command(&fragment)
697 .await?;
698
699 let (_, database_id) = self
700 .metadata_manager
701 .catalog_controller
702 .try_abort_creating_streaming_job(id, true)
703 .await?;
704
705 if let Some(database_id) = database_id {
706 self.barrier_scheduler
707 .run_command(database_id, cancel_command)
708 .await?;
709 }
710
711 tracing::info!(?id, "cancelled background streaming job");
712 Ok(Some(id))
713 });
714 let cancelled_recovered_ids = join_all(futures)
715 .await
716 .into_iter()
717 .collect::<MetaResult<Vec<_>>>()?;
718
719 cancelled_ids.extend(cancelled_recovered_ids.into_iter().flatten());
720 Ok(cancelled_ids)
721 }
722
723 pub(crate) async fn reschedule_streaming_job(
724 &self,
725 job_id: JobId,
726 policy: ReschedulePolicy,
727 deferred: bool,
728 ) -> MetaResult<()> {
729 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
730
731 let background_jobs = self
732 .metadata_manager
733 .list_background_creating_jobs()
734 .await?;
735
736 if !background_jobs.is_empty() {
737 let blocked_jobs = self
738 .metadata_manager
739 .collect_reschedule_blocked_jobs_for_creating_jobs(&background_jobs, !deferred)
740 .await?;
741
742 if blocked_jobs.contains(&job_id) {
743 bail!(
744 "Cannot alter the job {} because it is blocked by creating unreschedulable backfill jobs",
745 job_id,
746 );
747 }
748 }
749
750 let commands = self
751 .scale_controller
752 .reschedule_inplace(HashMap::from([(job_id, policy)]))
753 .await?;
754
755 if !deferred {
756 let _source_pause_guard = self.source_manager.pause_tick().await;
757
758 for (database_id, command) in commands {
759 self.barrier_scheduler
760 .run_command(database_id, command)
761 .await?;
762 }
763 }
764
765 Ok(())
766 }
767
768 pub(crate) async fn reschedule_streaming_job_backfill_parallelism(
769 &self,
770 job_id: JobId,
771 parallelism: Option<StreamingParallelism>,
772 deferred: bool,
773 ) -> MetaResult<()> {
774 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
775
776 let background_jobs = self
777 .metadata_manager
778 .list_background_creating_jobs()
779 .await?;
780
781 if !background_jobs.is_empty() {
782 let unreschedulable = self
783 .metadata_manager
784 .collect_unreschedulable_backfill_jobs(&background_jobs, !deferred)
785 .await?;
786
787 if unreschedulable.contains(&job_id) {
788 bail!(
789 "Cannot alter the job {} because it is a non-reschedulable background backfill job",
790 job_id,
791 );
792 }
793 }
794
795 let commands = self
796 .scale_controller
797 .reschedule_backfill_parallelism_inplace(HashMap::from([(job_id, parallelism)]))
798 .await?;
799
800 if !deferred {
801 let _source_pause_guard = self.source_manager.pause_tick().await;
802
803 for (database_id, command) in commands {
804 self.barrier_scheduler
805 .run_command(database_id, command)
806 .await?;
807 }
808 }
809
810 Ok(())
811 }
812
813 pub(crate) async fn reschedule_cdc_table_backfill(
815 &self,
816 job_id: JobId,
817 target: ReschedulePolicy,
818 ) -> MetaResult<()> {
819 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
820
821 let parallelism_policy = match target {
822 ReschedulePolicy::Parallelism(policy)
823 if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
824 {
825 policy
826 }
827 _ => bail_invalid_parameter!(
828 "CDC backfill reschedule only supports fixed parallelism targets"
829 ),
830 };
831
832 let cdc_fragment_id = {
833 let inner = self.metadata_manager.catalog_controller.inner.read().await;
834 let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
835 .select_only()
836 .columns([
837 fragment::Column::FragmentId,
838 fragment::Column::FragmentTypeMask,
839 ])
840 .filter(fragment::Column::JobId.eq(job_id))
841 .into_tuple()
842 .all(&inner.db)
843 .await?;
844
845 let cdc_fragments = fragments
846 .into_iter()
847 .filter_map(|(fragment_id, mask)| {
848 FragmentTypeMask::from(mask)
849 .contains(FragmentTypeFlag::StreamCdcScan)
850 .then_some(fragment_id)
851 })
852 .collect_vec();
853
854 match cdc_fragments.len() {
855 0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
856 1 => cdc_fragments[0],
857 _ => bail_invalid_parameter!(
858 "multiple StreamCdcScan fragments found for job {}; expected exactly one",
859 job_id
860 ),
861 }
862 };
863
864 let fragment_policy = HashMap::from([(
865 cdc_fragment_id,
866 Some(parallelism_policy.parallelism.clone()),
867 )]);
868
869 let commands = self
870 .scale_controller
871 .reschedule_fragment_inplace(fragment_policy)
872 .await?;
873
874 let _source_pause_guard = self.source_manager.pause_tick().await;
875
876 for (database_id, command) in commands {
877 self.barrier_scheduler
878 .run_command(database_id, command)
879 .await?;
880 }
881
882 Ok(())
883 }
884
885 pub(crate) async fn reschedule_fragments(
886 &self,
887 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
888 ) -> MetaResult<()> {
889 if fragment_targets.is_empty() {
890 return Ok(());
891 }
892
893 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
894
895 let fragment_policy = fragment_targets
896 .into_iter()
897 .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
898 .collect();
899
900 let commands = self
901 .scale_controller
902 .reschedule_fragment_inplace(fragment_policy)
903 .await?;
904
905 let _source_pause_guard = self.source_manager.pause_tick().await;
906
907 for (database_id, command) in commands {
908 self.barrier_scheduler
909 .run_command(database_id, command)
910 .await?;
911 }
912
913 Ok(())
914 }
915
916 pub async fn create_subscription(
918 self: &Arc<Self>,
919 subscription: &Subscription,
920 ) -> MetaResult<()> {
921 let command = Command::CreateSubscription {
922 subscription_id: subscription.id,
923 upstream_mv_table_id: subscription.dependent_table_id,
924 retention_second: subscription.retention_seconds,
925 };
926
927 tracing::debug!("sending Command::CreateSubscription");
928 self.barrier_scheduler
929 .run_command(subscription.database_id, command)
930 .await?;
931 Ok(())
932 }
933
934 pub async fn drop_subscription(
936 self: &Arc<Self>,
937 database_id: DatabaseId,
938 subscription_id: SubscriptionId,
939 table_id: TableId,
940 ) {
941 let command = Command::DropSubscription {
942 subscription_id,
943 upstream_mv_table_id: table_id,
944 };
945
946 tracing::debug!("sending Command::DropSubscriptions");
947 let _ = self
948 .barrier_scheduler
949 .run_command(database_id, command)
950 .await
951 .inspect_err(|err| {
952 tracing::error!(error = ?err.as_report(), "failed to run drop command");
953 });
954 }
955
956 pub async fn alter_subscription_retention(
957 self: &Arc<Self>,
958 database_id: DatabaseId,
959 subscription_id: SubscriptionId,
960 table_id: TableId,
961 retention_second: u64,
962 ) -> MetaResult<()> {
963 let command = Command::AlterSubscriptionRetention {
964 subscription_id,
965 upstream_mv_table_id: table_id,
966 retention_second,
967 };
968
969 tracing::debug!("sending Command::AlterSubscriptionRetention");
970 self.barrier_scheduler
971 .run_command(database_id, command)
972 .await?;
973 Ok(())
974 }
975}