risingwave_meta/manager/
metadata.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
21use risingwave_common::id::JobId;
22use risingwave_meta_model::refresh_job::{self, RefreshState};
23use risingwave_meta_model::{SinkId, SourceId, WorkerId};
24use risingwave_pb::catalog::{PbSink, PbSource, PbTable};
25use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
26use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
27use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
28use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode, PbStreamScanType};
29use sea_orm::prelude::DateTime;
30use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
31use tokio::sync::oneshot;
32use tracing::warn;
33
34use crate::MetaResult;
35use crate::barrier::SharedFragmentInfo;
36use crate::controller::catalog::CatalogControllerRef;
37use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
38use crate::controller::fragment::FragmentParallelismInfo;
39use crate::manager::{LocalNotification, NotificationVersion};
40use crate::model::{ActorId, ClusterId, FragmentId, StreamJobFragments, SubscriptionId};
41use crate::stream::SplitAssignment;
42use crate::telemetry::MetaTelemetryJobDesc;
43
44#[derive(Clone)]
45pub struct MetadataManager {
46    pub cluster_controller: ClusterControllerRef,
47    pub catalog_controller: CatalogControllerRef,
48}
49
50#[derive(Debug)]
51pub(crate) enum ActiveStreamingWorkerChange {
52    Add(WorkerNode),
53    Remove(WorkerNode),
54    Update(WorkerNode),
55}
56
57pub struct ActiveStreamingWorkerNodes {
58    worker_nodes: HashMap<WorkerId, WorkerNode>,
59    rx: UnboundedReceiver<LocalNotification>,
60    #[cfg_attr(not(debug_assertions), expect(dead_code))]
61    meta_manager: Option<MetadataManager>,
62}
63
64impl Debug for ActiveStreamingWorkerNodes {
65    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
66        f.debug_struct("ActiveStreamingWorkerNodes")
67            .field("worker_nodes", &self.worker_nodes)
68            .finish()
69    }
70}
71
72impl ActiveStreamingWorkerNodes {
73    pub(crate) fn uninitialized() -> Self {
74        Self {
75            worker_nodes: Default::default(),
76            rx: unbounded_channel().1,
77            meta_manager: None,
78        }
79    }
80
81    #[cfg(test)]
82    pub(crate) fn for_test(worker_nodes: HashMap<WorkerId, WorkerNode>) -> Self {
83        let (tx, rx) = unbounded_channel();
84        let _join_handle = tokio::spawn(async move {
85            let _tx = tx;
86            std::future::pending::<()>().await
87        });
88        Self {
89            worker_nodes,
90            rx,
91            meta_manager: None,
92        }
93    }
94
95    /// Return an uninitialized one as a placeholder for future initialized
96    pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
97        let (nodes, rx) = meta_manager
98            .subscribe_active_streaming_compute_nodes()
99            .await?;
100        Ok(Self {
101            worker_nodes: nodes.into_iter().map(|node| (node.id, node)).collect(),
102            rx,
103            meta_manager: Some(meta_manager),
104        })
105    }
106
107    pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
108        &self.worker_nodes
109    }
110
111    pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
112        loop {
113            let notification = self
114                .rx
115                .recv()
116                .await
117                .expect("notification stopped or uninitialized");
118            match notification {
119                LocalNotification::WorkerNodeDeleted(worker) => {
120                    let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32
121                        && worker.property.as_ref().unwrap().is_streaming;
122                    let Some(prev_worker) = self.worker_nodes.remove(&worker.id) else {
123                        if is_streaming_compute_node {
124                            warn!(
125                                ?worker,
126                                "notify to delete an non-existing streaming compute worker"
127                            );
128                        }
129                        continue;
130                    };
131                    if !is_streaming_compute_node {
132                        warn!(
133                            ?worker,
134                            ?prev_worker,
135                            "deleted worker has a different recent type"
136                        );
137                    }
138                    if worker.state == State::Starting as i32 {
139                        warn!(
140                            id = %worker.id,
141                            host = ?worker.host,
142                            state = worker.state,
143                            "a starting streaming worker is deleted"
144                        );
145                    }
146                    break ActiveStreamingWorkerChange::Remove(prev_worker);
147                }
148                LocalNotification::WorkerNodeActivated(worker) => {
149                    if worker.r#type != WorkerType::ComputeNode as i32
150                        || !worker.property.as_ref().unwrap().is_streaming
151                    {
152                        if let Some(prev_worker) = self.worker_nodes.remove(&worker.id) {
153                            warn!(
154                                ?worker,
155                                ?prev_worker,
156                                "the type of a streaming worker is changed"
157                            );
158                            break ActiveStreamingWorkerChange::Remove(prev_worker);
159                        } else {
160                            continue;
161                        }
162                    }
163                    assert_eq!(
164                        worker.state,
165                        State::Running as i32,
166                        "not started worker added: {:?}",
167                        worker
168                    );
169                    if let Some(prev_worker) = self.worker_nodes.insert(worker.id, worker.clone()) {
170                        assert_eq!(prev_worker.host, worker.host);
171                        assert_eq!(prev_worker.r#type, worker.r#type);
172                        warn!(
173                            ?prev_worker,
174                            ?worker,
175                            eq = prev_worker == worker,
176                            "notify to update an existing active worker"
177                        );
178                        if prev_worker == worker {
179                            continue;
180                        } else {
181                            break ActiveStreamingWorkerChange::Update(worker);
182                        }
183                    } else {
184                        break ActiveStreamingWorkerChange::Add(worker);
185                    }
186                }
187                _ => {
188                    continue;
189                }
190            }
191        }
192    }
193
194    #[cfg(debug_assertions)]
195    pub(crate) async fn validate_change(&self) {
196        use risingwave_pb::common::WorkerNode;
197        use thiserror_ext::AsReport;
198        let Some(meta_manager) = &self.meta_manager else {
199            return;
200        };
201        match meta_manager.list_active_streaming_compute_nodes().await {
202            Ok(worker_nodes) => {
203                let ignore_irrelevant_info = |node: &WorkerNode| {
204                    (
205                        node.id,
206                        WorkerNode {
207                            id: node.id,
208                            r#type: node.r#type,
209                            host: node.host.clone(),
210                            property: node.property.clone(),
211                            resource: node.resource.clone(),
212                            ..Default::default()
213                        },
214                    )
215                };
216                let worker_nodes: HashMap<_, _> =
217                    worker_nodes.iter().map(ignore_irrelevant_info).collect();
218                let curr_worker_nodes: HashMap<_, _> = self
219                    .current()
220                    .values()
221                    .map(ignore_irrelevant_info)
222                    .collect();
223                if worker_nodes != curr_worker_nodes {
224                    warn!(
225                        ?worker_nodes,
226                        ?curr_worker_nodes,
227                        "different to global snapshot"
228                    );
229                }
230            }
231            Err(e) => {
232                warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
233            }
234        }
235    }
236}
237
238impl MetadataManager {
239    pub fn new(
240        cluster_controller: ClusterControllerRef,
241        catalog_controller: CatalogControllerRef,
242    ) -> Self {
243        Self {
244            cluster_controller,
245            catalog_controller,
246        }
247    }
248
249    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
250        self.cluster_controller.get_worker_by_id(worker_id).await
251    }
252
253    pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
254        let node_map = self.cluster_controller.count_worker_by_type().await?;
255        Ok(node_map
256            .into_iter()
257            .map(|(ty, cnt)| (ty.into(), cnt as u64))
258            .collect())
259    }
260
261    pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
262        self.cluster_controller
263            .get_worker_info_by_id(worker_id as _)
264            .await
265    }
266
267    pub async fn add_worker_node(
268        &self,
269        r#type: PbWorkerType,
270        host_address: HostAddress,
271        property: AddNodeProperty,
272        resource: PbResource,
273    ) -> MetaResult<WorkerId> {
274        self.cluster_controller
275            .add_worker(r#type, host_address, property, resource)
276            .await
277            .map(|id| id as WorkerId)
278    }
279
280    pub async fn list_worker_node(
281        &self,
282        worker_type: Option<WorkerType>,
283        worker_state: Option<State>,
284    ) -> MetaResult<Vec<PbWorkerNode>> {
285        self.cluster_controller
286            .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
287            .await
288    }
289
290    pub async fn subscribe_active_streaming_compute_nodes(
291        &self,
292    ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
293        self.cluster_controller
294            .subscribe_active_streaming_compute_nodes()
295            .await
296    }
297
298    pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
299        self.cluster_controller
300            .list_active_streaming_workers()
301            .await
302    }
303
304    pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
305        self.cluster_controller.list_active_serving_workers().await
306    }
307
308    pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
309        Ok(self
310            .catalog_controller
311            .list_fragment_database_ids(None)
312            .await?
313            .into_iter()
314            .map(|(_, database_id)| database_id)
315            .collect())
316    }
317
318    pub async fn split_fragment_map_by_database<T: Debug>(
319        &self,
320        fragment_map: HashMap<FragmentId, T>,
321    ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
322        let fragment_to_database_map: HashMap<_, _> = self
323            .catalog_controller
324            .list_fragment_database_ids(Some(
325                fragment_map
326                    .keys()
327                    .map(|fragment_id| *fragment_id as _)
328                    .collect(),
329            ))
330            .await?
331            .into_iter()
332            .map(|(fragment_id, database_id)| (fragment_id as FragmentId, database_id))
333            .collect();
334        let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
335        for (fragment_id, value) in fragment_map {
336            let database_id = *fragment_to_database_map
337                .get(&fragment_id)
338                .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
339            ret.entry(database_id)
340                .or_default()
341                .try_insert(fragment_id, value)
342                .expect("non duplicate");
343        }
344        Ok(ret)
345    }
346
347    pub async fn list_background_creating_jobs(&self) -> MetaResult<Vec<JobId>> {
348        let jobs = self
349            .catalog_controller
350            .list_background_creating_jobs(false, None)
351            .await?;
352
353        let jobs = jobs.into_iter().map(|(id, _, _)| id).collect();
354
355        Ok(jobs)
356    }
357
358    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
359        self.catalog_controller.list_sources().await
360    }
361
362    pub fn running_fragment_parallelisms(
363        &self,
364        id_filter: Option<HashSet<FragmentId>>,
365    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
366        let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
367        Ok(self
368            .catalog_controller
369            .running_fragment_parallelisms(id_filter)?
370            .into_iter()
371            .map(|(k, v)| (k as FragmentId, v))
372            .collect())
373    }
374
375    /// Get and filter the "**root**" fragments of the specified relations.
376    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
377    ///
378    /// See also [`crate::controller::catalog::CatalogController::get_root_fragments`].
379    pub async fn get_upstream_root_fragments(
380        &self,
381        upstream_table_ids: &HashSet<TableId>,
382    ) -> MetaResult<(
383        HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
384        HashMap<ActorId, WorkerId>,
385    )> {
386        let (upstream_root_fragments, actors) = self
387            .catalog_controller
388            .get_root_fragments(upstream_table_ids.iter().map(|id| id.as_job_id()).collect())
389            .await?;
390
391        Ok((upstream_root_fragments, actors))
392    }
393
394    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
395        self.cluster_controller.get_streaming_cluster_info().await
396    }
397
398    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
399        self.catalog_controller.get_all_table_options().await
400    }
401
402    pub async fn get_table_name_type_mapping(
403        &self,
404    ) -> MetaResult<HashMap<TableId, (String, String)>> {
405        self.catalog_controller.get_table_name_type_mapping().await
406    }
407
408    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
409        self.catalog_controller.get_created_table_ids().await
410    }
411
412    pub async fn get_table_associated_source_id(
413        &self,
414        table_id: TableId,
415    ) -> MetaResult<Option<SourceId>> {
416        self.catalog_controller
417            .get_table_associated_source_id(table_id)
418            .await
419    }
420
421    pub async fn get_table_catalog_by_ids(&self, ids: &[TableId]) -> MetaResult<Vec<PbTable>> {
422        self.catalog_controller
423            .get_table_by_ids(ids.to_vec(), false)
424            .await
425    }
426
427    pub async fn get_table_incoming_sinks(&self, table_id: TableId) -> MetaResult<Vec<PbSink>> {
428        self.catalog_controller
429            .get_table_incoming_sinks(table_id)
430            .await
431    }
432
433    pub async fn list_refresh_jobs(&self) -> MetaResult<Vec<refresh_job::Model>> {
434        self.catalog_controller.list_refresh_jobs().await
435    }
436
437    pub async fn list_refreshable_table_ids(&self) -> MetaResult<Vec<TableId>> {
438        self.catalog_controller.list_refreshable_table_ids().await
439    }
440
441    pub async fn ensure_refresh_job(&self, table_id: TableId) -> MetaResult<()> {
442        self.catalog_controller.ensure_refresh_job(table_id).await
443    }
444
445    pub async fn update_refresh_job_status(
446        &self,
447        table_id: TableId,
448        status: RefreshState,
449        trigger_time: Option<DateTime>,
450        is_success: bool,
451    ) -> MetaResult<()> {
452        self.catalog_controller
453            .update_refresh_job_status(table_id, status, trigger_time, is_success)
454            .await
455    }
456
457    pub async fn reset_all_refresh_jobs_to_idle(&self) -> MetaResult<()> {
458        self.catalog_controller
459            .reset_all_refresh_jobs_to_idle()
460            .await
461    }
462
463    pub async fn update_refresh_job_interval(
464        &self,
465        table_id: TableId,
466        trigger_interval_secs: Option<i64>,
467    ) -> MetaResult<()> {
468        self.catalog_controller
469            .update_refresh_job_interval(table_id, trigger_interval_secs)
470            .await
471    }
472
473    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
474        self.catalog_controller
475            .get_sink_state_table_ids(sink_id)
476            .await
477    }
478
479    pub async fn get_table_catalog_by_cdc_table_id(
480        &self,
481        cdc_table_id: &String,
482    ) -> MetaResult<Vec<PbTable>> {
483        self.catalog_controller
484            .get_table_by_cdc_table_id(cdc_table_id)
485            .await
486    }
487
488    pub async fn get_downstream_fragments(
489        &self,
490        job_id: JobId,
491    ) -> MetaResult<(
492        Vec<(PbDispatcherType, SharedFragmentInfo, PbStreamNode)>,
493        HashMap<ActorId, WorkerId>,
494    )> {
495        let (fragments, actors) = self
496            .catalog_controller
497            .get_downstream_fragments(job_id)
498            .await?;
499
500        Ok((fragments, actors))
501    }
502
503    pub async fn get_job_id_to_internal_table_ids_mapping(
504        &self,
505    ) -> Option<Vec<(JobId, Vec<TableId>)>> {
506        self.catalog_controller.get_job_internal_table_ids().await
507    }
508
509    pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
510        self.catalog_controller
511            .get_job_fragments_by_id(job_id)
512            .await
513    }
514
515    pub fn get_running_actors_of_fragment(&self, id: FragmentId) -> MetaResult<HashSet<ActorId>> {
516        self.catalog_controller
517            .get_running_actors_of_fragment(id as _)
518    }
519
520    // (backfill_actor_id, upstream_source_actor_id)
521    pub async fn get_running_actors_for_source_backfill(
522        &self,
523        source_backfill_fragment_id: FragmentId,
524        source_fragment_id: FragmentId,
525    ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
526        let actor_ids = self
527            .catalog_controller
528            .get_running_actors_for_source_backfill(
529                source_backfill_fragment_id as _,
530                source_fragment_id as _,
531            )
532            .await?;
533        Ok(actor_ids
534            .into_iter()
535            .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
536            .collect())
537    }
538
539    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
540        let actor_cnt = self.catalog_controller.worker_actor_count()?;
541        Ok(actor_cnt
542            .into_iter()
543            .map(|(id, cnt)| (id as WorkerId, cnt))
544            .collect())
545    }
546
547    pub async fn count_streaming_job(&self) -> MetaResult<usize> {
548        self.catalog_controller
549            .list_streaming_job_infos()
550            .await
551            .map(|x| x.len())
552    }
553
554    pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
555        self.catalog_controller
556            .list_stream_job_desc_for_telemetry()
557            .await
558    }
559
560    pub async fn update_source_rate_limit_by_source_id(
561        &self,
562        source_id: SourceId,
563        rate_limit: Option<u32>,
564    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
565        let fragment_actors = self
566            .catalog_controller
567            .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
568            .await?;
569        Ok(fragment_actors
570            .into_iter()
571            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
572            .collect())
573    }
574
575    pub async fn update_backfill_rate_limit_by_job_id(
576        &self,
577        job_id: JobId,
578        rate_limit: Option<u32>,
579    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
580        let fragment_actors = self
581            .catalog_controller
582            .update_backfill_rate_limit_by_job_id(job_id, rate_limit)
583            .await?;
584        Ok(fragment_actors
585            .into_iter()
586            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
587            .collect())
588    }
589
590    pub async fn update_sink_rate_limit_by_sink_id(
591        &self,
592        sink_id: SinkId,
593        rate_limit: Option<u32>,
594    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
595        let fragment_actors = self
596            .catalog_controller
597            .update_sink_rate_limit_by_job_id(sink_id, rate_limit)
598            .await?;
599        Ok(fragment_actors
600            .into_iter()
601            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
602            .collect())
603    }
604
605    pub async fn update_dml_rate_limit_by_job_id(
606        &self,
607        job_id: JobId,
608        rate_limit: Option<u32>,
609    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
610        let fragment_actors = self
611            .catalog_controller
612            .update_dml_rate_limit_by_job_id(job_id, rate_limit)
613            .await?;
614        Ok(fragment_actors
615            .into_iter()
616            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
617            .collect())
618    }
619
620    pub async fn update_sink_props_by_sink_id(
621        &self,
622        sink_id: SinkId,
623        props: BTreeMap<String, String>,
624    ) -> MetaResult<HashMap<String, String>> {
625        let new_props = self
626            .catalog_controller
627            .update_sink_props_by_sink_id(sink_id, props)
628            .await?;
629        Ok(new_props)
630    }
631
632    pub async fn update_iceberg_table_props_by_table_id(
633        &self,
634        table_id: TableId,
635        props: BTreeMap<String, String>,
636        alter_iceberg_table_props: Option<
637            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
638        >,
639    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
640        let (new_props, sink_id) = self
641            .catalog_controller
642            .update_iceberg_table_props_by_table_id(table_id, props, alter_iceberg_table_props)
643            .await?;
644        Ok((new_props, sink_id))
645    }
646
647    pub async fn update_fragment_rate_limit_by_fragment_id(
648        &self,
649        fragment_id: FragmentId,
650        rate_limit: Option<u32>,
651    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
652        let fragment_actors = self
653            .catalog_controller
654            .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
655            .await?;
656        Ok(fragment_actors
657            .into_iter()
658            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
659            .collect())
660    }
661
662    #[await_tree::instrument]
663    pub async fn update_fragment_splits(
664        &self,
665        split_assignment: &SplitAssignment,
666    ) -> MetaResult<()> {
667        let fragment_splits = split_assignment
668            .iter()
669            .map(|(fragment_id, splits)| {
670                (
671                    *fragment_id as _,
672                    splits.values().flatten().cloned().collect_vec(),
673                )
674            })
675            .collect();
676
677        let inner = self.catalog_controller.inner.write().await;
678
679        self.catalog_controller
680            .update_fragment_splits(&inner.db, &fragment_splits)
681            .await
682    }
683
684    pub async fn get_mv_depended_subscriptions(
685        &self,
686        database_id: Option<DatabaseId>,
687    ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
688        Ok(self
689            .catalog_controller
690            .get_mv_depended_subscriptions(database_id)
691            .await?
692            .into_iter()
693            .map(|(table_id, subscriptions)| {
694                (
695                    table_id,
696                    subscriptions
697                        .into_iter()
698                        .map(|(subscription_id, retention_time)| {
699                            (subscription_id as SubscriptionId, retention_time)
700                        })
701                        .collect(),
702                )
703            })
704            .collect())
705    }
706
707    pub async fn get_job_max_parallelism(&self, job_id: JobId) -> MetaResult<usize> {
708        self.catalog_controller
709            .get_max_parallelism_by_id(job_id)
710            .await
711    }
712
713    pub async fn get_existing_job_resource_group(
714        &self,
715        streaming_job_id: JobId,
716    ) -> MetaResult<String> {
717        self.catalog_controller
718            .get_existing_job_resource_group(streaming_job_id)
719            .await
720    }
721
722    pub async fn get_database_resource_group(&self, database_id: DatabaseId) -> MetaResult<String> {
723        self.catalog_controller
724            .get_database_resource_group(database_id)
725            .await
726    }
727
728    pub fn cluster_id(&self) -> &ClusterId {
729        self.cluster_controller.cluster_id()
730    }
731
732    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
733        let rate_limits = self.catalog_controller.list_rate_limits().await?;
734        Ok(rate_limits)
735    }
736
737    pub async fn get_job_backfill_scan_types(
738        &self,
739        job_id: JobId,
740    ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
741        let backfill_types = self
742            .catalog_controller
743            .get_job_fragment_backfill_scan_type(job_id)
744            .await?;
745        Ok(backfill_types)
746    }
747}
748
749impl MetadataManager {
750    /// Wait for job finishing notification in `TrackingJob::finish`.
751    /// The progress is updated per barrier.
752    #[await_tree::instrument]
753    pub async fn wait_streaming_job_finished(
754        &self,
755        database_id: DatabaseId,
756        id: JobId,
757    ) -> MetaResult<NotificationVersion> {
758        tracing::debug!("wait_streaming_job_finished: {id:?}");
759        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
760        if mgr.streaming_job_is_finished(id).await? {
761            return Ok(self.catalog_controller.current_notification_version().await);
762        }
763        let (tx, rx) = oneshot::channel();
764
765        mgr.register_finish_notifier(database_id, id, tx);
766        drop(mgr);
767        rx.await
768            .map_err(|_| "no received reason".to_owned())
769            .and_then(|result| result)
770            .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
771    }
772
773    pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
774        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
775        mgr.notify_finish_failed(database_id, err);
776    }
777}