risingwave_meta/manager/
metadata.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17use std::pin::pin;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use futures::future::{Either, select};
22use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
23use risingwave_meta_model::{ObjectId, SinkId, SourceId, WorkerId};
24use risingwave_pb::catalog::{PbSink, PbSource, PbTable};
25use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
26use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
27use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
28use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamScanType};
29use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
30use tokio::sync::oneshot;
31use tokio::time::{Instant, sleep};
32use tracing::warn;
33
34use crate::MetaResult;
35use crate::barrier::Reschedule;
36use crate::controller::catalog::CatalogControllerRef;
37use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
38use crate::controller::fragment::FragmentParallelismInfo;
39use crate::manager::{LocalNotification, NotificationVersion};
40use crate::model::{
41    ActorId, ClusterId, Fragment, FragmentId, StreamActor, StreamJobFragments, SubscriptionId,
42};
43use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
44use crate::telemetry::MetaTelemetryJobDesc;
45
46#[derive(Clone)]
47pub struct MetadataManager {
48    pub cluster_controller: ClusterControllerRef,
49    pub catalog_controller: CatalogControllerRef,
50}
51
52#[derive(Debug)]
53pub(crate) enum ActiveStreamingWorkerChange {
54    Add(WorkerNode),
55    #[expect(dead_code)]
56    Remove(WorkerNode),
57    Update(WorkerNode),
58}
59
60pub struct ActiveStreamingWorkerNodes {
61    worker_nodes: HashMap<WorkerId, WorkerNode>,
62    rx: UnboundedReceiver<LocalNotification>,
63    #[cfg_attr(not(debug_assertions), expect(dead_code))]
64    meta_manager: MetadataManager,
65}
66
67impl Debug for ActiveStreamingWorkerNodes {
68    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("ActiveStreamingWorkerNodes")
70            .field("worker_nodes", &self.worker_nodes)
71            .finish()
72    }
73}
74
75impl ActiveStreamingWorkerNodes {
76    pub(crate) fn uninitialized(meta_manager: MetadataManager) -> Self {
77        Self {
78            worker_nodes: Default::default(),
79            rx: unbounded_channel().1,
80            meta_manager,
81        }
82    }
83
84    /// Return an uninitialized one as a placeholder for future initialized
85    pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
86        let (nodes, rx) = meta_manager
87            .subscribe_active_streaming_compute_nodes()
88            .await?;
89        Ok(Self {
90            worker_nodes: nodes.into_iter().map(|node| (node.id as _, node)).collect(),
91            rx,
92            meta_manager,
93        })
94    }
95
96    pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
97        &self.worker_nodes
98    }
99
100    pub(crate) async fn wait_changed(
101        &mut self,
102        verbose_internal: Duration,
103        verbose_timeout: Duration,
104        verbose_fn: impl Fn(&Self),
105    ) -> Option<ActiveStreamingWorkerChange> {
106        let start = Instant::now();
107        loop {
108            if let Either::Left((change, _)) =
109                select(pin!(self.changed()), pin!(sleep(verbose_internal))).await
110            {
111                break Some(change);
112            }
113
114            if start.elapsed() > verbose_timeout {
115                break None;
116            }
117
118            verbose_fn(self)
119        }
120    }
121
122    pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
123        loop {
124            let notification = self
125                .rx
126                .recv()
127                .await
128                .expect("notification stopped or uninitialized");
129            match notification {
130                LocalNotification::WorkerNodeDeleted(worker) => {
131                    let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32
132                        && worker.property.as_ref().unwrap().is_streaming;
133                    let Some(prev_worker) = self.worker_nodes.remove(&(worker.id as _)) else {
134                        if is_streaming_compute_node {
135                            warn!(
136                                ?worker,
137                                "notify to delete an non-existing streaming compute worker"
138                            );
139                        }
140                        continue;
141                    };
142                    if !is_streaming_compute_node {
143                        warn!(
144                            ?worker,
145                            ?prev_worker,
146                            "deleted worker has a different recent type"
147                        );
148                    }
149                    if worker.state == State::Starting as i32 {
150                        warn!(
151                            id = worker.id,
152                            host = ?worker.host,
153                            state = worker.state,
154                            "a starting streaming worker is deleted"
155                        );
156                    }
157                    break ActiveStreamingWorkerChange::Remove(prev_worker);
158                }
159                LocalNotification::WorkerNodeActivated(worker) => {
160                    if worker.r#type != WorkerType::ComputeNode as i32
161                        || !worker.property.as_ref().unwrap().is_streaming
162                    {
163                        if let Some(prev_worker) = self.worker_nodes.remove(&(worker.id as _)) {
164                            warn!(
165                                ?worker,
166                                ?prev_worker,
167                                "the type of a streaming worker is changed"
168                            );
169                            break ActiveStreamingWorkerChange::Remove(prev_worker);
170                        } else {
171                            continue;
172                        }
173                    }
174                    assert_eq!(
175                        worker.state,
176                        State::Running as i32,
177                        "not started worker added: {:?}",
178                        worker
179                    );
180                    if let Some(prev_worker) =
181                        self.worker_nodes.insert(worker.id as _, worker.clone())
182                    {
183                        assert_eq!(prev_worker.host, worker.host);
184                        assert_eq!(prev_worker.r#type, worker.r#type);
185                        warn!(
186                            ?prev_worker,
187                            ?worker,
188                            eq = prev_worker == worker,
189                            "notify to update an existing active worker"
190                        );
191                        if prev_worker == worker {
192                            continue;
193                        } else {
194                            break ActiveStreamingWorkerChange::Update(worker);
195                        }
196                    } else {
197                        break ActiveStreamingWorkerChange::Add(worker);
198                    }
199                }
200                _ => {
201                    continue;
202                }
203            }
204        }
205    }
206
207    #[cfg(debug_assertions)]
208    pub(crate) async fn validate_change(&self) {
209        use risingwave_pb::common::WorkerNode;
210        use thiserror_ext::AsReport;
211        match self
212            .meta_manager
213            .list_active_streaming_compute_nodes()
214            .await
215        {
216            Ok(worker_nodes) => {
217                let ignore_irrelevant_info = |node: &WorkerNode| {
218                    (
219                        node.id,
220                        WorkerNode {
221                            id: node.id,
222                            r#type: node.r#type,
223                            host: node.host.clone(),
224                            property: node.property.clone(),
225                            resource: node.resource.clone(),
226                            ..Default::default()
227                        },
228                    )
229                };
230                let worker_nodes: HashMap<_, _> =
231                    worker_nodes.iter().map(ignore_irrelevant_info).collect();
232                let curr_worker_nodes: HashMap<_, _> = self
233                    .current()
234                    .values()
235                    .map(ignore_irrelevant_info)
236                    .collect();
237                if worker_nodes != curr_worker_nodes {
238                    warn!(
239                        ?worker_nodes,
240                        ?curr_worker_nodes,
241                        "different to global snapshot"
242                    );
243                }
244            }
245            Err(e) => {
246                warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
247            }
248        }
249    }
250}
251
252impl MetadataManager {
253    pub fn new(
254        cluster_controller: ClusterControllerRef,
255        catalog_controller: CatalogControllerRef,
256    ) -> Self {
257        Self {
258            cluster_controller,
259            catalog_controller,
260        }
261    }
262
263    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
264        self.cluster_controller.get_worker_by_id(worker_id).await
265    }
266
267    pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
268        let node_map = self.cluster_controller.count_worker_by_type().await?;
269        Ok(node_map
270            .into_iter()
271            .map(|(ty, cnt)| (ty.into(), cnt as u64))
272            .collect())
273    }
274
275    pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
276        self.cluster_controller
277            .get_worker_info_by_id(worker_id as _)
278            .await
279    }
280
281    pub async fn add_worker_node(
282        &self,
283        r#type: PbWorkerType,
284        host_address: HostAddress,
285        property: AddNodeProperty,
286        resource: PbResource,
287    ) -> MetaResult<WorkerId> {
288        self.cluster_controller
289            .add_worker(r#type, host_address, property, resource)
290            .await
291            .map(|id| id as WorkerId)
292    }
293
294    pub async fn list_worker_node(
295        &self,
296        worker_type: Option<WorkerType>,
297        worker_state: Option<State>,
298    ) -> MetaResult<Vec<PbWorkerNode>> {
299        self.cluster_controller
300            .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
301            .await
302    }
303
304    pub async fn subscribe_active_streaming_compute_nodes(
305        &self,
306    ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
307        self.cluster_controller
308            .subscribe_active_streaming_compute_nodes()
309            .await
310    }
311
312    pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
313        self.cluster_controller
314            .list_active_streaming_workers()
315            .await
316    }
317
318    pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
319        self.cluster_controller.list_active_serving_workers().await
320    }
321
322    pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
323        Ok(self
324            .catalog_controller
325            .list_fragment_database_ids(None)
326            .await?
327            .into_iter()
328            .map(|(_, database_id)| DatabaseId::new(database_id as _))
329            .collect())
330    }
331
332    pub async fn split_fragment_map_by_database<T: Debug>(
333        &self,
334        fragment_map: HashMap<FragmentId, T>,
335    ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
336        let fragment_to_database_map: HashMap<_, _> = self
337            .catalog_controller
338            .list_fragment_database_ids(Some(
339                fragment_map
340                    .keys()
341                    .map(|fragment_id| *fragment_id as _)
342                    .collect(),
343            ))
344            .await?
345            .into_iter()
346            .map(|(fragment_id, database_id)| {
347                (fragment_id as FragmentId, DatabaseId::new(database_id as _))
348            })
349            .collect();
350        let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
351        for (fragment_id, value) in fragment_map {
352            let database_id = *fragment_to_database_map
353                .get(&fragment_id)
354                .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
355            ret.entry(database_id)
356                .or_default()
357                .try_insert(fragment_id, value)
358                .expect("non duplicate");
359        }
360        Ok(ret)
361    }
362
363    pub async fn list_background_creating_jobs(&self) -> MetaResult<Vec<TableId>> {
364        let tables = self
365            .catalog_controller
366            .list_background_creating_mviews(false)
367            .await?;
368
369        Ok(tables
370            .into_iter()
371            .map(|table| TableId::from(table.table_id as u32))
372            .collect())
373    }
374
375    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
376        self.catalog_controller.list_sources().await
377    }
378
379    pub async fn post_apply_reschedules(
380        &self,
381        reschedules: HashMap<FragmentId, Reschedule>,
382        post_updates: &JobReschedulePostUpdates,
383    ) -> MetaResult<()> {
384        // temp convert u32 to i32
385        let reschedules = reschedules.into_iter().map(|(k, v)| (k as _, v)).collect();
386
387        self.catalog_controller
388            .post_apply_reschedules(reschedules, post_updates)
389            .await
390    }
391
392    pub async fn running_fragment_parallelisms(
393        &self,
394        id_filter: Option<HashSet<FragmentId>>,
395    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
396        let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
397        Ok(self
398            .catalog_controller
399            .running_fragment_parallelisms(id_filter)
400            .await?
401            .into_iter()
402            .map(|(k, v)| (k as FragmentId, v))
403            .collect())
404    }
405
406    /// Get and filter the "**root**" fragments of the specified relations.
407    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
408    ///
409    /// See also [`crate::controller::catalog::CatalogController::get_root_fragments`].
410    pub async fn get_upstream_root_fragments(
411        &self,
412        upstream_table_ids: &HashSet<TableId>,
413    ) -> MetaResult<(HashMap<TableId, Fragment>, HashMap<ActorId, WorkerId>)> {
414        let (upstream_root_fragments, actors) = self
415            .catalog_controller
416            .get_root_fragments(
417                upstream_table_ids
418                    .iter()
419                    .map(|id| id.table_id as _)
420                    .collect(),
421            )
422            .await?;
423
424        let actors = actors
425            .into_iter()
426            .map(|(actor, worker)| (actor as u32, worker))
427            .collect();
428
429        Ok((
430            upstream_root_fragments
431                .into_iter()
432                .map(|(id, fragment)| ((id as u32).into(), fragment))
433                .collect(),
434            actors,
435        ))
436    }
437
438    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
439        self.cluster_controller.get_streaming_cluster_info().await
440    }
441
442    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<u32, TableOption>> {
443        self.catalog_controller
444            .get_all_table_options()
445            .await
446            .map(|tops| tops.into_iter().map(|(id, opt)| (id as u32, opt)).collect())
447    }
448
449    pub async fn get_table_name_type_mapping(&self) -> MetaResult<HashMap<u32, (String, String)>> {
450        let mappings = self
451            .catalog_controller
452            .get_table_name_type_mapping()
453            .await?;
454        Ok(mappings
455            .into_iter()
456            .map(|(id, value)| (id as u32, value))
457            .collect())
458    }
459
460    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<u32>> {
461        let table_ids = self.catalog_controller.get_created_table_ids().await?;
462        Ok(table_ids.into_iter().map(|id| id as u32).collect())
463    }
464
465    pub async fn get_table_associated_source_id(
466        &self,
467        table_id: u32,
468    ) -> MetaResult<Option<SourceId>> {
469        self.catalog_controller
470            .get_table_associated_source_id(table_id as _)
471            .await
472    }
473
474    pub async fn get_table_catalog_by_ids(&self, ids: Vec<u32>) -> MetaResult<Vec<PbTable>> {
475        self.catalog_controller
476            .get_table_by_ids(ids.into_iter().map(|id| id as _).collect(), false)
477            .await
478    }
479
480    pub async fn get_sink_catalog_by_ids(&self, ids: &[u32]) -> MetaResult<Vec<PbSink>> {
481        self.catalog_controller
482            .get_sink_by_ids(ids.iter().map(|id| *id as _).collect())
483            .await
484    }
485
486    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
487        Ok(self
488            .catalog_controller
489            .get_sink_state_table_ids(sink_id)
490            .await?
491            .into_iter()
492            .map(|id| (id as u32).into())
493            .collect())
494    }
495
496    pub async fn get_table_catalog_by_cdc_table_id(
497        &self,
498        cdc_table_id: &String,
499    ) -> MetaResult<Vec<PbTable>> {
500        self.catalog_controller
501            .get_table_by_cdc_table_id(cdc_table_id)
502            .await
503    }
504
505    pub async fn get_downstream_fragments(
506        &self,
507        job_id: u32,
508    ) -> MetaResult<(
509        Vec<(PbDispatchStrategy, Fragment)>,
510        HashMap<ActorId, WorkerId>,
511    )> {
512        let (fragments, actors) = self
513            .catalog_controller
514            .get_downstream_fragments(job_id as _)
515            .await?;
516
517        let actors = actors
518            .into_iter()
519            .map(|(actor, worker)| (actor as u32, worker))
520            .collect();
521
522        Ok((fragments, actors))
523    }
524
525    pub async fn get_worker_actor_ids(
526        &self,
527        job_ids: HashSet<TableId>,
528    ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
529        let worker_actors = self
530            .catalog_controller
531            .get_worker_actor_ids(job_ids.into_iter().map(|id| id.table_id as _).collect())
532            .await?;
533        Ok(worker_actors
534            .into_iter()
535            .map(|(id, actors)| {
536                (
537                    id as WorkerId,
538                    actors.into_iter().map(|id| id as ActorId).collect(),
539                )
540            })
541            .collect())
542    }
543
544    pub async fn get_job_id_to_internal_table_ids_mapping(&self) -> Option<Vec<(u32, Vec<u32>)>> {
545        let job_internal_table_ids = self.catalog_controller.get_job_internal_table_ids().await;
546        job_internal_table_ids.map(|ids| {
547            ids.into_iter()
548                .map(|(id, internal_ids)| {
549                    (
550                        id as u32,
551                        internal_ids.into_iter().map(|id| id as u32).collect(),
552                    )
553                })
554                .collect()
555        })
556    }
557
558    pub async fn get_job_fragments_by_id(
559        &self,
560        job_id: &TableId,
561    ) -> MetaResult<StreamJobFragments> {
562        self.catalog_controller
563            .get_job_fragments_by_id(job_id.table_id as _)
564            .await
565    }
566
567    pub async fn get_running_actors_of_fragment(
568        &self,
569        id: FragmentId,
570    ) -> MetaResult<HashSet<ActorId>> {
571        let actor_ids = self
572            .catalog_controller
573            .get_running_actors_of_fragment(id as _)
574            .await?;
575        Ok(actor_ids.into_iter().map(|id| id as ActorId).collect())
576    }
577
578    // (backfill_actor_id, upstream_source_actor_id)
579    pub async fn get_running_actors_for_source_backfill(
580        &self,
581        source_backfill_fragment_id: FragmentId,
582        source_fragment_id: FragmentId,
583    ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
584        let actor_ids = self
585            .catalog_controller
586            .get_running_actors_for_source_backfill(
587                source_backfill_fragment_id as _,
588                source_fragment_id as _,
589            )
590            .await?;
591        Ok(actor_ids
592            .into_iter()
593            .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
594            .collect())
595    }
596
597    pub async fn get_job_fragments_by_ids(
598        &self,
599        ids: &[TableId],
600    ) -> MetaResult<Vec<StreamJobFragments>> {
601        let mut table_fragments = vec![];
602        for id in ids {
603            table_fragments.push(
604                self.catalog_controller
605                    .get_job_fragments_by_id(id.table_id as _)
606                    .await?,
607            );
608        }
609        Ok(table_fragments)
610    }
611
612    pub async fn all_active_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
613        let table_fragments = self.catalog_controller.table_fragments().await?;
614        let mut actor_maps = HashMap::new();
615        for (_, tf) in table_fragments {
616            for actor in tf.active_actors() {
617                actor_maps
618                    .try_insert(actor.actor_id, actor)
619                    .expect("non duplicate");
620            }
621        }
622        Ok(actor_maps)
623    }
624
625    pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
626        let actor_cnt = self.catalog_controller.worker_actor_count().await?;
627        Ok(actor_cnt
628            .into_iter()
629            .map(|(id, cnt)| (id as WorkerId, cnt))
630            .collect())
631    }
632
633    pub async fn count_streaming_job(&self) -> MetaResult<usize> {
634        self.catalog_controller
635            .list_streaming_job_infos()
636            .await
637            .map(|x| x.len())
638    }
639
640    pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
641        self.catalog_controller
642            .list_stream_job_desc_for_telemetry()
643            .await
644    }
645
646    pub async fn update_source_rate_limit_by_source_id(
647        &self,
648        source_id: SourceId,
649        rate_limit: Option<u32>,
650    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
651        let fragment_actors = self
652            .catalog_controller
653            .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
654            .await?;
655        Ok(fragment_actors
656            .into_iter()
657            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
658            .collect())
659    }
660
661    pub async fn update_backfill_rate_limit_by_table_id(
662        &self,
663        table_id: TableId,
664        rate_limit: Option<u32>,
665    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
666        let fragment_actors = self
667            .catalog_controller
668            .update_backfill_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
669            .await?;
670        Ok(fragment_actors
671            .into_iter()
672            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
673            .collect())
674    }
675
676    pub async fn update_sink_rate_limit_by_sink_id(
677        &self,
678        sink_id: SinkId,
679        rate_limit: Option<u32>,
680    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
681        let fragment_actors = self
682            .catalog_controller
683            .update_sink_rate_limit_by_job_id(sink_id as _, rate_limit)
684            .await?;
685        Ok(fragment_actors
686            .into_iter()
687            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
688            .collect())
689    }
690
691    pub async fn update_dml_rate_limit_by_table_id(
692        &self,
693        table_id: TableId,
694        rate_limit: Option<u32>,
695    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
696        let fragment_actors = self
697            .catalog_controller
698            .update_dml_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
699            .await?;
700        Ok(fragment_actors
701            .into_iter()
702            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
703            .collect())
704    }
705
706    pub async fn update_fragment_rate_limit_by_fragment_id(
707        &self,
708        fragment_id: FragmentId,
709        rate_limit: Option<u32>,
710    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
711        let fragment_actors = self
712            .catalog_controller
713            .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
714            .await?;
715        Ok(fragment_actors
716            .into_iter()
717            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
718            .collect())
719    }
720
721    pub async fn update_actor_splits_by_split_assignment(
722        &self,
723        split_assignment: &SplitAssignment,
724    ) -> MetaResult<()> {
725        self.catalog_controller
726            .update_actor_splits(split_assignment)
727            .await
728    }
729
730    pub async fn get_mv_depended_subscriptions(
731        &self,
732        database_id: Option<DatabaseId>,
733    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<SubscriptionId, u64>>>> {
734        let database_id = database_id.map(|database_id| database_id.database_id as _);
735        Ok(self
736            .catalog_controller
737            .get_mv_depended_subscriptions(database_id)
738            .await?
739            .into_iter()
740            .map(|(loaded_database_id, mv_depended_subscriptions)| {
741                if let Some(database_id) = database_id {
742                    assert_eq!(loaded_database_id, database_id);
743                }
744                (
745                    DatabaseId::new(loaded_database_id as _),
746                    mv_depended_subscriptions
747                        .into_iter()
748                        .map(|(table_id, subscriptions)| {
749                            (
750                                TableId::new(table_id as _),
751                                subscriptions
752                                    .into_iter()
753                                    .map(|(subscription_id, retention_time)| {
754                                        (subscription_id as SubscriptionId, retention_time)
755                                    })
756                                    .collect(),
757                            )
758                        })
759                        .collect(),
760                )
761            })
762            .collect())
763    }
764
765    pub async fn get_job_max_parallelism(&self, table_id: TableId) -> MetaResult<usize> {
766        self.catalog_controller
767            .get_max_parallelism_by_id(table_id.table_id as _)
768            .await
769    }
770
771    pub async fn get_existing_job_resource_group(
772        &self,
773        streaming_job_id: ObjectId,
774    ) -> MetaResult<String> {
775        self.catalog_controller
776            .get_existing_job_resource_group(streaming_job_id)
777            .await
778    }
779
780    pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
781        self.catalog_controller
782            .get_database_resource_group(database_id)
783            .await
784    }
785
786    pub fn cluster_id(&self) -> &ClusterId {
787        self.cluster_controller.cluster_id()
788    }
789
790    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
791        let rate_limits = self.catalog_controller.list_rate_limits().await?;
792        Ok(rate_limits)
793    }
794
795    pub async fn get_job_backfill_scan_types(
796        &self,
797        job_id: &TableId,
798    ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
799        let backfill_types = self
800            .catalog_controller
801            .get_job_fragment_backfill_scan_type(job_id.table_id as _)
802            .await?;
803        Ok(backfill_types)
804    }
805}
806
807impl MetadataManager {
808    /// Wait for job finishing notification in `TrackingJob::finish`.
809    /// The progress is updated per barrier.
810    pub(crate) async fn wait_streaming_job_finished(
811        &self,
812        database_id: DatabaseId,
813        id: ObjectId,
814    ) -> MetaResult<NotificationVersion> {
815        tracing::debug!("wait_streaming_job_finished: {id:?}");
816        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
817        if mgr.streaming_job_is_finished(id).await? {
818            return Ok(self.catalog_controller.current_notification_version().await);
819        }
820        let (tx, rx) = oneshot::channel();
821
822        mgr.register_finish_notifier(database_id.database_id as _, id, tx);
823        drop(mgr);
824        rx.await
825            .map_err(|_| "no received reason".to_owned())
826            .and_then(|result| result)
827            .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
828    }
829
830    pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
831        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
832        mgr.notify_finish_failed(database_id.map(|id| id.database_id as _), err);
833    }
834}