1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::span;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, Field, FragmentTypeFlag, FragmentTypeMask, TableId};
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::{JobId, SinkId};
25use risingwave_connector::source::CdcTableSnapshotSplitRaw;
26use risingwave_meta_model::prelude::Fragment as FragmentModel;
27use risingwave_meta_model::{StreamingParallelism, WorkerId, fragment, streaming_job};
28use risingwave_pb::catalog::{CreateType, PbSink, PbTable, Subscription};
29use risingwave_pb::expr::PbExprNode;
30use risingwave_pb::plan_common::{PbColumnCatalog, PbField};
31use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
32use thiserror_ext::AsReport;
33use tokio::sync::{Mutex, OwnedSemaphorePermit, oneshot};
34use tracing::Instrument;
35
36use super::{
37 GlobalRefreshManagerRef, ReschedulePolicy, ScaleControllerRef, StreamFragmentGraph,
38 UserDefinedFragmentBackfillOrder,
39};
40use crate::barrier::{
41 BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
42 ReplaceStreamJobPlan, SnapshotBackfillInfo,
43};
44use crate::controller::catalog::DropTableConnectorContext;
45use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
46use crate::error::bail_invalid_parameter;
47use crate::hummock::HummockManagerRef;
48use crate::manager::{
49 MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
50};
51use crate::model::{
52 ActorId, DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, FragmentId,
53 FragmentReplaceUpstream, StreamActor, StreamContext, StreamJobFragments,
54 StreamJobFragmentsToCreate, SubscriptionId,
55};
56use crate::stream::{ReplaceJobSplitPlan, SourceManagerRef};
57use crate::{MetaError, MetaResult};
58
59pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
60
61pub(crate) async fn cleanup_dropped_streaming_jobs(
62 refresh_manager: &GlobalRefreshManagerRef,
63 hummock_manager: &HummockManagerRef,
64 metadata_manager: &MetadataManager,
65 streaming_job_ids: impl IntoIterator<Item = JobId>,
66 state_table_ids: Vec<TableId>,
67 progress_status: &str,
68) -> MetaResult<()> {
69 for job_id in streaming_job_ids {
70 refresh_manager.remove_progress_tracker(job_id.as_mv_table_id(), progress_status);
71 }
72
73 if state_table_ids.is_empty() {
74 return Ok(());
75 }
76
77 hummock_manager
78 .unregister_table_ids(state_table_ids.clone())
79 .await?;
80 metadata_manager
81 .catalog_controller
82 .complete_dropped_tables(state_table_ids)
83 .await;
84 Ok(())
85}
86
87#[derive(Default)]
88pub struct CreateStreamingJobOption {
89 }
91
92#[derive(Debug, Clone)]
93pub struct UpstreamSinkInfo {
94 pub sink_id: SinkId,
95 pub sink_fragment_id: FragmentId,
96 pub sink_output_fields: Vec<PbField>,
97 pub sink_original_target_columns: Vec<PbColumnCatalog>,
99 pub project_exprs: Vec<PbExprNode>,
100 pub new_sink_downstream: DownstreamFragmentRelation,
101}
102
103pub struct CreateStreamingJobContext {
107 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
109
110 pub database_resource_group: String,
112
113 pub definition: String,
115
116 pub create_type: CreateType,
117
118 pub job_type: StreamingJobType,
119
120 pub new_upstream_sink: Option<UpstreamSinkInfo>,
122
123 pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
124 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
125
126 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
127
128 pub option: CreateStreamingJobOption,
129
130 pub streaming_job: StreamingJob,
131
132 pub fragment_backfill_ordering: UserDefinedFragmentBackfillOrder,
133
134 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
135
136 pub is_serverless_backfill: bool,
137
138 pub streaming_job_model: streaming_job::Model,
140}
141
142struct StreamingJobExecution {
143 id: JobId,
144 shutdown_tx: Option<oneshot::Sender<oneshot::Sender<bool>>>,
145 _permit: OwnedSemaphorePermit,
146}
147
148impl StreamingJobExecution {
149 fn new(
150 id: JobId,
151 shutdown_tx: oneshot::Sender<oneshot::Sender<bool>>,
152 permit: OwnedSemaphorePermit,
153 ) -> Self {
154 Self {
155 id,
156 shutdown_tx: Some(shutdown_tx),
157 _permit: permit,
158 }
159 }
160}
161
162#[derive(Default)]
163struct CreatingStreamingJobInfo {
164 streaming_jobs: Mutex<HashMap<JobId, StreamingJobExecution>>,
165}
166
167impl CreatingStreamingJobInfo {
168 async fn add_job(&self, job: StreamingJobExecution) {
169 let mut jobs = self.streaming_jobs.lock().await;
170 jobs.insert(job.id, job);
171 }
172
173 async fn delete_job(&self, job_id: JobId) {
174 let mut jobs = self.streaming_jobs.lock().await;
175 jobs.remove(&job_id);
176 }
177
178 async fn cancel_jobs(
179 &self,
180 job_ids: Vec<JobId>,
181 ) -> MetaResult<(HashMap<JobId, oneshot::Receiver<bool>>, Vec<JobId>)> {
182 let mut jobs = self.streaming_jobs.lock().await;
183 let mut receivers = HashMap::new();
184 let mut background_job_ids = vec![];
185 for job_id in job_ids {
186 if let Some(job) = jobs.get_mut(&job_id) {
187 if let Some(shutdown_tx) = job.shutdown_tx.take() {
188 let (tx, rx) = oneshot::channel();
189 match shutdown_tx.send(tx) {
190 Ok(()) => {
191 receivers.insert(job_id, rx);
192 }
193 Err(_) => {
194 return Err(anyhow::anyhow!(
195 "failed to send shutdown signal for streaming job {}: receiver dropped",
196 job_id
197 )
198 .into());
199 }
200 }
201 }
202 } else {
203 background_job_ids.push(job_id);
205 }
206 }
207
208 Ok((receivers, background_job_ids))
209 }
210}
211
212type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
213
214#[derive(Debug, Clone)]
215pub struct AutoRefreshSchemaSinkContext {
216 pub tmp_sink_id: SinkId,
217 pub original_sink: PbSink,
218 pub original_fragment: Fragment,
219 pub new_schema: Vec<PbColumnCatalog>,
220 pub newly_add_fields: Vec<Field>,
221 pub removed_column_names: Vec<String>,
222 pub new_fragment: Fragment,
223 pub new_log_store_table: Option<Box<PbTable>>,
224 pub ctx: StreamContext,
226}
227
228impl AutoRefreshSchemaSinkContext {
229 pub fn new_fragment_info(
230 &self,
231 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
232 actor_location: &HashMap<ActorId, WorkerId>,
233 ) -> InflightFragmentInfo {
234 InflightFragmentInfo {
235 fragment_id: self.new_fragment.fragment_id,
236 distribution_type: self.new_fragment.distribution_type.into(),
237 fragment_type_mask: self.new_fragment.fragment_type_mask,
238 vnode_count: self.new_fragment.vnode_count(),
239 nodes: self.new_fragment.nodes.clone(),
240 actors: stream_actors
241 .get(&self.new_fragment.fragment_id)
242 .into_iter()
243 .flatten()
244 .map(|actor| {
245 (
246 actor.actor_id,
247 InflightActorInfo {
248 worker_id: actor_location[&actor.actor_id],
249 vnode_bitmap: actor.vnode_bitmap.clone(),
250 splits: vec![],
251 },
252 )
253 })
254 .collect(),
255 state_table_ids: self.new_fragment.state_table_ids.iter().copied().collect(),
256 }
257 }
258}
259
260pub struct ReplaceStreamJobContext {
264 pub old_fragments: StreamJobFragments,
266
267 pub replace_upstream: FragmentReplaceUpstream,
269
270 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
272
273 pub streaming_job: StreamingJob,
274
275 pub database_resource_group: String,
277
278 pub tmp_id: JobId,
279
280 pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
282
283 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
284
285 pub streaming_job_model: streaming_job::Model,
287}
288
289pub struct GlobalStreamManager {
291 pub env: MetaSrvEnv,
292
293 pub metadata_manager: MetadataManager,
294
295 pub barrier_scheduler: BarrierScheduler,
297
298 pub hummock_manager: HummockManagerRef,
299
300 pub source_manager: SourceManagerRef,
302
303 pub refresh_manager: GlobalRefreshManagerRef,
304
305 creating_job_info: CreatingStreamingJobInfoRef,
307
308 pub scale_controller: ScaleControllerRef,
309}
310
311impl GlobalStreamManager {
312 pub fn new(
313 env: MetaSrvEnv,
314 metadata_manager: MetadataManager,
315 barrier_scheduler: BarrierScheduler,
316 hummock_manager: HummockManagerRef,
317 source_manager: SourceManagerRef,
318 refresh_manager: GlobalRefreshManagerRef,
319 scale_controller: ScaleControllerRef,
320 ) -> MetaResult<Self> {
321 Ok(Self {
322 env,
323 metadata_manager,
324 barrier_scheduler,
325 hummock_manager,
326 source_manager,
327 refresh_manager,
328 creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
329 scale_controller,
330 })
331 }
332
333 #[await_tree::instrument]
344 pub async fn create_streaming_job(
345 self: &Arc<Self>,
346 stream_job_fragments: StreamJobFragmentsToCreate,
347 ctx: CreateStreamingJobContext,
348 permit: OwnedSemaphorePermit,
349 ) -> MetaResult<NotificationVersion> {
350 let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
351 let await_tree_span = span!(
352 "{:?}({})",
353 ctx.streaming_job.job_type(),
354 ctx.streaming_job.name()
355 );
356
357 let job_id = stream_job_fragments.stream_job_id();
358 let database_id = ctx.streaming_job.database_id();
359
360 let (cancel_tx, cancel_rx) = oneshot::channel();
361 let execution = StreamingJobExecution::new(job_id, cancel_tx, permit);
362 self.creating_job_info.add_job(execution).await;
363
364 let stream_manager = self.clone();
365 let fut = async move {
366 let create_type = ctx.create_type;
367 let streaming_job = stream_manager
368 .run_create_streaming_job_command(stream_job_fragments, ctx)
369 .await?;
370 let version = match create_type {
371 CreateType::Background => {
372 stream_manager
373 .env
374 .notification_manager_ref()
375 .current_version()
376 .await
377 }
378 CreateType::Foreground => {
379 stream_manager
380 .metadata_manager
381 .wait_streaming_job_finished(database_id, streaming_job.id() as _)
382 .await?
383 }
384 CreateType::Unspecified => unreachable!(),
385 };
386
387 tracing::debug!(?streaming_job, "stream job finish");
388 Ok(version)
389 }
390 .in_current_span();
391
392 let create_fut = (self.env.await_tree_reg())
393 .register(await_tree_key, await_tree_span)
394 .instrument(Box::pin(fut));
395
396 let result = async {
397 tokio::select! {
398 biased;
399
400 res = create_fut => res,
401 notifier = cancel_rx => {
402 let notifier = notifier.expect("sender should not be dropped");
403 tracing::debug!(id=%job_id, "cancelling streaming job");
404
405 enum CancelResult {
406 Completed(MetaResult<NotificationVersion>),
407 Failed(MetaError),
408 Cancelled,
409 }
410
411 let cancel_res = if let Ok(job_fragments) =
412 self.metadata_manager.get_job_fragments_by_id(job_id).await
413 {
414 if self
416 .barrier_scheduler
417 .try_cancel_scheduled_create(database_id, job_id)
418 {
419 tracing::debug!(
420 id=%job_id,
421 "cancelling streaming job in buffer queue."
422 );
423 CancelResult::Cancelled
424 } else if !job_fragments.is_created() {
425 tracing::debug!(
426 id=%job_id,
427 "cancelling streaming job by issue cancel command."
428 );
429
430 let cancel_result: MetaResult<()> = async {
431 let cancel_command = self.metadata_manager.catalog_controller
432 .build_cancel_command(&job_fragments)
433 .await?;
434 let cleanup_state_table_ids =
435 job_fragments.all_table_ids().collect_vec();
436 self.metadata_manager.catalog_controller
437 .try_abort_creating_streaming_job(job_id, true)
438 .await?;
439
440 self.barrier_scheduler
441 .run_command(database_id, cancel_command)
442 .await?;
443 cleanup_dropped_streaming_jobs(
444 &self.refresh_manager,
445 &self.hummock_manager,
446 &self.metadata_manager,
447 [job_id],
448 cleanup_state_table_ids,
449 "cancel_streaming_job",
450 )
451 .await?;
452 Ok(())
453 }
454 .await;
455
456 match cancel_result {
457 Ok(()) => CancelResult::Cancelled,
458 Err(err) => {
459 tracing::warn!(
460 error = ?err.as_report(),
461 id = %job_id,
462 "failed to run cancel command for creating streaming job"
463 );
464 CancelResult::Failed(err)
465 }
466 }
467 } else {
468 CancelResult::Completed(
470 self.metadata_manager
471 .wait_streaming_job_finished(database_id, job_id)
472 .await,
473 )
474 }
475 } else {
476 CancelResult::Cancelled
477 };
478
479 let (cancelled, result) = match cancel_res {
480 CancelResult::Completed(result) => (false, result),
481 CancelResult::Failed(err) => (false, Err(err)),
482 CancelResult::Cancelled => (true, Err(MetaError::cancelled("create"))),
483 };
484
485 let _ = notifier
486 .send(cancelled)
487 .inspect_err(|err| tracing::warn!("failed to notify cancellation result: {err}"));
488
489 result
490 }
491 }
492 }
493 .await;
494
495 tracing::debug!("cleaning creating job info: {}", job_id);
496 self.creating_job_info.delete_job(job_id).await;
497 result
498 }
499
500 #[await_tree::instrument]
503 async fn run_create_streaming_job_command(
504 &self,
505 stream_job_fragments: StreamJobFragmentsToCreate,
506 CreateStreamingJobContext {
507 streaming_job,
508 upstream_fragment_downstreams,
509 database_resource_group,
510 definition,
511 create_type,
512 job_type,
513 new_upstream_sink,
514 snapshot_backfill_info,
515 cross_db_snapshot_backfill_info,
516 fragment_backfill_ordering,
517 locality_fragment_state_table_mapping,
518 cdc_table_snapshot_splits,
519 is_serverless_backfill,
520 streaming_job_model,
521 ..
522 }: CreateStreamingJobContext,
523 ) -> MetaResult<StreamingJob> {
524 tracing::debug!(
525 table_id = %stream_job_fragments.stream_job_id(),
526 "built actors finished"
527 );
528
529 let init_split_assignment = self
534 .source_manager
535 .discover_splits(&stream_job_fragments)
536 .await?;
537
538 let fragment_backfill_ordering =
539 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
540 fragment_backfill_ordering,
541 &stream_job_fragments.downstreams,
542 || {
543 stream_job_fragments
544 .fragments
545 .iter()
546 .map(|(fragment_id, fragment)| {
547 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
548 })
549 },
550 );
551
552 let info = CreateStreamingJobCommandInfo {
553 stream_job_fragments,
554 upstream_fragment_downstreams,
555 init_split_assignment,
556 definition: definition.clone(),
557 streaming_job: streaming_job.clone(),
558 job_type,
559 create_type,
560 database_resource_group,
561 fragment_backfill_ordering,
562 cdc_table_snapshot_splits,
563 locality_fragment_state_table_mapping,
564 is_serverless: is_serverless_backfill,
565 streaming_job_model,
566 };
567
568 let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
569 tracing::debug!(
570 ?snapshot_backfill_info,
571 "sending Command::CreateSnapshotBackfillStreamingJob"
572 );
573 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
574 } else {
575 tracing::debug!("sending Command::CreateStreamingJob");
576 if let Some(new_upstream_sink) = new_upstream_sink {
577 CreateStreamingJobType::SinkIntoTable(new_upstream_sink)
578 } else {
579 CreateStreamingJobType::Normal
580 }
581 };
582
583 let command = Command::CreateStreamingJob {
584 info,
585 job_type,
586 cross_db_snapshot_backfill_info,
587 };
588
589 self.barrier_scheduler
590 .run_command(streaming_job.database_id(), command)
591 .await?;
592
593 tracing::debug!(?streaming_job, "first barrier collected for stream job");
594
595 Ok(streaming_job)
596 }
597
598 pub async fn replace_stream_job(
600 &self,
601 new_fragments: StreamJobFragmentsToCreate,
602 ReplaceStreamJobContext {
603 old_fragments,
604 replace_upstream,
605 upstream_fragment_downstreams,
606 tmp_id,
607 streaming_job,
608 drop_table_connector_ctx,
609 auto_refresh_schema_sinks,
610 streaming_job_model,
611 database_resource_group,
612 }: ReplaceStreamJobContext,
613 ) -> MetaResult<()> {
614 let split_plan = if streaming_job.is_source() {
619 match self
620 .source_manager
621 .discover_splits_for_replace_source(&new_fragments, &replace_upstream)
622 .await?
623 {
624 Some(discovered) => ReplaceJobSplitPlan::Discovered(discovered),
625 None => ReplaceJobSplitPlan::AlignFromPrevious,
626 }
627 } else {
628 let discovered = self.source_manager.discover_splits(&new_fragments).await?;
629 ReplaceJobSplitPlan::Discovered(discovered)
630 };
631 tracing::info!("replace_stream_job - split plan: {:?}", split_plan);
632
633 self.barrier_scheduler
634 .run_command(
635 streaming_job.database_id(),
636 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
637 old_fragments,
638 new_fragments,
639 database_resource_group,
640 replace_upstream,
641 upstream_fragment_downstreams,
642 split_plan,
643 streaming_job,
644 streaming_job_model,
645 tmp_id,
646 to_drop_state_table_ids: {
647 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
648 vec![drop_table_connector_ctx.to_remove_state_table_id]
649 } else {
650 Vec::new()
651 }
652 },
653 auto_refresh_schema_sinks,
654 }),
655 )
656 .await?;
657
658 Ok(())
659 }
660
661 pub async fn drop_streaming_jobs(
665 &self,
666 database_id: DatabaseId,
667 streaming_job_ids: Vec<JobId>,
668 state_table_ids: Vec<TableId>,
669 fragment_ids: HashSet<FragmentId>,
670 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
671 ) {
672 if !streaming_job_ids.is_empty() || !state_table_ids.is_empty() {
673 let cleanup_streaming_job_ids = streaming_job_ids.clone();
674 let cleanup_state_table_ids = state_table_ids.clone();
675 let run_result = self
676 .barrier_scheduler
677 .run_command(
678 database_id,
679 Command::DropStreamingJobs {
680 streaming_job_ids: streaming_job_ids.into_iter().collect(),
681 unregistered_state_table_ids: state_table_ids.iter().copied().collect(),
682 unregistered_fragment_ids: fragment_ids,
683 dropped_sink_fragment_by_targets,
684 },
685 )
686 .await;
687 let result = match run_result {
688 Ok(()) => {
689 cleanup_dropped_streaming_jobs(
690 &self.refresh_manager,
691 &self.hummock_manager,
692 &self.metadata_manager,
693 cleanup_streaming_job_ids,
694 cleanup_state_table_ids,
695 "drop_streaming_jobs",
696 )
697 .await
698 }
699 Err(err) => Err(err),
700 };
701 let _ = result.inspect_err(|err| {
702 tracing::error!(error = ?err.as_report(), "failed to run drop command");
703 });
704 }
705 }
706
707 pub async fn cancel_streaming_jobs(&self, job_ids: Vec<JobId>) -> MetaResult<Vec<JobId>> {
713 if job_ids.is_empty() {
714 return Ok(vec![]);
715 }
716
717 let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
718 let (receivers, background_job_ids) = self.creating_job_info.cancel_jobs(job_ids).await?;
719
720 let futures = receivers.into_iter().map(|(id, receiver)| async move {
721 if let Ok(cancelled) = receiver.await
722 && cancelled
723 {
724 tracing::info!("canceled streaming job {id}");
725 Ok(id)
726 } else {
727 Err(MetaError::from(anyhow::anyhow!(
728 "failed to cancel streaming job {id}"
729 )))
730 }
731 });
732 let mut cancelled_ids = join_all(futures)
733 .await
734 .into_iter()
735 .collect::<MetaResult<Vec<_>>>()?;
736
737 let futures = background_job_ids.into_iter().map(|id| async move {
740 let fragment = self.metadata_manager.get_job_fragments_by_id(id).await?;
741 if fragment.is_created() {
742 tracing::warn!(
743 "streaming job {} is already created, ignore cancel request",
744 id
745 );
746 return Ok(None);
747 }
748 if fragment.is_created() {
749 Err(MetaError::invalid_parameter(format!(
750 "streaming job {} is already created",
751 id
752 )))?;
753 }
754
755 let cancel_command = self
756 .metadata_manager
757 .catalog_controller
758 .build_cancel_command(&fragment)
759 .await?;
760 let cleanup_state_table_ids = fragment.all_table_ids().collect_vec();
761
762 let (_, database_id) = self
763 .metadata_manager
764 .catalog_controller
765 .try_abort_creating_streaming_job(id, true)
766 .await?;
767
768 if let Some(database_id) = database_id {
769 self.barrier_scheduler
770 .run_command(database_id, cancel_command)
771 .await?;
772 cleanup_dropped_streaming_jobs(
773 &self.refresh_manager,
774 &self.hummock_manager,
775 &self.metadata_manager,
776 [id],
777 cleanup_state_table_ids,
778 "cancel_streaming_job",
779 )
780 .await?;
781 }
782
783 tracing::info!(?id, "cancelled background streaming job");
784 Ok(Some(id))
785 });
786 let cancelled_recovered_ids = join_all(futures)
787 .await
788 .into_iter()
789 .collect::<MetaResult<Vec<_>>>()?;
790
791 cancelled_ids.extend(cancelled_recovered_ids.into_iter().flatten());
792 Ok(cancelled_ids)
793 }
794
795 pub(crate) async fn reschedule_streaming_job(
796 &self,
797 job_id: JobId,
798 policy: ReschedulePolicy,
799 deferred: bool,
800 ) -> MetaResult<()> {
801 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
802
803 let background_jobs = self
804 .metadata_manager
805 .list_background_creating_jobs()
806 .await?;
807
808 if !background_jobs.is_empty() {
809 let blocked_jobs = self
810 .metadata_manager
811 .collect_reschedule_blocked_jobs_for_creating_jobs(&background_jobs, !deferred)
812 .await?;
813
814 if blocked_jobs.contains(&job_id) {
815 bail!(
816 "Cannot alter the job {} because it is blocked by creating unreschedulable backfill jobs",
817 job_id,
818 );
819 }
820 }
821
822 let commands = self
823 .scale_controller
824 .reschedule_inplace(HashMap::from([(job_id, policy)]))
825 .await?;
826
827 if !deferred {
828 let _source_pause_guard = self.source_manager.pause_tick().await;
829
830 for (database_id, command) in commands {
831 self.barrier_scheduler
832 .run_command(database_id, command)
833 .await?;
834 }
835 }
836
837 Ok(())
838 }
839
840 pub(crate) async fn reschedule_streaming_job_backfill_parallelism(
841 &self,
842 job_id: JobId,
843 parallelism: Option<StreamingParallelism>,
844 deferred: bool,
845 ) -> MetaResult<()> {
846 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
847
848 let background_jobs = self
849 .metadata_manager
850 .list_background_creating_jobs()
851 .await?;
852
853 if !background_jobs.is_empty() {
854 let unreschedulable = self
855 .metadata_manager
856 .collect_unreschedulable_backfill_jobs(&background_jobs, !deferred)
857 .await?;
858
859 if unreschedulable.contains(&job_id) {
860 bail!(
861 "Cannot alter the job {} because it is a non-reschedulable background backfill job",
862 job_id,
863 );
864 }
865 }
866
867 let commands = self
868 .scale_controller
869 .reschedule_backfill_parallelism_inplace(HashMap::from([(job_id, parallelism)]))
870 .await?;
871
872 if !deferred {
873 let _source_pause_guard = self.source_manager.pause_tick().await;
874
875 for (database_id, command) in commands {
876 self.barrier_scheduler
877 .run_command(database_id, command)
878 .await?;
879 }
880 }
881
882 Ok(())
883 }
884
885 pub(crate) async fn reschedule_cdc_table_backfill(
887 &self,
888 job_id: JobId,
889 target: ReschedulePolicy,
890 ) -> MetaResult<()> {
891 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
892
893 let parallelism_policy = match target {
894 ReschedulePolicy::Parallelism(policy)
895 if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) =>
896 {
897 policy
898 }
899 _ => bail_invalid_parameter!(
900 "CDC backfill reschedule only supports fixed parallelism targets"
901 ),
902 };
903
904 let cdc_fragment_id = {
905 let inner = self.metadata_manager.catalog_controller.inner.read().await;
906 let fragments: Vec<(risingwave_meta_model::FragmentId, i32)> = FragmentModel::find()
907 .select_only()
908 .columns([
909 fragment::Column::FragmentId,
910 fragment::Column::FragmentTypeMask,
911 ])
912 .filter(fragment::Column::JobId.eq(job_id))
913 .into_tuple()
914 .all(&inner.db)
915 .await?;
916
917 let cdc_fragments = fragments
918 .into_iter()
919 .filter_map(|(fragment_id, mask)| {
920 FragmentTypeMask::from(mask)
921 .contains(FragmentTypeFlag::StreamCdcScan)
922 .then_some(fragment_id)
923 })
924 .collect_vec();
925
926 match cdc_fragments.len() {
927 0 => bail_invalid_parameter!("no StreamCdcScan fragments found for job {}", job_id),
928 1 => cdc_fragments[0],
929 _ => bail_invalid_parameter!(
930 "multiple StreamCdcScan fragments found for job {}; expected exactly one",
931 job_id
932 ),
933 }
934 };
935
936 let fragment_policy = HashMap::from([(
937 cdc_fragment_id,
938 Some(parallelism_policy.parallelism.clone()),
939 )]);
940
941 let commands = self
942 .scale_controller
943 .reschedule_fragment_inplace(fragment_policy)
944 .await?;
945
946 let _source_pause_guard = self.source_manager.pause_tick().await;
947
948 for (database_id, command) in commands {
949 self.barrier_scheduler
950 .run_command(database_id, command)
951 .await?;
952 }
953
954 Ok(())
955 }
956
957 pub(crate) async fn reschedule_fragments(
958 &self,
959 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
960 ) -> MetaResult<()> {
961 if fragment_targets.is_empty() {
962 return Ok(());
963 }
964
965 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
966
967 let fragment_policy = fragment_targets
968 .into_iter()
969 .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism))
970 .collect();
971
972 let commands = self
973 .scale_controller
974 .reschedule_fragment_inplace(fragment_policy)
975 .await?;
976
977 let _source_pause_guard = self.source_manager.pause_tick().await;
978
979 for (database_id, command) in commands {
980 self.barrier_scheduler
981 .run_command(database_id, command)
982 .await?;
983 }
984
985 Ok(())
986 }
987
988 pub async fn create_subscription(
990 self: &Arc<Self>,
991 subscription: &Subscription,
992 ) -> MetaResult<()> {
993 let command = Command::CreateSubscription {
994 subscription_id: subscription.id,
995 upstream_mv_table_id: subscription.dependent_table_id,
996 retention_second: subscription.retention_seconds,
997 };
998
999 tracing::debug!("sending Command::CreateSubscription");
1000 self.barrier_scheduler
1001 .run_command(subscription.database_id, command)
1002 .await?;
1003 Ok(())
1004 }
1005
1006 pub async fn drop_subscription(
1008 self: &Arc<Self>,
1009 database_id: DatabaseId,
1010 subscription_id: SubscriptionId,
1011 table_id: TableId,
1012 ) {
1013 let command = Command::DropSubscription {
1014 subscription_id,
1015 upstream_mv_table_id: table_id,
1016 };
1017
1018 tracing::debug!("sending Command::DropSubscriptions");
1019 let _ = self
1020 .barrier_scheduler
1021 .run_command(database_id, command)
1022 .await
1023 .inspect_err(|err| {
1024 tracing::error!(error = ?err.as_report(), "failed to run drop command");
1025 });
1026 }
1027
1028 pub async fn alter_subscription_retention(
1029 self: &Arc<Self>,
1030 database_id: DatabaseId,
1031 subscription_id: SubscriptionId,
1032 table_id: TableId,
1033 retention_second: u64,
1034 ) -> MetaResult<()> {
1035 let command = Command::AlterSubscriptionRetention {
1036 subscription_id,
1037 upstream_mv_table_id: table_id,
1038 retention_second,
1039 };
1040
1041 tracing::debug!("sending Command::AlterSubscriptionRetention");
1042 self.barrier_scheduler
1043 .run_command(database_id, command)
1044 .await?;
1045 Ok(())
1046 }
1047}