1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
21use risingwave_common::id::JobId;
22use risingwave_meta_model::refresh_job::{self, RefreshState};
23use risingwave_meta_model::{SinkId, SourceId, WorkerId};
24use risingwave_pb::catalog::{PbSource, PbTable};
25use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
26use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
27use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
28use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode, PbStreamScanType};
29use sea_orm::prelude::DateTime;
30use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
31use tokio::sync::oneshot;
32use tracing::warn;
33
34use crate::MetaResult;
35use crate::barrier::SharedFragmentInfo;
36use crate::controller::catalog::CatalogControllerRef;
37use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
38use crate::controller::fragment::FragmentParallelismInfo;
39use crate::manager::{LocalNotification, NotificationVersion};
40use crate::model::{ActorId, ClusterId, FragmentId, StreamJobFragments, SubscriptionId};
41use crate::stream::SplitAssignment;
42use crate::telemetry::MetaTelemetryJobDesc;
43
44#[derive(Clone)]
45pub struct MetadataManager {
46 pub cluster_controller: ClusterControllerRef,
47 pub catalog_controller: CatalogControllerRef,
48}
49
50#[derive(Debug)]
51pub(crate) enum ActiveStreamingWorkerChange {
52 Add(WorkerNode),
53 Remove(WorkerNode),
54 Update(WorkerNode),
55}
56
57pub struct ActiveStreamingWorkerNodes {
58 worker_nodes: HashMap<WorkerId, WorkerNode>,
59 rx: UnboundedReceiver<LocalNotification>,
60 #[cfg_attr(not(debug_assertions), expect(dead_code))]
61 meta_manager: Option<MetadataManager>,
62}
63
64impl Debug for ActiveStreamingWorkerNodes {
65 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
66 f.debug_struct("ActiveStreamingWorkerNodes")
67 .field("worker_nodes", &self.worker_nodes)
68 .finish()
69 }
70}
71
72impl ActiveStreamingWorkerNodes {
73 pub(crate) fn uninitialized() -> Self {
74 Self {
75 worker_nodes: Default::default(),
76 rx: unbounded_channel().1,
77 meta_manager: None,
78 }
79 }
80
81 #[cfg(test)]
82 pub(crate) fn for_test(worker_nodes: HashMap<WorkerId, WorkerNode>) -> Self {
83 let (tx, rx) = unbounded_channel();
84 let _join_handle = tokio::spawn(async move {
85 let _tx = tx;
86 std::future::pending::<()>().await
87 });
88 Self {
89 worker_nodes,
90 rx,
91 meta_manager: None,
92 }
93 }
94
95 pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
97 let (nodes, rx) = meta_manager
98 .subscribe_active_streaming_compute_nodes()
99 .await?;
100 Ok(Self {
101 worker_nodes: nodes
102 .into_iter()
103 .filter_map(|node| node.is_streaming_schedulable().then_some((node.id, node)))
104 .collect(),
105 rx,
106 meta_manager: Some(meta_manager),
107 })
108 }
109
110 pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
111 &self.worker_nodes
112 }
113
114 pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
115 loop {
116 let notification = self
117 .rx
118 .recv()
119 .await
120 .expect("notification stopped or uninitialized");
121 fn is_target_worker_node(worker: &WorkerNode) -> bool {
122 worker.r#type == WorkerType::ComputeNode as i32
123 && worker.property.as_ref().unwrap().is_streaming
124 && worker.is_streaming_schedulable()
125 }
126 match notification {
127 LocalNotification::WorkerNodeDeleted(worker) => {
128 let is_target_worker_node = is_target_worker_node(&worker);
129 let Some(prev_worker) = self.worker_nodes.remove(&worker.id) else {
130 if is_target_worker_node {
131 warn!(
132 ?worker,
133 "notify to delete an non-existing streaming compute worker"
134 );
135 }
136 continue;
137 };
138 if !is_target_worker_node {
139 warn!(
140 ?worker,
141 ?prev_worker,
142 "deleted worker has a different recent type"
143 );
144 }
145 if worker.state == State::Starting as i32 {
146 warn!(
147 id = %worker.id,
148 host = ?worker.host,
149 state = worker.state,
150 "a starting streaming worker is deleted"
151 );
152 }
153 break ActiveStreamingWorkerChange::Remove(prev_worker);
154 }
155 LocalNotification::WorkerNodeActivated(worker) => {
156 if !is_target_worker_node(&worker) {
157 if let Some(prev_worker) = self.worker_nodes.remove(&worker.id) {
158 warn!(
159 ?worker,
160 ?prev_worker,
161 "the type of a streaming worker is changed"
162 );
163 break ActiveStreamingWorkerChange::Remove(prev_worker);
164 } else {
165 continue;
166 }
167 }
168 assert_eq!(
169 worker.state,
170 State::Running as i32,
171 "not started worker added: {:?}",
172 worker
173 );
174 if let Some(prev_worker) = self.worker_nodes.insert(worker.id, worker.clone()) {
175 assert_eq!(prev_worker.host, worker.host);
176 assert_eq!(prev_worker.r#type, worker.r#type);
177 warn!(
178 ?prev_worker,
179 ?worker,
180 eq = prev_worker == worker,
181 "notify to update an existing active worker"
182 );
183 if prev_worker == worker {
184 continue;
185 } else {
186 break ActiveStreamingWorkerChange::Update(worker);
187 }
188 } else {
189 break ActiveStreamingWorkerChange::Add(worker);
190 }
191 }
192 _ => {
193 continue;
194 }
195 }
196 }
197 }
198
199 #[cfg(debug_assertions)]
200 pub(crate) async fn validate_change(&self) {
201 use risingwave_pb::common::WorkerNode;
202 use thiserror_ext::AsReport;
203 let Some(meta_manager) = &self.meta_manager else {
204 return;
205 };
206 match meta_manager.list_active_streaming_compute_nodes().await {
207 Ok(worker_nodes) => {
208 let ignore_irrelevant_info = |node: &WorkerNode| {
209 (
210 node.id,
211 WorkerNode {
212 id: node.id,
213 r#type: node.r#type,
214 host: node.host.clone(),
215 property: node.property.clone(),
216 resource: node.resource.clone(),
217 ..Default::default()
218 },
219 )
220 };
221 let worker_nodes: HashMap<_, _> =
222 worker_nodes.iter().map(ignore_irrelevant_info).collect();
223 let curr_worker_nodes: HashMap<_, _> = self
224 .current()
225 .values()
226 .map(ignore_irrelevant_info)
227 .collect();
228 if worker_nodes != curr_worker_nodes {
229 warn!(
230 ?worker_nodes,
231 ?curr_worker_nodes,
232 "different to global snapshot"
233 );
234 }
235 }
236 Err(e) => {
237 warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
238 }
239 }
240 }
241}
242
243impl MetadataManager {
244 pub fn new(
245 cluster_controller: ClusterControllerRef,
246 catalog_controller: CatalogControllerRef,
247 ) -> Self {
248 Self {
249 cluster_controller,
250 catalog_controller,
251 }
252 }
253
254 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
255 self.cluster_controller.get_worker_by_id(worker_id).await
256 }
257
258 pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
259 let node_map = self.cluster_controller.count_worker_by_type().await?;
260 Ok(node_map
261 .into_iter()
262 .map(|(ty, cnt)| (ty.into(), cnt as u64))
263 .collect())
264 }
265
266 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
267 self.cluster_controller
268 .get_worker_info_by_id(worker_id as _)
269 .await
270 }
271
272 pub async fn add_worker_node(
273 &self,
274 r#type: PbWorkerType,
275 host_address: HostAddress,
276 property: AddNodeProperty,
277 resource: PbResource,
278 ) -> MetaResult<WorkerId> {
279 self.cluster_controller
280 .add_worker(r#type, host_address, property, resource)
281 .await
282 .map(|id| id as WorkerId)
283 }
284
285 pub async fn list_worker_node(
286 &self,
287 worker_type: Option<WorkerType>,
288 worker_state: Option<State>,
289 ) -> MetaResult<Vec<PbWorkerNode>> {
290 self.cluster_controller
291 .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
292 .await
293 }
294
295 pub async fn subscribe_active_streaming_compute_nodes(
296 &self,
297 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
298 self.cluster_controller
299 .subscribe_active_streaming_compute_nodes()
300 .await
301 }
302
303 pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
304 self.cluster_controller
305 .list_active_streaming_workers()
306 .await
307 }
308
309 pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
310 self.cluster_controller.list_active_serving_workers().await
311 }
312
313 pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
314 Ok(self
315 .catalog_controller
316 .list_fragment_database_ids(None)
317 .await?
318 .into_iter()
319 .map(|(_, database_id)| database_id)
320 .collect())
321 }
322
323 pub async fn split_fragment_map_by_database<T: Debug>(
324 &self,
325 fragment_map: HashMap<FragmentId, T>,
326 ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
327 let fragment_to_database_map: HashMap<_, _> = self
328 .catalog_controller
329 .list_fragment_database_ids(Some(
330 fragment_map
331 .keys()
332 .map(|fragment_id| *fragment_id as _)
333 .collect(),
334 ))
335 .await?
336 .into_iter()
337 .map(|(fragment_id, database_id)| (fragment_id as FragmentId, database_id))
338 .collect();
339 let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
340 for (fragment_id, value) in fragment_map {
341 let database_id = *fragment_to_database_map
342 .get(&fragment_id)
343 .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
344 ret.entry(database_id)
345 .or_default()
346 .try_insert(fragment_id, value)
347 .expect("non duplicate");
348 }
349 Ok(ret)
350 }
351
352 pub async fn list_background_creating_jobs(&self) -> MetaResult<HashSet<JobId>> {
353 self.catalog_controller
354 .list_background_creating_jobs(false, None)
355 .await
356 }
357
358 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
359 self.catalog_controller.list_sources().await
360 }
361
362 pub fn running_fragment_parallelisms(
363 &self,
364 id_filter: Option<HashSet<FragmentId>>,
365 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
366 let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
367 Ok(self
368 .catalog_controller
369 .running_fragment_parallelisms(id_filter)?
370 .into_iter()
371 .map(|(k, v)| (k as FragmentId, v))
372 .collect())
373 }
374
375 pub async fn get_upstream_root_fragments(
380 &self,
381 upstream_table_ids: &HashSet<TableId>,
382 ) -> MetaResult<(
383 HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
384 HashMap<ActorId, WorkerId>,
385 )> {
386 let (upstream_root_fragments, actors) = self
387 .catalog_controller
388 .get_root_fragments(upstream_table_ids.iter().map(|id| id.as_job_id()).collect())
389 .await?;
390
391 Ok((upstream_root_fragments, actors))
392 }
393
394 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
395 self.cluster_controller.get_streaming_cluster_info().await
396 }
397
398 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
399 self.catalog_controller.get_all_table_options().await
400 }
401
402 pub async fn get_table_name_type_mapping(
403 &self,
404 ) -> MetaResult<HashMap<TableId, (String, String)>> {
405 self.catalog_controller.get_table_name_type_mapping().await
406 }
407
408 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
409 self.catalog_controller.get_created_table_ids().await
410 }
411
412 pub async fn get_table_associated_source_id(
413 &self,
414 table_id: TableId,
415 ) -> MetaResult<Option<SourceId>> {
416 self.catalog_controller
417 .get_table_associated_source_id(table_id)
418 .await
419 }
420
421 pub async fn get_table_catalog_by_ids(&self, ids: &[TableId]) -> MetaResult<Vec<PbTable>> {
422 self.catalog_controller
423 .get_table_by_ids(ids.to_vec(), false)
424 .await
425 }
426
427 pub async fn list_refresh_jobs(&self) -> MetaResult<Vec<refresh_job::Model>> {
428 self.catalog_controller.list_refresh_jobs().await
429 }
430
431 pub async fn list_refreshable_table_ids(&self) -> MetaResult<Vec<TableId>> {
432 self.catalog_controller.list_refreshable_table_ids().await
433 }
434
435 pub async fn ensure_refresh_job(&self, table_id: TableId) -> MetaResult<()> {
436 self.catalog_controller.ensure_refresh_job(table_id).await
437 }
438
439 pub async fn update_refresh_job_status(
440 &self,
441 table_id: TableId,
442 status: RefreshState,
443 trigger_time: Option<DateTime>,
444 is_success: bool,
445 ) -> MetaResult<()> {
446 self.catalog_controller
447 .update_refresh_job_status(table_id, status, trigger_time, is_success)
448 .await
449 }
450
451 pub async fn reset_all_refresh_jobs_to_idle(&self) -> MetaResult<()> {
452 self.catalog_controller
453 .reset_all_refresh_jobs_to_idle()
454 .await
455 }
456
457 pub async fn update_refresh_job_interval(
458 &self,
459 table_id: TableId,
460 trigger_interval_secs: Option<i64>,
461 ) -> MetaResult<()> {
462 self.catalog_controller
463 .update_refresh_job_interval(table_id, trigger_interval_secs)
464 .await
465 }
466
467 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
468 self.catalog_controller
469 .get_sink_state_table_ids(sink_id)
470 .await
471 }
472
473 pub async fn get_table_catalog_by_cdc_table_id(
474 &self,
475 cdc_table_id: &String,
476 ) -> MetaResult<Vec<PbTable>> {
477 self.catalog_controller
478 .get_table_by_cdc_table_id(cdc_table_id)
479 .await
480 }
481
482 pub async fn get_downstream_fragments(
483 &self,
484 job_id: JobId,
485 ) -> MetaResult<(
486 Vec<(PbDispatcherType, SharedFragmentInfo, PbStreamNode)>,
487 HashMap<ActorId, WorkerId>,
488 )> {
489 let (fragments, actors) = self
490 .catalog_controller
491 .get_downstream_fragments(job_id)
492 .await?;
493
494 Ok((fragments, actors))
495 }
496
497 pub async fn get_job_id_to_internal_table_ids_mapping(
498 &self,
499 ) -> Option<Vec<(JobId, Vec<TableId>)>> {
500 self.catalog_controller.get_job_internal_table_ids().await
501 }
502
503 pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
504 self.catalog_controller
505 .get_job_fragments_by_id(job_id)
506 .await
507 }
508
509 pub fn get_running_actors_of_fragment(&self, id: FragmentId) -> MetaResult<HashSet<ActorId>> {
510 self.catalog_controller
511 .get_running_actors_of_fragment(id as _)
512 }
513
514 pub async fn get_running_actors_for_source_backfill(
516 &self,
517 source_backfill_fragment_id: FragmentId,
518 source_fragment_id: FragmentId,
519 ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
520 let actor_ids = self
521 .catalog_controller
522 .get_running_actors_for_source_backfill(
523 source_backfill_fragment_id as _,
524 source_fragment_id as _,
525 )
526 .await?;
527 Ok(actor_ids
528 .into_iter()
529 .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
530 .collect())
531 }
532
533 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
534 let actor_cnt = self.catalog_controller.worker_actor_count()?;
535 Ok(actor_cnt
536 .into_iter()
537 .map(|(id, cnt)| (id as WorkerId, cnt))
538 .collect())
539 }
540
541 pub async fn count_streaming_job(&self) -> MetaResult<usize> {
542 self.catalog_controller.count_streaming_jobs().await
543 }
544
545 pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
546 self.catalog_controller
547 .list_stream_job_desc_for_telemetry()
548 .await
549 }
550
551 pub async fn update_source_rate_limit_by_source_id(
552 &self,
553 source_id: SourceId,
554 rate_limit: Option<u32>,
555 ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
556 self.catalog_controller
557 .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
558 .await
559 }
560
561 pub async fn update_backfill_rate_limit_by_job_id(
562 &self,
563 job_id: JobId,
564 rate_limit: Option<u32>,
565 ) -> MetaResult<HashSet<FragmentId>> {
566 self.catalog_controller
567 .update_backfill_rate_limit_by_job_id(job_id, rate_limit)
568 .await
569 }
570
571 pub async fn update_sink_rate_limit_by_sink_id(
572 &self,
573 sink_id: SinkId,
574 rate_limit: Option<u32>,
575 ) -> MetaResult<HashSet<FragmentId>> {
576 self.catalog_controller
577 .update_sink_rate_limit_by_job_id(sink_id, rate_limit)
578 .await
579 }
580
581 pub async fn update_dml_rate_limit_by_job_id(
582 &self,
583 job_id: JobId,
584 rate_limit: Option<u32>,
585 ) -> MetaResult<HashSet<FragmentId>> {
586 self.catalog_controller
587 .update_dml_rate_limit_by_job_id(job_id, rate_limit)
588 .await
589 }
590
591 pub async fn update_sink_props_by_sink_id(
592 &self,
593 sink_id: SinkId,
594 props: BTreeMap<String, String>,
595 ) -> MetaResult<HashMap<String, String>> {
596 let new_props = self
597 .catalog_controller
598 .update_sink_props_by_sink_id(sink_id, props)
599 .await?;
600 Ok(new_props)
601 }
602
603 pub async fn update_iceberg_table_props_by_table_id(
604 &self,
605 table_id: TableId,
606 props: BTreeMap<String, String>,
607 alter_iceberg_table_props: Option<
608 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
609 >,
610 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
611 let (new_props, sink_id) = self
612 .catalog_controller
613 .update_iceberg_table_props_by_table_id(table_id, props, alter_iceberg_table_props)
614 .await?;
615 Ok((new_props, sink_id))
616 }
617
618 pub async fn update_fragment_rate_limit_by_fragment_id(
619 &self,
620 fragment_id: FragmentId,
621 rate_limit: Option<u32>,
622 ) -> MetaResult<()> {
623 self.catalog_controller
624 .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
625 .await
626 }
627
628 #[await_tree::instrument]
629 pub async fn update_fragment_splits(
630 &self,
631 split_assignment: &SplitAssignment,
632 ) -> MetaResult<()> {
633 let fragment_splits = split_assignment
634 .iter()
635 .map(|(fragment_id, splits)| {
636 (
637 *fragment_id as _,
638 splits.values().flatten().cloned().collect_vec(),
639 )
640 })
641 .collect();
642
643 let inner = self.catalog_controller.inner.write().await;
644
645 self.catalog_controller
646 .update_fragment_splits(&inner.db, &fragment_splits)
647 .await
648 }
649
650 pub async fn get_mv_depended_subscriptions(
651 &self,
652 database_id: Option<DatabaseId>,
653 ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
654 Ok(self
655 .catalog_controller
656 .get_mv_depended_subscriptions(database_id)
657 .await?
658 .into_iter()
659 .map(|(table_id, subscriptions)| {
660 (
661 table_id,
662 subscriptions
663 .into_iter()
664 .map(|(subscription_id, retention_time)| {
665 (subscription_id as SubscriptionId, retention_time)
666 })
667 .collect(),
668 )
669 })
670 .collect())
671 }
672
673 pub async fn get_job_max_parallelism(&self, job_id: JobId) -> MetaResult<usize> {
674 self.catalog_controller
675 .get_max_parallelism_by_id(job_id)
676 .await
677 }
678
679 pub async fn get_existing_job_resource_group(
680 &self,
681 streaming_job_id: JobId,
682 ) -> MetaResult<String> {
683 self.catalog_controller
684 .get_existing_job_resource_group(streaming_job_id)
685 .await
686 }
687
688 pub async fn get_database_resource_group(&self, database_id: DatabaseId) -> MetaResult<String> {
689 self.catalog_controller
690 .get_database_resource_group(database_id)
691 .await
692 }
693
694 pub fn cluster_id(&self) -> &ClusterId {
695 self.cluster_controller.cluster_id()
696 }
697
698 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
699 let rate_limits = self.catalog_controller.list_rate_limits().await?;
700 Ok(rate_limits)
701 }
702
703 pub async fn get_job_backfill_scan_types(
704 &self,
705 job_id: JobId,
706 ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
707 let backfill_types = self
708 .catalog_controller
709 .get_job_fragment_backfill_scan_type(job_id)
710 .await?;
711 Ok(backfill_types)
712 }
713
714 pub async fn collect_unreschedulable_backfill_jobs(
715 &self,
716 job_ids: impl IntoIterator<Item = &JobId>,
717 ) -> MetaResult<HashSet<JobId>> {
718 let mut unreschedulable = HashSet::new();
719
720 for job_id in job_ids {
721 let scan_types = self
722 .catalog_controller
723 .get_job_fragment_backfill_scan_type(*job_id)
724 .await?;
725 if scan_types
726 .values()
727 .any(|scan_type| !scan_type.is_reschedulable())
728 {
729 unreschedulable.insert(*job_id);
730 }
731 }
732
733 Ok(unreschedulable)
734 }
735}
736
737impl MetadataManager {
738 #[await_tree::instrument]
741 pub async fn wait_streaming_job_finished(
742 &self,
743 database_id: DatabaseId,
744 id: JobId,
745 ) -> MetaResult<NotificationVersion> {
746 tracing::debug!("wait_streaming_job_finished: {id:?}");
747 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
748 if mgr.streaming_job_is_finished(id).await? {
749 return Ok(self.catalog_controller.current_notification_version().await);
750 }
751 let (tx, rx) = oneshot::channel();
752
753 mgr.register_finish_notifier(database_id, id, tx);
754 drop(mgr);
755 rx.await
756 .map_err(|_| "no received reason".to_owned())
757 .and_then(|result| result)
758 .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
759 }
760
761 pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
762 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
763 mgr.notify_finish_failed(database_id, err);
764 }
765}