1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use futures::FutureExt;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, TableId};
23use risingwave_meta_model::ObjectId;
24use risingwave_pb::catalog::{CreateType, Subscription, Table};
25use risingwave_pb::meta::object::PbObjectInfo;
26use risingwave_pb::meta::subscribe_response::{Operation, PbInfo};
27use risingwave_pb::meta::{PbObject, PbObjectGroup};
28use thiserror_ext::AsReport;
29use tokio::sync::mpsc::Sender;
30use tokio::sync::{Mutex, oneshot};
31use tracing::Instrument;
32
33use super::{
34 FragmentBackfillOrder, JobParallelismTarget, JobReschedulePolicy, JobReschedulePostUpdates,
35 JobRescheduleTarget, JobResourceGroupTarget, Locations, RescheduleOptions, ScaleControllerRef,
36};
37use crate::barrier::{
38 BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
39 ReplaceStreamJobPlan, SnapshotBackfillInfo,
40};
41use crate::controller::catalog::DropTableConnectorContext;
42use crate::error::bail_invalid_parameter;
43use crate::manager::{
44 MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
45};
46use crate::model::{
47 ActorId, FragmentDownstreamRelation, FragmentId, FragmentNewNoShuffle, FragmentReplaceUpstream,
48 StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
49};
50use crate::stream::{SourceChange, SourceManagerRef};
51use crate::{MetaError, MetaResult};
52
53pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
54
55#[derive(Default)]
56pub struct CreateStreamingJobOption {
57 }
59
60pub struct CreateStreamingJobContext {
64 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
66 pub new_no_shuffle: FragmentNewNoShuffle,
67 pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
68
69 pub internal_tables: BTreeMap<u32, Table>,
71
72 pub building_locations: Locations,
74
75 pub existing_locations: Locations,
77
78 pub definition: String,
80
81 pub mv_table_id: Option<u32>,
82
83 pub create_type: CreateType,
84
85 pub job_type: StreamingJobType,
86
87 pub replace_table_job_info: Option<(
89 StreamingJob,
90 ReplaceStreamJobContext,
91 StreamJobFragmentsToCreate,
92 )>,
93
94 pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
95 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
96
97 pub option: CreateStreamingJobOption,
98
99 pub streaming_job: StreamingJob,
100
101 pub fragment_backfill_ordering: FragmentBackfillOrder,
102}
103
104impl CreateStreamingJobContext {
105 pub fn internal_tables(&self) -> Vec<Table> {
106 self.internal_tables.values().cloned().collect()
107 }
108}
109
110pub enum CreatingState {
111 Failed { reason: MetaError },
112 Canceling { finish_tx: oneshot::Sender<()> },
114 Created { version: NotificationVersion },
115}
116
117struct StreamingJobExecution {
118 id: TableId,
119 shutdown_tx: Option<Sender<CreatingState>>,
120}
121
122impl StreamingJobExecution {
123 fn new(id: TableId, shutdown_tx: Sender<CreatingState>) -> Self {
124 Self {
125 id,
126 shutdown_tx: Some(shutdown_tx),
127 }
128 }
129}
130
131#[derive(Default)]
132struct CreatingStreamingJobInfo {
133 streaming_jobs: Mutex<HashMap<TableId, StreamingJobExecution>>,
134}
135
136impl CreatingStreamingJobInfo {
137 async fn add_job(&self, job: StreamingJobExecution) {
138 let mut jobs = self.streaming_jobs.lock().await;
139 jobs.insert(job.id, job);
140 }
141
142 async fn delete_job(&self, job_id: TableId) {
143 let mut jobs = self.streaming_jobs.lock().await;
144 jobs.remove(&job_id);
145 }
146
147 async fn cancel_jobs(
148 &self,
149 job_ids: Vec<TableId>,
150 ) -> (HashMap<TableId, oneshot::Receiver<()>>, Vec<TableId>) {
151 let mut jobs = self.streaming_jobs.lock().await;
152 let mut receivers = HashMap::new();
153 let mut recovered_job_ids = vec![];
154 for job_id in job_ids {
155 if let Some(job) = jobs.get_mut(&job_id) {
156 if let Some(shutdown_tx) = job.shutdown_tx.take() {
157 let (tx, rx) = oneshot::channel();
158 if shutdown_tx
159 .send(CreatingState::Canceling { finish_tx: tx })
160 .await
161 .is_ok()
162 {
163 receivers.insert(job_id, rx);
164 } else {
165 tracing::warn!(id=?job_id, "failed to send canceling state");
166 }
167 }
168 } else {
169 recovered_job_ids.push(job_id);
174 }
175 }
176 (receivers, recovered_job_ids)
177 }
178}
179
180type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
181
182pub struct ReplaceStreamJobContext {
186 pub old_fragments: StreamJobFragments,
188
189 pub replace_upstream: FragmentReplaceUpstream,
191 pub new_no_shuffle: FragmentNewNoShuffle,
192
193 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
195
196 pub building_locations: Locations,
198
199 pub existing_locations: Locations,
201
202 pub streaming_job: StreamingJob,
203
204 pub tmp_id: u32,
205
206 pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
208}
209
210pub struct GlobalStreamManager {
212 pub env: MetaSrvEnv,
213
214 pub metadata_manager: MetadataManager,
215
216 pub barrier_scheduler: BarrierScheduler,
218
219 pub source_manager: SourceManagerRef,
221
222 creating_job_info: CreatingStreamingJobInfoRef,
224
225 pub scale_controller: ScaleControllerRef,
226}
227
228impl GlobalStreamManager {
229 pub fn new(
230 env: MetaSrvEnv,
231 metadata_manager: MetadataManager,
232 barrier_scheduler: BarrierScheduler,
233 source_manager: SourceManagerRef,
234 scale_controller: ScaleControllerRef,
235 ) -> MetaResult<Self> {
236 Ok(Self {
237 env,
238 metadata_manager,
239 barrier_scheduler,
240 source_manager,
241 creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
242 scale_controller,
243 })
244 }
245
246 pub async fn create_streaming_job(
257 self: &Arc<Self>,
258 stream_job_fragments: StreamJobFragmentsToCreate,
259 ctx: CreateStreamingJobContext,
260 run_command_notifier: Option<oneshot::Sender<MetaResult<()>>>,
261 ) -> MetaResult<NotificationVersion> {
262 let table_id = stream_job_fragments.stream_job_id();
263 let database_id = ctx.streaming_job.database_id().into();
264 let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
265 let execution = StreamingJobExecution::new(table_id, sender.clone());
266 self.creating_job_info.add_job(execution).await;
267
268 let stream_manager = self.clone();
269 let fut = async move {
270 let res: MetaResult<_> = try {
271 let (source_change, streaming_job) = stream_manager
272 .run_create_streaming_job_command(stream_job_fragments, ctx)
273 .inspect(move |result| {
274 if let Some(tx) = run_command_notifier {
275 let _ = tx.send(match result {
276 Ok(_) => {
277 Ok(())
278 }
279 Err(err) => {
280 Err(err.clone())
281 }
282 });
283 }
284 })
285 .await?;
286 let version = stream_manager
287 .metadata_manager
288 .wait_streaming_job_finished(
289 streaming_job.database_id().into(),
290 streaming_job.id() as _,
291 )
292 .await?;
293 stream_manager.source_manager.apply_source_change(source_change).await;
294 tracing::debug!(?streaming_job, "stream job finish");
295 version
296 };
297
298 match res {
299 Ok(version) => {
300 let _ = sender
301 .send(CreatingState::Created { version })
302 .await
303 .inspect_err(|_| tracing::warn!("failed to notify created: {table_id}"));
304 }
305 Err(err) => {
306 let _ = sender
307 .send(CreatingState::Failed {
308 reason: err.clone(),
309 })
310 .await
311 .inspect_err(|_| {
312 tracing::warn!(error = %err.as_report(), "failed to notify failed: {table_id}")
313 });
314 }
315 }
316 }
317 .in_current_span();
318 tokio::spawn(fut);
319
320 while let Some(state) = receiver.recv().await {
321 match state {
322 CreatingState::Failed { reason } => {
323 tracing::debug!(id=?table_id, "stream job failed");
324 self.creating_job_info.delete_job(table_id).await;
327 return Err(reason);
328 }
329 CreatingState::Canceling { finish_tx } => {
330 tracing::debug!(id=?table_id, "cancelling streaming job");
331 if let Ok(table_fragments) = self
332 .metadata_manager
333 .get_job_fragments_by_id(&table_id)
334 .await
335 {
336 if self
338 .barrier_scheduler
339 .try_cancel_scheduled_create(database_id, table_id)
340 {
341 tracing::debug!("cancelling streaming job {table_id} in buffer queue.");
342 } else if !table_fragments.is_created() {
343 tracing::debug!(
344 "cancelling streaming job {table_id} by issue cancel command."
345 );
346 self.metadata_manager
347 .catalog_controller
348 .try_abort_creating_streaming_job(table_id.table_id as _, true)
349 .await?;
350
351 self.barrier_scheduler
352 .run_command(database_id, Command::cancel(&table_fragments))
353 .await?;
354 } else {
355 continue;
357 }
358 let _ = finish_tx.send(()).inspect_err(|_| {
359 tracing::warn!("failed to notify cancelled: {table_id}")
360 });
361 self.creating_job_info.delete_job(table_id).await;
362 return Err(MetaError::cancelled("create"));
363 }
364 }
365 CreatingState::Created { version } => {
366 self.creating_job_info.delete_job(table_id).await;
367 return Ok(version);
368 }
369 }
370 }
371 self.creating_job_info.delete_job(table_id).await;
372 bail!("receiver failed to get notification version for finished stream job")
373 }
374
375 async fn run_create_streaming_job_command(
378 &self,
379 stream_job_fragments: StreamJobFragmentsToCreate,
380 CreateStreamingJobContext {
381 streaming_job,
382 upstream_fragment_downstreams,
383 new_no_shuffle,
384 upstream_actors,
385 definition,
386 create_type,
387 job_type,
388 replace_table_job_info,
389 internal_tables,
390 snapshot_backfill_info,
391 cross_db_snapshot_backfill_info,
392 fragment_backfill_ordering,
393 ..
394 }: CreateStreamingJobContext,
395 ) -> MetaResult<(SourceChange, StreamingJob)> {
396 let mut replace_table_command = None;
397
398 tracing::debug!(
399 table_id = %stream_job_fragments.stream_job_id(),
400 "built actors finished"
401 );
402
403 if let Some((streaming_job, context, stream_job_fragments)) = replace_table_job_info {
404 self.metadata_manager
405 .catalog_controller
406 .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
407 .await?;
408
409 let tmp_table_id = stream_job_fragments.stream_job_id();
410 let init_split_assignment = self
411 .source_manager
412 .allocate_splits(&stream_job_fragments)
413 .await?;
414
415 replace_table_command = Some(ReplaceStreamJobPlan {
416 old_fragments: context.old_fragments,
417 new_fragments: stream_job_fragments,
418 replace_upstream: context.replace_upstream,
419 upstream_fragment_downstreams: context.upstream_fragment_downstreams,
420 init_split_assignment,
421 streaming_job,
422 tmp_id: tmp_table_id.table_id,
423 to_drop_state_table_ids: Vec::new(), });
425 }
426
427 let mut init_split_assignment = self
432 .source_manager
433 .allocate_splits(&stream_job_fragments)
434 .await?;
435 init_split_assignment.extend(
436 self.source_manager
437 .allocate_splits_for_backfill(
438 &stream_job_fragments,
439 &new_no_shuffle,
440 &upstream_actors,
441 )
442 .await?,
443 );
444
445 let source_change = SourceChange::CreateJobFinished {
446 finished_backfill_fragments: stream_job_fragments.source_backfill_fragments()?,
447 };
448
449 let info = CreateStreamingJobCommandInfo {
450 stream_job_fragments,
451 upstream_fragment_downstreams,
452 init_split_assignment,
453 definition: definition.clone(),
454 streaming_job: streaming_job.clone(),
455 internal_tables: internal_tables.into_values().collect_vec(),
456 job_type,
457 create_type,
458 fragment_backfill_ordering,
459 };
460
461 let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
462 tracing::debug!(
463 ?snapshot_backfill_info,
464 "sending Command::CreateSnapshotBackfillStreamingJob"
465 );
466 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
467 } else {
468 tracing::debug!("sending Command::CreateStreamingJob");
469 if let Some(replace_table_command) = replace_table_command {
470 CreateStreamingJobType::SinkIntoTable(replace_table_command)
471 } else {
472 CreateStreamingJobType::Normal
473 }
474 };
475
476 let command = Command::CreateStreamingJob {
477 info,
478 job_type,
479 cross_db_snapshot_backfill_info,
480 };
481
482 self.barrier_scheduler
483 .run_command(streaming_job.database_id().into(), command)
484 .await?;
485
486 tracing::debug!(?streaming_job, "first barrier collected for stream job");
487
488 Ok((source_change, streaming_job))
489 }
490
491 pub async fn replace_stream_job(
493 &self,
494 new_fragments: StreamJobFragmentsToCreate,
495 ReplaceStreamJobContext {
496 old_fragments,
497 replace_upstream,
498 new_no_shuffle,
499 upstream_fragment_downstreams,
500 tmp_id,
501 streaming_job,
502 drop_table_connector_ctx,
503 ..
504 }: ReplaceStreamJobContext,
505 ) -> MetaResult<()> {
506 let init_split_assignment = if streaming_job.is_source() {
507 self.source_manager
508 .allocate_splits_for_replace_source(
509 &new_fragments,
510 &replace_upstream,
511 &new_no_shuffle,
512 )
513 .await?
514 } else {
515 self.source_manager.allocate_splits(&new_fragments).await?
516 };
517 tracing::info!(
518 "replace_stream_job - allocate split: {:?}",
519 init_split_assignment
520 );
521
522 self.barrier_scheduler
523 .run_command(
524 streaming_job.database_id().into(),
525 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
526 old_fragments,
527 new_fragments,
528 replace_upstream,
529 upstream_fragment_downstreams,
530 init_split_assignment,
531 streaming_job,
532 tmp_id,
533 to_drop_state_table_ids: {
534 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
535 vec![TableId::new(
536 drop_table_connector_ctx.to_remove_state_table_id as _,
537 )]
538 } else {
539 Vec::new()
540 }
541 },
542 }),
543 )
544 .await?;
545
546 Ok(())
547 }
548
549 pub async fn drop_streaming_jobs(
553 &self,
554 database_id: DatabaseId,
555 removed_actors: Vec<ActorId>,
556 streaming_job_ids: Vec<ObjectId>,
557 state_table_ids: Vec<risingwave_meta_model::TableId>,
558 fragment_ids: HashSet<FragmentId>,
559 ) {
560 if !removed_actors.is_empty()
561 || !streaming_job_ids.is_empty()
562 || !state_table_ids.is_empty()
563 {
564 let res = self
565 .barrier_scheduler
566 .run_command(
567 database_id,
568 Command::DropStreamingJobs {
569 table_fragments_ids: streaming_job_ids
570 .iter()
571 .map(|job_id| TableId::new(*job_id as _))
572 .collect(),
573 actors: removed_actors,
574 unregistered_state_table_ids: state_table_ids
575 .iter()
576 .map(|table_id| TableId::new(*table_id as _))
577 .collect(),
578 unregistered_fragment_ids: fragment_ids,
579 },
580 )
581 .await
582 .inspect_err(|err| {
583 tracing::error!(error = ?err.as_report(), "failed to run drop command");
584 });
585 if res.is_ok() {
586 self.post_dropping_streaming_jobs(state_table_ids).await;
587 }
588 }
589 }
590
591 async fn post_dropping_streaming_jobs(
592 &self,
593 state_table_ids: Vec<risingwave_meta_model::TableId>,
594 ) {
595 let tables = self
596 .metadata_manager
597 .catalog_controller
598 .complete_dropped_tables(state_table_ids.into_iter())
599 .await;
600 let objects = tables
601 .into_iter()
602 .map(|t| PbObject {
603 object_info: Some(PbObjectInfo::Table(t)),
604 })
605 .collect();
606 let group = PbInfo::ObjectGroup(PbObjectGroup { objects });
607 self.env
608 .notification_manager()
609 .notify_hummock(Operation::Delete, group.clone())
610 .await;
611 self.env
612 .notification_manager()
613 .notify_compactor(Operation::Delete, group)
614 .await;
615 }
616
617 pub async fn cancel_streaming_jobs(&self, table_ids: Vec<TableId>) -> Vec<TableId> {
624 if table_ids.is_empty() {
625 return vec![];
626 }
627
628 let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
629 let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;
630
631 let futures = receivers.into_iter().map(|(id, receiver)| async move {
632 if receiver.await.is_ok() {
633 tracing::info!("canceled streaming job {id}");
634 Some(id)
635 } else {
636 tracing::warn!("failed to cancel streaming job {id}");
637 None
638 }
639 });
640 let mut cancelled_ids = join_all(futures).await.into_iter().flatten().collect_vec();
641
642 let futures = recovered_job_ids.into_iter().map(|id| async move {
645 tracing::debug!(?id, "cancelling recovered streaming job");
646 let result: MetaResult<()> = try {
647 let fragment = self
648 .metadata_manager.get_job_fragments_by_id(&id)
649 .await?;
650 if fragment.is_created() {
651 Err(MetaError::invalid_parameter(format!(
652 "streaming job {} is already created",
653 id
654 )))?;
655 }
656
657 let (_, database_id) = self.metadata_manager
658 .catalog_controller
659 .try_abort_creating_streaming_job(id.table_id as _, true)
660 .await?;
661
662 if let Some(database_id) = database_id {
663 self.barrier_scheduler
664 .run_command(DatabaseId::new(database_id as _), Command::cancel(&fragment))
665 .await?;
666 }
667 };
668 match result {
669 Ok(_) => {
670 tracing::info!(?id, "cancelled recovered streaming job");
671 Some(id)
672 }
673 Err(err) => {
674 tracing::error!(error=?err.as_report(), "failed to cancel recovered streaming job {id}, does it correspond to any jobs in `SHOW JOBS`?");
675 None
676 }
677 }
678 });
679 let cancelled_recovered_ids = join_all(futures).await.into_iter().flatten().collect_vec();
680
681 cancelled_ids.extend(cancelled_recovered_ids);
682 cancelled_ids
683 }
684
685 pub(crate) async fn reschedule_streaming_job(
686 &self,
687 job_id: u32,
688 target: JobRescheduleTarget,
689 deferred: bool,
690 ) -> MetaResult<()> {
691 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
692 let background_jobs = self
693 .metadata_manager
694 .list_background_creating_jobs()
695 .await?;
696
697 if !background_jobs.is_empty() {
698 let related_jobs = self
699 .scale_controller
700 .resolve_related_no_shuffle_jobs(&background_jobs)
701 .await?;
702
703 for job in background_jobs {
704 if related_jobs.contains(&job) {
705 bail!(
706 "Cannot alter the job {} because the related job {} is currently being created",
707 job_id,
708 job.table_id
709 );
710 }
711 }
712 }
713
714 let JobRescheduleTarget {
715 parallelism: parallelism_change,
716 resource_group: resource_group_change,
717 } = target;
718
719 let database_id = DatabaseId::new(
720 self.metadata_manager
721 .catalog_controller
722 .get_object_database_id(job_id as ObjectId)
723 .await? as _,
724 );
725 let job_id = TableId::new(job_id);
726
727 let worker_nodes = self
728 .metadata_manager
729 .list_active_streaming_compute_nodes()
730 .await?
731 .into_iter()
732 .filter(|w| w.is_streaming_schedulable())
733 .collect_vec();
734
735 let available_parallelism = worker_nodes
737 .iter()
738 .map(|w| w.compute_node_parallelism())
739 .sum::<usize>();
740 let max_parallelism = self
741 .metadata_manager
742 .get_job_max_parallelism(job_id)
743 .await?;
744
745 if let JobParallelismTarget::Update(parallelism) = parallelism_change {
746 match parallelism {
747 TableParallelism::Adaptive => {
748 if available_parallelism > max_parallelism {
749 tracing::warn!(
750 "too many parallelism available, use max parallelism {} will be limited",
751 max_parallelism
752 );
753 }
754 }
755 TableParallelism::Fixed(parallelism) => {
756 if parallelism > max_parallelism {
757 bail_invalid_parameter!(
758 "specified parallelism {} should not exceed max parallelism {}",
759 parallelism,
760 max_parallelism
761 );
762 }
763 }
764 TableParallelism::Custom => {
765 bail_invalid_parameter!("should not alter parallelism to custom")
766 }
767 }
768 }
769
770 let table_parallelism_assignment = match ¶llelism_change {
771 JobParallelismTarget::Update(parallelism) => HashMap::from([(job_id, *parallelism)]),
772 JobParallelismTarget::Refresh => HashMap::new(),
773 };
774 let resource_group_assignment = match &resource_group_change {
775 JobResourceGroupTarget::Update(target) => {
776 HashMap::from([(job_id.table_id() as ObjectId, target.clone())])
777 }
778 JobResourceGroupTarget::Keep => HashMap::new(),
779 };
780
781 if deferred {
782 tracing::debug!(
783 "deferred mode enabled for job {}, set the parallelism directly to parallelism {:?}, resource group {:?}",
784 job_id,
785 parallelism_change,
786 resource_group_change,
787 );
788 self.scale_controller
789 .post_apply_reschedule(
790 &HashMap::new(),
791 &JobReschedulePostUpdates {
792 parallelism_updates: table_parallelism_assignment,
793 resource_group_updates: resource_group_assignment,
794 },
795 )
796 .await?;
797 } else {
798 let reschedule_plan = self
799 .scale_controller
800 .generate_job_reschedule_plan(JobReschedulePolicy {
801 targets: HashMap::from([(
802 job_id.table_id,
803 JobRescheduleTarget {
804 parallelism: parallelism_change,
805 resource_group: resource_group_change,
806 },
807 )]),
808 })
809 .await?;
810
811 if reschedule_plan.reschedules.is_empty() {
812 tracing::debug!(
813 "empty reschedule plan generated for job {}, set the parallelism directly to {:?}",
814 job_id,
815 reschedule_plan.post_updates
816 );
817 self.scale_controller
818 .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
819 .await?;
820 } else {
821 self.reschedule_actors(
822 database_id,
823 reschedule_plan,
824 RescheduleOptions {
825 resolve_no_shuffle_upstream: false,
826 skip_create_new_actors: false,
827 },
828 )
829 .await?;
830 }
831 };
832
833 Ok(())
834 }
835
836 pub async fn create_subscription(
838 self: &Arc<Self>,
839 subscription: &Subscription,
840 ) -> MetaResult<()> {
841 let command = Command::CreateSubscription {
842 subscription_id: subscription.id,
843 upstream_mv_table_id: TableId::new(subscription.dependent_table_id),
844 retention_second: subscription.retention_seconds,
845 };
846
847 tracing::debug!("sending Command::CreateSubscription");
848 self.barrier_scheduler
849 .run_command(subscription.database_id.into(), command)
850 .await?;
851 Ok(())
852 }
853
854 pub async fn drop_subscription(
856 self: &Arc<Self>,
857 database_id: DatabaseId,
858 subscription_id: u32,
859 table_id: u32,
860 ) {
861 let command = Command::DropSubscription {
862 subscription_id,
863 upstream_mv_table_id: TableId::new(table_id),
864 };
865
866 tracing::debug!("sending Command::DropSubscriptions");
867 let _ = self
868 .barrier_scheduler
869 .run_command(database_id, command)
870 .await
871 .inspect_err(|err| {
872 tracing::error!(error = ?err.as_report(), "failed to run drop command");
873 });
874 }
875}