1use std::collections::HashMap;
16use std::sync::Arc;
17
18use await_tree::span;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::{JobId, SinkId};
25use risingwave_connector::source::CdcTableSnapshotSplitRaw;
26use risingwave_meta_model::prelude::Fragment as FragmentModel;
27use risingwave_meta_model::{StreamingParallelism, WorkerId, fragment, streaming_job};
28use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
29use risingwave_pb::expr::PbExprNode;
30use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
31use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
32use thiserror_ext::AsReport;
33use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
34use tracing::Instrument;
35
36use super::{
37 GlobalRefreshManagerRef, ReschedulePolicy, ScaleControllerRef, StreamFragmentGraph,
38 UserDefinedFragmentBackfillOrder,
39};
40use crate::barrier::{
41 BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
42 ReplaceStreamJobPlan, SnapshotBackfillInfo,
43};
44use crate::controller::catalog::DropTableConnectorContext;
45use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
46use crate::error::bail_invalid_parameter;
47use crate::hummock::HummockManagerRef;
48use crate::manager::{
49 MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
50};
51use crate::model::{
52 ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
53 FragmentReplaceUpstream, StreamActor, StreamContext, StreamJobFragments,
54 StreamJobFragmentsToCreate, SubscriptionId,
55};
56use crate::stream::{ReplaceJobSplitPlan, SourceManagerRef};
57use crate::{MetaError, MetaResult};
58
59pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
60
61pub(crate) async fn cleanup_dropped_streaming_jobs(
62 refresh_manager: &GlobalRefreshManagerRef,
63 hummock_manager: &HummockManagerRef,
64 metadata_manager: &MetadataManager,
65 streaming_job_ids: impl IntoIterator<Item = JobId>,
66 state_table_ids: Vec<TableId>,
67 progress_status: &str,
68) -> MetaResult<()> {
69 for job_id in streaming_job_ids {
70 refresh_manager.remove_progress_tracker(job_id.as_mv_table_id(), progress_status);
71 }
72
73 if state_table_ids.is_empty() {
74 return Ok(());
75 }
76
77 hummock_manager
78 .unregister_table_ids(state_table_ids.clone())
79 .await?;
80 metadata_manager
81 .catalog_controller
82 .complete_dropped_tables(state_table_ids)
83 .await;
84 Ok(())
85}
86
87#[derive(Default)]
88pub struct CreateStreamingJobOption {
89 }
91
92#[derive(Debug, Clone)]
93pub struct UpstreamSinkInfo {
94 pub sink_id: SinkId,
95 pub sink_fragment_id: FragmentId,
96 pub sink_output_fields: Vec<PbField>,
97 pub sink_original_target_columns: Vec<PbColumnCatalog>,
99 pub project_exprs: Vec<PbExprNode>,
100 pub new_sink_downstream: DownstreamFragmentRelation,
101}
102
103pub struct CreateStreamingJobContext {
107 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
109
110 pub database_resource_group: String,
112
113 pub definition: String,
115
116 pub create_type: CreateType,
117
118 pub job_type: StreamingJobType,
119
120 pub new_upstream_sink: Option<UpstreamSinkInfo>,
122
123 pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
124 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
125
126 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
127
128 pub option: CreateStreamingJobOption,
129
130 pub streaming_job: StreamingJob,
131
132 pub fragment_backfill_ordering: UserDefinedFragmentBackfillOrder,
133
134 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
135
136 pub is_serverless_backfill: bool,
137
138 pub streaming_job_model: streaming_job::Model,
140}
141
142struct StreamingJobExecution {
143 id: JobId,
144 shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
145 _permit: OwnedSemaphorePermit,
146}
147
148impl StreamingJobExecution {
149 fn new(
150 id: JobId,
151 shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
152 permit: OwnedSemaphorePermit,
153 ) -> Self {
154 Self {
155 id,
156 shutdown_tx: Some(shutdown_tx),
157 _permit: permit,
158 }
159 }
160}
161
162#[derive(Default)]
163struct CreatingStreamingJobInfo {
164 streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
165}
166
167impl CreatingStreamingJobInfo {
168 async fn add_job(&self, job: StreamingJobExecution) {
169 let mut jobs = self.streaming_jobs.lock().await;
170 jobs.insert(job.id, job);
171 }
172
173 async fn delete_job(&self, job_id: JobId) {
174 let mut jobs = self.streaming_jobs.lock().await;
175 jobs.remove(&job_id);
176 }
177
178 async fn cancel_jobs(
179 &self,
180 job_ids: Vec<JobId>,
181 ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
182 let mut jobs = self.streaming_jobs.lock().await;
183 let mut receivers = HashMap::new();
184 let mut background_job_ids = vec![];
185 for job_id in job_ids {
186 if let Some(job) = jobs.get_mut(&job_id) {
187 if let Some(shutdown_tx) = job.shutdown_tx.take() {
188 let (tx, rx) = oneshot::channel();
189 match shutdown_tx.send(tx) {
190 Ok(()) => {
191 receivers.insert(job_id, rx);
192 }
193 Err(_) => {
194 return Err(anyhow::anyhow!(
195 "failed to send shutdown signal for streaming job {}: receiver dropped",
196 job_id
197 )
198 .into());
199 }
200 }
201 }
202 } else {
203 background_job_ids.push(job_id);
205 }
206 }
207
208 Ok((receivers, background_job_ids))
209 }
210}
211
212type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
213
214#[derive(Debug, Clone)]
215pub struct AutoRefreshSchemaSinkContext {
216 pub tmp_sink_id: SinkId,
217 pub original_sink: PbSink,
218 pub original_fragment: Fragment,
219 pub new_schema: Vec<PbColumnCatalog>,
220 pub newly_add_fields: Vec<Field>,
221 pub removed_column_names: Vec<String>,
222 pub new_fragment: Fragment,
223 pub new_log_store_table: Option<Box<PbTable>>,
224 pub ctx: StreamContext,
226}
227
228impl AutoRefreshSchemaSinkContext {
229 pub fn new_fragment_info(
230 &self,
231 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
232 actor_location: &HashMap<ActorId, WorkerId>,
233 ) -> InflightFragmentInfo {
234 InflightFragmentInfo {
235 fragment_id: self.new_fragment.fragment_id,
236 distribution_type: self.new_fragment.distribution_type.into(),
237 fragment_type_mask: self.new_fragment.fragment_type_mask,
238 vnode_count: self.new_fragment.vnode_count(),
239 nodes: self.new_fragment.nodes.clone(),
240 actors: stream_actors
241 .get(&self.new_fragment.fragment_id)
242 .into_iter()
243 .flatten()
244 .map(|actor| {
245 (
246 actor.actor_id,
247 InflightActorInfo {
248 worker_id: actor_location[&actor.actor_id],
249 vnode_bitmap: actor.vnode_bitmap.clone(),
250 splits: vec![],
251 },
252 )
253 })
254 .collect(),
255 state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
256 }
257 }
258}
259
260pub struct ReplaceStreamJobContext {
264 pub old_fragments: StreamJobFragments,
266
267 pub replace_upstream: FragmentReplaceUpstream,
269
270 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
272
273 pub streaming_job: StreamingJob,
274
275 pub database_resource_group: String,
277
278 pub tmp_id: JobId,
279
280 pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
282
283 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
284
285 pub streaming_job_model: streaming_job::Model,
287}
288
289pub struct GlobalStreamManager {
291 pub env: MetaSrvEnv,
292
293 pub metadata_manager: MetadataManager,
294
295 pub barrier_scheduler: BarrierScheduler,
297
298 pub hummock_manager: HummockManagerRef,
299
300 pub source_manager: SourceManagerRef,
302
303 pub refresh_manager: GlobalRefreshManagerRef,
304
305 creating_job_info: CreatingStreamingJobInfoRef,
307
308 pub scale_controller: ScaleControllerRef,
309}
310
311impl GlobalStreamManager {
312 pub fn new(
313 env: MetaSrvEnv,
314 metadata_manager: MetadataManager,
315 barrier_scheduler: BarrierScheduler,
316 hummock_manager: HummockManagerRef,
317 source_manager: SourceManagerRef,
318 refresh_manager: GlobalRefreshManagerRef,
319 scale_controller: ScaleControllerRef,
320 ) -> MetaResult<Self> {
321 Ok(Self {
322 env,
323 metadata_manager,
324 barrier_scheduler,
325 hummock_manager,
326 source_manager,
327 refresh_manager,
328 creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
329 scale_controller,
330 })
331 }
332
333 #[await_tree::instrument]
344 pub async fn create_streaming_job(
345 self: &Arc<Self>,
346 stream_job_fragments: StreamJobFragmentsToCreate,
347 ctx: CreateStreamingJobContext,
348 permit: OwnedSemaphorePermit,
349 ) -> MetaResult<NotificationVersion> {
350 let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
351 let await_tree_span = span!(
352 "{:?}({})",
353 ctx.streaming_job.job_type(),
354 ctx.streaming_job.name()
355 );
356
357 let job_id = stream_job_fragments.stream_job_id();
358 let database_id = ctx.streaming_job.database_id();
359
360 let (cancel_tx, cancel_rx) = oneshot::channel();
361 let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
362 self.creating_job_info.add_job(execution).await;
363
364 let stream_manager = self.clone();
365 let fut = async move {
366 let create_type = ctx.create_type;
367 let streaming_job = stream_manager
368 .run_create_streaming_job_command(stream_job_fragments, ctx)
369 .await?;
370 let version = match create_type {
371 CreateType::Background => {
372 stream_manager
373 .env
374 .notification_manager_ref()
375 .current_version()
376 .await
377 }
378 CreateType::Foreground => {
379 stream_manager
380 .metadata_manager
381 .wait_streaming_job_finished(database_id, streaming_job.id() as _)
382 .await?
383 }
384 CreateType::Unspecified => unreachable!(),
385 };
386
387 tracing::debug!(?streaming_job, "stream job finish");
388 Ok(version)
389 }
390 .in_current_span();
391
392 let create_fut = (self.env.await_tree_reg())
393 .register(await_tree_key, await_tree_span)
394 .instrument(Box::pin(fut));
395
396 let result = async {
397 tokio::select! {
398 biased;
399
400 res = create_fut => res,
401 notifier = cancel_rx => {
402 let notifier = notifier.expect("sender should not be dropped");
403 tracing::debug!(id=%job_id, "cancelling streaming job");
404
405 enum CancelResult {
406 Completed(MetaResult<NotificationVersion>),
407 Failed(MetaError),
408 Cancelled,
409 }
410
411 let cancel_res = if let Ok(job_fragments) =
412 self.metadata_manager.get_job_fragments_by_id(job_id).await
413 {
414 if self
416 .barrier_scheduler
417 .try_cancel_scheduled_create(database_id, job_id)
418 {
419 tracing::debug!(
420 id=%job_id,
421 "cancelling streaming job in buffer queue."
422 );
423 CancelResult::Cancelled
424 } else if !job_fragments.is_created() {
425 tracing::debug!(
426 id=%job_id,
427 "cancelling streaming job by issue cancel command."
428 );
429
430 let cancel_result: MetaResult<()> = async {
431 let cancel_command = self.metadata_manager.catalog_controller
432 .build_cancel_command(&job_fragments)
433 .await?;
434 let cleanup_state_table_ids =
435 job_fragments.all_table_ids().collect_vec();
436 self.metadata_manager.catalog_controller
437 .try_abort_creating_streaming_job(job_id, true)
438 .await?;
439
440 self.barrier_scheduler
441 .run_command(database_id, cancel_command)
442 .await?;
443 cleanup_dropped_streaming_jobs(
444 &self.refresh_manager,
445 &self.hummock_manager,
446 &self.metadata_manager,
447 [job_id],
448 cleanup_state_table_ids,
449 "cancel_streaming_job",
450 )
451 .await?;
452 Ok(())
453 }
454 .await;
455
456 match cancel_result {
457 Ok(()) => CancelResult::Cancelled,
458 Err(err) => {
459 tracing::warn!(
460 error = ?err.as_report(),
461 id = %job_id,
462 "failed to run cancel command for creating streaming job"
463 );
464 CancelResult::Failed(err)
465 }
466 }
467 } else {
468 CancelResult::Completed(
470 self.metadata_manager
471 .wait_streaming_job_finished(database_id, job_id)
472 .await,
473 )
474 }
475 } else {
476 CancelResult::Cancelled
477 };
478
479 let (cancelled, result) = match cancel_res {
480 CancelResult::Completed(result) => (false, result),
481 CancelResult::Failed(err) => (false, Err(err)),
482 CancelResult::Cancelled => (true, Err(MetaError::cancelled("create"))),
483 };
484
485 let _ = notifier
486 .send(cancelled)
487 .inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
488
489 result
490 }
491 }
492 }
493 .await;
494
495 tracing::debug!("cleaning creating job info: {}", job_id);
496 self.creating_job_info.delete_job(job_id).await;
497 result
498 }
499
500 #[await_tree::instrument]
503 async fn run_create_streaming_job_command(
504 &self,
505 stream_job_fragments: StreamJobFragmentsToCreate,
506 CreateStreamingJobContext {
507 streaming_job,
508 upstream_fragment_downstreams,
509 database_resource_group,
510 definition,
511 create_type,
512 job_type,
513 new_upstream_sink,
514 snapshot_backfill_info,
515 cross_db_snapshot_backfill_info,
516 fragment_backfill_ordering,
517 locality_fragment_state_table_mapping,
518 cdc_table_snapshot_splits,
519 is_serverless_backfill,
520 streaming_job_model,
521 ..
522 }: CreateStreamingJobContext,
523 ) -> MetaResult<StreamingJob> {
524 tracing::debug!(
525 table_id = %stream_job_fragments.stream_job_id(),
526 "built actors finished"
527 );
528
529 let init_split_assignment = self
534 .source_manager
535 .discover_splits(&stream_job_fragments)
536 .await?;
537
538 let fragment_backfill_ordering =
539 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
540 fragment_backfill_ordering,
541 &stream_job_fragments.downstreams,
542 || {
543 stream_job_fragments
544 .fragments
545 .iter()
546 .map(|(fragment_id, fragment)| {
547 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
548 })
549 },
550 );
551
552 let info = CreateStreamingJobCommandInfo {
553 stream_job_fragments,
554 upstream_fragment_downstreams,
555 init_split_assignment,
556 definition: definition.clone(),
557 streaming_job: streaming_job.clone(),
558 job_type,
559 create_type,
560 database_resource_group,
561 fragment_backfill_ordering,
562 cdc_table_snapshot_splits,
563 locality_fragment_state_table_mapping,
564 is_serverless: is_serverless_backfill,
565 streaming_job_model,
566 };
567
568 let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
569 tracing::debug!(
570 ?snapshot_backfill_info,
571 "sending Command::CreateSnapshotBackfillStreamingJob"
572 );
573 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
574 } else {
575 tracing::debug!("sending Command::CreateStreamingJob");
576 if let Some(new_upstream_sink) = new_upstream_sink {
577 CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
578 } else {
579 CreateStreamingJobType::Normal
580 }
581 };
582
583 let command = Command::CreateStreamingJob {
584 info,
585 job_type,
586 cross_db_snapshot_backfill_info,
587 };
588
589 self.barrier_scheduler
590 .run_command(streaming_job.database_id(), command)
591 .await?;
592
593 tracing::debug!(?streaming_job, "first barrier collected for stream job");
594
595 Ok(streaming_job)
596 }
597
598 pub async fn replace_stream_job(
600 &self,
601 new_fragments: StreamJobFragmentsToCreate,
602 ReplaceStreamJobContext {
603 old_fragments,
604 replace_upstream,
605 upstream_fragment_downstreams,
606 tmp_id,
607 streaming_job,
608 drop_table_connector_ctx,
609 auto_refresh_schema_sinks,
610 streaming_job_model,
611 database_resource_group,
612 }: ReplaceStreamJobContext,
613 ) -> MetaResult<()> {
614 let split_plan = if streaming_job.is_source() {
619 match self
620 .source_manager
621 .discover_splits_for_replace_source(&new_fragments, &replace_upstream)
622 .await?
623 {
624 Some(discovered) => ReplaceJobSplitPlan::Discovered(discovered),
625 None => ReplaceJobSplitPlan::AlignFromPrevious,
626 }
627 } else {
628 let discovered = self.source_manager.discover_splits(&new_fragments).await?;
629 ReplaceJobSplitPlan::Discovered(discovered)
630 };
631 tracing::info!("replace_stream_job - split plan: {:?}", split_plan);
632
633 self.barrier_scheduler
634 .run_command(
635 streaming_job.database_id(),
636 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
637 old_fragments,
638 new_fragments,
639 database_resource_group,
640 replace_upstream,
641 upstream_fragment_downstreams,
642 split_plan,
643 streaming_job,
644 streaming_job_model,
645 tmp_id,
646 to_drop_state_table_ids: {
647 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
648 vec![drop_table_connector_ctx.to_remove_state_table_id]
649 } else {
650 Vec::new()
651 }
652 },
653 auto_refresh_schema_sinks,
654 }),
655 )
656 .await?;
657
658 Ok(())
659 }
660
661 pub async fn drop_streaming_jobs(
665 &self,
666 database_id: DatabaseId,
667 streaming_job_ids: Vec<JobId>,
668 state_table_ids: Vec<TableId>,
669 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
670 ) {
671 if !streaming_job_ids.is_empty() || !state_table_ids.is_empty() {
672 let cleanup_streaming_job_ids = streaming_job_ids.clone();
673 let cleanup_state_table_ids = state_table_ids.clone();
674 let run_result = self
675 .barrier_scheduler
676 .run_command(
677 database_id,
678 Command::DropStreamingJobs {
679 streaming_job_ids: streaming_job_ids.into_iter().collect(),
680 unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
681 dropped_sink_fragment_by_targets,
682 },
683 )
684 .await;
685 let result = match run_result {
686 Ok(()) => {
687 cleanup_dropped_streaming_jobs(
688 &self.refresh_manager,
689 &self.hummock_manager,
690 &self.metadata_manager,
691 cleanup_streaming_job_ids,
692 cleanup_state_table_ids,
693 "drop_streaming_jobs",
694 )
695 .await
696 }
697 Err(err) => Err(err),
698 };
699 let _ = result.inspect_err(|err| {
700 tracing::error!(error = ?err.as_report(), "failed to run drop command");
701 });
702 }
703 }
704
705 pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
711 if job_ids.is_empty() {
712 return Ok(vec![]);
713 }
714
715 let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
716 let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
717
718 let futures = receivers.into_iter().map(|(id, receiver)| async move {
719 if let Ok(cancelled) = receiver.await
720 && cancelled
721 {
722 tracing::info!("canceled streaming job {id}");
723 Ok(id)
724 } else {
725 Err(MetaError::from(anyhow::anyhow!(
726 "failed to cancel streaming job {id}"
727 )))
728 }
729 });
730 let mut cancelled_ids = join_all(futures)
731 .await
732 .into_iter()
733 .collect::<MetaResult<Vec<_>>>()?;
734
735 let futures = background_job_ids.into_iter().map(|id| async move {
738 let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
739 if fragment.is_created() {
740 tracing::warn!(
741 "streaming job {} is already created, ignore cancel request",
742 id
743 );
744 return Ok(None);
745 }
746 if fragment.is_created() {
747 Err(MetaError::invalid_parameter(format!(
748 "streaming job {} is already created",
749 id
750 )))?;
751 }
752
753 let cancel_command = self
754 .metadata_manager
755 .catalog_controller
756 .build_cancel_command(&fragment)
757 .await?;
758 let cleanup_state_table_ids = fragment.all_table_ids().collect_vec();
759
760 let (_, database_id) = self
761 .metadata_manager
762 .catalog_controller
763 .try_abort_creating_streaming_job(id, true)
764 .await?;
765
766 if let Some(database_id) = database_id {
767 self.barrier_scheduler
768 .run_command(database_id, cancel_command)
769 .await?;
770 cleanup_dropped_streaming_jobs(
771 &self.refresh_manager,
772 &self.hummock_manager,
773 &self.metadata_manager,
774 [id],
775 cleanup_state_table_ids,
776 "cancel_streaming_job",
777 )
778 .await?;
779 }
780
781 tracing::info!(?id, "cancelled background streaming job");
782 Ok(Some(id))
783 });
784 let cancelled_recovered_ids = join_all(futures)
785 .await
786 .into_iter()
787 .collect::<MetaResult<Vec<_>>>()?;
788
789 cancelled_ids.extend(cancelled_recovered_ids.into_iter().flatten());
790 Ok(cancelled_ids)
791 }
792
793 pub(crate) async fn reschedule_streaming_job(
794 &self,
795 job_id: JobId,
796 policy: ReschedulePolicy,
797 deferred: bool,
798 ) -> MetaResult<()> {
799 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
800
801 let background_jobs = self
802 .metadata_manager
803 .list_background_creating_jobs()
804 .await?;
805
806 if !background_jobs.is_empty() {
807 let blocked_jobs = self
808 .metadata_manager
809 .collect_reschedule_blocked_jobs_for_creating_jobs(&background_jobs, !deferred)
810 .await?;
811
812 if blocked_jobs.contains(&job_id) {
813 bail!(
814 "Cannot alter the job {} because it is blocked by creating unreschedulable backfill jobs",
815 job_id,
816 );
817 }
818 }
819
820 let commands = self
821 .scale_controller
822 .reschedule_inplace(HashMap::from([(job_id, policy)]))
823 .await?;
824
825 if !deferred {
826 let _source_pause_guard = self.source_manager.pause_tick().await;
827
828 for (database_id, command) in commands {
829 self.barrier_scheduler
830 .run_command(database_id, command)
831 .await?;
832 }
833 }
834
835 Ok(())
836 }
837
838 pub(crate) async fn reschedule_streaming_job_backfill_parallelism(
839 &self,
840 job_id: JobId,
841 parallelism: Option<StreamingParallelism>,
842 deferred: bool,
843 ) -> MetaResult<()> {
844 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
845
846 let background_jobs = self
847 .metadata_manager
848 .list_background_creating_jobs()
849 .await?;
850
851 if !background_jobs.is_empty() {
852 let unreschedulable = self
853 .metadata_manager
854 .collect_unreschedulable_backfill_jobs(&background_jobs, !deferred)
855 .await?;
856
857 if unreschedulable.contains(&job_id) {
858 bail!(
859 "Cannot alter the job {} because it is a non-reschedulable background backfill job",
860 job_id,
861 );
862 }
863 }
864
865 let commands = self
866 .scale_controller
867 .reschedule_backfill_parallelism_inplace(HashMap::from([(job_id, parallelism)]))
868 .await?;
869
870 if !deferred {
871 let _source_pause_guard = self.source_manager.pause_tick().await;
872
873 for (database_id, command) in commands {
874 self.barrier_scheduler
875 .run_command(database_id, command)
876 .await?;
877 }
878 }
879
880 Ok(())
881 }
882
883 pub(crate) async fn reschedule_cdc_table_backfill(
885 &self,
886 job_id: JobId,
887 target: ReschedulePolicy,
888 ) -> MetaResult<()> {
889 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
890
891 let parallelism_policy = match target {
892 ReschedulePolicy::Parallelism(policy)
893 if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
894 {
895 policy
896 }
897 _ => bail_invalid_parameter!(
898 "CDC backfill reschedule only supports fixed parallelism targets"
899 ),
900 };
901
902 let cdc_fragment_id = {
903 let inner = self.metadata_manager.catalog_controller.inner.read().await;
904 let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
905 .select_only()
906 .columns([
907 fragment::Column::FragmentId,
908 fragment::Column::FragmentTypeMask,
909 ])
910 .filter(fragment::Column::JobId.eq(job_id))
911 .into_tuple()
912 .all(&inner.db)
913 .await?;
914
915 let cdc_fragments = fragments
916 .into_iter()
917 .filter_map(|(fragment_id, mask)| {
918 FragmentTypeMask::from(mask)
919 .contains(FragmentTypeFlag::StreamCdcScan)
920 .then_some(fragment_id)
921 })
922 .collect_vec();
923
924 match cdc_fragments.len() {
925 0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
926 1 => cdc_fragments[0],
927 _ => bail_invalid_parameter!(
928 "multiple StreamCdcScan fragments found for job {}; expected exactly one",
929 job_id
930 ),
931 }
932 };
933
934 let fragment_policy = HashMap::from([(
935 cdc_fragment_id,
936 Some(parallelism_policy.parallelism.clone()),
937 )]);
938
939 let commands = self
940 .scale_controller
941 .reschedule_fragment_inplace(fragment_policy)
942 .await?;
943
944 let _source_pause_guard = self.source_manager.pause_tick().await;
945
946 for (database_id, command) in commands {
947 self.barrier_scheduler
948 .run_command(database_id, command)
949 .await?;
950 }
951
952 Ok(())
953 }
954
955 pub(crate) async fn reschedule_fragments(
956 &self,
957 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
958 ) -> MetaResult<()> {
959 if fragment_targets.is_empty() {
960 return Ok(());
961 }
962
963 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
964
965 let fragment_policy = fragment_targets
966 .into_iter()
967 .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
968 .collect();
969
970 let commands = self
971 .scale_controller
972 .reschedule_fragment_inplace(fragment_policy)
973 .await?;
974
975 let _source_pause_guard = self.source_manager.pause_tick().await;
976
977 for (database_id, command) in commands {
978 self.barrier_scheduler
979 .run_command(database_id, command)
980 .await?;
981 }
982
983 Ok(())
984 }
985
986 pub async fn create_subscription(
988 self: &Arc<Self>,
989 subscription: &Subscription,
990 ) -> MetaResult<()> {
991 let command = Command::CreateSubscription {
992 subscription_id: subscription.id,
993 upstream_mv_table_id: subscription.dependent_table_id,
994 retention_second: subscription.retention_seconds,
995 };
996
997 tracing::debug!("sending Command::CreateSubscription");
998 self.barrier_scheduler
999 .run_command(subscription.database_id, command)
1000 .await?;
1001 Ok(())
1002 }
1003
1004 pub async fn drop_subscription(
1006 self: &Arc<Self>,
1007 database_id: DatabaseId,
1008 subscription_id: SubscriptionId,
1009 table_id: TableId,
1010 ) {
1011 let command = Command::DropSubscription {
1012 subscription_id,
1013 upstream_mv_table_id: table_id,
1014 };
1015
1016 tracing::debug!("sending Command::DropSubscriptions");
1017 let _ = self
1018 .barrier_scheduler
1019 .run_command(database_id, command)
1020 .await
1021 .inspect_err(|err| {
1022 tracing::error!(error = ?err.as_report(), "failed to run drop command");
1023 });
1024 }
1025
1026 pub async fn alter_subscription_retention(
1027 self: &Arc<Self>,
1028 database_id: DatabaseId,
1029 subscription_id: SubscriptionId,
1030 table_id: TableId,
1031 retention_second: u64,
1032 ) -> MetaResult<()> {
1033 let command = Command::AlterSubscriptionRetention {
1034 subscription_id,
1035 upstream_mv_table_id: table_id,
1036 retention_second,
1037 };
1038
1039 tracing::debug!("sending Command::AlterSubscriptionRetention");
1040 self.barrier_scheduler
1041 .run_command(database_id, command)
1042 .await?;
1043 Ok(())
1044 }
1045}