1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::iter;
17use std::sync::Arc;
18
19use await_tree::{InstrumentAwait, span};
20use futures::FutureExt;
21use futures::future::join_all;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::{DatabaseId, Field, TableId};
25use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
26use risingwave_meta_model::ObjectId;
27use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
28use risingwave_pb::expr::PbExprNode;
29use risingwave_pb::meta::table_fragments::ActorStatus;
30use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc::Sender;
33use tokio::sync::{Mutex, oneshot};
34use tracing::Instrument;
35
36use super::{
37 FragmentBackfillOrder, JobParallelismTarget, JobReschedulePolicy, JobReschedulePostUpdates,
38 JobRescheduleTarget, JobResourceGroupTarget, Locations, RescheduleOptions, ScaleControllerRef,
39};
40use crate::barrier::{
41 BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
42 ReplaceStreamJobPlan, SnapshotBackfillInfo,
43};
44use crate::controller::catalog::DropTableConnectorContext;
45use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
46use crate::error::bail_invalid_parameter;
47use crate::manager::{
48 MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
49};
50use crate::model::{
51 ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
52 FragmentNewNoShuffle, FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate,
53 TableParallelism,
54};
55use crate::stream::cdc::{
56 assign_cdc_table_snapshot_splits, is_parallelized_backfill_enabled_cdc_scan_fragment,
57};
58use crate::stream::{SourceChange, SourceManagerRef};
59use crate::{MetaError, MetaResult};
60
61pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
62
63#[derive(Default)]
64pub struct CreateStreamingJobOption {
65 }
67
68#[derive(Debug, Clone)]
69pub struct UpstreamSinkInfo {
70 pub sink_id: ObjectId,
71 pub sink_fragment_id: FragmentId,
72 pub sink_output_fields: Vec<PbField>,
73 pub sink_original_target_columns: Vec<PbColumnCatalog>,
75 pub project_exprs: Vec<PbExprNode>,
76 pub new_sink_downstream: DownstreamFragmentRelation,
77}
78
79pub struct CreateStreamingJobContext {
83 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
85 pub new_no_shuffle: FragmentNewNoShuffle,
86 pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
87
88 pub building_locations: Locations,
90
91 pub definition: String,
93
94 pub mv_table_id: Option<u32>,
95
96 pub create_type: CreateType,
97
98 pub job_type: StreamingJobType,
99
100 pub new_upstream_sink: Option<UpstreamSinkInfo>,
102
103 pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
104 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
105
106 pub option: CreateStreamingJobOption,
107
108 pub streaming_job: StreamingJob,
109
110 pub fragment_backfill_ordering: FragmentBackfillOrder,
111}
112
113pub enum CreatingState {
114 Failed { reason: MetaError },
115 Canceling { finish_tx: oneshot::Sender<()> },
117 Created { version: NotificationVersion },
118}
119
120struct StreamingJobExecution {
121 id: TableId,
122 shutdown_tx: Option<Sender<CreatingState>>,
123}
124
125impl StreamingJobExecution {
126 fn new(id: TableId, shutdown_tx: Sender<CreatingState>) -> Self {
127 Self {
128 id,
129 shutdown_tx: Some(shutdown_tx),
130 }
131 }
132}
133
134#[derive(Default)]
135struct CreatingStreamingJobInfo {
136 streaming_jobs: Mutex<HashMap<TableId, StreamingJobExecution>>,
137}
138
139impl CreatingStreamingJobInfo {
140 async fn add_job(&self, job: StreamingJobExecution) {
141 let mut jobs = self.streaming_jobs.lock().await;
142 jobs.insert(job.id, job);
143 }
144
145 async fn delete_job(&self, job_id: TableId) {
146 let mut jobs = self.streaming_jobs.lock().await;
147 jobs.remove(&job_id);
148 }
149
150 async fn cancel_jobs(
151 &self,
152 job_ids: Vec<TableId>,
153 ) -> (HashMap<TableId, oneshot::Receiver<()>>, Vec<TableId>) {
154 let mut jobs = self.streaming_jobs.lock().await;
155 let mut receivers = HashMap::new();
156 let mut recovered_job_ids = vec![];
157 for job_id in job_ids {
158 if let Some(job) = jobs.get_mut(&job_id) {
159 if let Some(shutdown_tx) = job.shutdown_tx.take() {
160 let (tx, rx) = oneshot::channel();
161 if shutdown_tx
162 .send(CreatingState::Canceling { finish_tx: tx })
163 .await
164 .is_ok()
165 {
166 receivers.insert(job_id, rx);
167 } else {
168 tracing::warn!(id=?job_id, "failed to send canceling state");
169 }
170 }
171 } else {
172 recovered_job_ids.push(job_id);
177 }
178 }
179 (receivers, recovered_job_ids)
180 }
181
182 async fn check_job_exists(&self, job_id: TableId) -> bool {
183 let jobs = self.streaming_jobs.lock().await;
184 jobs.contains_key(&job_id)
185 }
186}
187
188type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
189
190#[derive(Debug, Clone)]
191pub struct AutoRefreshSchemaSinkContext {
192 pub tmp_sink_id: ObjectId,
193 pub original_sink: PbSink,
194 pub original_fragment: Fragment,
195 pub new_schema: Vec<PbColumnCatalog>,
196 pub newly_add_fields: Vec<Field>,
197 pub new_fragment: Fragment,
198 pub new_log_store_table: Option<PbTable>,
199 pub actor_status: BTreeMap<ActorId, ActorStatus>,
200}
201
202impl AutoRefreshSchemaSinkContext {
203 pub fn new_fragment_info(&self) -> InflightFragmentInfo {
204 InflightFragmentInfo {
205 fragment_id: self.new_fragment.fragment_id,
206 distribution_type: self.new_fragment.distribution_type.into(),
207 nodes: self.new_fragment.nodes.clone(),
208 actors: self
209 .new_fragment
210 .actors
211 .iter()
212 .map(|actor| {
213 (
214 actor.actor_id as _,
215 InflightActorInfo {
216 worker_id: self.actor_status[&actor.actor_id]
217 .location
218 .as_ref()
219 .unwrap()
220 .worker_node_id as _,
221 vnode_bitmap: actor.vnode_bitmap.clone(),
222 splits: vec![],
223 },
224 )
225 })
226 .collect(),
227 state_table_ids: self
228 .new_fragment
229 .state_table_ids
230 .iter()
231 .map(|table| (*table).into())
232 .collect(),
233 }
234 }
235}
236
237pub struct ReplaceStreamJobContext {
241 pub old_fragments: StreamJobFragments,
243
244 pub replace_upstream: FragmentReplaceUpstream,
246 pub new_no_shuffle: FragmentNewNoShuffle,
247
248 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
250
251 pub building_locations: Locations,
253
254 pub streaming_job: StreamingJob,
255
256 pub tmp_id: u32,
257
258 pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
260
261 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
262}
263
264pub struct GlobalStreamManager {
266 pub env: MetaSrvEnv,
267
268 pub metadata_manager: MetadataManager,
269
270 pub barrier_scheduler: BarrierScheduler,
272
273 pub source_manager: SourceManagerRef,
275
276 creating_job_info: CreatingStreamingJobInfoRef,
278
279 pub scale_controller: ScaleControllerRef,
280}
281
282impl GlobalStreamManager {
283 pub fn new(
284 env: MetaSrvEnv,
285 metadata_manager: MetadataManager,
286 barrier_scheduler: BarrierScheduler,
287 source_manager: SourceManagerRef,
288 scale_controller: ScaleControllerRef,
289 ) -> MetaResult<Self> {
290 Ok(Self {
291 env,
292 metadata_manager,
293 barrier_scheduler,
294 source_manager,
295 creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
296 scale_controller,
297 })
298 }
299
300 #[await_tree::instrument]
311 pub async fn create_streaming_job(
312 self: &Arc<Self>,
313 stream_job_fragments: StreamJobFragmentsToCreate,
314 ctx: CreateStreamingJobContext,
315 run_command_notifier: Option<oneshot::Sender<MetaResult<()>>>,
316 ) -> MetaResult<NotificationVersion> {
317 let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
318 let await_tree_span = span!(
319 "{:?}({})",
320 ctx.streaming_job.job_type(),
321 ctx.streaming_job.name()
322 );
323
324 let table_id = stream_job_fragments.stream_job_id();
325 let database_id = ctx.streaming_job.database_id().into();
326 let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
327 let execution = StreamingJobExecution::new(table_id, sender.clone());
328 self.creating_job_info.add_job(execution).await;
329
330 let stream_manager = self.clone();
331 let fut = async move {
332 let res: MetaResult<_> = try {
333 let (source_change, streaming_job) = stream_manager
334 .run_create_streaming_job_command(stream_job_fragments, ctx)
335 .inspect(move |result| {
336 if let Some(tx) = run_command_notifier {
337 let _ = tx.send(match result {
338 Ok(_) => {
339 Ok(())
340 }
341 Err(err) => {
342 Err(err.clone())
343 }
344 });
345 }
346 })
347 .await?;
348 let version = stream_manager
349 .metadata_manager
350 .wait_streaming_job_finished(
351 streaming_job.database_id().into(),
352 streaming_job.id() as _,
353 )
354 .await?;
355 stream_manager.source_manager
356 .apply_source_change(source_change)
357 .await;
358 tracing::debug!(?streaming_job, "stream job finish");
359 version
360 };
361
362 match res {
363 Ok(version) => {
364 let _ = sender
365 .send(CreatingState::Created { version })
366 .await
367 .inspect_err(|_| tracing::warn!("failed to notify created: {table_id}"));
368 }
369 Err(err) => {
370 let _ = sender
371 .send(CreatingState::Failed {
372 reason: err.clone(),
373 })
374 .await
375 .inspect_err(|_| {
376 tracing::warn!(error = %err.as_report(), "failed to notify failed: {table_id}")
377 });
378 }
379 }
380 }
381 .in_current_span();
382
383 let fut = (self.env.await_tree_reg())
384 .register(await_tree_key, await_tree_span)
385 .instrument(Box::pin(fut));
386 tokio::spawn(fut);
387
388 while let Some(state) = receiver
389 .recv()
390 .instrument_await("recv_creating_state")
391 .await
392 {
393 match state {
394 CreatingState::Failed { reason } => {
395 tracing::debug!(id=?table_id, "stream job failed");
396 self.creating_job_info.delete_job(table_id).await;
399 return Err(reason);
400 }
401 CreatingState::Canceling { finish_tx } => {
402 tracing::debug!(id=?table_id, "cancelling streaming job");
403 if let Ok(table_fragments) = self
404 .metadata_manager
405 .get_job_fragments_by_id(&table_id)
406 .await
407 {
408 if self
410 .barrier_scheduler
411 .try_cancel_scheduled_create(database_id, table_id)
412 {
413 tracing::debug!("cancelling streaming job {table_id} in buffer queue.");
414 } else if !table_fragments.is_created() {
415 tracing::debug!(
416 "cancelling streaming job {table_id} by issue cancel command."
417 );
418
419 let cancel_command = self
420 .metadata_manager
421 .catalog_controller
422 .build_cancel_command(&table_fragments)
423 .await?;
424
425 self.metadata_manager
426 .catalog_controller
427 .try_abort_creating_streaming_job(table_id.table_id as _, true)
428 .await?;
429
430 self.barrier_scheduler
431 .run_command(database_id, cancel_command)
432 .await?;
433 } else {
434 continue;
436 }
437 let _ = finish_tx.send(()).inspect_err(|_| {
438 tracing::warn!("failed to notify cancelled: {table_id}")
439 });
440 self.creating_job_info.delete_job(table_id).await;
441 return Err(MetaError::cancelled("create"));
442 }
443 }
444 CreatingState::Created { version } => {
445 self.creating_job_info.delete_job(table_id).await;
446 return Ok(version);
447 }
448 }
449 }
450 self.creating_job_info.delete_job(table_id).await;
451 bail!("receiver failed to get notification version for finished stream job")
452 }
453
454 #[await_tree::instrument]
457 async fn run_create_streaming_job_command(
458 &self,
459 stream_job_fragments: StreamJobFragmentsToCreate,
460 CreateStreamingJobContext {
461 streaming_job,
462 upstream_fragment_downstreams,
463 new_no_shuffle,
464 upstream_actors,
465 definition,
466 create_type,
467 job_type,
468 new_upstream_sink,
469 snapshot_backfill_info,
470 cross_db_snapshot_backfill_info,
471 fragment_backfill_ordering,
472 ..
473 }: CreateStreamingJobContext,
474 ) -> MetaResult<(SourceChange, StreamingJob)> {
475 tracing::debug!(
476 table_id = %stream_job_fragments.stream_job_id(),
477 "built actors finished"
478 );
479
480 let mut init_split_assignment = self
485 .source_manager
486 .allocate_splits(&stream_job_fragments)
487 .await?;
488 init_split_assignment.extend(
489 self.source_manager
490 .allocate_splits_for_backfill(
491 &stream_job_fragments,
492 &new_no_shuffle,
493 &upstream_actors,
494 )
495 .await?,
496 );
497
498 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
499 stream_job_fragments.stream_job_id.table_id,
500 &stream_job_fragments,
501 self.env.meta_store_ref(),
502 )
503 .await?;
504 let cdc_table_snapshot_split_assignment = if !cdc_table_snapshot_split_assignment.is_empty()
505 {
506 self.env.cdc_table_backfill_tracker.track_new_job(
507 stream_job_fragments.stream_job_id.table_id,
508 cdc_table_snapshot_split_assignment
509 .values()
510 .map(|s| u64::try_from(s.len()).unwrap())
511 .sum(),
512 );
513 self.env
514 .cdc_table_backfill_tracker
515 .add_fragment_table_mapping(
516 stream_job_fragments
517 .fragments
518 .values()
519 .filter(|f| is_parallelized_backfill_enabled_cdc_scan_fragment(f))
520 .map(|f| f.fragment_id),
521 stream_job_fragments.stream_job_id.table_id,
522 );
523 CdcTableSnapshotSplitAssignmentWithGeneration::new(
524 cdc_table_snapshot_split_assignment,
525 self.env
526 .cdc_table_backfill_tracker
527 .next_generation(iter::once(stream_job_fragments.stream_job_id.table_id)),
528 )
529 } else {
530 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
531 };
532
533 let source_change = SourceChange::CreateJobFinished {
534 finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
535 };
536
537 let info = CreateStreamingJobCommandInfo {
538 stream_job_fragments,
539 upstream_fragment_downstreams,
540 init_split_assignment,
541 definition: definition.clone(),
542 streaming_job: streaming_job.clone(),
543 job_type,
544 create_type,
545 fragment_backfill_ordering,
546 cdc_table_snapshot_split_assignment,
547 };
548
549 let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
550 tracing::debug!(
551 ?snapshot_backfill_info,
552 "sending Command::CreateSnapshotBackfillStreamingJob"
553 );
554 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
555 } else {
556 tracing::debug!("sending Command::CreateStreamingJob");
557 if let Some(new_upstream_sink) = new_upstream_sink {
558 CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
559 } else {
560 CreateStreamingJobType::Normal
561 }
562 };
563
564 let command = Command::CreateStreamingJob {
565 info,
566 job_type,
567 cross_db_snapshot_backfill_info,
568 };
569
570 self.barrier_scheduler
571 .run_command(streaming_job.database_id().into(), command)
572 .await?;
573
574 tracing::debug!(?streaming_job, "first barrier collected for stream job");
575
576 Ok((source_change, streaming_job))
577 }
578
579 pub async fn replace_stream_job(
581 &self,
582 new_fragments: StreamJobFragmentsToCreate,
583 ReplaceStreamJobContext {
584 old_fragments,
585 replace_upstream,
586 new_no_shuffle,
587 upstream_fragment_downstreams,
588 tmp_id,
589 streaming_job,
590 drop_table_connector_ctx,
591 auto_refresh_schema_sinks,
592 ..
593 }: ReplaceStreamJobContext,
594 ) -> MetaResult<()> {
595 let init_split_assignment = if streaming_job.is_source() {
596 self.source_manager
597 .allocate_splits_for_replace_source(
598 &new_fragments,
599 &replace_upstream,
600 &new_no_shuffle,
601 )
602 .await?
603 } else {
604 self.source_manager.allocate_splits(&new_fragments).await?
605 };
606 tracing::info!(
607 "replace_stream_job - allocate split: {:?}",
608 init_split_assignment
609 );
610
611 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
612 old_fragments.stream_job_id.table_id,
613 &new_fragments.inner,
614 self.env.meta_store_ref(),
615 )
616 .await?;
617
618 self.barrier_scheduler
619 .run_command(
620 streaming_job.database_id().into(),
621 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
622 old_fragments,
623 new_fragments,
624 replace_upstream,
625 upstream_fragment_downstreams,
626 init_split_assignment,
627 streaming_job,
628 tmp_id,
629 to_drop_state_table_ids: {
630 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
631 vec![TableId::new(
632 drop_table_connector_ctx.to_remove_state_table_id as _,
633 )]
634 } else {
635 Vec::new()
636 }
637 },
638 auto_refresh_schema_sinks,
639 cdc_table_snapshot_split_assignment,
640 }),
641 )
642 .await?;
643
644 Ok(())
645 }
646
647 pub async fn drop_streaming_jobs(
651 &self,
652 database_id: DatabaseId,
653 removed_actors: Vec<ActorId>,
654 streaming_job_ids: Vec<ObjectId>,
655 state_table_ids: Vec<risingwave_meta_model::TableId>,
656 fragment_ids: HashSet<FragmentId>,
657 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
658 ) {
659 for &job_id in &streaming_job_ids {
661 if self
662 .creating_job_info
663 .check_job_exists(TableId::new(job_id as _))
664 .await
665 {
666 tracing::info!(
667 ?job_id,
668 "streaming job is creating, cancel it with drop directly"
669 );
670 self.metadata_manager
671 .notify_cancelled(database_id, job_id)
672 .await;
673 }
674 }
675
676 if !removed_actors.is_empty()
677 || !streaming_job_ids.is_empty()
678 || !state_table_ids.is_empty()
679 {
680 let _ = self
681 .barrier_scheduler
682 .run_command(
683 database_id,
684 Command::DropStreamingJobs {
685 streaming_job_ids: streaming_job_ids
686 .iter()
687 .map(|job_id| TableId::new(*job_id as _))
688 .collect(),
689 actors: removed_actors,
690 unregistered_state_table_ids: state_table_ids
691 .iter()
692 .map(|table_id| TableId::new(*table_id as _))
693 .collect(),
694 unregistered_fragment_ids: fragment_ids,
695 dropped_sink_fragment_by_targets,
696 },
697 )
698 .await
699 .inspect_err(|err| {
700 tracing::error!(error = ?err.as_report(), "failed to run drop command");
701 });
702 }
703 }
704
705 pub async fn cancel_streaming_jobs(&self, table_ids: Vec<TableId>) -> Vec<TableId> {
712 if table_ids.is_empty() {
713 return vec![];
714 }
715
716 let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
717 let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;
718
719 let futures = receivers.into_iter().map(|(id, receiver)| async move {
720 if receiver.await.is_ok() {
721 tracing::info!("canceled streaming job {id}");
722 Some(id)
723 } else {
724 tracing::warn!("failed to cancel streaming job {id}");
725 None
726 }
727 });
728 let mut cancelled_ids = join_all(futures).await.into_iter().flatten().collect_vec();
729
730 let futures = recovered_job_ids.into_iter().map(|id| async move {
733 tracing::debug!(?id, "cancelling recovered streaming job");
734 let result: MetaResult<()> = try {
735 let fragment = self
736 .metadata_manager.get_job_fragments_by_id(&id)
737 .await?;
738 if fragment.is_created() {
739 Err(MetaError::invalid_parameter(format!(
740 "streaming job {} is already created",
741 id
742 )))?;
743 }
744
745 let cancel_command = self
746 .metadata_manager
747 .catalog_controller
748 .build_cancel_command(&fragment)
749 .await?;
750
751 let (_, database_id) = self.metadata_manager
752 .catalog_controller
753 .try_abort_creating_streaming_job(id.table_id as _, true)
754 .await?;
755
756 if let Some(database_id) = database_id {
757 self.barrier_scheduler
758 .run_command(DatabaseId::new(database_id as _), cancel_command)
759 .await?;
760 }
761 };
762 match result {
763 Ok(_) => {
764 tracing::info!(?id, "cancelled recovered streaming job");
765 Some(id)
766 }
767 Err(err) => {
768 tracing::error!(error=?err.as_report(), "failed to cancel recovered streaming job {id}, does it correspond to any jobs in `SHOW JOBS`?");
769 None
770 }
771 }
772 });
773 let cancelled_recovered_ids = join_all(futures).await.into_iter().flatten().collect_vec();
774
775 cancelled_ids.extend(cancelled_recovered_ids);
776 cancelled_ids
777 }
778
779 pub(crate) async fn reschedule_streaming_job(
780 &self,
781 job_id: u32,
782 target: JobRescheduleTarget,
783 deferred: bool,
784 ) -> MetaResult<()> {
785 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
786 let background_jobs = self
787 .metadata_manager
788 .list_background_creating_jobs()
789 .await?;
790
791 if !background_jobs.is_empty() {
792 let related_jobs = self
793 .scale_controller
794 .resolve_related_no_shuffle_jobs(&background_jobs)
795 .await?;
796
797 for job in background_jobs {
798 if related_jobs.contains(&job) {
799 bail!(
800 "Cannot alter the job {} because the related job {} is currently being created",
801 job_id,
802 job.table_id
803 );
804 }
805 }
806 }
807
808 let JobRescheduleTarget {
809 parallelism: parallelism_change,
810 resource_group: resource_group_change,
811 } = target;
812
813 let database_id = DatabaseId::new(
814 self.metadata_manager
815 .catalog_controller
816 .get_object_database_id(job_id as ObjectId)
817 .await? as _,
818 );
819 let job_id = TableId::new(job_id);
820
821 let worker_nodes = self
822 .metadata_manager
823 .list_active_streaming_compute_nodes()
824 .await?
825 .into_iter()
826 .filter(|w| w.is_streaming_schedulable())
827 .collect_vec();
828
829 let available_parallelism = worker_nodes
831 .iter()
832 .map(|w| w.compute_node_parallelism())
833 .sum::<usize>();
834 let max_parallelism = self
835 .metadata_manager
836 .get_job_max_parallelism(job_id)
837 .await?;
838
839 if let JobParallelismTarget::Update(parallelism) = parallelism_change {
840 match parallelism {
841 TableParallelism::Adaptive => {
842 if available_parallelism > max_parallelism {
843 tracing::warn!(
844 "too many parallelism available, use max parallelism {} will be limited",
845 max_parallelism
846 );
847 }
848 }
849 TableParallelism::Fixed(parallelism) => {
850 if parallelism > max_parallelism {
851 bail_invalid_parameter!(
852 "specified parallelism {} should not exceed max parallelism {}",
853 parallelism,
854 max_parallelism
855 );
856 }
857 }
858 TableParallelism::Custom => {
859 bail_invalid_parameter!("should not alter parallelism to custom")
860 }
861 }
862 }
863
864 let table_parallelism_assignment = match ¶llelism_change {
865 JobParallelismTarget::Update(parallelism) => HashMap::from([(job_id, *parallelism)]),
866 JobParallelismTarget::Refresh => HashMap::new(),
867 };
868 let resource_group_assignment = match &resource_group_change {
869 JobResourceGroupTarget::Update(target) => {
870 HashMap::from([(job_id.table_id() as ObjectId, target.clone())])
871 }
872 JobResourceGroupTarget::Keep => HashMap::new(),
873 };
874
875 if deferred {
876 tracing::debug!(
877 "deferred mode enabled for job {}, set the parallelism directly to parallelism {:?}, resource group {:?}",
878 job_id,
879 parallelism_change,
880 resource_group_change,
881 );
882 self.scale_controller
883 .post_apply_reschedule(
884 &HashMap::new(),
885 &JobReschedulePostUpdates {
886 parallelism_updates: table_parallelism_assignment,
887 resource_group_updates: resource_group_assignment,
888 },
889 )
890 .await?;
891 } else {
892 let reschedule_plan = self
893 .scale_controller
894 .generate_job_reschedule_plan(
895 JobReschedulePolicy {
896 targets: HashMap::from([(
897 job_id.table_id,
898 JobRescheduleTarget {
899 parallelism: parallelism_change,
900 resource_group: resource_group_change,
901 },
902 )]),
903 },
904 false,
905 )
906 .await?;
907
908 if reschedule_plan.reschedules.is_empty() {
909 tracing::debug!(
910 "empty reschedule plan generated for job {}, set the parallelism directly to {:?}",
911 job_id,
912 reschedule_plan.post_updates
913 );
914 self.scale_controller
915 .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
916 .await?;
917 } else {
918 self.reschedule_actors(
919 database_id,
920 reschedule_plan,
921 RescheduleOptions {
922 resolve_no_shuffle_upstream: false,
923 skip_create_new_actors: false,
924 },
925 )
926 .await?;
927 }
928 };
929
930 Ok(())
931 }
932
933 pub(crate) async fn reschedule_cdc_table_backfill(
935 &self,
936 job_id: u32,
937 target: JobRescheduleTarget,
938 ) -> MetaResult<()> {
939 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
940 let JobRescheduleTarget {
941 parallelism: parallelism_change,
942 resource_group: resource_group_change,
943 } = target;
944 let database_id = DatabaseId::new(
945 self.metadata_manager
946 .catalog_controller
947 .get_object_database_id(job_id as ObjectId)
948 .await? as _,
949 );
950 let job_id = TableId::new(job_id);
951 if let JobParallelismTarget::Update(parallelism) = ¶llelism_change {
952 match parallelism {
953 TableParallelism::Fixed(_) => {}
954 TableParallelism::Custom => {
955 bail_invalid_parameter!("should not alter parallelism to custom")
956 }
957 TableParallelism::Adaptive => {
958 bail_invalid_parameter!("should not alter parallelism to adaptive")
959 }
960 }
961 } else {
962 bail_invalid_parameter!("should not refresh")
963 }
964 match &resource_group_change {
965 JobResourceGroupTarget::Update(_) => {
966 bail_invalid_parameter!("should not update resource group")
967 }
968 JobResourceGroupTarget::Keep => {}
969 };
970 let reschedule_plan = self
972 .scale_controller
973 .generate_job_reschedule_plan(
974 JobReschedulePolicy {
975 targets: HashMap::from([(
976 job_id.table_id,
977 JobRescheduleTarget {
978 parallelism: parallelism_change,
979 resource_group: resource_group_change,
980 },
981 )]),
982 },
983 true,
984 )
985 .await?;
986 if reschedule_plan.reschedules.is_empty() {
987 tracing::debug!(
988 ?job_id,
989 post_updates = ?reschedule_plan.post_updates,
990 "Empty reschedule plan generated for job.",
991 );
992 self.scale_controller
993 .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
994 .await?;
995 } else {
996 self.reschedule_actors(
997 database_id,
998 reschedule_plan,
999 RescheduleOptions {
1000 resolve_no_shuffle_upstream: false,
1001 skip_create_new_actors: false,
1002 },
1003 )
1004 .await?;
1005 }
1006
1007 Ok(())
1008 }
1009
1010 pub async fn create_subscription(
1012 self: &Arc<Self>,
1013 subscription: &Subscription,
1014 ) -> MetaResult<()> {
1015 let command = Command::CreateSubscription {
1016 subscription_id: subscription.id,
1017 upstream_mv_table_id: TableId::new(subscription.dependent_table_id),
1018 retention_second: subscription.retention_seconds,
1019 };
1020
1021 tracing::debug!("sending Command::CreateSubscription");
1022 self.barrier_scheduler
1023 .run_command(subscription.database_id.into(), command)
1024 .await?;
1025 Ok(())
1026 }
1027
1028 pub async fn drop_subscription(
1030 self: &Arc<Self>,
1031 database_id: DatabaseId,
1032 subscription_id: u32,
1033 table_id: u32,
1034 ) {
1035 let command = Command::DropSubscription {
1036 subscription_id,
1037 upstream_mv_table_id: TableId::new(table_id),
1038 };
1039
1040 tracing::debug!("sending Command::DropSubscriptions");
1041 let _ = self
1042 .barrier_scheduler
1043 .run_command(database_id, command)
1044 .await
1045 .inspect_err(|err| {
1046 tracing::error!(error = ?err.as_report(), "failed to run drop command");
1047 });
1048 }
1049}