1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use futures::FutureExt;
19use futures::future::join_all;
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, TableId};
23use risingwave_meta_model::ObjectId;
24use risingwave_pb::catalog::{CreateType, Subscription, Table};
25use risingwave_pb::meta::object::PbObjectInfo;
26use risingwave_pb::meta::subscribe_response::{Operation, PbInfo};
27use risingwave_pb::meta::{PbObject, PbObjectGroup};
28use thiserror_ext::AsReport;
29use tokio::sync::mpsc::Sender;
30use tokio::sync::{Mutex, oneshot};
31use tracing::Instrument;
32
33use super::{
34 JobParallelismTarget, JobReschedulePolicy, JobReschedulePostUpdates, JobRescheduleTarget,
35 JobResourceGroupTarget, Locations, RescheduleOptions, ScaleControllerRef,
36};
37use crate::barrier::{
38 BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
39 ReplaceStreamJobPlan, SnapshotBackfillInfo,
40};
41use crate::controller::catalog::DropTableConnectorContext;
42use crate::error::bail_invalid_parameter;
43use crate::manager::{
44 MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
45};
46use crate::model::{
47 ActorId, FragmentDownstreamRelation, FragmentId, FragmentNewNoShuffle, FragmentReplaceUpstream,
48 StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
49};
50use crate::stream::{SourceChange, SourceManagerRef};
51use crate::{MetaError, MetaResult};
52
53pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
54
55#[derive(Default)]
56pub struct CreateStreamingJobOption {
57 }
59
60pub struct CreateStreamingJobContext {
64 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
66 pub new_no_shuffle: FragmentNewNoShuffle,
67 pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
68
69 pub internal_tables: BTreeMap<u32, Table>,
71
72 pub building_locations: Locations,
74
75 pub existing_locations: Locations,
77
78 pub definition: String,
80
81 pub mv_table_id: Option<u32>,
82
83 pub create_type: CreateType,
84
85 pub job_type: StreamingJobType,
86
87 pub replace_table_job_info: Option<(
89 StreamingJob,
90 ReplaceStreamJobContext,
91 StreamJobFragmentsToCreate,
92 )>,
93
94 pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
95 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
96
97 pub option: CreateStreamingJobOption,
98
99 pub streaming_job: StreamingJob,
100}
101
102impl CreateStreamingJobContext {
103 pub fn internal_tables(&self) -> Vec<Table> {
104 self.internal_tables.values().cloned().collect()
105 }
106}
107
108pub enum CreatingState {
109 Failed { reason: MetaError },
110 Canceling { finish_tx: oneshot::Sender<()> },
112 Created { version: NotificationVersion },
113}
114
115struct StreamingJobExecution {
116 id: TableId,
117 shutdown_tx: Option<Sender<CreatingState>>,
118}
119
120impl StreamingJobExecution {
121 fn new(id: TableId, shutdown_tx: Sender<CreatingState>) -> Self {
122 Self {
123 id,
124 shutdown_tx: Some(shutdown_tx),
125 }
126 }
127}
128
129#[derive(Default)]
130struct CreatingStreamingJobInfo {
131 streaming_jobs: Mutex<HashMap<TableId, StreamingJobExecution>>,
132}
133
134impl CreatingStreamingJobInfo {
135 async fn add_job(&self, job: StreamingJobExecution) {
136 let mut jobs = self.streaming_jobs.lock().await;
137 jobs.insert(job.id, job);
138 }
139
140 async fn delete_job(&self, job_id: TableId) {
141 let mut jobs = self.streaming_jobs.lock().await;
142 jobs.remove(&job_id);
143 }
144
145 async fn cancel_jobs(
146 &self,
147 job_ids: Vec<TableId>,
148 ) -> (HashMap<TableId, oneshot::Receiver<()>>, Vec<TableId>) {
149 let mut jobs = self.streaming_jobs.lock().await;
150 let mut receivers = HashMap::new();
151 let mut recovered_job_ids = vec![];
152 for job_id in job_ids {
153 if let Some(job) = jobs.get_mut(&job_id) {
154 if let Some(shutdown_tx) = job.shutdown_tx.take() {
155 let (tx, rx) = oneshot::channel();
156 if shutdown_tx
157 .send(CreatingState::Canceling { finish_tx: tx })
158 .await
159 .is_ok()
160 {
161 receivers.insert(job_id, rx);
162 } else {
163 tracing::warn!(id=?job_id, "failed to send canceling state");
164 }
165 }
166 } else {
167 recovered_job_ids.push(job_id);
172 }
173 }
174 (receivers, recovered_job_ids)
175 }
176}
177
178type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
179
180pub struct ReplaceStreamJobContext {
184 pub old_fragments: StreamJobFragments,
186
187 pub replace_upstream: FragmentReplaceUpstream,
189 pub new_no_shuffle: FragmentNewNoShuffle,
190
191 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
193
194 pub building_locations: Locations,
196
197 pub existing_locations: Locations,
199
200 pub streaming_job: StreamingJob,
201
202 pub tmp_id: u32,
203
204 pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
206}
207
208pub struct GlobalStreamManager {
210 pub env: MetaSrvEnv,
211
212 pub metadata_manager: MetadataManager,
213
214 pub barrier_scheduler: BarrierScheduler,
216
217 pub source_manager: SourceManagerRef,
219
220 creating_job_info: CreatingStreamingJobInfoRef,
222
223 pub scale_controller: ScaleControllerRef,
224}
225
226impl GlobalStreamManager {
227 pub fn new(
228 env: MetaSrvEnv,
229 metadata_manager: MetadataManager,
230 barrier_scheduler: BarrierScheduler,
231 source_manager: SourceManagerRef,
232 scale_controller: ScaleControllerRef,
233 ) -> MetaResult<Self> {
234 Ok(Self {
235 env,
236 metadata_manager,
237 barrier_scheduler,
238 source_manager,
239 creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
240 scale_controller,
241 })
242 }
243
244 pub async fn create_streaming_job(
255 self: &Arc<Self>,
256 stream_job_fragments: StreamJobFragmentsToCreate,
257 ctx: CreateStreamingJobContext,
258 run_command_notifier: Option<oneshot::Sender<MetaResult<()>>>,
259 ) -> MetaResult<NotificationVersion> {
260 let table_id = stream_job_fragments.stream_job_id();
261 let database_id = ctx.streaming_job.database_id().into();
262 let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
263 let execution = StreamingJobExecution::new(table_id, sender.clone());
264 self.creating_job_info.add_job(execution).await;
265
266 let stream_manager = self.clone();
267 let fut = async move {
268 let res: MetaResult<_> = try {
269 let (source_change, streaming_job) = stream_manager
270 .run_create_streaming_job_command(stream_job_fragments, ctx)
271 .inspect(move |result| {
272 if let Some(tx) = run_command_notifier {
273 let _ = tx.send(match result {
274 Ok(_) => {
275 Ok(())
276 }
277 Err(err) => {
278 Err(err.clone())
279 }
280 });
281 }
282 })
283 .await?;
284 let version = stream_manager
285 .metadata_manager
286 .wait_streaming_job_finished(
287 streaming_job.database_id().into(),
288 streaming_job.id() as _,
289 )
290 .await?;
291 stream_manager.source_manager.apply_source_change(source_change).await;
292 tracing::debug!(?streaming_job, "stream job finish");
293 version
294 };
295
296 match res {
297 Ok(version) => {
298 let _ = sender
299 .send(CreatingState::Created { version })
300 .await
301 .inspect_err(|_| tracing::warn!("failed to notify created: {table_id}"));
302 }
303 Err(err) => {
304 let _ = sender
305 .send(CreatingState::Failed {
306 reason: err.clone(),
307 })
308 .await
309 .inspect_err(|_| {
310 tracing::warn!(error = %err.as_report(), "failed to notify failed: {table_id}")
311 });
312 }
313 }
314 }
315 .in_current_span();
316 tokio::spawn(fut);
317
318 while let Some(state) = receiver.recv().await {
319 match state {
320 CreatingState::Failed { reason } => {
321 tracing::debug!(id=?table_id, "stream job failed");
322 self.creating_job_info.delete_job(table_id).await;
325 return Err(reason);
326 }
327 CreatingState::Canceling { finish_tx } => {
328 tracing::debug!(id=?table_id, "cancelling streaming job");
329 if let Ok(table_fragments) = self
330 .metadata_manager
331 .get_job_fragments_by_id(&table_id)
332 .await
333 {
334 if self
336 .barrier_scheduler
337 .try_cancel_scheduled_create(database_id, table_id)
338 {
339 tracing::debug!("cancelling streaming job {table_id} in buffer queue.");
340 } else if !table_fragments.is_created() {
341 tracing::debug!(
342 "cancelling streaming job {table_id} by issue cancel command."
343 );
344 self.metadata_manager
345 .catalog_controller
346 .try_abort_creating_streaming_job(table_id.table_id as _, true)
347 .await?;
348
349 self.barrier_scheduler
350 .run_command(database_id, Command::cancel(&table_fragments))
351 .await?;
352 } else {
353 continue;
355 }
356 let _ = finish_tx.send(()).inspect_err(|_| {
357 tracing::warn!("failed to notify cancelled: {table_id}")
358 });
359 self.creating_job_info.delete_job(table_id).await;
360 return Err(MetaError::cancelled("create"));
361 }
362 }
363 CreatingState::Created { version } => {
364 self.creating_job_info.delete_job(table_id).await;
365 return Ok(version);
366 }
367 }
368 }
369 self.creating_job_info.delete_job(table_id).await;
370 bail!("receiver failed to get notification version for finished stream job")
371 }
372
373 async fn run_create_streaming_job_command(
376 &self,
377 stream_job_fragments: StreamJobFragmentsToCreate,
378 CreateStreamingJobContext {
379 streaming_job,
380 upstream_fragment_downstreams,
381 new_no_shuffle,
382 upstream_actors,
383 definition,
384 create_type,
385 job_type,
386 replace_table_job_info,
387 internal_tables,
388 snapshot_backfill_info,
389 cross_db_snapshot_backfill_info,
390 ..
391 }: CreateStreamingJobContext,
392 ) -> MetaResult<(SourceChange, StreamingJob)> {
393 let mut replace_table_command = None;
394
395 tracing::debug!(
396 table_id = %stream_job_fragments.stream_job_id(),
397 "built actors finished"
398 );
399
400 if let Some((streaming_job, context, stream_job_fragments)) = replace_table_job_info {
401 self.metadata_manager
402 .catalog_controller
403 .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
404 .await?;
405
406 let tmp_table_id = stream_job_fragments.stream_job_id();
407 let init_split_assignment = self
408 .source_manager
409 .allocate_splits(&stream_job_fragments)
410 .await?;
411
412 replace_table_command = Some(ReplaceStreamJobPlan {
413 old_fragments: context.old_fragments,
414 new_fragments: stream_job_fragments,
415 replace_upstream: context.replace_upstream,
416 upstream_fragment_downstreams: context.upstream_fragment_downstreams,
417 init_split_assignment,
418 streaming_job,
419 tmp_id: tmp_table_id.table_id,
420 to_drop_state_table_ids: Vec::new(), });
422 }
423
424 let mut init_split_assignment = self
429 .source_manager
430 .allocate_splits(&stream_job_fragments)
431 .await?;
432 init_split_assignment.extend(
433 self.source_manager
434 .allocate_splits_for_backfill(
435 &stream_job_fragments,
436 &new_no_shuffle,
437 &upstream_actors,
438 )
439 .await?,
440 );
441
442 let source_change = SourceChange::CreateJobFinished {
443 finished_backfill_fragments: stream_job_fragments.source_backfill_fragments()?,
444 };
445
446 let info = CreateStreamingJobCommandInfo {
447 stream_job_fragments,
448 upstream_fragment_downstreams,
449 init_split_assignment,
450 definition: definition.clone(),
451 streaming_job: streaming_job.clone(),
452 internal_tables: internal_tables.into_values().collect_vec(),
453 job_type,
454 create_type,
455 };
456
457 let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
458 tracing::debug!(
459 ?snapshot_backfill_info,
460 "sending Command::CreateSnapshotBackfillStreamingJob"
461 );
462 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
463 } else {
464 tracing::debug!("sending Command::CreateStreamingJob");
465 if let Some(replace_table_command) = replace_table_command {
466 CreateStreamingJobType::SinkIntoTable(replace_table_command)
467 } else {
468 CreateStreamingJobType::Normal
469 }
470 };
471
472 let command = Command::CreateStreamingJob {
473 info,
474 job_type,
475 cross_db_snapshot_backfill_info,
476 };
477
478 self.barrier_scheduler
479 .run_command(streaming_job.database_id().into(), command)
480 .await?;
481
482 tracing::debug!(?streaming_job, "first barrier collected for stream job");
483
484 Ok((source_change, streaming_job))
485 }
486
487 pub async fn replace_stream_job(
489 &self,
490 new_fragments: StreamJobFragmentsToCreate,
491 ReplaceStreamJobContext {
492 old_fragments,
493 replace_upstream,
494 new_no_shuffle,
495 upstream_fragment_downstreams,
496 tmp_id,
497 streaming_job,
498 drop_table_connector_ctx,
499 ..
500 }: ReplaceStreamJobContext,
501 ) -> MetaResult<()> {
502 let init_split_assignment = if streaming_job.is_source() {
503 self.source_manager
504 .allocate_splits_for_replace_source(
505 &new_fragments,
506 &replace_upstream,
507 &new_no_shuffle,
508 )
509 .await?
510 } else {
511 self.source_manager.allocate_splits(&new_fragments).await?
512 };
513 tracing::info!(
514 "replace_stream_job - allocate split: {:?}",
515 init_split_assignment
516 );
517
518 self.barrier_scheduler
519 .run_command(
520 streaming_job.database_id().into(),
521 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
522 old_fragments,
523 new_fragments,
524 replace_upstream,
525 upstream_fragment_downstreams,
526 init_split_assignment,
527 streaming_job,
528 tmp_id,
529 to_drop_state_table_ids: {
530 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
531 vec![TableId::new(
532 drop_table_connector_ctx.to_remove_state_table_id as _,
533 )]
534 } else {
535 Vec::new()
536 }
537 },
538 }),
539 )
540 .await?;
541
542 Ok(())
543 }
544
545 pub async fn drop_streaming_jobs(
549 &self,
550 database_id: DatabaseId,
551 removed_actors: Vec<ActorId>,
552 streaming_job_ids: Vec<ObjectId>,
553 state_table_ids: Vec<risingwave_meta_model::TableId>,
554 fragment_ids: HashSet<FragmentId>,
555 ) {
556 if !removed_actors.is_empty()
557 || !streaming_job_ids.is_empty()
558 || !state_table_ids.is_empty()
559 {
560 let res = self
561 .barrier_scheduler
562 .run_command(
563 database_id,
564 Command::DropStreamingJobs {
565 table_fragments_ids: streaming_job_ids
566 .iter()
567 .map(|job_id| TableId::new(*job_id as _))
568 .collect(),
569 actors: removed_actors,
570 unregistered_state_table_ids: state_table_ids
571 .iter()
572 .map(|table_id| TableId::new(*table_id as _))
573 .collect(),
574 unregistered_fragment_ids: fragment_ids,
575 },
576 )
577 .await
578 .inspect_err(|err| {
579 tracing::error!(error = ?err.as_report(), "failed to run drop command");
580 });
581 if res.is_ok() {
582 self.post_dropping_streaming_jobs(state_table_ids).await;
583 }
584 }
585 }
586
587 async fn post_dropping_streaming_jobs(
588 &self,
589 state_table_ids: Vec<risingwave_meta_model::TableId>,
590 ) {
591 let tables = self
592 .metadata_manager
593 .catalog_controller
594 .complete_dropped_tables(state_table_ids.into_iter())
595 .await;
596 let objects = tables
597 .into_iter()
598 .map(|t| PbObject {
599 object_info: Some(PbObjectInfo::Table(t)),
600 })
601 .collect();
602 let group = PbInfo::ObjectGroup(PbObjectGroup { objects });
603 self.env
604 .notification_manager()
605 .notify_hummock(Operation::Delete, group.clone())
606 .await;
607 self.env
608 .notification_manager()
609 .notify_compactor(Operation::Delete, group)
610 .await;
611 }
612
613 pub async fn cancel_streaming_jobs(&self, table_ids: Vec<TableId>) -> Vec<TableId> {
620 if table_ids.is_empty() {
621 return vec![];
622 }
623
624 let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
625 let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;
626
627 let futures = receivers.into_iter().map(|(id, receiver)| async move {
628 if receiver.await.is_ok() {
629 tracing::info!("canceled streaming job {id}");
630 Some(id)
631 } else {
632 tracing::warn!("failed to cancel streaming job {id}");
633 None
634 }
635 });
636 let mut cancelled_ids = join_all(futures).await.into_iter().flatten().collect_vec();
637
638 let futures = recovered_job_ids.into_iter().map(|id| async move {
641 tracing::debug!(?id, "cancelling recovered streaming job");
642 let result: MetaResult<()> = try {
643 let fragment = self
644 .metadata_manager.get_job_fragments_by_id(&id)
645 .await?;
646 if fragment.is_created() {
647 Err(MetaError::invalid_parameter(format!(
648 "streaming job {} is already created",
649 id
650 )))?;
651 }
652
653 let (_, database_id) = self.metadata_manager
654 .catalog_controller
655 .try_abort_creating_streaming_job(id.table_id as _, true)
656 .await?;
657
658 if let Some(database_id) = database_id {
659 self.barrier_scheduler
660 .run_command(DatabaseId::new(database_id as _), Command::cancel(&fragment))
661 .await?;
662 }
663 };
664 match result {
665 Ok(_) => {
666 tracing::info!(?id, "cancelled recovered streaming job");
667 Some(id)
668 }
669 Err(err) => {
670 tracing::error!(error=?err.as_report(), "failed to cancel recovered streaming job {id}, does it correspond to any jobs in `SHOW JOBS`?");
671 None
672 }
673 }
674 });
675 let cancelled_recovered_ids = join_all(futures).await.into_iter().flatten().collect_vec();
676
677 cancelled_ids.extend(cancelled_recovered_ids);
678 cancelled_ids
679 }
680
681 pub(crate) async fn reschedule_streaming_job(
682 &self,
683 job_id: u32,
684 target: JobRescheduleTarget,
685 deferred: bool,
686 ) -> MetaResult<()> {
687 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
688 let background_jobs = self
689 .metadata_manager
690 .list_background_creating_jobs()
691 .await?;
692
693 if !background_jobs.is_empty() {
694 let related_jobs = self
695 .scale_controller
696 .resolve_related_no_shuffle_jobs(&background_jobs)
697 .await?;
698
699 for job in background_jobs {
700 if related_jobs.contains(&job) {
701 bail!(
702 "Cannot alter the job {} because the related job {} is currently being created",
703 job_id,
704 job.table_id
705 );
706 }
707 }
708 }
709
710 let JobRescheduleTarget {
711 parallelism: parallelism_change,
712 resource_group: resource_group_change,
713 } = target;
714
715 let database_id = DatabaseId::new(
716 self.metadata_manager
717 .catalog_controller
718 .get_object_database_id(job_id as ObjectId)
719 .await? as _,
720 );
721 let job_id = TableId::new(job_id);
722
723 let worker_nodes = self
724 .metadata_manager
725 .list_active_streaming_compute_nodes()
726 .await?
727 .into_iter()
728 .filter(|w| w.is_streaming_schedulable())
729 .collect_vec();
730
731 let available_parallelism = worker_nodes
733 .iter()
734 .map(|w| w.compute_node_parallelism())
735 .sum::<usize>();
736 let max_parallelism = self
737 .metadata_manager
738 .get_job_max_parallelism(job_id)
739 .await?;
740
741 if let JobParallelismTarget::Update(parallelism) = parallelism_change {
742 match parallelism {
743 TableParallelism::Adaptive => {
744 if available_parallelism > max_parallelism {
745 tracing::warn!(
746 "too many parallelism available, use max parallelism {} will be limited",
747 max_parallelism
748 );
749 }
750 }
751 TableParallelism::Fixed(parallelism) => {
752 if parallelism > max_parallelism {
753 bail_invalid_parameter!(
754 "specified parallelism {} should not exceed max parallelism {}",
755 parallelism,
756 max_parallelism
757 );
758 }
759 }
760 TableParallelism::Custom => {
761 bail_invalid_parameter!("should not alter parallelism to custom")
762 }
763 }
764 }
765
766 let table_parallelism_assignment = match ¶llelism_change {
767 JobParallelismTarget::Update(parallelism) => HashMap::from([(job_id, *parallelism)]),
768 JobParallelismTarget::Refresh => HashMap::new(),
769 };
770 let resource_group_assignment = match &resource_group_change {
771 JobResourceGroupTarget::Update(target) => {
772 HashMap::from([(job_id.table_id() as ObjectId, target.clone())])
773 }
774 JobResourceGroupTarget::Keep => HashMap::new(),
775 };
776
777 if deferred {
778 tracing::debug!(
779 "deferred mode enabled for job {}, set the parallelism directly to parallelism {:?}, resource group {:?}",
780 job_id,
781 parallelism_change,
782 resource_group_change,
783 );
784 self.scale_controller
785 .post_apply_reschedule(
786 &HashMap::new(),
787 &JobReschedulePostUpdates {
788 parallelism_updates: table_parallelism_assignment,
789 resource_group_updates: resource_group_assignment,
790 },
791 )
792 .await?;
793 } else {
794 let reschedule_plan = self
795 .scale_controller
796 .generate_job_reschedule_plan(JobReschedulePolicy {
797 targets: HashMap::from([(
798 job_id.table_id,
799 JobRescheduleTarget {
800 parallelism: parallelism_change,
801 resource_group: resource_group_change,
802 },
803 )]),
804 })
805 .await?;
806
807 if reschedule_plan.reschedules.is_empty() {
808 tracing::debug!(
809 "empty reschedule plan generated for job {}, set the parallelism directly to {:?}",
810 job_id,
811 reschedule_plan.post_updates
812 );
813 self.scale_controller
814 .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
815 .await?;
816 } else {
817 self.reschedule_actors(
818 database_id,
819 reschedule_plan,
820 RescheduleOptions {
821 resolve_no_shuffle_upstream: false,
822 skip_create_new_actors: false,
823 },
824 )
825 .await?;
826 }
827 };
828
829 Ok(())
830 }
831
832 pub async fn create_subscription(
834 self: &Arc<Self>,
835 subscription: &Subscription,
836 ) -> MetaResult<()> {
837 let command = Command::CreateSubscription {
838 subscription_id: subscription.id,
839 upstream_mv_table_id: TableId::new(subscription.dependent_table_id),
840 retention_second: subscription.retention_seconds,
841 };
842
843 tracing::debug!("sending Command::CreateSubscription");
844 self.barrier_scheduler
845 .run_command(subscription.database_id.into(), command)
846 .await?;
847 Ok(())
848 }
849
850 pub async fn drop_subscription(
852 self: &Arc<Self>,
853 database_id: DatabaseId,
854 subscription_id: u32,
855 table_id: u32,
856 ) {
857 let command = Command::DropSubscription {
858 subscription_id,
859 upstream_mv_table_id: TableId::new(table_id),
860 };
861
862 tracing::debug!("sending Command::DropSubscriptions");
863 let _ = self
864 .barrier_scheduler
865 .run_command(database_id, command)
866 .await
867 .inspect_err(|err| {
868 tracing::error!(error = ?err.as_report(), "failed to run drop command");
869 });
870 }
871}