1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::iter;
17use std::ops::Deref;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, span};
21use futures::FutureExt;
22use futures::future::join_all;
23use itertools::Itertools;
24use risingwave_common::bail;
25use risingwave_common::catalog::{DatabaseId, TableId};
26use risingwave_meta_model::ObjectId;
27use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
28use risingwave_pb::meta::object::PbObjectInfo;
29use risingwave_pb::meta::subscribe_response::{Operation, PbInfo};
30use risingwave_pb::meta::table_fragments::ActorStatus;
31use risingwave_pb::meta::{PbObject, PbObjectGroup};
32use risingwave_pb::plan_common::PbColumnCatalog;
33use thiserror_ext::AsReport;
34use tokio::sync::mpsc::Sender;
35use tokio::sync::{Mutex, oneshot};
36use tracing::Instrument;
37
38use super::{
39 FragmentBackfillOrder, JobParallelismTarget, JobReschedulePolicy, JobReschedulePostUpdates,
40 JobRescheduleTarget, JobResourceGroupTarget, Locations, RescheduleOptions, ScaleControllerRef,
41};
42use crate::barrier::{
43 BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
44 ReplaceStreamJobPlan, SnapshotBackfillInfo,
45};
46use crate::controller::catalog::DropTableConnectorContext;
47use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
48use crate::error::bail_invalid_parameter;
49use crate::manager::{
50 MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
51};
52use crate::model::{
53 ActorId, Fragment, FragmentDownstreamRelation, FragmentId, FragmentNewNoShuffle,
54 FragmentReplaceUpstream, StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
55};
56use crate::stream::cdc::{
57 assign_cdc_table_snapshot_splits, assign_cdc_table_snapshot_splits_for_replace_table,
58};
59use crate::stream::{SourceChange, SourceManagerRef};
60use crate::{MetaError, MetaResult};
61
62pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
63
64#[derive(Default)]
65pub struct CreateStreamingJobOption {
66 }
68
69pub struct CreateStreamingJobContext {
73 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
75 pub new_no_shuffle: FragmentNewNoShuffle,
76 pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
77
78 pub building_locations: Locations,
80
81 pub definition: String,
83
84 pub mv_table_id: Option<u32>,
85
86 pub create_type: CreateType,
87
88 pub job_type: StreamingJobType,
89
90 pub replace_table_job_info: Option<(
92 StreamingJob,
93 ReplaceStreamJobContext,
94 StreamJobFragmentsToCreate,
95 )>,
96
97 pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
98 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
99
100 pub option: CreateStreamingJobOption,
101
102 pub streaming_job: StreamingJob,
103
104 pub fragment_backfill_ordering: FragmentBackfillOrder,
105}
106
107pub enum CreatingState {
108 Failed { reason: MetaError },
109 Canceling { finish_tx: oneshot::Sender<()> },
111 Created { version: NotificationVersion },
112}
113
114struct StreamingJobExecution {
115 id: TableId,
116 shutdown_tx: Option<Sender<CreatingState>>,
117}
118
119impl StreamingJobExecution {
120 fn new(id: TableId, shutdown_tx: Sender<CreatingState>) -> Self {
121 Self {
122 id,
123 shutdown_tx: Some(shutdown_tx),
124 }
125 }
126}
127
128#[derive(Default)]
129struct CreatingStreamingJobInfo {
130 streaming_jobs: Mutex<HashMap<TableId, StreamingJobExecution>>,
131}
132
133impl CreatingStreamingJobInfo {
134 async fn add_job(&self, job: StreamingJobExecution) {
135 let mut jobs = self.streaming_jobs.lock().await;
136 jobs.insert(job.id, job);
137 }
138
139 async fn delete_job(&self, job_id: TableId) {
140 let mut jobs = self.streaming_jobs.lock().await;
141 jobs.remove(&job_id);
142 }
143
144 async fn cancel_jobs(
145 &self,
146 job_ids: Vec<TableId>,
147 ) -> (HashMap<TableId, oneshot::Receiver<()>>, Vec<TableId>) {
148 let mut jobs = self.streaming_jobs.lock().await;
149 let mut receivers = HashMap::new();
150 let mut recovered_job_ids = vec![];
151 for job_id in job_ids {
152 if let Some(job) = jobs.get_mut(&job_id) {
153 if let Some(shutdown_tx) = job.shutdown_tx.take() {
154 let (tx, rx) = oneshot::channel();
155 if shutdown_tx
156 .send(CreatingState::Canceling { finish_tx: tx })
157 .await
158 .is_ok()
159 {
160 receivers.insert(job_id, rx);
161 } else {
162 tracing::warn!(id=?job_id, "failed to send canceling state");
163 }
164 }
165 } else {
166 recovered_job_ids.push(job_id);
171 }
172 }
173 (receivers, recovered_job_ids)
174 }
175
176 async fn check_job_exists(&self, job_id: TableId) -> bool {
177 let jobs = self.streaming_jobs.lock().await;
178 jobs.contains_key(&job_id)
179 }
180}
181
182type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
183
184#[derive(Debug, Clone)]
185pub struct AutoRefreshSchemaSinkContext {
186 pub tmp_sink_id: ObjectId,
187 pub original_sink: PbSink,
188 pub original_fragment: Fragment,
189 pub new_columns: Vec<PbColumnCatalog>,
190 pub new_fragment: Fragment,
191 pub new_log_store_table: Option<PbTable>,
192 pub actor_status: BTreeMap<ActorId, ActorStatus>,
193}
194
195impl AutoRefreshSchemaSinkContext {
196 pub fn new_fragment_info(&self) -> InflightFragmentInfo {
197 InflightFragmentInfo {
198 fragment_id: self.new_fragment.fragment_id,
199 distribution_type: self.new_fragment.distribution_type.into(),
200 nodes: self.new_fragment.nodes.clone(),
201 actors: self
202 .new_fragment
203 .actors
204 .iter()
205 .map(|actor| {
206 (
207 actor.actor_id as _,
208 InflightActorInfo {
209 worker_id: self.actor_status[&actor.actor_id]
210 .location
211 .as_ref()
212 .unwrap()
213 .worker_node_id as _,
214 vnode_bitmap: actor.vnode_bitmap.clone(),
215 },
216 )
217 })
218 .collect(),
219 state_table_ids: self
220 .new_fragment
221 .state_table_ids
222 .iter()
223 .map(|table| (*table).into())
224 .collect(),
225 }
226 }
227}
228
229pub struct ReplaceStreamJobContext {
233 pub old_fragments: StreamJobFragments,
235
236 pub replace_upstream: FragmentReplaceUpstream,
238 pub new_no_shuffle: FragmentNewNoShuffle,
239
240 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
242
243 pub building_locations: Locations,
245
246 pub streaming_job: StreamingJob,
247
248 pub tmp_id: u32,
249
250 pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
252
253 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
254}
255
256pub struct GlobalStreamManager {
258 pub env: MetaSrvEnv,
259
260 pub metadata_manager: MetadataManager,
261
262 pub barrier_scheduler: BarrierScheduler,
264
265 pub source_manager: SourceManagerRef,
267
268 creating_job_info: CreatingStreamingJobInfoRef,
270
271 pub scale_controller: ScaleControllerRef,
272}
273
274impl GlobalStreamManager {
275 pub fn new(
276 env: MetaSrvEnv,
277 metadata_manager: MetadataManager,
278 barrier_scheduler: BarrierScheduler,
279 source_manager: SourceManagerRef,
280 scale_controller: ScaleControllerRef,
281 ) -> MetaResult<Self> {
282 Ok(Self {
283 env,
284 metadata_manager,
285 barrier_scheduler,
286 source_manager,
287 creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
288 scale_controller,
289 })
290 }
291
292 #[await_tree::instrument]
303 pub async fn create_streaming_job(
304 self: &Arc<Self>,
305 stream_job_fragments: StreamJobFragmentsToCreate,
306 ctx: CreateStreamingJobContext,
307 run_command_notifier: Option<oneshot::Sender<MetaResult<()>>>,
308 ) -> MetaResult<NotificationVersion> {
309 let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
310 let await_tree_span = span!(
311 "{:?}({})",
312 ctx.streaming_job.job_type(),
313 ctx.streaming_job.name()
314 );
315
316 let table_id = stream_job_fragments.stream_job_id();
317 let database_id = ctx.streaming_job.database_id().into();
318 let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
319 let execution = StreamingJobExecution::new(table_id, sender.clone());
320 self.creating_job_info.add_job(execution).await;
321
322 let stream_manager = self.clone();
323 let fut = async move {
324 let res: MetaResult<_> = try {
325 let (source_change, streaming_job) = stream_manager
326 .run_create_streaming_job_command(stream_job_fragments, ctx)
327 .inspect(move |result| {
328 if let Some(tx) = run_command_notifier {
329 let _ = tx.send(match result {
330 Ok(_) => {
331 Ok(())
332 }
333 Err(err) => {
334 Err(err.clone())
335 }
336 });
337 }
338 })
339 .await?;
340 let version = stream_manager
341 .metadata_manager
342 .wait_streaming_job_finished(
343 streaming_job.database_id().into(),
344 streaming_job.id() as _,
345 )
346 .await?;
347 stream_manager.source_manager
348 .apply_source_change(source_change)
349 .await;
350 tracing::debug!(?streaming_job, "stream job finish");
351 version
352 };
353
354 match res {
355 Ok(version) => {
356 let _ = sender
357 .send(CreatingState::Created { version })
358 .await
359 .inspect_err(|_| tracing::warn!("failed to notify created: {table_id}"));
360 }
361 Err(err) => {
362 let _ = sender
363 .send(CreatingState::Failed {
364 reason: err.clone(),
365 })
366 .await
367 .inspect_err(|_| {
368 tracing::warn!(error = %err.as_report(), "failed to notify failed: {table_id}")
369 });
370 }
371 }
372 }
373 .in_current_span();
374
375 let fut = (self.env.await_tree_reg())
376 .register(await_tree_key, await_tree_span)
377 .instrument(Box::pin(fut));
378 tokio::spawn(fut);
379
380 while let Some(state) = receiver
381 .recv()
382 .instrument_await("recv_creating_state")
383 .await
384 {
385 match state {
386 CreatingState::Failed { reason } => {
387 tracing::debug!(id=?table_id, "stream job failed");
388 self.creating_job_info.delete_job(table_id).await;
391 return Err(reason);
392 }
393 CreatingState::Canceling { finish_tx } => {
394 tracing::debug!(id=?table_id, "cancelling streaming job");
395 if let Ok(table_fragments) = self
396 .metadata_manager
397 .get_job_fragments_by_id(&table_id)
398 .await
399 {
400 if self
402 .barrier_scheduler
403 .try_cancel_scheduled_create(database_id, table_id)
404 {
405 tracing::debug!("cancelling streaming job {table_id} in buffer queue.");
406 } else if !table_fragments.is_created() {
407 tracing::debug!(
408 "cancelling streaming job {table_id} by issue cancel command."
409 );
410 self.metadata_manager
411 .catalog_controller
412 .try_abort_creating_streaming_job(table_id.table_id as _, true)
413 .await?;
414
415 self.barrier_scheduler
416 .run_command(database_id, Command::cancel(&table_fragments))
417 .await?;
418 } else {
419 continue;
421 }
422 let _ = finish_tx.send(()).inspect_err(|_| {
423 tracing::warn!("failed to notify cancelled: {table_id}")
424 });
425 self.creating_job_info.delete_job(table_id).await;
426 return Err(MetaError::cancelled("create"));
427 }
428 }
429 CreatingState::Created { version } => {
430 self.creating_job_info.delete_job(table_id).await;
431 return Ok(version);
432 }
433 }
434 }
435 self.creating_job_info.delete_job(table_id).await;
436 bail!("receiver failed to get notification version for finished stream job")
437 }
438
439 #[await_tree::instrument]
442 async fn run_create_streaming_job_command(
443 &self,
444 stream_job_fragments: StreamJobFragmentsToCreate,
445 CreateStreamingJobContext {
446 streaming_job,
447 upstream_fragment_downstreams,
448 new_no_shuffle,
449 upstream_actors,
450 definition,
451 create_type,
452 job_type,
453 replace_table_job_info,
454 snapshot_backfill_info,
455 cross_db_snapshot_backfill_info,
456 fragment_backfill_ordering,
457 ..
458 }: CreateStreamingJobContext,
459 ) -> MetaResult<(SourceChange, StreamingJob)> {
460 let mut replace_table_command = None;
461
462 tracing::debug!(
463 table_id = %stream_job_fragments.stream_job_id(),
464 "built actors finished"
465 );
466
467 if let Some((streaming_job, context, stream_job_fragments)) = replace_table_job_info {
468 self.metadata_manager
469 .catalog_controller
470 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
471 .await?;
472
473 let tmp_table_id = stream_job_fragments.stream_job_id();
474 let init_split_assignment = self
475 .source_manager
476 .allocate_splits(&stream_job_fragments)
477 .await?;
478 let cdc_table_snapshot_split_assignment =
479 assign_cdc_table_snapshot_splits_for_replace_table(
480 context.old_fragments.stream_job_id.table_id,
481 &stream_job_fragments.inner,
482 self.env.meta_store_ref(),
483 )
484 .await?;
485
486 replace_table_command = Some(ReplaceStreamJobPlan {
487 old_fragments: context.old_fragments,
488 new_fragments: stream_job_fragments,
489 replace_upstream: context.replace_upstream,
490 upstream_fragment_downstreams: context.upstream_fragment_downstreams,
491 init_split_assignment,
492 streaming_job,
493 tmp_id: tmp_table_id.table_id,
494 to_drop_state_table_ids: Vec::new(), auto_refresh_schema_sinks: None,
496 cdc_table_snapshot_split_assignment,
497 });
498 }
499
500 let mut init_split_assignment = self
505 .source_manager
506 .allocate_splits(&stream_job_fragments)
507 .await?;
508 init_split_assignment.extend(
509 self.source_manager
510 .allocate_splits_for_backfill(
511 &stream_job_fragments,
512 &new_no_shuffle,
513 &upstream_actors,
514 )
515 .await?,
516 );
517
518 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits(
519 iter::once(stream_job_fragments.deref()),
520 self.env.meta_store_ref(),
521 )
522 .await?;
523
524 let source_change = SourceChange::CreateJobFinished {
525 finished_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
526 };
527
528 let info = CreateStreamingJobCommandInfo {
529 stream_job_fragments,
530 upstream_fragment_downstreams,
531 init_split_assignment,
532 definition: definition.clone(),
533 streaming_job: streaming_job.clone(),
534 job_type,
535 create_type,
536 fragment_backfill_ordering,
537 cdc_table_snapshot_split_assignment,
538 };
539
540 let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
541 tracing::debug!(
542 ?snapshot_backfill_info,
543 "sending Command::CreateSnapshotBackfillStreamingJob"
544 );
545 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
546 } else {
547 tracing::debug!("sending Command::CreateStreamingJob");
548 if let Some(replace_table_command) = replace_table_command {
549 CreateStreamingJobType::SinkIntoTable(replace_table_command)
550 } else {
551 CreateStreamingJobType::Normal
552 }
553 };
554
555 let command = Command::CreateStreamingJob {
556 info,
557 job_type,
558 cross_db_snapshot_backfill_info,
559 };
560
561 self.barrier_scheduler
562 .run_command(streaming_job.database_id().into(), command)
563 .await?;
564
565 tracing::debug!(?streaming_job, "first barrier collected for stream job");
566
567 Ok((source_change, streaming_job))
568 }
569
570 pub async fn replace_stream_job(
572 &self,
573 new_fragments: StreamJobFragmentsToCreate,
574 ReplaceStreamJobContext {
575 old_fragments,
576 replace_upstream,
577 new_no_shuffle,
578 upstream_fragment_downstreams,
579 tmp_id,
580 streaming_job,
581 drop_table_connector_ctx,
582 auto_refresh_schema_sinks,
583 ..
584 }: ReplaceStreamJobContext,
585 ) -> MetaResult<()> {
586 let init_split_assignment = if streaming_job.is_source() {
587 self.source_manager
588 .allocate_splits_for_replace_source(
589 &new_fragments,
590 &replace_upstream,
591 &new_no_shuffle,
592 )
593 .await?
594 } else {
595 self.source_manager.allocate_splits(&new_fragments).await?
596 };
597 tracing::info!(
598 "replace_stream_job - allocate split: {:?}",
599 init_split_assignment
600 );
601
602 let cdc_table_snapshot_split_assignment =
603 assign_cdc_table_snapshot_splits_for_replace_table(
604 old_fragments.stream_job_id.table_id,
605 &new_fragments.inner,
606 self.env.meta_store_ref(),
607 )
608 .await?;
609
610 self.barrier_scheduler
611 .run_command(
612 streaming_job.database_id().into(),
613 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
614 old_fragments,
615 new_fragments,
616 replace_upstream,
617 upstream_fragment_downstreams,
618 init_split_assignment,
619 streaming_job,
620 tmp_id,
621 to_drop_state_table_ids: {
622 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
623 vec![TableId::new(
624 drop_table_connector_ctx.to_remove_state_table_id as _,
625 )]
626 } else {
627 Vec::new()
628 }
629 },
630 auto_refresh_schema_sinks,
631 cdc_table_snapshot_split_assignment,
632 }),
633 )
634 .await?;
635
636 Ok(())
637 }
638
639 pub async fn drop_streaming_jobs(
643 &self,
644 database_id: DatabaseId,
645 removed_actors: Vec<ActorId>,
646 streaming_job_ids: Vec<ObjectId>,
647 state_table_ids: Vec<risingwave_meta_model::TableId>,
648 fragment_ids: HashSet<FragmentId>,
649 ) {
650 for &job_id in &streaming_job_ids {
652 if self
653 .creating_job_info
654 .check_job_exists(TableId::new(job_id as _))
655 .await
656 {
657 tracing::info!(
658 ?job_id,
659 "streaming job is creating, cancel it with drop directly"
660 );
661 self.metadata_manager
662 .notify_cancelled(database_id, job_id)
663 .await;
664 }
665 }
666
667 if !removed_actors.is_empty()
668 || !streaming_job_ids.is_empty()
669 || !state_table_ids.is_empty()
670 {
671 let res = self
672 .barrier_scheduler
673 .run_command(
674 database_id,
675 Command::DropStreamingJobs {
676 table_fragments_ids: streaming_job_ids
677 .iter()
678 .map(|job_id| TableId::new(*job_id as _))
679 .collect(),
680 actors: removed_actors,
681 unregistered_state_table_ids: state_table_ids
682 .iter()
683 .map(|table_id| TableId::new(*table_id as _))
684 .collect(),
685 unregistered_fragment_ids: fragment_ids,
686 },
687 )
688 .await
689 .inspect_err(|err| {
690 tracing::error!(error = ?err.as_report(), "failed to run drop command");
691 });
692 if res.is_ok() {
693 self.post_dropping_streaming_jobs(state_table_ids).await;
694 }
695 }
696 }
697
698 async fn post_dropping_streaming_jobs(
699 &self,
700 state_table_ids: Vec<risingwave_meta_model::TableId>,
701 ) {
702 let tables = self
703 .metadata_manager
704 .catalog_controller
705 .complete_dropped_tables(state_table_ids.into_iter())
706 .await;
707 let objects = tables
708 .into_iter()
709 .map(|t| PbObject {
710 object_info: Some(PbObjectInfo::Table(t)),
711 })
712 .collect();
713 let group = PbInfo::ObjectGroup(PbObjectGroup { objects });
714 self.env
715 .notification_manager()
716 .notify_hummock(Operation::Delete, group.clone())
717 .await;
718 self.env
719 .notification_manager()
720 .notify_compactor(Operation::Delete, group)
721 .await;
722 }
723
724 pub async fn cancel_streaming_jobs(&self, table_ids: Vec<TableId>) -> Vec<TableId> {
731 if table_ids.is_empty() {
732 return vec![];
733 }
734
735 let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
736 let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;
737
738 let futures = receivers.into_iter().map(|(id, receiver)| async move {
739 if receiver.await.is_ok() {
740 tracing::info!("canceled streaming job {id}");
741 Some(id)
742 } else {
743 tracing::warn!("failed to cancel streaming job {id}");
744 None
745 }
746 });
747 let mut cancelled_ids = join_all(futures).await.into_iter().flatten().collect_vec();
748
749 let futures = recovered_job_ids.into_iter().map(|id| async move {
752 tracing::debug!(?id, "cancelling recovered streaming job");
753 let result: MetaResult<()> = try {
754 let fragment = self
755 .metadata_manager.get_job_fragments_by_id(&id)
756 .await?;
757 if fragment.is_created() {
758 Err(MetaError::invalid_parameter(format!(
759 "streaming job {} is already created",
760 id
761 )))?;
762 }
763
764 let (_, database_id) = self.metadata_manager
765 .catalog_controller
766 .try_abort_creating_streaming_job(id.table_id as _, true)
767 .await?;
768
769 if let Some(database_id) = database_id {
770 self.barrier_scheduler
771 .run_command(DatabaseId::new(database_id as _), Command::cancel(&fragment))
772 .await?;
773 }
774 };
775 match result {
776 Ok(_) => {
777 tracing::info!(?id, "cancelled recovered streaming job");
778 Some(id)
779 }
780 Err(err) => {
781 tracing::error!(error=?err.as_report(), "failed to cancel recovered streaming job {id}, does it correspond to any jobs in `SHOW JOBS`?");
782 None
783 }
784 }
785 });
786 let cancelled_recovered_ids = join_all(futures).await.into_iter().flatten().collect_vec();
787
788 cancelled_ids.extend(cancelled_recovered_ids);
789 cancelled_ids
790 }
791
792 pub(crate) async fn reschedule_streaming_job(
793 &self,
794 job_id: u32,
795 target: JobRescheduleTarget,
796 deferred: bool,
797 ) -> MetaResult<()> {
798 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
799 let background_jobs = self
800 .metadata_manager
801 .list_background_creating_jobs()
802 .await?;
803
804 if !background_jobs.is_empty() {
805 let related_jobs = self
806 .scale_controller
807 .resolve_related_no_shuffle_jobs(&background_jobs)
808 .await?;
809
810 for job in background_jobs {
811 if related_jobs.contains(&job) {
812 bail!(
813 "Cannot alter the job {} because the related job {} is currently being created",
814 job_id,
815 job.table_id
816 );
817 }
818 }
819 }
820
821 let JobRescheduleTarget {
822 parallelism: parallelism_change,
823 resource_group: resource_group_change,
824 } = target;
825
826 let database_id = DatabaseId::new(
827 self.metadata_manager
828 .catalog_controller
829 .get_object_database_id(job_id as ObjectId)
830 .await? as _,
831 );
832 let job_id = TableId::new(job_id);
833
834 let worker_nodes = self
835 .metadata_manager
836 .list_active_streaming_compute_nodes()
837 .await?
838 .into_iter()
839 .filter(|w| w.is_streaming_schedulable())
840 .collect_vec();
841
842 let available_parallelism = worker_nodes
844 .iter()
845 .map(|w| w.compute_node_parallelism())
846 .sum::<usize>();
847 let max_parallelism = self
848 .metadata_manager
849 .get_job_max_parallelism(job_id)
850 .await?;
851
852 if let JobParallelismTarget::Update(parallelism) = parallelism_change {
853 match parallelism {
854 TableParallelism::Adaptive => {
855 if available_parallelism > max_parallelism {
856 tracing::warn!(
857 "too many parallelism available, use max parallelism {} will be limited",
858 max_parallelism
859 );
860 }
861 }
862 TableParallelism::Fixed(parallelism) => {
863 if parallelism > max_parallelism {
864 bail_invalid_parameter!(
865 "specified parallelism {} should not exceed max parallelism {}",
866 parallelism,
867 max_parallelism
868 );
869 }
870 }
871 TableParallelism::Custom => {
872 bail_invalid_parameter!("should not alter parallelism to custom")
873 }
874 }
875 }
876
877 let table_parallelism_assignment = match ¶llelism_change {
878 JobParallelismTarget::Update(parallelism) => HashMap::from([(job_id, *parallelism)]),
879 JobParallelismTarget::Refresh => HashMap::new(),
880 };
881 let resource_group_assignment = match &resource_group_change {
882 JobResourceGroupTarget::Update(target) => {
883 HashMap::from([(job_id.table_id() as ObjectId, target.clone())])
884 }
885 JobResourceGroupTarget::Keep => HashMap::new(),
886 };
887
888 if deferred {
889 tracing::debug!(
890 "deferred mode enabled for job {}, set the parallelism directly to parallelism {:?}, resource group {:?}",
891 job_id,
892 parallelism_change,
893 resource_group_change,
894 );
895 self.scale_controller
896 .post_apply_reschedule(
897 &HashMap::new(),
898 &JobReschedulePostUpdates {
899 parallelism_updates: table_parallelism_assignment,
900 resource_group_updates: resource_group_assignment,
901 },
902 )
903 .await?;
904 } else {
905 let reschedule_plan = self
906 .scale_controller
907 .generate_job_reschedule_plan(
908 JobReschedulePolicy {
909 targets: HashMap::from([(
910 job_id.table_id,
911 JobRescheduleTarget {
912 parallelism: parallelism_change,
913 resource_group: resource_group_change,
914 },
915 )]),
916 },
917 false,
918 )
919 .await?;
920
921 if reschedule_plan.reschedules.is_empty() {
922 tracing::debug!(
923 "empty reschedule plan generated for job {}, set the parallelism directly to {:?}",
924 job_id,
925 reschedule_plan.post_updates
926 );
927 self.scale_controller
928 .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
929 .await?;
930 } else {
931 self.reschedule_actors(
932 database_id,
933 reschedule_plan,
934 RescheduleOptions {
935 resolve_no_shuffle_upstream: false,
936 skip_create_new_actors: false,
937 },
938 )
939 .await?;
940 }
941 };
942
943 Ok(())
944 }
945
946 pub(crate) async fn reschedule_cdc_table_backfill(
948 &self,
949 job_id: u32,
950 target: JobRescheduleTarget,
951 ) -> MetaResult<()> {
952 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
953 let JobRescheduleTarget {
954 parallelism: parallelism_change,
955 resource_group: resource_group_change,
956 } = target;
957 let database_id = DatabaseId::new(
958 self.metadata_manager
959 .catalog_controller
960 .get_object_database_id(job_id as ObjectId)
961 .await? as _,
962 );
963 let job_id = TableId::new(job_id);
964 if let JobParallelismTarget::Update(parallelism) = ¶llelism_change {
965 match parallelism {
966 TableParallelism::Fixed(_) => {}
967 TableParallelism::Custom => {
968 bail_invalid_parameter!("should not alter parallelism to custom")
969 }
970 TableParallelism::Adaptive => {
971 bail_invalid_parameter!("should not alter parallelism to adaptive")
972 }
973 }
974 } else {
975 bail_invalid_parameter!("should not refresh")
976 }
977 match &resource_group_change {
978 JobResourceGroupTarget::Update(_) => {
979 bail_invalid_parameter!("should not update resource group")
980 }
981 JobResourceGroupTarget::Keep => {}
982 };
983 let reschedule_plan = self
985 .scale_controller
986 .generate_job_reschedule_plan(
987 JobReschedulePolicy {
988 targets: HashMap::from([(
989 job_id.table_id,
990 JobRescheduleTarget {
991 parallelism: parallelism_change,
992 resource_group: resource_group_change,
993 },
994 )]),
995 },
996 true,
997 )
998 .await?;
999 if reschedule_plan.reschedules.is_empty() {
1000 tracing::debug!(
1001 ?job_id,
1002 post_updates = ?reschedule_plan.post_updates,
1003 "Empty reschedule plan generated for job.",
1004 );
1005 self.scale_controller
1006 .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
1007 .await?;
1008 } else {
1009 self.reschedule_actors(
1010 database_id,
1011 reschedule_plan,
1012 RescheduleOptions {
1013 resolve_no_shuffle_upstream: false,
1014 skip_create_new_actors: false,
1015 },
1016 )
1017 .await?;
1018 }
1019
1020 Ok(())
1021 }
1022
1023 pub async fn create_subscription(
1025 self: &Arc<Self>,
1026 subscription: &Subscription,
1027 ) -> MetaResult<()> {
1028 let command = Command::CreateSubscription {
1029 subscription_id: subscription.id,
1030 upstream_mv_table_id: TableId::new(subscription.dependent_table_id),
1031 retention_second: subscription.retention_seconds,
1032 };
1033
1034 tracing::debug!("sending Command::CreateSubscription");
1035 self.barrier_scheduler
1036 .run_command(subscription.database_id.into(), command)
1037 .await?;
1038 Ok(())
1039 }
1040
1041 pub async fn drop_subscription(
1043 self: &Arc<Self>,
1044 database_id: DatabaseId,
1045 subscription_id: u32,
1046 table_id: u32,
1047 ) {
1048 let command = Command::DropSubscription {
1049 subscription_id,
1050 upstream_mv_table_id: TableId::new(table_id),
1051 };
1052
1053 tracing::debug!("sending Command::DropSubscriptions");
1054 let _ = self
1055 .barrier_scheduler
1056 .run_command(database_id, command)
1057 .await
1058 .inspect_err(|err| {
1059 tracing::error!(error = ?err.as_report(), "failed to run drop command");
1060 });
1061 }
1062}