risingwave_meta/manager/
metadata.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17use std::pin::pin;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use futures::future::{Either, select};
22use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
23use risingwave_meta_model::{ObjectId, SinkId, SourceId, WorkerId};
24use risingwave_pb::catalog::{PbSink, PbSource, PbTable};
25use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
26use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
27use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
28use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamScanType};
29use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
30use tokio::sync::oneshot;
31use tokio::time::{Instant, sleep};
32use tracing::warn;
33
34use crate::MetaResult;
35use crate::barrier::Reschedule;
36use crate::controller::catalog::CatalogControllerRef;
37use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
38use crate::controller::fragment::FragmentParallelismInfo;
39use crate::manager::{LocalNotification, NotificationVersion};
40use crate::model::{
41    ActorId, ClusterId, Fragment, FragmentId, StreamActor, StreamJobFragments, SubscriptionId,
42};
43use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
44use crate::telemetry::MetaTelemetryJobDesc;
45
46#[derive(Clone)]
47pub struct MetadataManager {
48    pub cluster_controller: ClusterControllerRef,
49    pub catalog_controller: CatalogControllerRef,
50}
51
52#[derive(Debug)]
53pub(crate) enum ActiveStreamingWorkerChange {
54    Add(WorkerNode),
55    #[expect(dead_code)]
56    Remove(WorkerNode),
57    Update(WorkerNode),
58}
59
60pub struct ActiveStreamingWorkerNodes {
61    worker_nodes: HashMap<WorkerId, WorkerNode>,
62    rx: UnboundedReceiver<LocalNotification>,
63    #[cfg_attr(not(debug_assertions), expect(dead_code))]
64    meta_manager: Option<MetadataManager>,
65}
66
67impl Debug for ActiveStreamingWorkerNodes {
68    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("ActiveStreamingWorkerNodes")
70            .field("worker_nodes", &self.worker_nodes)
71            .finish()
72    }
73}
74
75impl ActiveStreamingWorkerNodes {
76    pub(crate) fn uninitialized() -> Self {
77        Self {
78            worker_nodes: Default::default(),
79            rx: unbounded_channel().1,
80            meta_manager: None,
81        }
82    }
83
84    #[cfg(test)]
85    pub(crate) fn for_test(worker_nodes: HashMap<WorkerId, WorkerNode>) -> Self {
86        let (tx, rx) = unbounded_channel();
87        let _join_handle = tokio::spawn(async move {
88            let _tx = tx;
89            std::future::pending::<()>().await
90        });
91        Self {
92            worker_nodes,
93            rx,
94            meta_manager: None,
95        }
96    }
97
98    /// Return an uninitialized one as a placeholder for future initialized
99    pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
100        let (nodes, rx) = meta_manager
101            .subscribe_active_streaming_compute_nodes()
102            .await?;
103        Ok(Self {
104            worker_nodes: nodes.into_iter().map(|node| (node.id as _, node)).collect(),
105            rx,
106            meta_manager: Some(meta_manager),
107        })
108    }
109
110    pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
111        &self.worker_nodes
112    }
113
114    pub(crate) async fn wait_changed(
115        &mut self,
116        verbose_internal: Duration,
117        verbose_timeout: Duration,
118        verbose_fn: impl Fn(&Self),
119    ) -> Option<ActiveStreamingWorkerChange> {
120        let start = Instant::now();
121        loop {
122            if let Either::Left((change, _)) =
123                select(pin!(self.changed()), pin!(sleep(verbose_internal))).await
124            {
125                break Some(change);
126            }
127
128            if start.elapsed() > verbose_timeout {
129                break None;
130            }
131
132            verbose_fn(self)
133        }
134    }
135
136    pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
137        loop {
138            let notification = self
139                .rx
140                .recv()
141                .await
142                .expect("notification stopped or uninitialized");
143            match notification {
144                LocalNotification::WorkerNodeDeleted(worker) => {
145                    let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32
146                        && worker.property.as_ref().unwrap().is_streaming;
147                    let Some(prev_worker) = self.worker_nodes.remove(&(worker.id as _)) else {
148                        if is_streaming_compute_node {
149                            warn!(
150                                ?worker,
151                                "notify to delete an non-existing streaming compute worker"
152                            );
153                        }
154                        continue;
155                    };
156                    if !is_streaming_compute_node {
157                        warn!(
158                            ?worker,
159                            ?prev_worker,
160                            "deleted worker has a different recent type"
161                        );
162                    }
163                    if worker.state == State::Starting as i32 {
164                        warn!(
165                            id = worker.id,
166                            host = ?worker.host,
167                            state = worker.state,
168                            "a starting streaming worker is deleted"
169                        );
170                    }
171                    break ActiveStreamingWorkerChange::Remove(prev_worker);
172                }
173                LocalNotification::WorkerNodeActivated(worker) => {
174                    if worker.r#type != WorkerType::ComputeNode as i32
175                        || !worker.property.as_ref().unwrap().is_streaming
176                    {
177                        if let Some(prev_worker) = self.worker_nodes.remove(&(worker.id as _)) {
178                            warn!(
179                                ?worker,
180                                ?prev_worker,
181                                "the type of a streaming worker is changed"
182                            );
183                            break ActiveStreamingWorkerChange::Remove(prev_worker);
184                        } else {
185                            continue;
186                        }
187                    }
188                    assert_eq!(
189                        worker.state,
190                        State::Running as i32,
191                        "not started worker added: {:?}",
192                        worker
193                    );
194                    if let Some(prev_worker) =
195                        self.worker_nodes.insert(worker.id as _, worker.clone())
196                    {
197                        assert_eq!(prev_worker.host, worker.host);
198                        assert_eq!(prev_worker.r#type, worker.r#type);
199                        warn!(
200                            ?prev_worker,
201                            ?worker,
202                            eq = prev_worker == worker,
203                            "notify to update an existing active worker"
204                        );
205                        if prev_worker == worker {
206                            continue;
207                        } else {
208                            break ActiveStreamingWorkerChange::Update(worker);
209                        }
210                    } else {
211                        break ActiveStreamingWorkerChange::Add(worker);
212                    }
213                }
214                _ => {
215                    continue;
216                }
217            }
218        }
219    }
220
221    #[cfg(debug_assertions)]
222    pub(crate) async fn validate_change(&self) {
223        use risingwave_pb::common::WorkerNode;
224        use thiserror_ext::AsReport;
225        let Some(meta_manager) = &self.meta_manager else {
226            return;
227        };
228        match meta_manager.list_active_streaming_compute_nodes().await {
229            Ok(worker_nodes) => {
230                let ignore_irrelevant_info = |node: &WorkerNode| {
231                    (
232                        node.id,
233                        WorkerNode {
234                            id: node.id,
235                            r#type: node.r#type,
236                            host: node.host.clone(),
237                            property: node.property.clone(),
238                            resource: node.resource.clone(),
239                            ..Default::default()
240                        },
241                    )
242                };
243                let worker_nodes: HashMap<_, _> =
244                    worker_nodes.iter().map(ignore_irrelevant_info).collect();
245                let curr_worker_nodes: HashMap<_, _> = self
246                    .current()
247                    .values()
248                    .map(ignore_irrelevant_info)
249                    .collect();
250                if worker_nodes != curr_worker_nodes {
251                    warn!(
252                        ?worker_nodes,
253                        ?curr_worker_nodes,
254                        "different to global snapshot"
255                    );
256                }
257            }
258            Err(e) => {
259                warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
260            }
261        }
262    }
263}
264
265impl MetadataManager {
266    pub fn new(
267        cluster_controller: ClusterControllerRef,
268        catalog_controller: CatalogControllerRef,
269    ) -> Self {
270        Self {
271            cluster_controller,
272            catalog_controller,
273        }
274    }
275
276    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
277        self.cluster_controller.get_worker_by_id(worker_id).await
278    }
279
280    pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
281        let node_map = self.cluster_controller.count_worker_by_type().await?;
282        Ok(node_map
283            .into_iter()
284            .map(|(ty, cnt)| (ty.into(), cnt as u64))
285            .collect())
286    }
287
288    pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
289        self.cluster_controller
290            .get_worker_info_by_id(worker_id as _)
291            .await
292    }
293
294    pub async fn add_worker_node(
295        &self,
296        r#type: PbWorkerType,
297        host_address: HostAddress,
298        property: AddNodeProperty,
299        resource: PbResource,
300    ) -> MetaResult<WorkerId> {
301        self.cluster_controller
302            .add_worker(r#type, host_address, property, resource)
303            .await
304            .map(|id| id as WorkerId)
305    }
306
307    pub async fn list_worker_node(
308        &self,
309        worker_type: Option<WorkerType>,
310        worker_state: Option<State>,
311    ) -> MetaResult<Vec<PbWorkerNode>> {
312        self.cluster_controller
313            .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
314            .await
315    }
316
317    pub async fn subscribe_active_streaming_compute_nodes(
318        &self,
319    ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
320        self.cluster_controller
321            .subscribe_active_streaming_compute_nodes()
322            .await
323    }
324
325    pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
326        self.cluster_controller
327            .list_active_streaming_workers()
328            .await
329    }
330
331    pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
332        self.cluster_controller.list_active_serving_workers().await
333    }
334
335    pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
336        Ok(self
337            .catalog_controller
338            .list_fragment_database_ids(None)
339            .await?
340            .into_iter()
341            .map(|(_, database_id)| DatabaseId::new(database_id as _))
342            .collect())
343    }
344
345    pub async fn split_fragment_map_by_database<T: Debug>(
346        &self,
347        fragment_map: HashMap<FragmentId, T>,
348    ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
349        let fragment_to_database_map: HashMap<_, _> = self
350            .catalog_controller
351            .list_fragment_database_ids(Some(
352                fragment_map
353                    .keys()
354                    .map(|fragment_id| *fragment_id as _)
355                    .collect(),
356            ))
357            .await?
358            .into_iter()
359            .map(|(fragment_id, database_id)| {
360                (fragment_id as FragmentId, DatabaseId::new(database_id as _))
361            })
362            .collect();
363        let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
364        for (fragment_id, value) in fragment_map {
365            let database_id = *fragment_to_database_map
366                .get(&fragment_id)
367                .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
368            ret.entry(database_id)
369                .or_default()
370                .try_insert(fragment_id, value)
371                .expect("non duplicate");
372        }
373        Ok(ret)
374    }
375
376    pub async fn list_background_creating_jobs(&self) -> MetaResult<Vec<TableId>> {
377        let tables = self
378            .catalog_controller
379            .list_background_creating_mviews(false)
380            .await?;
381
382        Ok(tables
383            .into_iter()
384            .map(|table| TableId::from(table.table_id as u32))
385            .collect())
386    }
387
388    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
389        self.catalog_controller.list_sources().await
390    }
391
392    pub async fn post_apply_reschedules(
393        &self,
394        reschedules: HashMap<FragmentId, Reschedule>,
395        post_updates: &JobReschedulePostUpdates,
396    ) -> MetaResult<()> {
397        // temp convert u32 to i32
398        let reschedules = reschedules.into_iter().map(|(k, v)| (k as _, v)).collect();
399
400        self.catalog_controller
401            .post_apply_reschedules(reschedules, post_updates)
402            .await
403    }
404
405    pub async fn running_fragment_parallelisms(
406        &self,
407        id_filter: Option<HashSet<FragmentId>>,
408    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
409        let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
410        Ok(self
411            .catalog_controller
412            .running_fragment_parallelisms(id_filter)
413            .await?
414            .into_iter()
415            .map(|(k, v)| (k as FragmentId, v))
416            .collect())
417    }
418
419    /// Get and filter the "**root**" fragments of the specified relations.
420    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
421    ///
422    /// See also [`crate::controller::catalog::CatalogController::get_root_fragments`].
423    pub async fn get_upstream_root_fragments(
424        &self,
425        upstream_table_ids: &HashSet<TableId>,
426    ) -> MetaResult<(HashMap<TableId, Fragment>, HashMap<ActorId, WorkerId>)> {
427        let (upstream_root_fragments, actors) = self
428            .catalog_controller
429            .get_root_fragments(
430                upstream_table_ids
431                    .iter()
432                    .map(|id| id.table_id as _)
433                    .collect(),
434            )
435            .await?;
436
437        let actors = actors
438            .into_iter()
439            .map(|(actor, worker)| (actor as u32, worker))
440            .collect();
441
442        Ok((
443            upstream_root_fragments
444                .into_iter()
445                .map(|(id, fragment)| ((id as u32).into(), fragment))
446                .collect(),
447            actors,
448        ))
449    }
450
451    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
452        self.cluster_controller.get_streaming_cluster_info().await
453    }
454
455    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<u32, TableOption>> {
456        self.catalog_controller
457            .get_all_table_options()
458            .await
459            .map(|tops| tops.into_iter().map(|(id, opt)| (id as u32, opt)).collect())
460    }
461
462    pub async fn get_table_name_type_mapping(&self) -> MetaResult<HashMap<u32, (String, String)>> {
463        let mappings = self
464            .catalog_controller
465            .get_table_name_type_mapping()
466            .await?;
467        Ok(mappings
468            .into_iter()
469            .map(|(id, value)| (id as u32, value))
470            .collect())
471    }
472
473    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<u32>> {
474        let table_ids = self.catalog_controller.get_created_table_ids().await?;
475        Ok(table_ids.into_iter().map(|id| id as u32).collect())
476    }
477
478    pub async fn get_table_associated_source_id(
479        &self,
480        table_id: u32,
481    ) -> MetaResult<Option<SourceId>> {
482        self.catalog_controller
483            .get_table_associated_source_id(table_id as _)
484            .await
485    }
486
487    pub async fn get_table_catalog_by_ids(&self, ids: Vec<u32>) -> MetaResult<Vec<PbTable>> {
488        self.catalog_controller
489            .get_table_by_ids(ids.into_iter().map(|id| id as _).collect(), false)
490            .await
491    }
492
493    pub async fn get_sink_catalog_by_ids(&self, ids: &[u32]) -> MetaResult<Vec<PbSink>> {
494        self.catalog_controller
495            .get_sink_by_ids(ids.iter().map(|id| *id as _).collect())
496            .await
497    }
498
499    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
500        Ok(self
501            .catalog_controller
502            .get_sink_state_table_ids(sink_id)
503            .await?
504            .into_iter()
505            .map(|id| (id as u32).into())
506            .collect())
507    }
508
509    pub async fn get_table_catalog_by_cdc_table_id(
510        &self,
511        cdc_table_id: &String,
512    ) -> MetaResult<Vec<PbTable>> {
513        self.catalog_controller
514            .get_table_by_cdc_table_id(cdc_table_id)
515            .await
516    }
517
518    pub async fn get_downstream_fragments(
519        &self,
520        job_id: u32,
521    ) -> MetaResult<(
522        Vec<(PbDispatcherType, Fragment)>,
523        HashMap<ActorId, WorkerId>,
524    )> {
525        let (fragments, actors) = self
526            .catalog_controller
527            .get_downstream_fragments(job_id as _)
528            .await?;
529
530        let actors = actors
531            .into_iter()
532            .map(|(actor, worker)| (actor as u32, worker))
533            .collect();
534
535        Ok((fragments, actors))
536    }
537
538    pub async fn get_worker_actor_ids(
539        &self,
540        job_ids: HashSet<TableId>,
541    ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
542        let worker_actors = self
543            .catalog_controller
544            .get_worker_actor_ids(job_ids.into_iter().map(|id| id.table_id as _).collect())
545            .await?;
546        Ok(worker_actors
547            .into_iter()
548            .map(|(id, actors)| {
549                (
550                    id as WorkerId,
551                    actors.into_iter().map(|id| id as ActorId).collect(),
552                )
553            })
554            .collect())
555    }
556
557    pub async fn get_job_id_to_internal_table_ids_mapping(&self) -> Option<Vec<(u32, Vec<u32>)>> {
558        let job_internal_table_ids = self.catalog_controller.get_job_internal_table_ids().await;
559        job_internal_table_ids.map(|ids| {
560            ids.into_iter()
561                .map(|(id, internal_ids)| {
562                    (
563                        id as u32,
564                        internal_ids.into_iter().map(|id| id as u32).collect(),
565                    )
566                })
567                .collect()
568        })
569    }
570
571    pub async fn get_job_fragments_by_id(
572        &self,
573        job_id: &TableId,
574    ) -> MetaResult<StreamJobFragments> {
575        self.catalog_controller
576            .get_job_fragments_by_id(job_id.table_id as _)
577            .await
578    }
579
580    pub async fn get_running_actors_of_fragment(
581        &self,
582        id: FragmentId,
583    ) -> MetaResult<HashSet<ActorId>> {
584        let actor_ids = self
585            .catalog_controller
586            .get_running_actors_of_fragment(id as _)
587            .await?;
588        Ok(actor_ids.into_iter().map(|id| id as ActorId).collect())
589    }
590
591    // (backfill_actor_id, upstream_source_actor_id)
592    pub async fn get_running_actors_for_source_backfill(
593        &self,
594        source_backfill_fragment_id: FragmentId,
595        source_fragment_id: FragmentId,
596    ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
597        let actor_ids = self
598            .catalog_controller
599            .get_running_actors_for_source_backfill(
600                source_backfill_fragment_id as _,
601                source_fragment_id as _,
602            )
603            .await?;
604        Ok(actor_ids
605            .into_iter()
606            .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
607            .collect())
608    }
609
610    pub async fn get_job_fragments_by_ids(
611        &self,
612        ids: &[TableId],
613    ) -> MetaResult<Vec<StreamJobFragments>> {
614        let mut table_fragments = vec![];
615        for id in ids {
616            table_fragments.push(
617                self.catalog_controller
618                    .get_job_fragments_by_id(id.table_id as _)
619                    .await?,
620            );
621        }
622        Ok(table_fragments)
623    }
624
625    pub async fn all_active_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
626        let table_fragments = self.catalog_controller.table_fragments().await?;
627        let mut actor_maps = HashMap::new();
628        for (_, tf) in table_fragments {
629            for actor in tf.active_actors() {
630                actor_maps
631                    .try_insert(actor.actor_id, actor)
632                    .expect("non duplicate");
633            }
634        }
635        Ok(actor_maps)
636    }
637
638    pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
639        let actor_cnt = self.catalog_controller.worker_actor_count().await?;
640        Ok(actor_cnt
641            .into_iter()
642            .map(|(id, cnt)| (id as WorkerId, cnt))
643            .collect())
644    }
645
646    pub async fn count_streaming_job(&self) -> MetaResult<usize> {
647        self.catalog_controller
648            .list_streaming_job_infos()
649            .await
650            .map(|x| x.len())
651    }
652
653    pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
654        self.catalog_controller
655            .list_stream_job_desc_for_telemetry()
656            .await
657    }
658
659    pub async fn update_source_rate_limit_by_source_id(
660        &self,
661        source_id: SourceId,
662        rate_limit: Option<u32>,
663    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
664        let fragment_actors = self
665            .catalog_controller
666            .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
667            .await?;
668        Ok(fragment_actors
669            .into_iter()
670            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
671            .collect())
672    }
673
674    pub async fn update_backfill_rate_limit_by_table_id(
675        &self,
676        table_id: TableId,
677        rate_limit: Option<u32>,
678    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
679        let fragment_actors = self
680            .catalog_controller
681            .update_backfill_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
682            .await?;
683        Ok(fragment_actors
684            .into_iter()
685            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
686            .collect())
687    }
688
689    pub async fn update_sink_rate_limit_by_sink_id(
690        &self,
691        sink_id: SinkId,
692        rate_limit: Option<u32>,
693    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
694        let fragment_actors = self
695            .catalog_controller
696            .update_sink_rate_limit_by_job_id(sink_id as _, rate_limit)
697            .await?;
698        Ok(fragment_actors
699            .into_iter()
700            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
701            .collect())
702    }
703
704    pub async fn update_dml_rate_limit_by_table_id(
705        &self,
706        table_id: TableId,
707        rate_limit: Option<u32>,
708    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
709        let fragment_actors = self
710            .catalog_controller
711            .update_dml_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
712            .await?;
713        Ok(fragment_actors
714            .into_iter()
715            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
716            .collect())
717    }
718
719    pub async fn update_sink_props_by_sink_id(
720        &self,
721        sink_id: SinkId,
722        props: BTreeMap<String, String>,
723    ) -> MetaResult<HashMap<String, String>> {
724        let new_props = self
725            .catalog_controller
726            .update_sink_props_by_sink_id(sink_id, props)
727            .await?;
728        Ok(new_props)
729    }
730
731    pub async fn update_fragment_rate_limit_by_fragment_id(
732        &self,
733        fragment_id: FragmentId,
734        rate_limit: Option<u32>,
735    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
736        let fragment_actors = self
737            .catalog_controller
738            .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
739            .await?;
740        Ok(fragment_actors
741            .into_iter()
742            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
743            .collect())
744    }
745
746    #[await_tree::instrument]
747    pub async fn update_actor_splits_by_split_assignment(
748        &self,
749        split_assignment: &SplitAssignment,
750    ) -> MetaResult<()> {
751        self.catalog_controller
752            .update_actor_splits(split_assignment)
753            .await
754    }
755
756    pub async fn get_mv_depended_subscriptions(
757        &self,
758        database_id: Option<DatabaseId>,
759    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<SubscriptionId, u64>>>> {
760        let database_id = database_id.map(|database_id| database_id.database_id as _);
761        Ok(self
762            .catalog_controller
763            .get_mv_depended_subscriptions(database_id)
764            .await?
765            .into_iter()
766            .map(|(loaded_database_id, mv_depended_subscriptions)| {
767                if let Some(database_id) = database_id {
768                    assert_eq!(loaded_database_id, database_id);
769                }
770                (
771                    DatabaseId::new(loaded_database_id as _),
772                    mv_depended_subscriptions
773                        .into_iter()
774                        .map(|(table_id, subscriptions)| {
775                            (
776                                TableId::new(table_id as _),
777                                subscriptions
778                                    .into_iter()
779                                    .map(|(subscription_id, retention_time)| {
780                                        (subscription_id as SubscriptionId, retention_time)
781                                    })
782                                    .collect(),
783                            )
784                        })
785                        .collect(),
786                )
787            })
788            .collect())
789    }
790
791    pub async fn get_job_max_parallelism(&self, table_id: TableId) -> MetaResult<usize> {
792        self.catalog_controller
793            .get_max_parallelism_by_id(table_id.table_id as _)
794            .await
795    }
796
797    pub async fn get_existing_job_resource_group(
798        &self,
799        streaming_job_id: ObjectId,
800    ) -> MetaResult<String> {
801        self.catalog_controller
802            .get_existing_job_resource_group(streaming_job_id)
803            .await
804    }
805
806    pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
807        self.catalog_controller
808            .get_database_resource_group(database_id)
809            .await
810    }
811
812    pub fn cluster_id(&self) -> &ClusterId {
813        self.cluster_controller.cluster_id()
814    }
815
816    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
817        let rate_limits = self.catalog_controller.list_rate_limits().await?;
818        Ok(rate_limits)
819    }
820
821    pub async fn get_job_backfill_scan_types(
822        &self,
823        job_id: &TableId,
824    ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
825        let backfill_types = self
826            .catalog_controller
827            .get_job_fragment_backfill_scan_type(job_id.table_id as _)
828            .await?;
829        Ok(backfill_types)
830    }
831}
832
833impl MetadataManager {
834    /// Wait for job finishing notification in `TrackingJob::finish`.
835    /// The progress is updated per barrier.
836    #[await_tree::instrument]
837    pub(crate) async fn wait_streaming_job_finished(
838        &self,
839        database_id: DatabaseId,
840        id: ObjectId,
841    ) -> MetaResult<NotificationVersion> {
842        tracing::debug!("wait_streaming_job_finished: {id:?}");
843        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
844        if mgr.streaming_job_is_finished(id).await? {
845            return Ok(self.catalog_controller.current_notification_version().await);
846        }
847        let (tx, rx) = oneshot::channel();
848
849        mgr.register_finish_notifier(database_id.database_id as _, id, tx);
850        drop(mgr);
851        rx.await
852            .map_err(|_| "no received reason".to_owned())
853            .and_then(|result| result)
854            .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
855    }
856
857    pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
858        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
859        mgr.notify_finish_failed(database_id.map(|id| id.database_id as _), err);
860    }
861}