1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, span};
19use futures::FutureExt;
20use futures::future::join_all;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_meta_model::ObjectId;
25use risingwave_pb::catalog::{CreateType, Subscription, Table};
26use risingwave_pb::meta::object::PbObjectInfo;
27use risingwave_pb::meta::subscribe_response::{Operation, PbInfo};
28use risingwave_pb::meta::{PbObject, PbObjectGroup};
29use thiserror_ext::AsReport;
30use tokio::sync::mpsc::Sender;
31use tokio::sync::{Mutex, oneshot};
32use tracing::Instrument;
33
34use super::{
35 FragmentBackfillOrder, JobParallelismTarget, JobReschedulePolicy, JobReschedulePostUpdates,
36 JobRescheduleTarget, JobResourceGroupTarget, Locations, RescheduleOptions, ScaleControllerRef,
37};
38use crate::barrier::{
39 BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
40 ReplaceStreamJobPlan, SnapshotBackfillInfo,
41};
42use crate::controller::catalog::DropTableConnectorContext;
43use crate::error::bail_invalid_parameter;
44use crate::manager::{
45 MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
46};
47use crate::model::{
48 ActorId, FragmentDownstreamRelation, FragmentId, FragmentNewNoShuffle, FragmentReplaceUpstream,
49 StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
50};
51use crate::stream::{SourceChange, SourceManagerRef};
52use crate::{MetaError, MetaResult};
53
54pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;
55
56#[derive(Default)]
57pub struct CreateStreamingJobOption {
58 }
60
61pub struct CreateStreamingJobContext {
65 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
67 pub new_no_shuffle: FragmentNewNoShuffle,
68 pub upstream_actors: HashMap<FragmentId, HashSet<ActorId>>,
69
70 pub internal_tables: BTreeMap<u32, Table>,
72
73 pub building_locations: Locations,
75
76 pub existing_locations: Locations,
78
79 pub definition: String,
81
82 pub mv_table_id: Option<u32>,
83
84 pub create_type: CreateType,
85
86 pub job_type: StreamingJobType,
87
88 pub replace_table_job_info: Option<(
90 StreamingJob,
91 ReplaceStreamJobContext,
92 StreamJobFragmentsToCreate,
93 )>,
94
95 pub snapshot_backfill_info: Option<SnapshotBackfillInfo>,
96 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
97
98 pub option: CreateStreamingJobOption,
99
100 pub streaming_job: StreamingJob,
101
102 pub fragment_backfill_ordering: FragmentBackfillOrder,
103}
104
105impl CreateStreamingJobContext {
106 pub fn internal_tables(&self) -> Vec<Table> {
107 self.internal_tables.values().cloned().collect()
108 }
109}
110
111pub enum CreatingState {
112 Failed { reason: MetaError },
113 Canceling { finish_tx: oneshot::Sender<()> },
115 Created { version: NotificationVersion },
116}
117
118struct StreamingJobExecution {
119 id: TableId,
120 shutdown_tx: Option<Sender<CreatingState>>,
121}
122
123impl StreamingJobExecution {
124 fn new(id: TableId, shutdown_tx: Sender<CreatingState>) -> Self {
125 Self {
126 id,
127 shutdown_tx: Some(shutdown_tx),
128 }
129 }
130}
131
132#[derive(Default)]
133struct CreatingStreamingJobInfo {
134 streaming_jobs: Mutex<HashMap<TableId, StreamingJobExecution>>,
135}
136
137impl CreatingStreamingJobInfo {
138 async fn add_job(&self, job: StreamingJobExecution) {
139 let mut jobs = self.streaming_jobs.lock().await;
140 jobs.insert(job.id, job);
141 }
142
143 async fn delete_job(&self, job_id: TableId) {
144 let mut jobs = self.streaming_jobs.lock().await;
145 jobs.remove(&job_id);
146 }
147
148 async fn cancel_jobs(
149 &self,
150 job_ids: Vec<TableId>,
151 ) -> (HashMap<TableId, oneshot::Receiver<()>>, Vec<TableId>) {
152 let mut jobs = self.streaming_jobs.lock().await;
153 let mut receivers = HashMap::new();
154 let mut recovered_job_ids = vec![];
155 for job_id in job_ids {
156 if let Some(job) = jobs.get_mut(&job_id) {
157 if let Some(shutdown_tx) = job.shutdown_tx.take() {
158 let (tx, rx) = oneshot::channel();
159 if shutdown_tx
160 .send(CreatingState::Canceling { finish_tx: tx })
161 .await
162 .is_ok()
163 {
164 receivers.insert(job_id, rx);
165 } else {
166 tracing::warn!(id=?job_id, "failed to send canceling state");
167 }
168 }
169 } else {
170 recovered_job_ids.push(job_id);
175 }
176 }
177 (receivers, recovered_job_ids)
178 }
179}
180
181type CreatingStreamingJobInfoRef = Arc<CreatingStreamingJobInfo>;
182
183pub struct ReplaceStreamJobContext {
187 pub old_fragments: StreamJobFragments,
189
190 pub replace_upstream: FragmentReplaceUpstream,
192 pub new_no_shuffle: FragmentNewNoShuffle,
193
194 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
196
197 pub building_locations: Locations,
199
200 pub existing_locations: Locations,
202
203 pub streaming_job: StreamingJob,
204
205 pub tmp_id: u32,
206
207 pub drop_table_connector_ctx: Option<DropTableConnectorContext>,
209}
210
211pub struct GlobalStreamManager {
213 pub env: MetaSrvEnv,
214
215 pub metadata_manager: MetadataManager,
216
217 pub barrier_scheduler: BarrierScheduler,
219
220 pub source_manager: SourceManagerRef,
222
223 creating_job_info: CreatingStreamingJobInfoRef,
225
226 pub scale_controller: ScaleControllerRef,
227}
228
229impl GlobalStreamManager {
230 pub fn new(
231 env: MetaSrvEnv,
232 metadata_manager: MetadataManager,
233 barrier_scheduler: BarrierScheduler,
234 source_manager: SourceManagerRef,
235 scale_controller: ScaleControllerRef,
236 ) -> MetaResult<Self> {
237 Ok(Self {
238 env,
239 metadata_manager,
240 barrier_scheduler,
241 source_manager,
242 creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
243 scale_controller,
244 })
245 }
246
247 #[await_tree::instrument]
258 pub async fn create_streaming_job(
259 self: &Arc<Self>,
260 stream_job_fragments: StreamJobFragmentsToCreate,
261 ctx: CreateStreamingJobContext,
262 run_command_notifier: Option<oneshot::Sender<MetaResult<()>>>,
263 ) -> MetaResult<NotificationVersion> {
264 let await_tree_key = format!("Create Streaming Job Worker ({})", ctx.streaming_job.id());
265 let await_tree_span = span!(
266 "{:?}({})",
267 ctx.streaming_job.job_type(),
268 ctx.streaming_job.name()
269 );
270
271 let table_id = stream_job_fragments.stream_job_id();
272 let database_id = ctx.streaming_job.database_id().into();
273 let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
274 let execution = StreamingJobExecution::new(table_id, sender.clone());
275 self.creating_job_info.add_job(execution).await;
276
277 let stream_manager = self.clone();
278 let fut = async move {
279 let res: MetaResult<_> = try {
280 let (source_change, streaming_job) = stream_manager
281 .run_create_streaming_job_command(stream_job_fragments, ctx)
282 .inspect(move |result| {
283 if let Some(tx) = run_command_notifier {
284 let _ = tx.send(match result {
285 Ok(_) => {
286 Ok(())
287 }
288 Err(err) => {
289 Err(err.clone())
290 }
291 });
292 }
293 })
294 .await?;
295 let version = stream_manager
296 .metadata_manager
297 .wait_streaming_job_finished(
298 streaming_job.database_id().into(),
299 streaming_job.id() as _,
300 )
301 .await?;
302 stream_manager.source_manager
303 .apply_source_change(source_change)
304 .await;
305 tracing::debug!(?streaming_job, "stream job finish");
306 version
307 };
308
309 match res {
310 Ok(version) => {
311 let _ = sender
312 .send(CreatingState::Created { version })
313 .await
314 .inspect_err(|_| tracing::warn!("failed to notify created: {table_id}"));
315 }
316 Err(err) => {
317 let _ = sender
318 .send(CreatingState::Failed {
319 reason: err.clone(),
320 })
321 .await
322 .inspect_err(|_| {
323 tracing::warn!(error = %err.as_report(), "failed to notify failed: {table_id}")
324 });
325 }
326 }
327 }
328 .in_current_span();
329
330 let fut = (self.env.await_tree_reg())
331 .register(await_tree_key, await_tree_span)
332 .instrument(Box::pin(fut));
333 tokio::spawn(fut);
334
335 while let Some(state) = receiver
336 .recv()
337 .instrument_await("recv_creating_state")
338 .await
339 {
340 match state {
341 CreatingState::Failed { reason } => {
342 tracing::debug!(id=?table_id, "stream job failed");
343 self.creating_job_info.delete_job(table_id).await;
346 return Err(reason);
347 }
348 CreatingState::Canceling { finish_tx } => {
349 tracing::debug!(id=?table_id, "cancelling streaming job");
350 if let Ok(table_fragments) = self
351 .metadata_manager
352 .get_job_fragments_by_id(&table_id)
353 .await
354 {
355 if self
357 .barrier_scheduler
358 .try_cancel_scheduled_create(database_id, table_id)
359 {
360 tracing::debug!("cancelling streaming job {table_id} in buffer queue.");
361 } else if !table_fragments.is_created() {
362 tracing::debug!(
363 "cancelling streaming job {table_id} by issue cancel command."
364 );
365 self.metadata_manager
366 .catalog_controller
367 .try_abort_creating_streaming_job(table_id.table_id as _, true)
368 .await?;
369
370 self.barrier_scheduler
371 .run_command(database_id, Command::cancel(&table_fragments))
372 .await?;
373 } else {
374 continue;
376 }
377 let _ = finish_tx.send(()).inspect_err(|_| {
378 tracing::warn!("failed to notify cancelled: {table_id}")
379 });
380 self.creating_job_info.delete_job(table_id).await;
381 return Err(MetaError::cancelled("create"));
382 }
383 }
384 CreatingState::Created { version } => {
385 self.creating_job_info.delete_job(table_id).await;
386 return Ok(version);
387 }
388 }
389 }
390 self.creating_job_info.delete_job(table_id).await;
391 bail!("receiver failed to get notification version for finished stream job")
392 }
393
394 #[await_tree::instrument]
397 async fn run_create_streaming_job_command(
398 &self,
399 stream_job_fragments: StreamJobFragmentsToCreate,
400 CreateStreamingJobContext {
401 streaming_job,
402 upstream_fragment_downstreams,
403 new_no_shuffle,
404 upstream_actors,
405 definition,
406 create_type,
407 job_type,
408 replace_table_job_info,
409 internal_tables,
410 snapshot_backfill_info,
411 cross_db_snapshot_backfill_info,
412 fragment_backfill_ordering,
413 ..
414 }: CreateStreamingJobContext,
415 ) -> MetaResult<(SourceChange, StreamingJob)> {
416 let mut replace_table_command = None;
417
418 tracing::debug!(
419 table_id = %stream_job_fragments.stream_job_id(),
420 "built actors finished"
421 );
422
423 if let Some((streaming_job, context, stream_job_fragments)) = replace_table_job_info {
424 self.metadata_manager
425 .catalog_controller
426 .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
427 .await?;
428
429 let tmp_table_id = stream_job_fragments.stream_job_id();
430 let init_split_assignment = self
431 .source_manager
432 .allocate_splits(&stream_job_fragments)
433 .await?;
434
435 replace_table_command = Some(ReplaceStreamJobPlan {
436 old_fragments: context.old_fragments,
437 new_fragments: stream_job_fragments,
438 replace_upstream: context.replace_upstream,
439 upstream_fragment_downstreams: context.upstream_fragment_downstreams,
440 init_split_assignment,
441 streaming_job,
442 tmp_id: tmp_table_id.table_id,
443 to_drop_state_table_ids: Vec::new(), });
445 }
446
447 let mut init_split_assignment = self
452 .source_manager
453 .allocate_splits(&stream_job_fragments)
454 .await?;
455 init_split_assignment.extend(
456 self.source_manager
457 .allocate_splits_for_backfill(
458 &stream_job_fragments,
459 &new_no_shuffle,
460 &upstream_actors,
461 )
462 .await?,
463 );
464
465 let source_change = SourceChange::CreateJobFinished {
466 finished_backfill_fragments: stream_job_fragments.source_backfill_fragments()?,
467 };
468
469 let info = CreateStreamingJobCommandInfo {
470 stream_job_fragments,
471 upstream_fragment_downstreams,
472 init_split_assignment,
473 definition: definition.clone(),
474 streaming_job: streaming_job.clone(),
475 internal_tables: internal_tables.into_values().collect_vec(),
476 job_type,
477 create_type,
478 fragment_backfill_ordering,
479 };
480
481 let job_type = if let Some(snapshot_backfill_info) = snapshot_backfill_info {
482 tracing::debug!(
483 ?snapshot_backfill_info,
484 "sending Command::CreateSnapshotBackfillStreamingJob"
485 );
486 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
487 } else {
488 tracing::debug!("sending Command::CreateStreamingJob");
489 if let Some(replace_table_command) = replace_table_command {
490 CreateStreamingJobType::SinkIntoTable(replace_table_command)
491 } else {
492 CreateStreamingJobType::Normal
493 }
494 };
495
496 let command = Command::CreateStreamingJob {
497 info,
498 job_type,
499 cross_db_snapshot_backfill_info,
500 };
501
502 self.barrier_scheduler
503 .run_command(streaming_job.database_id().into(), command)
504 .await?;
505
506 tracing::debug!(?streaming_job, "first barrier collected for stream job");
507
508 Ok((source_change, streaming_job))
509 }
510
511 pub async fn replace_stream_job(
513 &self,
514 new_fragments: StreamJobFragmentsToCreate,
515 ReplaceStreamJobContext {
516 old_fragments,
517 replace_upstream,
518 new_no_shuffle,
519 upstream_fragment_downstreams,
520 tmp_id,
521 streaming_job,
522 drop_table_connector_ctx,
523 ..
524 }: ReplaceStreamJobContext,
525 ) -> MetaResult<()> {
526 let init_split_assignment = if streaming_job.is_source() {
527 self.source_manager
528 .allocate_splits_for_replace_source(
529 &new_fragments,
530 &replace_upstream,
531 &new_no_shuffle,
532 )
533 .await?
534 } else {
535 self.source_manager.allocate_splits(&new_fragments).await?
536 };
537 tracing::info!(
538 "replace_stream_job - allocate split: {:?}",
539 init_split_assignment
540 );
541
542 self.barrier_scheduler
543 .run_command(
544 streaming_job.database_id().into(),
545 Command::ReplaceStreamJob(ReplaceStreamJobPlan {
546 old_fragments,
547 new_fragments,
548 replace_upstream,
549 upstream_fragment_downstreams,
550 init_split_assignment,
551 streaming_job,
552 tmp_id,
553 to_drop_state_table_ids: {
554 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
555 vec![TableId::new(
556 drop_table_connector_ctx.to_remove_state_table_id as _,
557 )]
558 } else {
559 Vec::new()
560 }
561 },
562 }),
563 )
564 .await?;
565
566 Ok(())
567 }
568
569 pub async fn drop_streaming_jobs(
573 &self,
574 database_id: DatabaseId,
575 removed_actors: Vec<ActorId>,
576 streaming_job_ids: Vec<ObjectId>,
577 state_table_ids: Vec<risingwave_meta_model::TableId>,
578 fragment_ids: HashSet<FragmentId>,
579 ) {
580 if !removed_actors.is_empty()
581 || !streaming_job_ids.is_empty()
582 || !state_table_ids.is_empty()
583 {
584 let res = self
585 .barrier_scheduler
586 .run_command(
587 database_id,
588 Command::DropStreamingJobs {
589 table_fragments_ids: streaming_job_ids
590 .iter()
591 .map(|job_id| TableId::new(*job_id as _))
592 .collect(),
593 actors: removed_actors,
594 unregistered_state_table_ids: state_table_ids
595 .iter()
596 .map(|table_id| TableId::new(*table_id as _))
597 .collect(),
598 unregistered_fragment_ids: fragment_ids,
599 },
600 )
601 .await
602 .inspect_err(|err| {
603 tracing::error!(error = ?err.as_report(), "failed to run drop command");
604 });
605 if res.is_ok() {
606 self.post_dropping_streaming_jobs(state_table_ids).await;
607 }
608 }
609 }
610
611 async fn post_dropping_streaming_jobs(
612 &self,
613 state_table_ids: Vec<risingwave_meta_model::TableId>,
614 ) {
615 let tables = self
616 .metadata_manager
617 .catalog_controller
618 .complete_dropped_tables(state_table_ids.into_iter())
619 .await;
620 let objects = tables
621 .into_iter()
622 .map(|t| PbObject {
623 object_info: Some(PbObjectInfo::Table(t)),
624 })
625 .collect();
626 let group = PbInfo::ObjectGroup(PbObjectGroup { objects });
627 self.env
628 .notification_manager()
629 .notify_hummock(Operation::Delete, group.clone())
630 .await;
631 self.env
632 .notification_manager()
633 .notify_compactor(Operation::Delete, group)
634 .await;
635 }
636
637 pub async fn cancel_streaming_jobs(&self, table_ids: Vec<TableId>) -> Vec<TableId> {
644 if table_ids.is_empty() {
645 return vec![];
646 }
647
648 let _reschedule_job_lock = self.reschedule_lock_read_guard().await;
649 let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await;
650
651 let futures = receivers.into_iter().map(|(id, receiver)| async move {
652 if receiver.await.is_ok() {
653 tracing::info!("canceled streaming job {id}");
654 Some(id)
655 } else {
656 tracing::warn!("failed to cancel streaming job {id}");
657 None
658 }
659 });
660 let mut cancelled_ids = join_all(futures).await.into_iter().flatten().collect_vec();
661
662 let futures = recovered_job_ids.into_iter().map(|id| async move {
665 tracing::debug!(?id, "cancelling recovered streaming job");
666 let result: MetaResult<()> = try {
667 let fragment = self
668 .metadata_manager.get_job_fragments_by_id(&id)
669 .await?;
670 if fragment.is_created() {
671 Err(MetaError::invalid_parameter(format!(
672 "streaming job {} is already created",
673 id
674 )))?;
675 }
676
677 let (_, database_id) = self.metadata_manager
678 .catalog_controller
679 .try_abort_creating_streaming_job(id.table_id as _, true)
680 .await?;
681
682 if let Some(database_id) = database_id {
683 self.barrier_scheduler
684 .run_command(DatabaseId::new(database_id as _), Command::cancel(&fragment))
685 .await?;
686 }
687 };
688 match result {
689 Ok(_) => {
690 tracing::info!(?id, "cancelled recovered streaming job");
691 Some(id)
692 }
693 Err(err) => {
694 tracing::error!(error=?err.as_report(), "failed to cancel recovered streaming job {id}, does it correspond to any jobs in `SHOW JOBS`?");
695 None
696 }
697 }
698 });
699 let cancelled_recovered_ids = join_all(futures).await.into_iter().flatten().collect_vec();
700
701 cancelled_ids.extend(cancelled_recovered_ids);
702 cancelled_ids
703 }
704
705 pub(crate) async fn reschedule_streaming_job(
706 &self,
707 job_id: u32,
708 target: JobRescheduleTarget,
709 deferred: bool,
710 ) -> MetaResult<()> {
711 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
712 let background_jobs = self
713 .metadata_manager
714 .list_background_creating_jobs()
715 .await?;
716
717 if !background_jobs.is_empty() {
718 let related_jobs = self
719 .scale_controller
720 .resolve_related_no_shuffle_jobs(&background_jobs)
721 .await?;
722
723 for job in background_jobs {
724 if related_jobs.contains(&job) {
725 bail!(
726 "Cannot alter the job {} because the related job {} is currently being created",
727 job_id,
728 job.table_id
729 );
730 }
731 }
732 }
733
734 let JobRescheduleTarget {
735 parallelism: parallelism_change,
736 resource_group: resource_group_change,
737 } = target;
738
739 let database_id = DatabaseId::new(
740 self.metadata_manager
741 .catalog_controller
742 .get_object_database_id(job_id as ObjectId)
743 .await? as _,
744 );
745 let job_id = TableId::new(job_id);
746
747 let worker_nodes = self
748 .metadata_manager
749 .list_active_streaming_compute_nodes()
750 .await?
751 .into_iter()
752 .filter(|w| w.is_streaming_schedulable())
753 .collect_vec();
754
755 let available_parallelism = worker_nodes
757 .iter()
758 .map(|w| w.compute_node_parallelism())
759 .sum::<usize>();
760 let max_parallelism = self
761 .metadata_manager
762 .get_job_max_parallelism(job_id)
763 .await?;
764
765 if let JobParallelismTarget::Update(parallelism) = parallelism_change {
766 match parallelism {
767 TableParallelism::Adaptive => {
768 if available_parallelism > max_parallelism {
769 tracing::warn!(
770 "too many parallelism available, use max parallelism {} will be limited",
771 max_parallelism
772 );
773 }
774 }
775 TableParallelism::Fixed(parallelism) => {
776 if parallelism > max_parallelism {
777 bail_invalid_parameter!(
778 "specified parallelism {} should not exceed max parallelism {}",
779 parallelism,
780 max_parallelism
781 );
782 }
783 }
784 TableParallelism::Custom => {
785 bail_invalid_parameter!("should not alter parallelism to custom")
786 }
787 }
788 }
789
790 let table_parallelism_assignment = match ¶llelism_change {
791 JobParallelismTarget::Update(parallelism) => HashMap::from([(job_id, *parallelism)]),
792 JobParallelismTarget::Refresh => HashMap::new(),
793 };
794 let resource_group_assignment = match &resource_group_change {
795 JobResourceGroupTarget::Update(target) => {
796 HashMap::from([(job_id.table_id() as ObjectId, target.clone())])
797 }
798 JobResourceGroupTarget::Keep => HashMap::new(),
799 };
800
801 if deferred {
802 tracing::debug!(
803 "deferred mode enabled for job {}, set the parallelism directly to parallelism {:?}, resource group {:?}",
804 job_id,
805 parallelism_change,
806 resource_group_change,
807 );
808 self.scale_controller
809 .post_apply_reschedule(
810 &HashMap::new(),
811 &JobReschedulePostUpdates {
812 parallelism_updates: table_parallelism_assignment,
813 resource_group_updates: resource_group_assignment,
814 },
815 )
816 .await?;
817 } else {
818 let reschedule_plan = self
819 .scale_controller
820 .generate_job_reschedule_plan(JobReschedulePolicy {
821 targets: HashMap::from([(
822 job_id.table_id,
823 JobRescheduleTarget {
824 parallelism: parallelism_change,
825 resource_group: resource_group_change,
826 },
827 )]),
828 })
829 .await?;
830
831 if reschedule_plan.reschedules.is_empty() {
832 tracing::debug!(
833 "empty reschedule plan generated for job {}, set the parallelism directly to {:?}",
834 job_id,
835 reschedule_plan.post_updates
836 );
837 self.scale_controller
838 .post_apply_reschedule(&HashMap::new(), &reschedule_plan.post_updates)
839 .await?;
840 } else {
841 self.reschedule_actors(
842 database_id,
843 reschedule_plan,
844 RescheduleOptions {
845 resolve_no_shuffle_upstream: false,
846 skip_create_new_actors: false,
847 },
848 )
849 .await?;
850 }
851 };
852
853 Ok(())
854 }
855
856 pub async fn create_subscription(
858 self: &Arc<Self>,
859 subscription: &Subscription,
860 ) -> MetaResult<()> {
861 let command = Command::CreateSubscription {
862 subscription_id: subscription.id,
863 upstream_mv_table_id: TableId::new(subscription.dependent_table_id),
864 retention_second: subscription.retention_seconds,
865 };
866
867 tracing::debug!("sending Command::CreateSubscription");
868 self.barrier_scheduler
869 .run_command(subscription.database_id.into(), command)
870 .await?;
871 Ok(())
872 }
873
874 pub async fn drop_subscription(
876 self: &Arc<Self>,
877 database_id: DatabaseId,
878 subscription_id: u32,
879 table_id: u32,
880 ) {
881 let command = Command::DropSubscription {
882 subscription_id,
883 upstream_mv_table_id: TableId::new(table_id),
884 };
885
886 tracing::debug!("sending Command::DropSubscriptions");
887 let _ = self
888 .barrier_scheduler
889 .run_command(database_id, command)
890 .await
891 .inspect_err(|err| {
892 tracing::error!(error = ?err.as_report(), "failed to run drop command");
893 });
894 }
895}