risingwave_meta/manager/
metadata.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
21use risingwave_meta_model::{ObjectId, SinkId, SourceId, WorkerId};
22use risingwave_pb::catalog::{PbSink, PbSource, PbTable};
23use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
24use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
25use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
26use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode, PbStreamScanType};
27use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
28use tokio::sync::oneshot;
29use tracing::warn;
30
31use crate::MetaResult;
32use crate::barrier::SharedFragmentInfo;
33use crate::controller::catalog::CatalogControllerRef;
34use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
35use crate::controller::fragment::FragmentParallelismInfo;
36use crate::manager::{LocalNotification, NotificationVersion};
37use crate::model::{ActorId, ClusterId, FragmentId, StreamJobFragments, SubscriptionId};
38use crate::stream::SplitAssignment;
39use crate::telemetry::MetaTelemetryJobDesc;
40
41#[derive(Clone)]
42pub struct MetadataManager {
43    pub cluster_controller: ClusterControllerRef,
44    pub catalog_controller: CatalogControllerRef,
45}
46
47#[derive(Debug)]
48pub(crate) enum ActiveStreamingWorkerChange {
49    Add(WorkerNode),
50    Remove(WorkerNode),
51    Update(WorkerNode),
52}
53
54pub struct ActiveStreamingWorkerNodes {
55    worker_nodes: HashMap<WorkerId, WorkerNode>,
56    rx: UnboundedReceiver<LocalNotification>,
57    #[cfg_attr(not(debug_assertions), expect(dead_code))]
58    meta_manager: Option<MetadataManager>,
59}
60
61impl Debug for ActiveStreamingWorkerNodes {
62    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63        f.debug_struct("ActiveStreamingWorkerNodes")
64            .field("worker_nodes", &self.worker_nodes)
65            .finish()
66    }
67}
68
69impl ActiveStreamingWorkerNodes {
70    pub(crate) fn uninitialized() -> Self {
71        Self {
72            worker_nodes: Default::default(),
73            rx: unbounded_channel().1,
74            meta_manager: None,
75        }
76    }
77
78    #[cfg(test)]
79    pub(crate) fn for_test(worker_nodes: HashMap<WorkerId, WorkerNode>) -> Self {
80        let (tx, rx) = unbounded_channel();
81        let _join_handle = tokio::spawn(async move {
82            let _tx = tx;
83            std::future::pending::<()>().await
84        });
85        Self {
86            worker_nodes,
87            rx,
88            meta_manager: None,
89        }
90    }
91
92    /// Return an uninitialized one as a placeholder for future initialized
93    pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
94        let (nodes, rx) = meta_manager
95            .subscribe_active_streaming_compute_nodes()
96            .await?;
97        Ok(Self {
98            worker_nodes: nodes.into_iter().map(|node| (node.id as _, node)).collect(),
99            rx,
100            meta_manager: Some(meta_manager),
101        })
102    }
103
104    pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
105        &self.worker_nodes
106    }
107
108    pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
109        loop {
110            let notification = self
111                .rx
112                .recv()
113                .await
114                .expect("notification stopped or uninitialized");
115            match notification {
116                LocalNotification::WorkerNodeDeleted(worker) => {
117                    let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32
118                        && worker.property.as_ref().unwrap().is_streaming;
119                    let Some(prev_worker) = self.worker_nodes.remove(&(worker.id as _)) else {
120                        if is_streaming_compute_node {
121                            warn!(
122                                ?worker,
123                                "notify to delete an non-existing streaming compute worker"
124                            );
125                        }
126                        continue;
127                    };
128                    if !is_streaming_compute_node {
129                        warn!(
130                            ?worker,
131                            ?prev_worker,
132                            "deleted worker has a different recent type"
133                        );
134                    }
135                    if worker.state == State::Starting as i32 {
136                        warn!(
137                            id = worker.id,
138                            host = ?worker.host,
139                            state = worker.state,
140                            "a starting streaming worker is deleted"
141                        );
142                    }
143                    break ActiveStreamingWorkerChange::Remove(prev_worker);
144                }
145                LocalNotification::WorkerNodeActivated(worker) => {
146                    if worker.r#type != WorkerType::ComputeNode as i32
147                        || !worker.property.as_ref().unwrap().is_streaming
148                    {
149                        if let Some(prev_worker) = self.worker_nodes.remove(&(worker.id as _)) {
150                            warn!(
151                                ?worker,
152                                ?prev_worker,
153                                "the type of a streaming worker is changed"
154                            );
155                            break ActiveStreamingWorkerChange::Remove(prev_worker);
156                        } else {
157                            continue;
158                        }
159                    }
160                    assert_eq!(
161                        worker.state,
162                        State::Running as i32,
163                        "not started worker added: {:?}",
164                        worker
165                    );
166                    if let Some(prev_worker) =
167                        self.worker_nodes.insert(worker.id as _, worker.clone())
168                    {
169                        assert_eq!(prev_worker.host, worker.host);
170                        assert_eq!(prev_worker.r#type, worker.r#type);
171                        warn!(
172                            ?prev_worker,
173                            ?worker,
174                            eq = prev_worker == worker,
175                            "notify to update an existing active worker"
176                        );
177                        if prev_worker == worker {
178                            continue;
179                        } else {
180                            break ActiveStreamingWorkerChange::Update(worker);
181                        }
182                    } else {
183                        break ActiveStreamingWorkerChange::Add(worker);
184                    }
185                }
186                _ => {
187                    continue;
188                }
189            }
190        }
191    }
192
193    #[cfg(debug_assertions)]
194    pub(crate) async fn validate_change(&self) {
195        use risingwave_pb::common::WorkerNode;
196        use thiserror_ext::AsReport;
197        let Some(meta_manager) = &self.meta_manager else {
198            return;
199        };
200        match meta_manager.list_active_streaming_compute_nodes().await {
201            Ok(worker_nodes) => {
202                let ignore_irrelevant_info = |node: &WorkerNode| {
203                    (
204                        node.id,
205                        WorkerNode {
206                            id: node.id,
207                            r#type: node.r#type,
208                            host: node.host.clone(),
209                            property: node.property.clone(),
210                            resource: node.resource.clone(),
211                            ..Default::default()
212                        },
213                    )
214                };
215                let worker_nodes: HashMap<_, _> =
216                    worker_nodes.iter().map(ignore_irrelevant_info).collect();
217                let curr_worker_nodes: HashMap<_, _> = self
218                    .current()
219                    .values()
220                    .map(ignore_irrelevant_info)
221                    .collect();
222                if worker_nodes != curr_worker_nodes {
223                    warn!(
224                        ?worker_nodes,
225                        ?curr_worker_nodes,
226                        "different to global snapshot"
227                    );
228                }
229            }
230            Err(e) => {
231                warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
232            }
233        }
234    }
235}
236
237impl MetadataManager {
238    pub fn new(
239        cluster_controller: ClusterControllerRef,
240        catalog_controller: CatalogControllerRef,
241    ) -> Self {
242        Self {
243            cluster_controller,
244            catalog_controller,
245        }
246    }
247
248    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
249        self.cluster_controller.get_worker_by_id(worker_id).await
250    }
251
252    pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
253        let node_map = self.cluster_controller.count_worker_by_type().await?;
254        Ok(node_map
255            .into_iter()
256            .map(|(ty, cnt)| (ty.into(), cnt as u64))
257            .collect())
258    }
259
260    pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
261        self.cluster_controller
262            .get_worker_info_by_id(worker_id as _)
263            .await
264    }
265
266    pub async fn add_worker_node(
267        &self,
268        r#type: PbWorkerType,
269        host_address: HostAddress,
270        property: AddNodeProperty,
271        resource: PbResource,
272    ) -> MetaResult<WorkerId> {
273        self.cluster_controller
274            .add_worker(r#type, host_address, property, resource)
275            .await
276            .map(|id| id as WorkerId)
277    }
278
279    pub async fn list_worker_node(
280        &self,
281        worker_type: Option<WorkerType>,
282        worker_state: Option<State>,
283    ) -> MetaResult<Vec<PbWorkerNode>> {
284        self.cluster_controller
285            .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
286            .await
287    }
288
289    pub async fn subscribe_active_streaming_compute_nodes(
290        &self,
291    ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
292        self.cluster_controller
293            .subscribe_active_streaming_compute_nodes()
294            .await
295    }
296
297    pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
298        self.cluster_controller
299            .list_active_streaming_workers()
300            .await
301    }
302
303    pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
304        self.cluster_controller.list_active_serving_workers().await
305    }
306
307    pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
308        Ok(self
309            .catalog_controller
310            .list_fragment_database_ids(None)
311            .await?
312            .into_iter()
313            .map(|(_, database_id)| DatabaseId::new(database_id as _))
314            .collect())
315    }
316
317    pub async fn split_fragment_map_by_database<T: Debug>(
318        &self,
319        fragment_map: HashMap<FragmentId, T>,
320    ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
321        let fragment_to_database_map: HashMap<_, _> = self
322            .catalog_controller
323            .list_fragment_database_ids(Some(
324                fragment_map
325                    .keys()
326                    .map(|fragment_id| *fragment_id as _)
327                    .collect(),
328            ))
329            .await?
330            .into_iter()
331            .map(|(fragment_id, database_id)| {
332                (fragment_id as FragmentId, DatabaseId::new(database_id as _))
333            })
334            .collect();
335        let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
336        for (fragment_id, value) in fragment_map {
337            let database_id = *fragment_to_database_map
338                .get(&fragment_id)
339                .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
340            ret.entry(database_id)
341                .or_default()
342                .try_insert(fragment_id, value)
343                .expect("non duplicate");
344        }
345        Ok(ret)
346    }
347
348    pub async fn list_background_creating_jobs(&self) -> MetaResult<Vec<ObjectId>> {
349        let jobs = self
350            .catalog_controller
351            .list_background_creating_jobs(false, None)
352            .await?;
353
354        let jobs = jobs.into_iter().map(|(id, _, _)| id).collect();
355
356        Ok(jobs)
357    }
358
359    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
360        self.catalog_controller.list_sources().await
361    }
362
363    pub fn running_fragment_parallelisms(
364        &self,
365        id_filter: Option<HashSet<FragmentId>>,
366    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
367        let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
368        Ok(self
369            .catalog_controller
370            .running_fragment_parallelisms(id_filter)?
371            .into_iter()
372            .map(|(k, v)| (k as FragmentId, v))
373            .collect())
374    }
375
376    /// Get and filter the "**root**" fragments of the specified relations.
377    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
378    ///
379    /// See also [`crate::controller::catalog::CatalogController::get_root_fragments`].
380    pub async fn get_upstream_root_fragments(
381        &self,
382        upstream_table_ids: &HashSet<TableId>,
383    ) -> MetaResult<(
384        HashMap<TableId, (SharedFragmentInfo, PbStreamNode)>,
385        HashMap<ActorId, WorkerId>,
386    )> {
387        let (upstream_root_fragments, actors) = self
388            .catalog_controller
389            .get_root_fragments(
390                upstream_table_ids
391                    .iter()
392                    .map(|id| id.table_id as _)
393                    .collect(),
394            )
395            .await?;
396
397        let actors = actors
398            .into_iter()
399            .map(|(actor, worker)| (actor as u32, worker))
400            .collect();
401
402        Ok((
403            upstream_root_fragments
404                .into_iter()
405                .map(|(id, fragment)| ((id as u32).into(), fragment))
406                .collect(),
407            actors,
408        ))
409    }
410
411    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
412        self.cluster_controller.get_streaming_cluster_info().await
413    }
414
415    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<u32, TableOption>> {
416        self.catalog_controller
417            .get_all_table_options()
418            .await
419            .map(|tops| tops.into_iter().map(|(id, opt)| (id as u32, opt)).collect())
420    }
421
422    pub async fn get_table_name_type_mapping(&self) -> MetaResult<HashMap<u32, (String, String)>> {
423        let mappings = self
424            .catalog_controller
425            .get_table_name_type_mapping()
426            .await?;
427        Ok(mappings
428            .into_iter()
429            .map(|(id, value)| (id as u32, value))
430            .collect())
431    }
432
433    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<u32>> {
434        let table_ids = self.catalog_controller.get_created_table_ids().await?;
435        Ok(table_ids.into_iter().map(|id| id as u32).collect())
436    }
437
438    pub async fn get_table_associated_source_id(
439        &self,
440        table_id: u32,
441    ) -> MetaResult<Option<SourceId>> {
442        self.catalog_controller
443            .get_table_associated_source_id(table_id as _)
444            .await
445    }
446
447    pub async fn get_table_catalog_by_ids(&self, ids: &[u32]) -> MetaResult<Vec<PbTable>> {
448        self.catalog_controller
449            .get_table_by_ids(ids.iter().map(|id| *id as _).collect(), false)
450            .await
451    }
452
453    pub async fn get_table_incoming_sinks(&self, table_id: u32) -> MetaResult<Vec<PbSink>> {
454        self.catalog_controller
455            .get_table_incoming_sinks(table_id as _)
456            .await
457    }
458
459    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
460        Ok(self
461            .catalog_controller
462            .get_sink_state_table_ids(sink_id)
463            .await?
464            .into_iter()
465            .map(|id| (id as u32).into())
466            .collect())
467    }
468
469    pub async fn get_table_catalog_by_cdc_table_id(
470        &self,
471        cdc_table_id: &String,
472    ) -> MetaResult<Vec<PbTable>> {
473        self.catalog_controller
474            .get_table_by_cdc_table_id(cdc_table_id)
475            .await
476    }
477
478    pub async fn get_downstream_fragments(
479        &self,
480        job_id: u32,
481    ) -> MetaResult<(
482        Vec<(PbDispatcherType, SharedFragmentInfo, PbStreamNode)>,
483        HashMap<ActorId, WorkerId>,
484    )> {
485        let (fragments, actors) = self
486            .catalog_controller
487            .get_downstream_fragments(job_id as _)
488            .await?;
489
490        let actors = actors
491            .into_iter()
492            .map(|(actor, worker)| (actor as u32, worker))
493            .collect();
494
495        Ok((fragments, actors))
496    }
497
498    pub async fn get_job_id_to_internal_table_ids_mapping(&self) -> Option<Vec<(u32, Vec<u32>)>> {
499        let job_internal_table_ids = self.catalog_controller.get_job_internal_table_ids().await;
500        job_internal_table_ids.map(|ids| {
501            ids.into_iter()
502                .map(|(id, internal_ids)| {
503                    (
504                        id as u32,
505                        internal_ids.into_iter().map(|id| id as u32).collect(),
506                    )
507                })
508                .collect()
509        })
510    }
511
512    pub async fn get_job_fragments_by_id(
513        &self,
514        job_id: &TableId,
515    ) -> MetaResult<StreamJobFragments> {
516        self.catalog_controller
517            .get_job_fragments_by_id(job_id.table_id as _)
518            .await
519    }
520
521    pub fn get_running_actors_of_fragment(&self, id: FragmentId) -> MetaResult<HashSet<ActorId>> {
522        self.catalog_controller
523            .get_running_actors_of_fragment(id as _)
524    }
525
526    // (backfill_actor_id, upstream_source_actor_id)
527    pub async fn get_running_actors_for_source_backfill(
528        &self,
529        source_backfill_fragment_id: FragmentId,
530        source_fragment_id: FragmentId,
531    ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
532        let actor_ids = self
533            .catalog_controller
534            .get_running_actors_for_source_backfill(
535                source_backfill_fragment_id as _,
536                source_fragment_id as _,
537            )
538            .await?;
539        Ok(actor_ids
540            .into_iter()
541            .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
542            .collect())
543    }
544
545    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
546        let actor_cnt = self.catalog_controller.worker_actor_count()?;
547        Ok(actor_cnt
548            .into_iter()
549            .map(|(id, cnt)| (id as WorkerId, cnt))
550            .collect())
551    }
552
553    pub async fn count_streaming_job(&self) -> MetaResult<usize> {
554        self.catalog_controller
555            .list_streaming_job_infos()
556            .await
557            .map(|x| x.len())
558    }
559
560    pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
561        self.catalog_controller
562            .list_stream_job_desc_for_telemetry()
563            .await
564    }
565
566    pub async fn update_source_rate_limit_by_source_id(
567        &self,
568        source_id: SourceId,
569        rate_limit: Option<u32>,
570    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
571        let fragment_actors = self
572            .catalog_controller
573            .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
574            .await?;
575        Ok(fragment_actors
576            .into_iter()
577            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
578            .collect())
579    }
580
581    pub async fn update_backfill_rate_limit_by_table_id(
582        &self,
583        table_id: TableId,
584        rate_limit: Option<u32>,
585    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
586        let fragment_actors = self
587            .catalog_controller
588            .update_backfill_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
589            .await?;
590        Ok(fragment_actors
591            .into_iter()
592            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
593            .collect())
594    }
595
596    pub async fn update_sink_rate_limit_by_sink_id(
597        &self,
598        sink_id: SinkId,
599        rate_limit: Option<u32>,
600    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
601        let fragment_actors = self
602            .catalog_controller
603            .update_sink_rate_limit_by_job_id(sink_id as _, rate_limit)
604            .await?;
605        Ok(fragment_actors
606            .into_iter()
607            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
608            .collect())
609    }
610
611    pub async fn update_dml_rate_limit_by_table_id(
612        &self,
613        table_id: TableId,
614        rate_limit: Option<u32>,
615    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
616        let fragment_actors = self
617            .catalog_controller
618            .update_dml_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
619            .await?;
620        Ok(fragment_actors
621            .into_iter()
622            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
623            .collect())
624    }
625
626    pub async fn update_sink_props_by_sink_id(
627        &self,
628        sink_id: SinkId,
629        props: BTreeMap<String, String>,
630    ) -> MetaResult<HashMap<String, String>> {
631        let new_props = self
632            .catalog_controller
633            .update_sink_props_by_sink_id(sink_id, props)
634            .await?;
635        Ok(new_props)
636    }
637
638    pub async fn update_iceberg_table_props_by_table_id(
639        &self,
640        table_id: TableId,
641        props: BTreeMap<String, String>,
642        alter_iceberg_table_props: Option<
643            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
644        >,
645    ) -> MetaResult<(HashMap<String, String>, u32)> {
646        let (new_props, sink_id) = self
647            .catalog_controller
648            .update_iceberg_table_props_by_table_id(
649                table_id.table_id as _,
650                props,
651                alter_iceberg_table_props,
652            )
653            .await?;
654        Ok((new_props, sink_id))
655    }
656
657    pub async fn update_fragment_rate_limit_by_fragment_id(
658        &self,
659        fragment_id: FragmentId,
660        rate_limit: Option<u32>,
661    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
662        let fragment_actors = self
663            .catalog_controller
664            .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
665            .await?;
666        Ok(fragment_actors
667            .into_iter()
668            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
669            .collect())
670    }
671
672    #[await_tree::instrument]
673    pub async fn update_fragment_splits(
674        &self,
675        split_assignment: &SplitAssignment,
676    ) -> MetaResult<()> {
677        let fragment_splits = split_assignment
678            .iter()
679            .map(|(fragment_id, splits)| {
680                (
681                    *fragment_id as _,
682                    splits.values().flatten().cloned().collect_vec(),
683                )
684            })
685            .collect();
686
687        let inner = self.catalog_controller.inner.write().await;
688
689        self.catalog_controller
690            .update_fragment_splits(&inner.db, &fragment_splits)
691            .await
692    }
693
694    pub async fn get_mv_depended_subscriptions(
695        &self,
696        database_id: Option<DatabaseId>,
697    ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
698        let database_id = database_id.map(|database_id| database_id.database_id as _);
699        Ok(self
700            .catalog_controller
701            .get_mv_depended_subscriptions(database_id)
702            .await?
703            .into_iter()
704            .map(|(table_id, subscriptions)| {
705                (
706                    TableId::new(table_id as _),
707                    subscriptions
708                        .into_iter()
709                        .map(|(subscription_id, retention_time)| {
710                            (subscription_id as SubscriptionId, retention_time)
711                        })
712                        .collect(),
713                )
714            })
715            .collect())
716    }
717
718    pub async fn get_job_max_parallelism(&self, table_id: TableId) -> MetaResult<usize> {
719        self.catalog_controller
720            .get_max_parallelism_by_id(table_id.table_id as _)
721            .await
722    }
723
724    pub async fn get_existing_job_resource_group(
725        &self,
726        streaming_job_id: ObjectId,
727    ) -> MetaResult<String> {
728        self.catalog_controller
729            .get_existing_job_resource_group(streaming_job_id)
730            .await
731    }
732
733    pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
734        self.catalog_controller
735            .get_database_resource_group(database_id)
736            .await
737    }
738
739    pub fn cluster_id(&self) -> &ClusterId {
740        self.cluster_controller.cluster_id()
741    }
742
743    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
744        let rate_limits = self.catalog_controller.list_rate_limits().await?;
745        Ok(rate_limits)
746    }
747
748    pub async fn get_job_backfill_scan_types(
749        &self,
750        job_id: ObjectId,
751    ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
752        let backfill_types = self
753            .catalog_controller
754            .get_job_fragment_backfill_scan_type(job_id)
755            .await?;
756        Ok(backfill_types)
757    }
758}
759
760impl MetadataManager {
761    /// Wait for job finishing notification in `TrackingJob::finish`.
762    /// The progress is updated per barrier.
763    #[await_tree::instrument]
764    pub(crate) async fn wait_streaming_job_finished(
765        &self,
766        database_id: DatabaseId,
767        id: ObjectId,
768    ) -> MetaResult<NotificationVersion> {
769        tracing::debug!("wait_streaming_job_finished: {id:?}");
770        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
771        if mgr.streaming_job_is_finished(id).await? {
772            return Ok(self.catalog_controller.current_notification_version().await);
773        }
774        let (tx, rx) = oneshot::channel();
775
776        mgr.register_finish_notifier(database_id.database_id as _, id, tx);
777        drop(mgr);
778        rx.await
779            .map_err(|_| "no received reason".to_owned())
780            .and_then(|result| result)
781            .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
782    }
783
784    pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
785        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
786        mgr.notify_finish_failed(database_id.map(|id| id.database_id as _), err);
787    }
788
789    pub(crate) async fn notify_cancelled(&self, database_id: DatabaseId, id: ObjectId) {
790        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
791        mgr.notify_cancelled(database_id.database_id as _, id);
792    }
793}