1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::iter;
17use std::sync::Arc;
18
19use await_tree::{InstrumentAwait, span};
20use futures::FutureExt;
21use futures::future::join_all;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::{DatabaseId, Field, TableId};
25use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
26use risingwave_meta_model::ObjectId;
27use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
28use risingwave_pb::expr::PbExprNode;
29use risingwave_pb::meta::table_fragments::ActorStatus;
30use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc::Sender;
33use tokio::sync::{Mutex, oneshot};
34use tracing::Instrument;
35
36use super::{
37 FragmentBackfillOrder, JobParallelismTarget, JobReschedulePolicy, JobReschedulePostUpdates,
38 JobRescheduleTarget, JobResourceGroupTarget, Locations, RescheduleOptions, ScaleControllerRef,
39};
40use crate::barrier::{
41 BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
42 ReplaceStreamJobPlan, SnapshotBackfillInfo,
43};
44use crate::controller::catalog::DropTableConnectorContext;
45use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
46use crate::error::bail_invalid_parameter;
47use crate::manager::{
48 MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
49};
50use crate::model::{
51 ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
52 FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate,
53 TableParallelism,
54};
55use crate::stream::cdc::{
56 assign_cdc_table_snapshot_splits, is_parallelized_backfill_enabled_cdc_scan_fragment,
57};
58use crate::stream::{SourceChange, SourceManagerRef};
59use crate::{MetaError, MetaResult};
60
61pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
62
63#[derive(Default)]
64pub struct CreateStreamingJobOption {
65 }
67
68#[derive(Debug, Clone)]
69pub struct UpstreamSinkInfo {
70 pub sink_id: ObjectId,
71 pub sink_fragment_id: FragmentId,
72 pub sink_output_fields: Vec<PbField>,
73 pub sink_original_target_columns: Vec<PbColumnCatalog>,
75 pub project_exprs: Vec<PbExprNode>,
76 pub new_sink_downstream: DownstreamFragmentRelation,
77}
78
79pub struct CreateStreamingJobContext {
83 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
85 pub new_no_shuffle: FragmentNewNoShuffle,
86 pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
87
88 pub building_locations: Locations,
90
91 pub definition: String,
93
94 pub mv_table_id: Option<u32>,
95
96 pub create_type: CreateType,
97
98 pub job_type: StreamingJobType,
99
100 pub new_upstream_sink: Option<UpstreamSinkInfo>,
102
103 pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
104 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
105
106 pub option: CreateStreamingJobOption,
107
108 pub streaming_job: StreamingJob,
109
110 pub fragment_backfill_ordering: FragmentBackfillOrder,
111}
112
113pub enum CreatingState {
114 Failed { reason: MetaError },
115 Canceling { finish_tx: oneshot::Sender<()> },
117 Created { version: NotificationVersion },
118}
119
120struct StreamingJobExecution {
121 id: TableId,
122 shutdown_tx: Option<Sender<CreatingState>>,
123}
124
125impl StreamingJobExecution {
126 fn new(id: TableId, shutdown_tx: Sender<CreatingState>) -> Self {
127 Self {
128 id,
129 shutdown_tx: Some(shutdown_tx),
130 }
131 }
132}
133
134#[derive(Default)]
135struct CreatingStreamingJobInfo {
136 streaming_jobs: Mutex<HashMap<TableId, StreamingJobExecution>>,
137}
138
139impl CreatingStreamingJobInfo {
140 async fn add_job(&self, job: StreamingJobExecution) {
141 let mut jobs = self.streaming_jobs.lock().await;
142 jobs.insert(job.id, job);
143 }
144
145 async fn delete_job(&self, job_id: TableId) {
146 let mut jobs = self.streaming_jobs.lock().await;
147 jobs.remove(&job_id);
148 }
149
150 async fn cancel_jobs(
151 &self,
152 job_ids: Vec<TableId>,
153 ) -> (HashMap<TableId, oneshot::Receiver<()>>, Vec<TableId>) {
154 let mut jobs = self.streaming_jobs.lock().await;
155 let mut receivers = HashMap::new();
156 let mut recovered_job_ids = vec![];
157 for job_id in job_ids {
158 if let Some(job) = jobs.get_mut(&job_id) {
159 if let Some(shutdown_tx) = job.shutdown_tx.take() {
160 let (tx, rx) = oneshot::channel();
161 if shutdown_tx
162 .send(CreatingState::Canceling { finish_tx: tx })
163 .await
164 .is_ok()
165 {
166 receivers.insert(job_id, rx);
167 } else {
168 tracing::warn!(id=?job_id, "failed to send canceling state");
169 }
170 }
171 } else {
172 recovered_job_ids.push(job_id);
177 }
178 }
179 (receivers, recovered_job_ids)
180 }
181
182 async fn check_job_exists(&self, job_id: TableId) -> bool {
183 let jobs = self.streaming_jobs.lock().await;
184 jobs.contains_key(&job_id)
185 }
186}
187
188type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
189
190#[derive(Debug, Clone)]
191pub struct AutoRefreshSchemaSinkContext {
192 pub tmp_sink_id: ObjectId,
193 pub original_sink: PbSink,
194 pub original_fragment: Fragment,
195 pub new_schema: Vec<PbColumnCatalog>,
196 pub newly_add_fields: Vec<Field>,
197 pub new_fragment: Fragment,
198 pub new_log_store_table: Option<PbTable>,
199 pub actor_status: BTreeMap<ActorId, ActorStatus>,
200}
201
202impl AutoRefreshSchemaSinkContext {
203 pub fn new_fragment_info(&self) -> InflightFragmentInfo {
204 InflightFragmentInfo {
205 fragment_id: self.new_fragment.fragment_id,
206 distribution_type: self.new_fragment.distribution_type.into(),
207 nodes: self.new_fragment.nodes.clone(),
208 actors: self
209 .new_fragment
210 .actors
211 .iter()
212 .map(|actor| {
213 (
214 actor.actor_id as _,
215 InflightActorInfo {
216 worker_id: self.actor_status[&actor.actor_id]
217 .location
218 .as_ref()
219 .unwrap()
220 .worker_node_id as _,
221 vnode_bitmap: actor.vnode_bitmap.clone(),
222 },
223 )
224 })
225 .collect(),
226 state_table_ids: self
227 .new_fragment
228 .state_table_ids
229 .iter()
230 .map(|table| (*table).into())
231 .collect(),
232 }
233 }
234}
235
236pub struct ReplaceStreamJobContext {
240 pub old_fragments: StreamJobFragments,
242
243 pub replace_upstream: FragmentReplaceUpstream,
245 pub new_no_shuffle: FragmentNewNoShuffle,
246
247 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
249
250 pub building_locations: Locations,
252
253 pub streaming_job: StreamingJob,
254
255 pub tmp_id: u32,
256
257 pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
259
260 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
261}
262
263pub struct GlobalStreamManager {
265 pub env: MetaSrvEnv,
266
267 pub metadata_manager: MetadataManager,
268
269 pub barrier_scheduler: BarrierScheduler,
271
272 pub source_manager: SourceManagerRef,
274
275 creating_job_info: CreatingStreamingJobInfoRef,
277
278 pub scale_controller: ScaleControllerRef,
279}
280
281impl GlobalStreamManager {
282 pub fn new(
283 env: MetaSrvEnv,
284 metadata_manager: MetadataManager,
285 barrier_scheduler: BarrierScheduler,
286 source_manager: SourceManagerRef,
287 scale_controller: ScaleControllerRef,
288 ) -> MetaResult<Self> {
289 Ok(Self {
290 env,
291 metadata_manager,
292 barrier_scheduler,
293 source_manager,
294 creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
295 scale_controller,
296 })
297 }
298
299 #[await_tree::instrument]
310 pub async fn create_streaming_job(
311 self: &Arc<Self>,
312 stream_job_fragments: StreamJobFragmentsToCreate,
313 ctx: CreateStreamingJobContext,
314 run_command_notifier: Option<oneshot::Sender<MetaResult<()>>>,
315 ) -> MetaResult<NotificationVersion> {
316 let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
317 let await_tree_span = span!(
318 "{:?}({})",
319 ctx.streaming_job.job_type(),
320 ctx.streaming_job.name()
321 );
322
323 let table_id = stream_job_fragments.stream_job_id();
324 let database_id = ctx.streaming_job.database_id().into();
325 let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
326 let execution = StreamingJobExecution::new(table_id, sender.clone());
327 self.creating_job_info.add_job(execution).await;
328
329 let stream_manager = self.clone();
330 let fut = async move {
331 let res: MetaResult<_> = try {
332 let (source_change, streaming_job) = stream_manager
333 .run_create_streaming_job_command(stream_job_fragments, ctx)
334 .inspect(move |result| {
335 if let Some(tx) = run_command_notifier {
336 let _ = tx.send(match result {
337 Ok(_) => {
338 Ok(())
339 }
340 Err(err) => {
341 Err(err.clone())
342 }
343 });
344 }
345 })
346 .await?;
347 let version = stream_manager
348 .metadata_manager
349 .wait_streaming_job_finished(
350 streaming_job.database_id().into(),
351 streaming_job.id() as _,
352 )
353 .await?;
354 stream_manager.source_manager
355 .apply_source_change(source_change)
356 .await;
357 tracing::debug!(?streaming_job, "stream job finish");
358 version
359 };
360
361 match res {
362 Ok(version) => {
363 let _ = sender
364 .send(CreatingState::Created { version })
365 .await
366 .inspect_err(|_| tracing::warn!("failed to notify created: {table_id}"));
367 }
368 Err(err) => {
369 let _ = sender
370 .send(CreatingState::Failed {
371 reason: err.clone(),
372 })
373 .await
374 .inspect_err(|_| {
375 tracing::warn!(error = %err.as_report(), "failed to notify failed: {table_id}")
376 });
377 }
378 }
379 }
380 .in_current_span();
381
382 let fut = (self.env.await_tree_reg())
383 .register(await_tree_key, await_tree_span)
384 .instrument(Box::pin(fut));
385 tokio::spawn(fut);
386
387 while let Some(state) = receiver
388 .recv()
389 .instrument_await("recv_creating_state")
390 .await
391 {
392 match state {
393 CreatingState::Failed { reason } => {
394 tracing::debug!(id=?table_id, "stream job failed");
395 self.creating_job_info.delete_job(table_id).await;
398 return Err(reason);
399 }
400 CreatingState::Canceling { finish_tx } => {
401 tracing::debug!(id=?table_id, "cancelling streaming job");
402 if let Ok(table_fragments) = self
403 .metadata_manager
404 .get_job_fragments_by_id(&table_id)
405 .await
406 {
407 if self
409 .barrier_scheduler
410 .try_cancel_scheduled_create(database_id, table_id)
411 {
412 tracing::debug!("cancelling streaming job {table_id} in buffer queue.");
413 } else if !table_fragments.is_created() {
414 tracing::debug!(
415 "cancelling streaming job {table_id} by issue cancel command."
416 );
417
418 let cancel_command = self
419 .metadata_manager
420 .catalog_controller
421 .build_cancel_command(&table_fragments)
422 .await?;
423
424 self.metadata_manager
425 .catalog_controller
426 .try_abort_creating_streaming_job(table_id.table_id as _, true)
427 .await?;
428
429 self.barrier_scheduler
430 .run_command(database_id, cancel_command)
431 .await?;
432 } else {
433 continue;
435 }
436 let _ = finish_tx.send(()).inspect_err(|_| {
437 tracing::warn!("failed to notify cancelled: {table_id}")
438 });
439 self.creating_job_info.delete_job(table_id).await;
440 return Err(MetaError::cancelled("create"));
441 }
442 }
443 CreatingState::Created { version } => {
444 self.creating_job_info.delete_job(table_id).await;
445 return Ok(version);
446 }
447 }
448 }
449 self.creating_job_info.delete_job(table_id).await;
450 bail!("receiver failed to get notification version for finished stream job")
451 }
452
453 #[await_tree::instrument]
456 async fn run_create_streaming_job_command(
457 &self,
458 stream_job_fragments: StreamJobFragmentsToCreate,
459 CreateStreamingJobContext {
460 streaming_job,
461 upstream_fragment_downstreams,
462 new_no_shuffle,
463 upstream_actors,
464 definition,
465 create_type,
466 job_type,
467 new_upstream_sink,
468 snapshot_backfill_info,
469 cross_db_snapshot_backfill_info,
470 fragment_backfill_ordering,
471 ..
472 }: CreateStreamingJobContext,
473 ) -> MetaResult<(SourceChange, StreamingJob)> {
474 tracing::debug!(
475 table_id = %stream_job_fragments.stream_job_id(),
476 "built actors finished"
477 );
478
479 let mut init_split_assignment = self
484 .source_manager
485 .allocate_splits(&stream_job_fragments)
486 .await?;
487 init_split_assignment.extend(
488 self.source_manager
489 .allocate_splits_for_backfill(
490 &stream_job_fragments,
491 &new_no_shuffle,
492 &upstream_actors,
493 )
494 .await?,
495 );
496
497 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
498 stream_job_fragments.stream_job_id.table_id,
499 &stream_job_fragments,
500 self.env.meta_store_ref(),
501 )
502 .await?;
503 let cdc_table_snapshot_split_assignment = if !cdc_table_snapshot_split_assignment.is_empty()
504 {
505 self.env.cdc_table_backfill_tracker.track_new_job(
506 stream_job_fragments.stream_job_id.table_id,
507 cdc_table_snapshot_split_assignment
508 .values()
509 .map(|s| u64::try_from(s.len()).unwrap())
510 .sum(),
511 );
512 self.env
513 .cdc_table_backfill_tracker
514 .add_fragment_table_mapping(
515 stream_job_fragments
516 .fragments
517 .values()
518 .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
519 .map(|f| f.fragment_id),
520 stream_job_fragments.stream_job_id.table_id,
521 );
522 CdcTableSnapshotSplitAssignmentWithGeneration::new(
523 cdc_table_snapshot_split_assignment,
524 self.env
525 .cdc_table_backfill_tracker
526 .next_generation(iter::once(stream_job_fragments.stream_job_id.table_id)),
527 )
528 } else {
529 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
530 };
531
532 let source_change = SourceChange::CreateJobFinished {
533 finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
534 };
535
536 let info = CreateStreamingJobCommandInfo {
537 stream_job_fragments,
538 upstream_fragment_downstreams,
539 init_split_assignment,
540 definition: definition.clone(),
541 streaming_job: streaming_job.clone(),
542 job_type,
543 create_type,
544 fragment_backfill_ordering,
545 cdc_table_snapshot_split_assignment,
546 };
547
548 let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
549 tracing::debug!(
550 ?snapshot_backfill_info,
551 "sending Command::CreateSnapshotBackfillStreamingJob"
552 );
553 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
554 } else {
555 tracing::debug!("sending Command::CreateStreamingJob");
556 if let Some(new_upstream_sink) = new_upstream_sink {
557 CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
558 } else {
559 CreateStreamingJobType::Normal
560 }
561 };
562
563 let command = Command::CreateStreamingJob {
564 info,
565 job_type,
566 cross_db_snapshot_backfill_info,
567 };
568
569 self.barrier_scheduler
570 .run_command(streaming_job.database_id().into(), command)
571 .await?;
572
573 tracing::debug!(?streaming_job, "first barrier collected for stream job");
574
575 Ok((source_change, streaming_job))
576 }
577
578 pub async fn replace_stream_job(
580 &self,
581 new_fragments: StreamJobFragmentsToCreate,
582 ReplaceStreamJobContext {
583 old_fragments,
584 replace_upstream,
585 new_no_shuffle,
586 upstream_fragment_downstreams,
587 tmp_id,
588 streaming_job,
589 drop_table_connector_ctx,
590 auto_refresh_schema_sinks,
591 ..
592 }: ReplaceStreamJobContext,
593 ) -> MetaResult<()> {
594 let init_split_assignment = if streaming_job.is_source() {
595 self.source_manager
596 .allocate_splits_for_replace_source(
597 &new_fragments,
598 &replace_upstream,
599 &new_no_shuffle,
600 )
601 .await?
602 } else {
603 self.source_manager.allocate_splits(&new_fragments).await?
604 };
605 tracing::info!(
606 "replace_stream_job - allocate split: {:?}",
607 init_split_assignment
608 );
609
610 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
611 old_fragments.stream_job_id.table_id,
612 &new_fragments.inner,
613 self.env.meta_store_ref(),
614 )
615 .await?;
616
617 self.barrier_scheduler
618 .run_command(
619 streaming_job.database_id().into(),
620 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
621 old_fragments,
622 new_fragments,
623 replace_upstream,
624 upstream_fragment_downstreams,
625 init_split_assignment,
626 streaming_job,
627 tmp_id,
628 to_drop_state_table_ids: {
629 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
630 vec![TableId::new(
631 drop_table_connector_ctx.to_remove_state_table_id as _,
632 )]
633 } else {
634 Vec::new()
635 }
636 },
637 auto_refresh_schema_sinks,
638 cdc_table_snapshot_split_assignment,
639 }),
640 )
641 .await?;
642
643 Ok(())
644 }
645
646 pub async fn drop_streaming_jobs(
650 &self,
651 database_id: DatabaseId,
652 removed_actors: Vec<ActorId>,
653 streaming_job_ids: Vec<ObjectId>,
654 state_table_ids: Vec<risingwave_meta_model::TableId>,
655 fragment_ids: HashSet<FragmentId>,
656 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
657 ) {
658 for &job_id in &streaming_job_ids {
660 if self
661 .creating_job_info
662 .check_job_exists(TableId::new(job_id as _))
663 .await
664 {
665 tracing::info!(
666 ?job_id,
667 "streaming job is creating, cancel it with drop directly"
668 );
669 self.metadata_manager
670 .notify_cancelled(database_id, job_id)
671 .await;
672 }
673 }
674
675 if !removed_actors.is_empty()
676 || !streaming_job_ids.is_empty()
677 || !state_table_ids.is_empty()
678 {
679 let _ = self
680 .barrier_scheduler
681 .run_command(
682 database_id,
683 Command::DropStreamingJobs {
684 streaming_job_ids: streaming_job_ids
685 .iter()
686 .map(|job_id| TableId::new(*job_id as _))
687 .collect(),
688 actors: removed_actors,
689 unregistered_state_table_ids: state_table_ids
690 .iter()
691 .map(|table_id| TableId::new(*table_id as _))
692 .collect(),
693 unregistered_fragment_ids: fragment_ids,
694 dropped_sink_fragment_by_targets,
695 },
696 )
697 .await
698 .inspect_err(|err| {
699 tracing::error!(error = ?err.as_report(), "failed to run drop command");
700 });
701 }
702 }
703
704 pub async fn cancel_streaming_jobs(&self, table_ids: Vec<TableId>) -> Vec<TableId> {
711 if table_ids.is_empty() {
712 return vec![];
713 }
714
715 let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
716 let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;
717
718 let futures = receivers.into_iter().map(|(id, receiver)| async move {
719 if receiver.await.is_ok() {
720 tracing::info!("canceled streaming job {id}");
721 Some(id)
722 } else {
723 tracing::warn!("failed to cancel streaming job {id}");
724 None
725 }
726 });
727 let mut cancelled_ids = join_all(futures).await.into_iter().flatten().collect_vec();
728
729 let futures = recovered_job_ids.into_iter().map(|id| async move {
732 tracing::debug!(?id, "cancelling recovered streaming job");
733 let result: MetaResult<()> = try {
734 let fragment = self
735 .metadata_manager.get_job_fragments_by_id(&id)
736 .await?;
737 if fragment.is_created() {
738 Err(MetaError::invalid_parameter(format!(
739 "streaming job {} is already created",
740 id
741 )))?;
742 }
743
744 let cancel_command = self
745 .metadata_manager
746 .catalog_controller
747 .build_cancel_command(&fragment)
748 .await?;
749
750 let (_, database_id) = self.metadata_manager
751 .catalog_controller
752 .try_abort_creating_streaming_job(id.table_id as _, true)
753 .await?;
754
755 if let Some(database_id) = database_id {
756 self.barrier_scheduler
757 .run_command(DatabaseId::new(database_id as _), cancel_command)
758 .await?;
759 }
760 };
761 match result {
762 Ok(_) => {
763 tracing::info!(?id, "cancelled recovered streaming job");
764 Some(id)
765 }
766 Err(err) => {
767 tracing::error!(error=?err.as_report(), "failed to cancel recovered streaming job {id}, does it correspond to any jobs in `SHOW JOBS`?");
768 None
769 }
770 }
771 });
772 let cancelled_recovered_ids = join_all(futures).await.into_iter().flatten().collect_vec();
773
774 cancelled_ids.extend(cancelled_recovered_ids);
775 cancelled_ids
776 }
777
778 pub(crate) async fn reschedule_streaming_job(
779 &self,
780 job_id: u32,
781 target: JobRescheduleTarget,
782 deferred: bool,
783 ) -> MetaResult<()> {
784 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
785 let background_jobs = self
786 .metadata_manager
787 .list_background_creating_jobs()
788 .await?;
789
790 if !background_jobs.is_empty() {
791 let related_jobs = self
792 .scale_controller
793 .resolve_related_no_shuffle_jobs(&background_jobs)
794 .await?;
795
796 for job in background_jobs {
797 if related_jobs.contains(&job) {
798 bail!(
799 "Cannot alter the job {} because the related job {} is currently being created",
800 job_id,
801 job.table_id
802 );
803 }
804 }
805 }
806
807 let JobRescheduleTarget {
808 parallelism: parallelism_change,
809 resource_group: resource_group_change,
810 } = target;
811
812 let database_id = DatabaseId::new(
813 self.metadata_manager
814 .catalog_controller
815 .get_object_database_id(job_id as ObjectId)
816 .await? as _,
817 );
818 let job_id = TableId::new(job_id);
819
820 let worker_nodes = self
821 .metadata_manager
822 .list_active_streaming_compute_nodes()
823 .await?
824 .into_iter()
825 .filter(|w| w.is_streaming_schedulable())
826 .collect_vec();
827
828 let available_parallelism = worker_nodes
830 .iter()
831 .map(|w| w.compute_node_parallelism())
832 .sum::<usize>();
833 let max_parallelism = self
834 .metadata_manager
835 .get_job_max_parallelism(job_id)
836 .await?;
837
838 if let JobParallelismTarget::Update(parallelism) = parallelism_change {
839 match parallelism {
840 TableParallelism::Adaptive => {
841 if available_parallelism > max_parallelism {
842 tracing::warn!(
843 "too many parallelism available, use max parallelism {} will be limited",
844 max_parallelism
845 );
846 }
847 }
848 TableParallelism::Fixed(parallelism) => {
849 if parallelism > max_parallelism {
850 bail_invalid_parameter!(
851 "specified parallelism {} should not exceed max parallelism {}",
852 parallelism,
853 max_parallelism
854 );
855 }
856 }
857 TableParallelism::Custom => {
858 bail_invalid_parameter!("should not alter parallelism to custom")
859 }
860 }
861 }
862
863 let table_parallelism_assignment = match ¶llelism_change {
864 JobParallelismTarget::Update(parallelism) => HashMap::from([(job_id, *parallelism)]),
865 JobParallelismTarget::Refresh => HashMap::new(),
866 };
867 let resource_group_assignment = match &resource_group_change {
868 JobResourceGroupTarget::Update(target) => {
869 HashMap::from([(job_id.table_id() as ObjectId, target.clone())])
870 }
871 JobResourceGroupTarget::Keep => HashMap::new(),
872 };
873
874 if deferred {
875 tracing::debug!(
876 "deferred mode enabled for job {}, set the parallelism directly to parallelism {:?}, resource group {:?}",
877 job_id,
878 parallelism_change,
879 resource_group_change,
880 );
881 self.scale_controller
882 .post_apply_reschedule(
883 &HashMap::new(),
884 &JobReschedulePostUpdates {
885 parallelism_updates: table_parallelism_assignment,
886 resource_group_updates: resource_group_assignment,
887 },
888 )
889 .await?;
890 } else {
891 let reschedule_plan = self
892 .scale_controller
893 .generate_job_reschedule_plan(
894 JobReschedulePolicy {
895 targets: HashMap::from([(
896 job_id.table_id,
897 JobRescheduleTarget {
898 parallelism: parallelism_change,
899 resource_group: resource_group_change,
900 },
901 )]),
902 },
903 false,
904 )
905 .await?;
906
907 if reschedule_plan.reschedules.is_empty() {
908 tracing::debug!(
909 "empty reschedule plan generated for job {}, set the parallelism directly to {:?}",
910 job_id,
911 reschedule_plan.post_updates
912 );
913 self.scale_controller
914 .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
915 .await?;
916 } else {
917 self.reschedule_actors(
918 database_id,
919 reschedule_plan,
920 RescheduleOptions {
921 resolve_no_shuffle_upstream: false,
922 skip_create_new_actors: false,
923 },
924 )
925 .await?;
926 }
927 };
928
929 Ok(())
930 }
931
932 pub(crate) async fn reschedule_cdc_table_backfill(
934 &self,
935 job_id: u32,
936 target: JobRescheduleTarget,
937 ) -> MetaResult<()> {
938 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
939 let JobRescheduleTarget {
940 parallelism: parallelism_change,
941 resource_group: resource_group_change,
942 } = target;
943 let database_id = DatabaseId::new(
944 self.metadata_manager
945 .catalog_controller
946 .get_object_database_id(job_id as ObjectId)
947 .await? as _,
948 );
949 let job_id = TableId::new(job_id);
950 if let JobParallelismTarget::Update(parallelism) = ¶llelism_change {
951 match parallelism {
952 TableParallelism::Fixed(_) => {}
953 TableParallelism::Custom => {
954 bail_invalid_parameter!("should not alter parallelism to custom")
955 }
956 TableParallelism::Adaptive => {
957 bail_invalid_parameter!("should not alter parallelism to adaptive")
958 }
959 }
960 } else {
961 bail_invalid_parameter!("should not refresh")
962 }
963 match &resource_group_change {
964 JobResourceGroupTarget::Update(_) => {
965 bail_invalid_parameter!("should not update resource group")
966 }
967 JobResourceGroupTarget::Keep => {}
968 };
969 let reschedule_plan = self
971 .scale_controller
972 .generate_job_reschedule_plan(
973 JobReschedulePolicy {
974 targets: HashMap::from([(
975 job_id.table_id,
976 JobRescheduleTarget {
977 parallelism: parallelism_change,
978 resource_group: resource_group_change,
979 },
980 )]),
981 },
982 true,
983 )
984 .await?;
985 if reschedule_plan.reschedules.is_empty() {
986 tracing::debug!(
987 ?job_id,
988 post_updates = ?reschedule_plan.post_updates,
989 "Empty reschedule plan generated for job.",
990 );
991 self.scale_controller
992 .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
993 .await?;
994 } else {
995 self.reschedule_actors(
996 database_id,
997 reschedule_plan,
998 RescheduleOptions {
999 resolve_no_shuffle_upstream: false,
1000 skip_create_new_actors: false,
1001 },
1002 )
1003 .await?;
1004 }
1005
1006 Ok(())
1007 }
1008
1009 pub async fn create_subscription(
1011 self: &Arc<Self>,
1012 subscription: &Subscription,
1013 ) -> MetaResult<()> {
1014 let command = Command::CreateSubscription {
1015 subscription_id: subscription.id,
1016 upstream_mv_table_id: TableId::new(subscription.dependent_table_id),
1017 retention_second: subscription.retention_seconds,
1018 };
1019
1020 tracing::debug!("sending Command::CreateSubscription");
1021 self.barrier_scheduler
1022 .run_command(subscription.database_id.into(), command)
1023 .await?;
1024 Ok(())
1025 }
1026
1027 pub async fn drop_subscription(
1029 self: &Arc<Self>,
1030 database_id: DatabaseId,
1031 subscription_id: u32,
1032 table_id: u32,
1033 ) {
1034 let command = Command::DropSubscription {
1035 subscription_id,
1036 upstream_mv_table_id: TableId::new(table_id),
1037 };
1038
1039 tracing::debug!("sending Command::DropSubscriptions");
1040 let _ = self
1041 .barrier_scheduler
1042 .run_command(database_id, command)
1043 .await
1044 .inspect_err(|err| {
1045 tracing::error!(error = ?err.as_report(), "failed to run drop command");
1046 });
1047 }
1048}