1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::iter;
17use std::sync::Arc;
18
19use await_tree::span;
20use futures::future::join_all;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, Field, TableId};
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
26use risingwave_meta_model::ObjectId;
27use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
28use risingwave_pb::expr::PbExprNode;
29use risingwave_pb::meta::table_fragments::ActorStatus;
30use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
31use thiserror_ext::AsReport;
32use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
33use tracing::Instrument;
34
35use super::{
36 FragmentBackfillOrder, JobParallelismTarget, JobReschedulePolicy, JobReschedulePostUpdates,
37 JobRescheduleTarget, JobResourceGroupTarget, Locations, RescheduleOptions, ScaleControllerRef,
38};
39use crate::barrier::{
40 BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
41 ReplaceStreamJobPlan, SnapshotBackfillInfo,
42};
43use crate::controller::catalog::DropTableConnectorContext;
44use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
45use crate::error::bail_invalid_parameter;
46use crate::manager::{
47 MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
48};
49use crate::model::{
50 ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
51 FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate,
52 TableParallelism,
53};
54use crate::stream::SourceManagerRef;
55use crate::stream::cdc::{
56 assign_cdc_table_snapshot_splits, is_parallelized_backfill_enabled_cdc_scan_fragment,
57};
58use crate::{MetaError, MetaResult};
59
60pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
61
62#[derive(Default)]
63pub struct CreateStreamingJobOption {
64 }
66
67#[derive(Debug, Clone)]
68pub struct UpstreamSinkInfo {
69 pub sink_id: ObjectId,
70 pub sink_fragment_id: FragmentId,
71 pub sink_output_fields: Vec<PbField>,
72 pub sink_original_target_columns: Vec<PbColumnCatalog>,
74 pub project_exprs: Vec<PbExprNode>,
75 pub new_sink_downstream: DownstreamFragmentRelation,
76}
77
78pub struct CreateStreamingJobContext {
82 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
84 pub new_no_shuffle: FragmentNewNoShuffle,
85 pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
86
87 pub building_locations: Locations,
89
90 pub definition: String,
92
93 pub mv_table_id: Option<u32>,
94
95 pub create_type: CreateType,
96
97 pub job_type: StreamingJobType,
98
99 pub new_upstream_sink: Option<UpstreamSinkInfo>,
101
102 pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
103 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
104
105 pub option: CreateStreamingJobOption,
106
107 pub streaming_job: StreamingJob,
108
109 pub fragment_backfill_ordering: FragmentBackfillOrder,
110
111 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
112}
113
114struct StreamingJobExecution {
115 id: TableId,
116 shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
117 _permit: OwnedSemaphorePermit,
118}
119
120impl StreamingJobExecution {
121 fn new(
122 id: TableId,
123 shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
124 permit: OwnedSemaphorePermit,
125 ) -> Self {
126 Self {
127 id,
128 shutdown_tx: Some(shutdown_tx),
129 _permit: permit,
130 }
131 }
132}
133
134#[derive(Default)]
135struct CreatingStreamingJobInfo {
136 streaming_jobs: Mutex<HashMap<TableId, StreamingJobExecution>>,
137}
138
139impl CreatingStreamingJobInfo {
140 async fn add_job(&self, job: StreamingJobExecution) {
141 let mut jobs = self.streaming_jobs.lock().await;
142 jobs.insert(job.id, job);
143 }
144
145 async fn delete_job(&self, job_id: TableId) {
146 let mut jobs = self.streaming_jobs.lock().await;
147 jobs.remove(&job_id);
148 }
149
150 async fn cancel_jobs(
151 &self,
152 job_ids: Vec<TableId>,
153 ) -> (HashMap<TableId, oneshot::Receiver<bool>>, Vec<TableId>) {
154 let mut jobs = self.streaming_jobs.lock().await;
155 let mut receivers = HashMap::new();
156 let mut background_job_ids = vec![];
157 for job_id in job_ids {
158 if let Some(job) = jobs.get_mut(&job_id) {
159 if let Some(shutdown_tx) = job.shutdown_tx.take() {
160 let (tx, rx) = oneshot::channel();
161 if shutdown_tx.send(tx).is_ok() {
162 receivers.insert(job_id, rx);
163 } else {
164 tracing::warn!(id=?job_id, "failed to send canceling state");
165 }
166 }
167 } else {
168 background_job_ids.push(job_id);
170 }
171 }
172 (receivers, background_job_ids)
173 }
174
175 async fn check_job_exists(&self, job_id: TableId) -> bool {
176 let jobs = self.streaming_jobs.lock().await;
177 jobs.contains_key(&job_id)
178 }
179}
180
181type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
182
183#[derive(Debug, Clone)]
184pub struct AutoRefreshSchemaSinkContext {
185 pub tmp_sink_id: ObjectId,
186 pub original_sink: PbSink,
187 pub original_fragment: Fragment,
188 pub new_schema: Vec<PbColumnCatalog>,
189 pub newly_add_fields: Vec<Field>,
190 pub new_fragment: Fragment,
191 pub new_log_store_table: Option<PbTable>,
192 pub actor_status: BTreeMap<ActorId, ActorStatus>,
193}
194
195impl AutoRefreshSchemaSinkContext {
196 pub fn new_fragment_info(&self) -> InflightFragmentInfo {
197 InflightFragmentInfo {
198 fragment_id: self.new_fragment.fragment_id,
199 distribution_type: self.new_fragment.distribution_type.into(),
200 fragment_type_mask: self.new_fragment.fragment_type_mask,
201 vnode_count: self.new_fragment.vnode_count(),
202 nodes: self.new_fragment.nodes.clone(),
203 actors: self
204 .new_fragment
205 .actors
206 .iter()
207 .map(|actor| {
208 (
209 actor.actor_id as _,
210 InflightActorInfo {
211 worker_id: self.actor_status[&actor.actor_id]
212 .location
213 .as_ref()
214 .unwrap()
215 .worker_node_id as _,
216 vnode_bitmap: actor.vnode_bitmap.clone(),
217 splits: vec![],
218 },
219 )
220 })
221 .collect(),
222 state_table_ids: self
223 .new_fragment
224 .state_table_ids
225 .iter()
226 .map(|table| (*table).into())
227 .collect(),
228 }
229 }
230}
231
232pub struct ReplaceStreamJobContext {
236 pub old_fragments: StreamJobFragments,
238
239 pub replace_upstream: FragmentReplaceUpstream,
241 pub new_no_shuffle: FragmentNewNoShuffle,
242
243 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
245
246 pub building_locations: Locations,
248
249 pub streaming_job: StreamingJob,
250
251 pub tmp_id: u32,
252
253 pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
255
256 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
257}
258
259pub struct GlobalStreamManager {
261 pub env: MetaSrvEnv,
262
263 pub metadata_manager: MetadataManager,
264
265 pub barrier_scheduler: BarrierScheduler,
267
268 pub source_manager: SourceManagerRef,
270
271 creating_job_info: CreatingStreamingJobInfoRef,
273
274 pub scale_controller: ScaleControllerRef,
275}
276
277impl GlobalStreamManager {
278 pub fn new(
279 env: MetaSrvEnv,
280 metadata_manager: MetadataManager,
281 barrier_scheduler: BarrierScheduler,
282 source_manager: SourceManagerRef,
283 scale_controller: ScaleControllerRef,
284 ) -> MetaResult<Self> {
285 Ok(Self {
286 env,
287 metadata_manager,
288 barrier_scheduler,
289 source_manager,
290 creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
291 scale_controller,
292 })
293 }
294
295 #[await_tree::instrument]
306 pub async fn create_streaming_job(
307 self: &Arc<Self>,
308 stream_job_fragments: StreamJobFragmentsToCreate,
309 ctx: CreateStreamingJobContext,
310 permit: OwnedSemaphorePermit,
311 ) -> MetaResult<NotificationVersion> {
312 let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
313 let await_tree_span = span!(
314 "{:?}({})",
315 ctx.streaming_job.job_type(),
316 ctx.streaming_job.name()
317 );
318
319 let table_id = stream_job_fragments.stream_job_id();
320 let database_id = ctx.streaming_job.database_id().into();
321
322 let (cancel_tx, cancel_rx) = oneshot::channel();
323 let execution = StreamingJobExecution::new(table_id, cancel_tx, permit);
324 self.creating_job_info.add_job(execution).await;
325
326 let stream_manager = self.clone();
327 let fut = async move {
328 let create_type = ctx.create_type;
329 let streaming_job = stream_manager
330 .run_create_streaming_job_command(stream_job_fragments, ctx)
331 .await?;
332 let version = match create_type {
333 CreateType::Background => {
334 stream_manager
335 .env
336 .notification_manager_ref()
337 .current_version()
338 .await
339 }
340 CreateType::Foreground => {
341 stream_manager
342 .metadata_manager
343 .wait_streaming_job_finished(database_id, streaming_job.id() as _)
344 .await?
345 }
346 CreateType::Unspecified => unreachable!(),
347 };
348
349 tracing::debug!(?streaming_job, "stream job finish");
350 Ok(version)
351 }
352 .in_current_span();
353
354 let create_fut = (self.env.await_tree_reg())
355 .register(await_tree_key, await_tree_span)
356 .instrument(Box::pin(fut));
357
358 let result = tokio::select! {
359 biased;
360
361 res = create_fut => res,
362 notifier = cancel_rx => {
363 let notifier = notifier.expect("sender should not be dropped");
364 tracing::debug!(id=?table_id, "cancelling streaming job");
365
366 if let Ok(job_fragments) = self.metadata_manager.get_job_fragments_by_id(&table_id)
367 .await {
368 if self.barrier_scheduler.try_cancel_scheduled_create(database_id, table_id) {
370 tracing::debug!("cancelling streaming job {table_id} in buffer queue.");
371 } else if !job_fragments.is_created() {
372 tracing::debug!("cancelling streaming job {table_id} by issue cancel command.");
373
374 let cancel_command = self.metadata_manager.catalog_controller
375 .build_cancel_command(&job_fragments)
376 .await?;
377 self.metadata_manager.catalog_controller
378 .try_abort_creating_streaming_job(table_id.table_id as _, true)
379 .await?;
380
381 self.barrier_scheduler.run_command(database_id, cancel_command).await?;
382 } else {
383 let _ = notifier.send(false).inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
385 return self.metadata_manager.wait_streaming_job_finished(database_id, table_id.table_id as _).await;
386 }
387 }
388 notifier.send(true).expect("receiver should not be dropped");
389 Err(MetaError::cancelled("create"))
390 }
391 };
392
393 tracing::info!("cleaning creating job info: {}", table_id);
394 self.creating_job_info.delete_job(table_id).await;
395 result
396 }
397
398 #[await_tree::instrument]
401 async fn run_create_streaming_job_command(
402 &self,
403 stream_job_fragments: StreamJobFragmentsToCreate,
404 CreateStreamingJobContext {
405 streaming_job,
406 upstream_fragment_downstreams,
407 new_no_shuffle,
408 upstream_actors,
409 definition,
410 create_type,
411 job_type,
412 new_upstream_sink,
413 snapshot_backfill_info,
414 cross_db_snapshot_backfill_info,
415 fragment_backfill_ordering,
416 locality_fragment_state_table_mapping,
417 ..
418 }: CreateStreamingJobContext,
419 ) -> MetaResult<StreamingJob> {
420 tracing::debug!(
421 table_id = %stream_job_fragments.stream_job_id(),
422 "built actors finished"
423 );
424
425 let mut init_split_assignment = self
430 .source_manager
431 .allocate_splits(&stream_job_fragments)
432 .await?;
433 init_split_assignment.extend(
434 self.source_manager
435 .allocate_splits_for_backfill(
436 &stream_job_fragments,
437 &new_no_shuffle,
438 &upstream_actors,
439 )
440 .await?,
441 );
442
443 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
444 stream_job_fragments.stream_job_id.table_id,
445 &stream_job_fragments,
446 self.env.meta_store_ref(),
447 )
448 .await?;
449 let cdc_table_snapshot_split_assignment = if !cdc_table_snapshot_split_assignment.is_empty()
450 {
451 self.env.cdc_table_backfill_tracker.track_new_job(
452 stream_job_fragments.stream_job_id.table_id,
453 cdc_table_snapshot_split_assignment
454 .values()
455 .map(|s| u64::try_from(s.len()).unwrap())
456 .sum(),
457 );
458 self.env
459 .cdc_table_backfill_tracker
460 .add_fragment_table_mapping(
461 stream_job_fragments
462 .fragments
463 .values()
464 .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
465 .map(|f| f.fragment_id),
466 stream_job_fragments.stream_job_id.table_id,
467 );
468 CdcTableSnapshotSplitAssignmentWithGeneration::new(
469 cdc_table_snapshot_split_assignment,
470 self.env
471 .cdc_table_backfill_tracker
472 .next_generation(iter::once(stream_job_fragments.stream_job_id.table_id)),
473 )
474 } else {
475 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
476 };
477
478 let info = CreateStreamingJobCommandInfo {
479 stream_job_fragments,
480 upstream_fragment_downstreams,
481 init_split_assignment,
482 definition: definition.clone(),
483 streaming_job: streaming_job.clone(),
484 job_type,
485 create_type,
486 fragment_backfill_ordering,
487 cdc_table_snapshot_split_assignment,
488 locality_fragment_state_table_mapping,
489 };
490
491 let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
492 tracing::debug!(
493 ?snapshot_backfill_info,
494 "sending Command::CreateSnapshotBackfillStreamingJob"
495 );
496 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
497 } else {
498 tracing::debug!("sending Command::CreateStreamingJob");
499 if let Some(new_upstream_sink) = new_upstream_sink {
500 CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
501 } else {
502 CreateStreamingJobType::Normal
503 }
504 };
505
506 let command = Command::CreateStreamingJob {
507 info,
508 job_type,
509 cross_db_snapshot_backfill_info,
510 };
511
512 self.barrier_scheduler
513 .run_command(streaming_job.database_id().into(), command)
514 .await?;
515
516 tracing::debug!(?streaming_job, "first barrier collected for stream job");
517
518 Ok(streaming_job)
519 }
520
521 pub async fn replace_stream_job(
523 &self,
524 new_fragments: StreamJobFragmentsToCreate,
525 ReplaceStreamJobContext {
526 old_fragments,
527 replace_upstream,
528 new_no_shuffle,
529 upstream_fragment_downstreams,
530 tmp_id,
531 streaming_job,
532 drop_table_connector_ctx,
533 auto_refresh_schema_sinks,
534 ..
535 }: ReplaceStreamJobContext,
536 ) -> MetaResult<()> {
537 let init_split_assignment = if streaming_job.is_source() {
538 self.source_manager
539 .allocate_splits_for_replace_source(
540 &new_fragments,
541 &replace_upstream,
542 &new_no_shuffle,
543 )
544 .await?
545 } else {
546 self.source_manager.allocate_splits(&new_fragments).await?
547 };
548 tracing::info!(
549 "replace_stream_job - allocate split: {:?}",
550 init_split_assignment
551 );
552
553 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
554 old_fragments.stream_job_id.table_id,
555 &new_fragments.inner,
556 self.env.meta_store_ref(),
557 )
558 .await?;
559
560 self.barrier_scheduler
561 .run_command(
562 streaming_job.database_id().into(),
563 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
564 old_fragments,
565 new_fragments,
566 replace_upstream,
567 upstream_fragment_downstreams,
568 init_split_assignment,
569 streaming_job,
570 tmp_id,
571 to_drop_state_table_ids: {
572 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
573 vec![TableId::new(
574 drop_table_connector_ctx.to_remove_state_table_id as _,
575 )]
576 } else {
577 Vec::new()
578 }
579 },
580 auto_refresh_schema_sinks,
581 cdc_table_snapshot_split_assignment,
582 }),
583 )
584 .await?;
585
586 Ok(())
587 }
588
589 pub async fn drop_streaming_jobs(
593 &self,
594 database_id: DatabaseId,
595 removed_actors: Vec<ActorId>,
596 streaming_job_ids: Vec<ObjectId>,
597 state_table_ids: Vec<risingwave_meta_model::TableId>,
598 fragment_ids: HashSet<FragmentId>,
599 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
600 ) {
601 for &job_id in &streaming_job_ids {
603 if self
604 .creating_job_info
605 .check_job_exists(TableId::new(job_id as _))
606 .await
607 {
608 tracing::info!(
609 ?job_id,
610 "streaming job is creating, cancel it with drop directly"
611 );
612 self.metadata_manager
613 .notify_cancelled(database_id, job_id)
614 .await;
615 }
616 }
617
618 if !removed_actors.is_empty()
619 || !streaming_job_ids.is_empty()
620 || !state_table_ids.is_empty()
621 {
622 let _ = self
623 .barrier_scheduler
624 .run_command(
625 database_id,
626 Command::DropStreamingJobs {
627 streaming_job_ids: streaming_job_ids
628 .iter()
629 .map(|job_id| TableId::new(*job_id as _))
630 .collect(),
631 actors: removed_actors,
632 unregistered_state_table_ids: state_table_ids
633 .iter()
634 .map(|table_id| TableId::new(*table_id as _))
635 .collect(),
636 unregistered_fragment_ids: fragment_ids,
637 dropped_sink_fragment_by_targets,
638 },
639 )
640 .await
641 .inspect_err(|err| {
642 tracing::error!(error = ?err.as_report(), "failed to run drop command");
643 });
644 }
645 }
646
647 pub async fn cancel_streaming_jobs(&self, table_ids: Vec<TableId>) -> Vec<TableId> {
654 if table_ids.is_empty() {
655 return vec![];
656 }
657
658 let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
659 let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;
660
661 let futures = receivers.into_iter().map(|(id, receiver)| async move {
662 if let Ok(cancelled) = receiver.await
663 && cancelled
664 {
665 tracing::info!("canceled streaming job {id}");
666 Some(id)
667 } else {
668 tracing::warn!("failed to cancel streaming job {id}");
669 None
670 }
671 });
672 let mut cancelled_ids = join_all(futures).await.into_iter().flatten().collect_vec();
673
674 let futures = background_job_ids.into_iter().map(|id| async move {
677 tracing::debug!(?id, "cancelling background streaming job");
678 let result: MetaResult<()> = try {
679 let fragment = self
680 .metadata_manager.get_job_fragments_by_id(&id)
681 .await?;
682 if fragment.is_created() {
683 Err(MetaError::invalid_parameter(format!(
684 "streaming job {} is already created",
685 id
686 )))?;
687 }
688
689 let cancel_command = self
690 .metadata_manager
691 .catalog_controller
692 .build_cancel_command(&fragment)
693 .await?;
694
695 let (_, database_id) = self.metadata_manager
696 .catalog_controller
697 .try_abort_creating_streaming_job(id.table_id as _, true)
698 .await?;
699
700 if let Some(database_id) = database_id {
701 self.barrier_scheduler
702 .run_command(DatabaseId::new(database_id as _), cancel_command)
703 .await?;
704 }
705 };
706 match result {
707 Ok(_) => {
708 tracing::info!(?id, "cancelled recovered streaming job");
709 Some(id)
710 }
711 Err(err) => {
712 tracing::error!(error=?err.as_report(), "failed to cancel recovered streaming job {id}, does it correspond to any jobs in `SHOW JOBS`?");
713 None
714 }
715 }
716 });
717 let cancelled_recovered_ids = join_all(futures).await.into_iter().flatten().collect_vec();
718
719 cancelled_ids.extend(cancelled_recovered_ids);
720 cancelled_ids
721 }
722
723 pub(crate) async fn reschedule_streaming_job(
724 &self,
725 job_id: u32,
726 target: JobRescheduleTarget,
727 deferred: bool,
728 ) -> MetaResult<()> {
729 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
730 let background_jobs = self
731 .metadata_manager
732 .list_background_creating_jobs()
733 .await?;
734
735 if !background_jobs.is_empty() {
736 let related_jobs = self
737 .scale_controller
738 .resolve_related_no_shuffle_jobs(&background_jobs)
739 .await?;
740
741 for job in background_jobs {
742 if related_jobs.contains(&job) {
743 bail!(
744 "Cannot alter the job {} because the related job {} is currently being created",
745 job_id,
746 job.table_id
747 );
748 }
749 }
750 }
751
752 let JobRescheduleTarget {
753 parallelism: parallelism_change,
754 resource_group: resource_group_change,
755 } = target;
756
757 let database_id = DatabaseId::new(
758 self.metadata_manager
759 .catalog_controller
760 .get_object_database_id(job_id as ObjectId)
761 .await? as _,
762 );
763 let job_id = TableId::new(job_id);
764
765 let worker_nodes = self
766 .metadata_manager
767 .list_active_streaming_compute_nodes()
768 .await?
769 .into_iter()
770 .filter(|w| w.is_streaming_schedulable())
771 .collect_vec();
772
773 let available_parallelism = worker_nodes
775 .iter()
776 .map(|w| w.compute_node_parallelism())
777 .sum::<usize>();
778 let max_parallelism = self
779 .metadata_manager
780 .get_job_max_parallelism(job_id)
781 .await?;
782
783 if let JobParallelismTarget::Update(parallelism) = parallelism_change {
784 match parallelism {
785 TableParallelism::Adaptive => {
786 if available_parallelism > max_parallelism {
787 tracing::warn!(
788 "too many parallelism available, use max parallelism {} will be limited",
789 max_parallelism
790 );
791 }
792 }
793 TableParallelism::Fixed(parallelism) => {
794 if parallelism > max_parallelism {
795 bail_invalid_parameter!(
796 "specified parallelism {} should not exceed max parallelism {}",
797 parallelism,
798 max_parallelism
799 );
800 }
801 }
802 TableParallelism::Custom => {
803 bail_invalid_parameter!("should not alter parallelism to custom")
804 }
805 }
806 }
807
808 let table_parallelism_assignment = match ¶llelism_change {
809 JobParallelismTarget::Update(parallelism) => HashMap::from([(job_id, *parallelism)]),
810 JobParallelismTarget::Refresh => HashMap::new(),
811 };
812 let resource_group_assignment = match &resource_group_change {
813 JobResourceGroupTarget::Update(target) => {
814 HashMap::from([(job_id.table_id() as ObjectId, target.clone())])
815 }
816 JobResourceGroupTarget::Keep => HashMap::new(),
817 };
818
819 if deferred {
820 tracing::debug!(
821 "deferred mode enabled for job {}, set the parallelism directly to parallelism {:?}, resource group {:?}",
822 job_id,
823 parallelism_change,
824 resource_group_change,
825 );
826 self.scale_controller
827 .post_apply_reschedule(
828 &HashMap::new(),
829 &JobReschedulePostUpdates {
830 parallelism_updates: table_parallelism_assignment,
831 resource_group_updates: resource_group_assignment,
832 },
833 )
834 .await?;
835 } else {
836 let reschedule_plan = self
837 .scale_controller
838 .generate_job_reschedule_plan(
839 JobReschedulePolicy {
840 targets: HashMap::from([(
841 job_id.table_id,
842 JobRescheduleTarget {
843 parallelism: parallelism_change,
844 resource_group: resource_group_change,
845 },
846 )]),
847 },
848 false,
849 )
850 .await?;
851
852 if reschedule_plan.reschedules.is_empty() {
853 tracing::debug!(
854 "empty reschedule plan generated for job {}, set the parallelism directly to {:?}",
855 job_id,
856 reschedule_plan.post_updates
857 );
858 self.scale_controller
859 .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
860 .await?;
861 } else {
862 self.reschedule_actors(
863 database_id,
864 reschedule_plan,
865 RescheduleOptions {
866 resolve_no_shuffle_upstream: false,
867 skip_create_new_actors: false,
868 },
869 )
870 .await?;
871 }
872 };
873
874 Ok(())
875 }
876
877 pub(crate) async fn reschedule_cdc_table_backfill(
879 &self,
880 job_id: u32,
881 target: JobRescheduleTarget,
882 ) -> MetaResult<()> {
883 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
884 let JobRescheduleTarget {
885 parallelism: parallelism_change,
886 resource_group: resource_group_change,
887 } = target;
888 let database_id = DatabaseId::new(
889 self.metadata_manager
890 .catalog_controller
891 .get_object_database_id(job_id as ObjectId)
892 .await? as _,
893 );
894 let job_id = TableId::new(job_id);
895 if let JobParallelismTarget::Update(parallelism) = ¶llelism_change {
896 match parallelism {
897 TableParallelism::Fixed(_) => {}
898 TableParallelism::Custom => {
899 bail_invalid_parameter!("should not alter parallelism to custom")
900 }
901 TableParallelism::Adaptive => {
902 bail_invalid_parameter!("should not alter parallelism to adaptive")
903 }
904 }
905 } else {
906 bail_invalid_parameter!("should not refresh")
907 }
908 match &resource_group_change {
909 JobResourceGroupTarget::Update(_) => {
910 bail_invalid_parameter!("should not update resource group")
911 }
912 JobResourceGroupTarget::Keep => {}
913 };
914 let reschedule_plan = self
916 .scale_controller
917 .generate_job_reschedule_plan(
918 JobReschedulePolicy {
919 targets: HashMap::from([(
920 job_id.table_id,
921 JobRescheduleTarget {
922 parallelism: parallelism_change,
923 resource_group: resource_group_change,
924 },
925 )]),
926 },
927 true,
928 )
929 .await?;
930 if reschedule_plan.reschedules.is_empty() {
931 tracing::debug!(
932 ?job_id,
933 post_updates = ?reschedule_plan.post_updates,
934 "Empty reschedule plan generated for job.",
935 );
936 self.scale_controller
937 .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
938 .await?;
939 } else {
940 self.reschedule_actors(
941 database_id,
942 reschedule_plan,
943 RescheduleOptions {
944 resolve_no_shuffle_upstream: false,
945 skip_create_new_actors: false,
946 },
947 )
948 .await?;
949 }
950
951 Ok(())
952 }
953
954 pub async fn create_subscription(
956 self: &Arc<Self>,
957 subscription: &Subscription,
958 ) -> MetaResult<()> {
959 let command = Command::CreateSubscription {
960 subscription_id: subscription.id,
961 upstream_mv_table_id: TableId::new(subscription.dependent_table_id),
962 retention_second: subscription.retention_seconds,
963 };
964
965 tracing::debug!("sending Command::CreateSubscription");
966 self.barrier_scheduler
967 .run_command(subscription.database_id.into(), command)
968 .await?;
969 Ok(())
970 }
971
972 pub async fn drop_subscription(
974 self: &Arc<Self>,
975 database_id: DatabaseId,
976 subscription_id: u32,
977 table_id: u32,
978 ) {
979 let command = Command::DropSubscription {
980 subscription_id,
981 upstream_mv_table_id: TableId::new(table_id),
982 };
983
984 tracing::debug!("sending Command::DropSubscriptions");
985 let _ = self
986 .barrier_scheduler
987 .run_command(database_id, command)
988 .await
989 .inspect_err(|err| {
990 tracing::error!(error = ?err.as_report(), "failed to run drop command");
991 });
992 }
993}