risingwave_meta/manager/
metadata.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
21use risingwave_common::id::JobId;
22use risingwave_meta_model::{SinkId, SourceId, WorkerId};
23use risingwave_pb::catalog::{PbSink, PbSource, PbTable};
24use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
25use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
26use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
27use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode, PbStreamScanType};
28use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
29use tokio::sync::oneshot;
30use tracing::warn;
31
32use crate::MetaResult;
33use crate::barrier::SharedFragmentInfo;
34use crate::controller::catalog::CatalogControllerRef;
35use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
36use crate::controller::fragment::FragmentParallelismInfo;
37use crate::manager::{LocalNotification, NotificationVersion};
38use crate::model::{ActorId, ClusterId, FragmentId, StreamJobFragments, SubscriptionId};
39use crate::stream::SplitAssignment;
40use crate::telemetry::MetaTelemetryJobDesc;
41
42#[derive(Clone)]
43pub struct MetadataManager {
44    pub cluster_controller: ClusterControllerRef,
45    pub catalog_controller: CatalogControllerRef,
46}
47
48#[derive(Debug)]
49pub(crate) enum ActiveStreamingWorkerChange {
50    Add(WorkerNode),
51    Remove(WorkerNode),
52    Update(WorkerNode),
53}
54
55pub struct ActiveStreamingWorkerNodes {
56    worker_nodes: HashMap<WorkerId, WorkerNode>,
57    rx: UnboundedReceiver<LocalNotification>,
58    #[cfg_attr(not(debug_assertions), expect(dead_code))]
59    meta_manager: Option<MetadataManager>,
60}
61
62impl Debug for ActiveStreamingWorkerNodes {
63    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("ActiveStreamingWorkerNodes")
65            .field("worker_nodes", &self.worker_nodes)
66            .finish()
67    }
68}
69
70impl ActiveStreamingWorkerNodes {
71    pub(crate) fn uninitialized() -> Self {
72        Self {
73            worker_nodes: Default::default(),
74            rx: unbounded_channel().1,
75            meta_manager: None,
76        }
77    }
78
79    #[cfg(test)]
80    pub(crate) fn for_test(worker_nodes: HashMap<WorkerId, WorkerNode>) -> Self {
81        let (tx, rx) = unbounded_channel();
82        let _join_handle = tokio::spawn(async move {
83            let _tx = tx;
84            std::future::pending::<()>().await
85        });
86        Self {
87            worker_nodes,
88            rx,
89            meta_manager: None,
90        }
91    }
92
93    /// Return an uninitialized one as a placeholder for future initialized
94    pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
95        let (nodes, rx) = meta_manager
96            .subscribe_active_streaming_compute_nodes()
97            .await?;
98        Ok(Self {
99            worker_nodes: nodes.into_iter().map(|node| (node.id, node)).collect(),
100            rx,
101            meta_manager: Some(meta_manager),
102        })
103    }
104
105    pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
106        &self.worker_nodes
107    }
108
109    pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
110        loop {
111            let notification = self
112                .rx
113                .recv()
114                .await
115                .expect("notification stopped or uninitialized");
116            match notification {
117                LocalNotification::WorkerNodeDeleted(worker) => {
118                    let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32
119                        && worker.property.as_ref().unwrap().is_streaming;
120                    let Some(prev_worker) = self.worker_nodes.remove(&worker.id) else {
121                        if is_streaming_compute_node {
122                            warn!(
123                                ?worker,
124                                "notify to delete an non-existing streaming compute worker"
125                            );
126                        }
127                        continue;
128                    };
129                    if !is_streaming_compute_node {
130                        warn!(
131                            ?worker,
132                            ?prev_worker,
133                            "deleted worker has a different recent type"
134                        );
135                    }
136                    if worker.state == State::Starting as i32 {
137                        warn!(
138                            id = %worker.id,
139                            host = ?worker.host,
140                            state = worker.state,
141                            "a starting streaming worker is deleted"
142                        );
143                    }
144                    break ActiveStreamingWorkerChange::Remove(prev_worker);
145                }
146                LocalNotification::WorkerNodeActivated(worker) => {
147                    if worker.r#type != WorkerType::ComputeNode as i32
148                        || !worker.property.as_ref().unwrap().is_streaming
149                    {
150                        if let Some(prev_worker) = self.worker_nodes.remove(&worker.id) {
151                            warn!(
152                                ?worker,
153                                ?prev_worker,
154                                "the type of a streaming worker is changed"
155                            );
156                            break ActiveStreamingWorkerChange::Remove(prev_worker);
157                        } else {
158                            continue;
159                        }
160                    }
161                    assert_eq!(
162                        worker.state,
163                        State::Running as i32,
164                        "not started worker added: {:?}",
165                        worker
166                    );
167                    if let Some(prev_worker) = self.worker_nodes.insert(worker.id, worker.clone()) {
168                        assert_eq!(prev_worker.host, worker.host);
169                        assert_eq!(prev_worker.r#type, worker.r#type);
170                        warn!(
171                            ?prev_worker,
172                            ?worker,
173                            eq = prev_worker == worker,
174                            "notify to update an existing active worker"
175                        );
176                        if prev_worker == worker {
177                            continue;
178                        } else {
179                            break ActiveStreamingWorkerChange::Update(worker);
180                        }
181                    } else {
182                        break ActiveStreamingWorkerChange::Add(worker);
183                    }
184                }
185                _ => {
186                    continue;
187                }
188            }
189        }
190    }
191
192    #[cfg(debug_assertions)]
193    pub(crate) async fn validate_change(&self) {
194        use risingwave_pb::common::WorkerNode;
195        use thiserror_ext::AsReport;
196        let Some(meta_manager) = &self.meta_manager else {
197            return;
198        };
199        match meta_manager.list_active_streaming_compute_nodes().await {
200            Ok(worker_nodes) => {
201                let ignore_irrelevant_info = |node: &WorkerNode| {
202                    (
203                        node.id,
204                        WorkerNode {
205                            id: node.id,
206                            r#type: node.r#type,
207                            host: node.host.clone(),
208                            property: node.property.clone(),
209                            resource: node.resource.clone(),
210                            ..Default::default()
211                        },
212                    )
213                };
214                let worker_nodes: HashMap<_, _> =
215                    worker_nodes.iter().map(ignore_irrelevant_info).collect();
216                let curr_worker_nodes: HashMap<_, _> = self
217                    .current()
218                    .values()
219                    .map(ignore_irrelevant_info)
220                    .collect();
221                if worker_nodes != curr_worker_nodes {
222                    warn!(
223                        ?worker_nodes,
224                        ?curr_worker_nodes,
225                        "different to global snapshot"
226                    );
227                }
228            }
229            Err(e) => {
230                warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
231            }
232        }
233    }
234}
235
236impl MetadataManager {
237    pub fn new(
238        cluster_controller: ClusterControllerRef,
239        catalog_controller: CatalogControllerRef,
240    ) -> Self {
241        Self {
242            cluster_controller,
243            catalog_controller,
244        }
245    }
246
247    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
248        self.cluster_controller.get_worker_by_id(worker_id).await
249    }
250
251    pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
252        let node_map = self.cluster_controller.count_worker_by_type().await?;
253        Ok(node_map
254            .into_iter()
255            .map(|(ty, cnt)| (ty.into(), cnt as u64))
256            .collect())
257    }
258
259    pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
260        self.cluster_controller
261            .get_worker_info_by_id(worker_id as _)
262            .await
263    }
264
265    pub async fn add_worker_node(
266        &self,
267        r#type: PbWorkerType,
268        host_address: HostAddress,
269        property: AddNodeProperty,
270        resource: PbResource,
271    ) -> MetaResult<WorkerId> {
272        self.cluster_controller
273            .add_worker(r#type, host_address, property, resource)
274            .await
275            .map(|id| id as WorkerId)
276    }
277
278    pub async fn list_worker_node(
279        &self,
280        worker_type: Option<WorkerType>,
281        worker_state: Option<State>,
282    ) -> MetaResult<Vec<PbWorkerNode>> {
283        self.cluster_controller
284            .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
285            .await
286    }
287
288    pub async fn subscribe_active_streaming_compute_nodes(
289        &self,
290    ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
291        self.cluster_controller
292            .subscribe_active_streaming_compute_nodes()
293            .await
294    }
295
296    pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
297        self.cluster_controller
298            .list_active_streaming_workers()
299            .await
300    }
301
302    pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
303        self.cluster_controller.list_active_serving_workers().await
304    }
305
306    pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
307        Ok(self
308            .catalog_controller
309            .list_fragment_database_ids(None)
310            .await?
311            .into_iter()
312            .map(|(_, database_id)| database_id)
313            .collect())
314    }
315
316    pub async fn split_fragment_map_by_database<T: Debug>(
317        &self,
318        fragment_map: HashMap<FragmentId, T>,
319    ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
320        let fragment_to_database_map: HashMap<_, _> = self
321            .catalog_controller
322            .list_fragment_database_ids(Some(
323                fragment_map
324                    .keys()
325                    .map(|fragment_id| *fragment_id as _)
326                    .collect(),
327            ))
328            .await?
329            .into_iter()
330            .map(|(fragment_id, database_id)| (fragment_id as FragmentId, database_id))
331            .collect();
332        let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
333        for (fragment_id, value) in fragment_map {
334            let database_id = *fragment_to_database_map
335                .get(&fragment_id)
336                .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
337            ret.entry(database_id)
338                .or_default()
339                .try_insert(fragment_id, value)
340                .expect("non duplicate");
341        }
342        Ok(ret)
343    }
344
345    pub async fn list_background_creating_jobs(&self) -> MetaResult<Vec<JobId>> {
346        let jobs = self
347            .catalog_controller
348            .list_background_creating_jobs(false, None)
349            .await?;
350
351        let jobs = jobs.into_iter().map(|(id, _, _)| id).collect();
352
353        Ok(jobs)
354    }
355
356    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
357        self.catalog_controller.list_sources().await
358    }
359
360    pub fn running_fragment_parallelisms(
361        &self,
362        id_filter: Option<HashSet<FragmentId>>,
363    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
364        let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
365        Ok(self
366            .catalog_controller
367            .running_fragment_parallelisms(id_filter)?
368            .into_iter()
369            .map(|(k, v)| (k as FragmentId, v))
370            .collect())
371    }
372
373    /// Get and filter the "**root**" fragments of the specified relations.
374    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
375    ///
376    /// See also [`crate::controller::catalog::CatalogController::get_root_fragments`].
377    pub async fn get_upstream_root_fragments(
378        &self,
379        upstream_table_ids: &HashSet<TableId>,
380    ) -> MetaResult<(
381        HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
382        HashMap<ActorId, WorkerId>,
383    )> {
384        let (upstream_root_fragments, actors) = self
385            .catalog_controller
386            .get_root_fragments(upstream_table_ids.iter().map(|id| id.as_job_id()).collect())
387            .await?;
388
389        Ok((upstream_root_fragments, actors))
390    }
391
392    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
393        self.cluster_controller.get_streaming_cluster_info().await
394    }
395
396    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
397        self.catalog_controller.get_all_table_options().await
398    }
399
400    pub async fn get_table_name_type_mapping(
401        &self,
402    ) -> MetaResult<HashMap<TableId, (String, String)>> {
403        self.catalog_controller.get_table_name_type_mapping().await
404    }
405
406    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
407        self.catalog_controller.get_created_table_ids().await
408    }
409
410    pub async fn get_table_associated_source_id(
411        &self,
412        table_id: TableId,
413    ) -> MetaResult<Option<SourceId>> {
414        self.catalog_controller
415            .get_table_associated_source_id(table_id)
416            .await
417    }
418
419    pub async fn get_table_catalog_by_ids(&self, ids: &[TableId]) -> MetaResult<Vec<PbTable>> {
420        self.catalog_controller
421            .get_table_by_ids(ids.to_vec(), false)
422            .await
423    }
424
425    pub async fn get_table_incoming_sinks(&self, table_id: TableId) -> MetaResult<Vec<PbSink>> {
426        self.catalog_controller
427            .get_table_incoming_sinks(table_id)
428            .await
429    }
430
431    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
432        self.catalog_controller
433            .get_sink_state_table_ids(sink_id)
434            .await
435    }
436
437    pub async fn get_table_catalog_by_cdc_table_id(
438        &self,
439        cdc_table_id: &String,
440    ) -> MetaResult<Vec<PbTable>> {
441        self.catalog_controller
442            .get_table_by_cdc_table_id(cdc_table_id)
443            .await
444    }
445
446    pub async fn get_downstream_fragments(
447        &self,
448        job_id: JobId,
449    ) -> MetaResult<(
450        Vec<(PbDispatcherType, SharedFragmentInfo, PbStreamNode)>,
451        HashMap<ActorId, WorkerId>,
452    )> {
453        let (fragments, actors) = self
454            .catalog_controller
455            .get_downstream_fragments(job_id)
456            .await?;
457
458        Ok((fragments, actors))
459    }
460
461    pub async fn get_job_id_to_internal_table_ids_mapping(
462        &self,
463    ) -> Option<Vec<(JobId, Vec<TableId>)>> {
464        self.catalog_controller.get_job_internal_table_ids().await
465    }
466
467    pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
468        self.catalog_controller
469            .get_job_fragments_by_id(job_id)
470            .await
471    }
472
473    pub fn get_running_actors_of_fragment(&self, id: FragmentId) -> MetaResult<HashSet<ActorId>> {
474        self.catalog_controller
475            .get_running_actors_of_fragment(id as _)
476    }
477
478    // (backfill_actor_id, upstream_source_actor_id)
479    pub async fn get_running_actors_for_source_backfill(
480        &self,
481        source_backfill_fragment_id: FragmentId,
482        source_fragment_id: FragmentId,
483    ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
484        let actor_ids = self
485            .catalog_controller
486            .get_running_actors_for_source_backfill(
487                source_backfill_fragment_id as _,
488                source_fragment_id as _,
489            )
490            .await?;
491        Ok(actor_ids
492            .into_iter()
493            .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
494            .collect())
495    }
496
497    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
498        let actor_cnt = self.catalog_controller.worker_actor_count()?;
499        Ok(actor_cnt
500            .into_iter()
501            .map(|(id, cnt)| (id as WorkerId, cnt))
502            .collect())
503    }
504
505    pub async fn count_streaming_job(&self) -> MetaResult<usize> {
506        self.catalog_controller
507            .list_streaming_job_infos()
508            .await
509            .map(|x| x.len())
510    }
511
512    pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
513        self.catalog_controller
514            .list_stream_job_desc_for_telemetry()
515            .await
516    }
517
518    pub async fn update_source_rate_limit_by_source_id(
519        &self,
520        source_id: SourceId,
521        rate_limit: Option<u32>,
522    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
523        let fragment_actors = self
524            .catalog_controller
525            .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
526            .await?;
527        Ok(fragment_actors
528            .into_iter()
529            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
530            .collect())
531    }
532
533    pub async fn update_backfill_rate_limit_by_job_id(
534        &self,
535        job_id: JobId,
536        rate_limit: Option<u32>,
537    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
538        let fragment_actors = self
539            .catalog_controller
540            .update_backfill_rate_limit_by_job_id(job_id, rate_limit)
541            .await?;
542        Ok(fragment_actors
543            .into_iter()
544            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
545            .collect())
546    }
547
548    pub async fn update_sink_rate_limit_by_sink_id(
549        &self,
550        sink_id: SinkId,
551        rate_limit: Option<u32>,
552    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
553        let fragment_actors = self
554            .catalog_controller
555            .update_sink_rate_limit_by_job_id(sink_id, rate_limit)
556            .await?;
557        Ok(fragment_actors
558            .into_iter()
559            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
560            .collect())
561    }
562
563    pub async fn update_dml_rate_limit_by_job_id(
564        &self,
565        job_id: JobId,
566        rate_limit: Option<u32>,
567    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
568        let fragment_actors = self
569            .catalog_controller
570            .update_dml_rate_limit_by_job_id(job_id, rate_limit)
571            .await?;
572        Ok(fragment_actors
573            .into_iter()
574            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
575            .collect())
576    }
577
578    pub async fn update_sink_props_by_sink_id(
579        &self,
580        sink_id: SinkId,
581        props: BTreeMap<String, String>,
582    ) -> MetaResult<HashMap<String, String>> {
583        let new_props = self
584            .catalog_controller
585            .update_sink_props_by_sink_id(sink_id, props)
586            .await?;
587        Ok(new_props)
588    }
589
590    pub async fn update_iceberg_table_props_by_table_id(
591        &self,
592        table_id: TableId,
593        props: BTreeMap<String, String>,
594        alter_iceberg_table_props: Option<
595            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
596        >,
597    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
598        let (new_props, sink_id) = self
599            .catalog_controller
600            .update_iceberg_table_props_by_table_id(table_id, props, alter_iceberg_table_props)
601            .await?;
602        Ok((new_props, sink_id))
603    }
604
605    pub async fn update_fragment_rate_limit_by_fragment_id(
606        &self,
607        fragment_id: FragmentId,
608        rate_limit: Option<u32>,
609    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
610        let fragment_actors = self
611            .catalog_controller
612            .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
613            .await?;
614        Ok(fragment_actors
615            .into_iter()
616            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
617            .collect())
618    }
619
620    #[await_tree::instrument]
621    pub async fn update_fragment_splits(
622        &self,
623        split_assignment: &SplitAssignment,
624    ) -> MetaResult<()> {
625        let fragment_splits = split_assignment
626            .iter()
627            .map(|(fragment_id, splits)| {
628                (
629                    *fragment_id as _,
630                    splits.values().flatten().cloned().collect_vec(),
631                )
632            })
633            .collect();
634
635        let inner = self.catalog_controller.inner.write().await;
636
637        self.catalog_controller
638            .update_fragment_splits(&inner.db, &fragment_splits)
639            .await
640    }
641
642    pub async fn get_mv_depended_subscriptions(
643        &self,
644        database_id: Option<DatabaseId>,
645    ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
646        Ok(self
647            .catalog_controller
648            .get_mv_depended_subscriptions(database_id)
649            .await?
650            .into_iter()
651            .map(|(table_id, subscriptions)| {
652                (
653                    table_id,
654                    subscriptions
655                        .into_iter()
656                        .map(|(subscription_id, retention_time)| {
657                            (subscription_id as SubscriptionId, retention_time)
658                        })
659                        .collect(),
660                )
661            })
662            .collect())
663    }
664
665    pub async fn get_job_max_parallelism(&self, job_id: JobId) -> MetaResult<usize> {
666        self.catalog_controller
667            .get_max_parallelism_by_id(job_id)
668            .await
669    }
670
671    pub async fn get_existing_job_resource_group(
672        &self,
673        streaming_job_id: JobId,
674    ) -> MetaResult<String> {
675        self.catalog_controller
676            .get_existing_job_resource_group(streaming_job_id)
677            .await
678    }
679
680    pub async fn get_database_resource_group(&self, database_id: DatabaseId) -> MetaResult<String> {
681        self.catalog_controller
682            .get_database_resource_group(database_id)
683            .await
684    }
685
686    pub fn cluster_id(&self) -> &ClusterId {
687        self.cluster_controller.cluster_id()
688    }
689
690    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
691        let rate_limits = self.catalog_controller.list_rate_limits().await?;
692        Ok(rate_limits)
693    }
694
695    pub async fn get_job_backfill_scan_types(
696        &self,
697        job_id: JobId,
698    ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
699        let backfill_types = self
700            .catalog_controller
701            .get_job_fragment_backfill_scan_type(job_id)
702            .await?;
703        Ok(backfill_types)
704    }
705}
706
707impl MetadataManager {
708    /// Wait for job finishing notification in `TrackingJob::finish`.
709    /// The progress is updated per barrier.
710    #[await_tree::instrument]
711    pub async fn wait_streaming_job_finished(
712        &self,
713        database_id: DatabaseId,
714        id: JobId,
715    ) -> MetaResult<NotificationVersion> {
716        tracing::debug!("wait_streaming_job_finished: {id:?}");
717        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
718        if mgr.streaming_job_is_finished(id).await? {
719            return Ok(self.catalog_controller.current_notification_version().await);
720        }
721        let (tx, rx) = oneshot::channel();
722
723        mgr.register_finish_notifier(database_id, id, tx);
724        drop(mgr);
725        rx.await
726            .map_err(|_| "no received reason".to_owned())
727            .and_then(|result| result)
728            .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
729    }
730
731    pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
732        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
733        mgr.notify_finish_failed(database_id, err);
734    }
735}