risingwave_meta/manager/
metadata.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17use std::pin::pin;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use futures::future::{Either, select};
22use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
23use risingwave_connector::source::SplitImpl;
24use risingwave_meta_model::{ObjectId, SinkId, SourceId, WorkerId};
25use risingwave_pb::catalog::{PbSink, PbSource, PbTable};
26use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
27use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
28use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
29use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamScanType};
30use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
31use tokio::sync::oneshot;
32use tokio::time::{Instant, sleep};
33use tracing::warn;
34
35use crate::MetaResult;
36use crate::barrier::Reschedule;
37use crate::controller::catalog::CatalogControllerRef;
38use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
39use crate::controller::fragment::FragmentParallelismInfo;
40use crate::manager::{LocalNotification, NotificationVersion};
41use crate::model::{
42    ActorId, ClusterId, Fragment, FragmentId, StreamActor, StreamJobFragments, SubscriptionId,
43};
44use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
45use crate::telemetry::MetaTelemetryJobDesc;
46
47#[derive(Clone)]
48pub struct MetadataManager {
49    pub cluster_controller: ClusterControllerRef,
50    pub catalog_controller: CatalogControllerRef,
51}
52
53#[derive(Debug)]
54pub(crate) enum ActiveStreamingWorkerChange {
55    Add(WorkerNode),
56    #[expect(dead_code)]
57    Remove(WorkerNode),
58    Update(WorkerNode),
59}
60
61pub struct ActiveStreamingWorkerNodes {
62    worker_nodes: HashMap<WorkerId, WorkerNode>,
63    rx: UnboundedReceiver<LocalNotification>,
64    #[cfg_attr(not(debug_assertions), expect(dead_code))]
65    meta_manager: Option<MetadataManager>,
66}
67
68impl Debug for ActiveStreamingWorkerNodes {
69    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70        f.debug_struct("ActiveStreamingWorkerNodes")
71            .field("worker_nodes", &self.worker_nodes)
72            .finish()
73    }
74}
75
76impl ActiveStreamingWorkerNodes {
77    pub(crate) fn uninitialized() -> Self {
78        Self {
79            worker_nodes: Default::default(),
80            rx: unbounded_channel().1,
81            meta_manager: None,
82        }
83    }
84
85    #[cfg(test)]
86    pub(crate) fn for_test(worker_nodes: HashMap<WorkerId, WorkerNode>) -> Self {
87        let (tx, rx) = unbounded_channel();
88        let _join_handle = tokio::spawn(async move {
89            let _tx = tx;
90            std::future::pending::<()>().await
91        });
92        Self {
93            worker_nodes,
94            rx,
95            meta_manager: None,
96        }
97    }
98
99    /// Return an uninitialized one as a placeholder for future initialized
100    pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
101        let (nodes, rx) = meta_manager
102            .subscribe_active_streaming_compute_nodes()
103            .await?;
104        Ok(Self {
105            worker_nodes: nodes.into_iter().map(|node| (node.id as _, node)).collect(),
106            rx,
107            meta_manager: Some(meta_manager),
108        })
109    }
110
111    pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
112        &self.worker_nodes
113    }
114
115    pub(crate) async fn wait_changed(
116        &mut self,
117        verbose_internal: Duration,
118        verbose_timeout: Duration,
119        verbose_fn: impl Fn(&Self),
120    ) -> Option<ActiveStreamingWorkerChange> {
121        let start = Instant::now();
122        loop {
123            if let Either::Left((change, _)) =
124                select(pin!(self.changed()), pin!(sleep(verbose_internal))).await
125            {
126                break Some(change);
127            }
128
129            if start.elapsed() > verbose_timeout {
130                break None;
131            }
132
133            verbose_fn(self)
134        }
135    }
136
137    pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
138        loop {
139            let notification = self
140                .rx
141                .recv()
142                .await
143                .expect("notification stopped or uninitialized");
144            match notification {
145                LocalNotification::WorkerNodeDeleted(worker) => {
146                    let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32
147                        && worker.property.as_ref().unwrap().is_streaming;
148                    let Some(prev_worker) = self.worker_nodes.remove(&(worker.id as _)) else {
149                        if is_streaming_compute_node {
150                            warn!(
151                                ?worker,
152                                "notify to delete an non-existing streaming compute worker"
153                            );
154                        }
155                        continue;
156                    };
157                    if !is_streaming_compute_node {
158                        warn!(
159                            ?worker,
160                            ?prev_worker,
161                            "deleted worker has a different recent type"
162                        );
163                    }
164                    if worker.state == State::Starting as i32 {
165                        warn!(
166                            id = worker.id,
167                            host = ?worker.host,
168                            state = worker.state,
169                            "a starting streaming worker is deleted"
170                        );
171                    }
172                    break ActiveStreamingWorkerChange::Remove(prev_worker);
173                }
174                LocalNotification::WorkerNodeActivated(worker) => {
175                    if worker.r#type != WorkerType::ComputeNode as i32
176                        || !worker.property.as_ref().unwrap().is_streaming
177                    {
178                        if let Some(prev_worker) = self.worker_nodes.remove(&(worker.id as _)) {
179                            warn!(
180                                ?worker,
181                                ?prev_worker,
182                                "the type of a streaming worker is changed"
183                            );
184                            break ActiveStreamingWorkerChange::Remove(prev_worker);
185                        } else {
186                            continue;
187                        }
188                    }
189                    assert_eq!(
190                        worker.state,
191                        State::Running as i32,
192                        "not started worker added: {:?}",
193                        worker
194                    );
195                    if let Some(prev_worker) =
196                        self.worker_nodes.insert(worker.id as _, worker.clone())
197                    {
198                        assert_eq!(prev_worker.host, worker.host);
199                        assert_eq!(prev_worker.r#type, worker.r#type);
200                        warn!(
201                            ?prev_worker,
202                            ?worker,
203                            eq = prev_worker == worker,
204                            "notify to update an existing active worker"
205                        );
206                        if prev_worker == worker {
207                            continue;
208                        } else {
209                            break ActiveStreamingWorkerChange::Update(worker);
210                        }
211                    } else {
212                        break ActiveStreamingWorkerChange::Add(worker);
213                    }
214                }
215                _ => {
216                    continue;
217                }
218            }
219        }
220    }
221
222    #[cfg(debug_assertions)]
223    pub(crate) async fn validate_change(&self) {
224        use risingwave_pb::common::WorkerNode;
225        use thiserror_ext::AsReport;
226        let Some(meta_manager) = &self.meta_manager else {
227            return;
228        };
229        match meta_manager.list_active_streaming_compute_nodes().await {
230            Ok(worker_nodes) => {
231                let ignore_irrelevant_info = |node: &WorkerNode| {
232                    (
233                        node.id,
234                        WorkerNode {
235                            id: node.id,
236                            r#type: node.r#type,
237                            host: node.host.clone(),
238                            property: node.property.clone(),
239                            resource: node.resource.clone(),
240                            ..Default::default()
241                        },
242                    )
243                };
244                let worker_nodes: HashMap<_, _> =
245                    worker_nodes.iter().map(ignore_irrelevant_info).collect();
246                let curr_worker_nodes: HashMap<_, _> = self
247                    .current()
248                    .values()
249                    .map(ignore_irrelevant_info)
250                    .collect();
251                if worker_nodes != curr_worker_nodes {
252                    warn!(
253                        ?worker_nodes,
254                        ?curr_worker_nodes,
255                        "different to global snapshot"
256                    );
257                }
258            }
259            Err(e) => {
260                warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
261            }
262        }
263    }
264}
265
266impl MetadataManager {
267    pub fn new(
268        cluster_controller: ClusterControllerRef,
269        catalog_controller: CatalogControllerRef,
270    ) -> Self {
271        Self {
272            cluster_controller,
273            catalog_controller,
274        }
275    }
276
277    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
278        self.cluster_controller.get_worker_by_id(worker_id).await
279    }
280
281    pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
282        let node_map = self.cluster_controller.count_worker_by_type().await?;
283        Ok(node_map
284            .into_iter()
285            .map(|(ty, cnt)| (ty.into(), cnt as u64))
286            .collect())
287    }
288
289    pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
290        self.cluster_controller
291            .get_worker_info_by_id(worker_id as _)
292            .await
293    }
294
295    pub async fn add_worker_node(
296        &self,
297        r#type: PbWorkerType,
298        host_address: HostAddress,
299        property: AddNodeProperty,
300        resource: PbResource,
301    ) -> MetaResult<WorkerId> {
302        self.cluster_controller
303            .add_worker(r#type, host_address, property, resource)
304            .await
305            .map(|id| id as WorkerId)
306    }
307
308    pub async fn list_worker_node(
309        &self,
310        worker_type: Option<WorkerType>,
311        worker_state: Option<State>,
312    ) -> MetaResult<Vec<PbWorkerNode>> {
313        self.cluster_controller
314            .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
315            .await
316    }
317
318    pub async fn subscribe_active_streaming_compute_nodes(
319        &self,
320    ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
321        self.cluster_controller
322            .subscribe_active_streaming_compute_nodes()
323            .await
324    }
325
326    pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
327        self.cluster_controller
328            .list_active_streaming_workers()
329            .await
330    }
331
332    pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
333        self.cluster_controller.list_active_serving_workers().await
334    }
335
336    pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
337        Ok(self
338            .catalog_controller
339            .list_fragment_database_ids(None)
340            .await?
341            .into_iter()
342            .map(|(_, database_id)| DatabaseId::new(database_id as _))
343            .collect())
344    }
345
346    pub async fn split_fragment_map_by_database<T: Debug>(
347        &self,
348        fragment_map: HashMap<FragmentId, T>,
349    ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
350        let fragment_to_database_map: HashMap<_, _> = self
351            .catalog_controller
352            .list_fragment_database_ids(Some(
353                fragment_map
354                    .keys()
355                    .map(|fragment_id| *fragment_id as _)
356                    .collect(),
357            ))
358            .await?
359            .into_iter()
360            .map(|(fragment_id, database_id)| {
361                (fragment_id as FragmentId, DatabaseId::new(database_id as _))
362            })
363            .collect();
364        let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
365        for (fragment_id, value) in fragment_map {
366            let database_id = *fragment_to_database_map
367                .get(&fragment_id)
368                .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
369            ret.entry(database_id)
370                .or_default()
371                .try_insert(fragment_id, value)
372                .expect("non duplicate");
373        }
374        Ok(ret)
375    }
376
377    pub async fn list_background_creating_jobs(&self) -> MetaResult<Vec<TableId>> {
378        let jobs = self
379            .catalog_controller
380            .list_background_creating_jobs(false)
381            .await?;
382
383        Ok(jobs
384            .into_iter()
385            .map(|(id, _, _)| TableId::from(id as u32))
386            .collect())
387    }
388
389    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
390        self.catalog_controller.list_sources().await
391    }
392
393    pub async fn post_apply_reschedules(
394        &self,
395        reschedules: HashMap<FragmentId, Reschedule>,
396        post_updates: &JobReschedulePostUpdates,
397    ) -> MetaResult<()> {
398        // temp convert u32 to i32
399        let reschedules = reschedules.into_iter().map(|(k, v)| (k as _, v)).collect();
400
401        self.catalog_controller
402            .post_apply_reschedules(reschedules, post_updates)
403            .await
404    }
405
406    pub async fn running_fragment_parallelisms(
407        &self,
408        id_filter: Option<HashSet<FragmentId>>,
409    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
410        let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
411        Ok(self
412            .catalog_controller
413            .running_fragment_parallelisms(id_filter)
414            .await?
415            .into_iter()
416            .map(|(k, v)| (k as FragmentId, v))
417            .collect())
418    }
419
420    /// Get and filter the "**root**" fragments of the specified relations.
421    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
422    ///
423    /// See also [`crate::controller::catalog::CatalogController::get_root_fragments`].
424    pub async fn get_upstream_root_fragments(
425        &self,
426        upstream_table_ids: &HashSet<TableId>,
427    ) -> MetaResult<(HashMap<TableId, Fragment>, HashMap<ActorId, WorkerId>)> {
428        let (upstream_root_fragments, actors) = self
429            .catalog_controller
430            .get_root_fragments(
431                upstream_table_ids
432                    .iter()
433                    .map(|id| id.table_id as _)
434                    .collect(),
435            )
436            .await?;
437
438        let actors = actors
439            .into_iter()
440            .map(|(actor, worker)| (actor as u32, worker))
441            .collect();
442
443        Ok((
444            upstream_root_fragments
445                .into_iter()
446                .map(|(id, fragment)| ((id as u32).into(), fragment))
447                .collect(),
448            actors,
449        ))
450    }
451
452    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
453        self.cluster_controller.get_streaming_cluster_info().await
454    }
455
456    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<u32, TableOption>> {
457        self.catalog_controller
458            .get_all_table_options()
459            .await
460            .map(|tops| tops.into_iter().map(|(id, opt)| (id as u32, opt)).collect())
461    }
462
463    pub async fn get_table_name_type_mapping(&self) -> MetaResult<HashMap<u32, (String, String)>> {
464        let mappings = self
465            .catalog_controller
466            .get_table_name_type_mapping()
467            .await?;
468        Ok(mappings
469            .into_iter()
470            .map(|(id, value)| (id as u32, value))
471            .collect())
472    }
473
474    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<u32>> {
475        let table_ids = self.catalog_controller.get_created_table_ids().await?;
476        Ok(table_ids.into_iter().map(|id| id as u32).collect())
477    }
478
479    pub async fn get_table_associated_source_id(
480        &self,
481        table_id: u32,
482    ) -> MetaResult<Option<SourceId>> {
483        self.catalog_controller
484            .get_table_associated_source_id(table_id as _)
485            .await
486    }
487
488    pub async fn get_table_catalog_by_ids(&self, ids: &[u32]) -> MetaResult<Vec<PbTable>> {
489        self.catalog_controller
490            .get_table_by_ids(ids.iter().map(|id| *id as _).collect(), false)
491            .await
492    }
493
494    pub async fn get_table_incoming_sinks(&self, table_id: u32) -> MetaResult<Vec<PbSink>> {
495        self.catalog_controller
496            .get_table_incoming_sinks(table_id as _)
497            .await
498    }
499
500    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
501        Ok(self
502            .catalog_controller
503            .get_sink_state_table_ids(sink_id)
504            .await?
505            .into_iter()
506            .map(|id| (id as u32).into())
507            .collect())
508    }
509
510    pub async fn get_table_catalog_by_cdc_table_id(
511        &self,
512        cdc_table_id: &String,
513    ) -> MetaResult<Vec<PbTable>> {
514        self.catalog_controller
515            .get_table_by_cdc_table_id(cdc_table_id)
516            .await
517    }
518
519    pub async fn get_downstream_fragments(
520        &self,
521        job_id: u32,
522    ) -> MetaResult<(
523        Vec<(PbDispatcherType, Fragment)>,
524        HashMap<ActorId, WorkerId>,
525    )> {
526        let (fragments, actors) = self
527            .catalog_controller
528            .get_downstream_fragments(job_id as _)
529            .await?;
530
531        let actors = actors
532            .into_iter()
533            .map(|(actor, worker)| (actor as u32, worker))
534            .collect();
535
536        Ok((fragments, actors))
537    }
538
539    pub async fn get_worker_actor_ids(
540        &self,
541        job_ids: HashSet<TableId>,
542    ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
543        let worker_actors = self
544            .catalog_controller
545            .get_worker_actor_ids(job_ids.into_iter().map(|id| id.table_id as _).collect())
546            .await?;
547        Ok(worker_actors
548            .into_iter()
549            .map(|(id, actors)| {
550                (
551                    id as WorkerId,
552                    actors.into_iter().map(|id| id as ActorId).collect(),
553                )
554            })
555            .collect())
556    }
557
558    pub async fn get_job_id_to_internal_table_ids_mapping(&self) -> Option<Vec<(u32, Vec<u32>)>> {
559        let job_internal_table_ids = self.catalog_controller.get_job_internal_table_ids().await;
560        job_internal_table_ids.map(|ids| {
561            ids.into_iter()
562                .map(|(id, internal_ids)| {
563                    (
564                        id as u32,
565                        internal_ids.into_iter().map(|id| id as u32).collect(),
566                    )
567                })
568                .collect()
569        })
570    }
571
572    pub async fn get_job_fragments_by_id(
573        &self,
574        job_id: &TableId,
575    ) -> MetaResult<StreamJobFragments> {
576        self.catalog_controller
577            .get_job_fragments_by_id(job_id.table_id as _)
578            .await
579    }
580
581    pub async fn get_running_actors_of_fragment(
582        &self,
583        id: FragmentId,
584    ) -> MetaResult<HashSet<ActorId>> {
585        let actor_ids = self
586            .catalog_controller
587            .get_running_actors_of_fragment(id as _)
588            .await?;
589        Ok(actor_ids.into_iter().map(|id| id as ActorId).collect())
590    }
591
592    // (backfill_actor_id, upstream_source_actor_id)
593    pub async fn get_running_actors_for_source_backfill(
594        &self,
595        source_backfill_fragment_id: FragmentId,
596        source_fragment_id: FragmentId,
597    ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
598        let actor_ids = self
599            .catalog_controller
600            .get_running_actors_for_source_backfill(
601                source_backfill_fragment_id as _,
602                source_fragment_id as _,
603            )
604            .await?;
605        Ok(actor_ids
606            .into_iter()
607            .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
608            .collect())
609    }
610
611    pub async fn get_job_fragments_by_ids(
612        &self,
613        ids: &[TableId],
614    ) -> MetaResult<Vec<StreamJobFragments>> {
615        let mut table_fragments = vec![];
616        for id in ids {
617            table_fragments.push(
618                self.catalog_controller
619                    .get_job_fragments_by_id(id.table_id as _)
620                    .await?,
621            );
622        }
623        Ok(table_fragments)
624    }
625
626    pub async fn all_active_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
627        let table_fragments = self.catalog_controller.table_fragments().await?;
628        let mut actor_maps = HashMap::new();
629        for (_, tf) in table_fragments {
630            for actor in tf.active_actors() {
631                actor_maps
632                    .try_insert(actor.actor_id, actor)
633                    .expect("non duplicate");
634            }
635        }
636        Ok(actor_maps)
637    }
638
639    pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
640        let actor_cnt = self.catalog_controller.worker_actor_count().await?;
641        Ok(actor_cnt
642            .into_iter()
643            .map(|(id, cnt)| (id as WorkerId, cnt))
644            .collect())
645    }
646
647    pub async fn count_streaming_job(&self) -> MetaResult<usize> {
648        self.catalog_controller
649            .list_streaming_job_infos()
650            .await
651            .map(|x| x.len())
652    }
653
654    pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
655        self.catalog_controller
656            .list_stream_job_desc_for_telemetry()
657            .await
658    }
659
660    pub async fn update_source_rate_limit_by_source_id(
661        &self,
662        source_id: SourceId,
663        rate_limit: Option<u32>,
664    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
665        let fragment_actors = self
666            .catalog_controller
667            .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
668            .await?;
669        Ok(fragment_actors
670            .into_iter()
671            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
672            .collect())
673    }
674
675    pub async fn update_backfill_rate_limit_by_table_id(
676        &self,
677        table_id: TableId,
678        rate_limit: Option<u32>,
679    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
680        let fragment_actors = self
681            .catalog_controller
682            .update_backfill_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
683            .await?;
684        Ok(fragment_actors
685            .into_iter()
686            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
687            .collect())
688    }
689
690    pub async fn update_sink_rate_limit_by_sink_id(
691        &self,
692        sink_id: SinkId,
693        rate_limit: Option<u32>,
694    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
695        let fragment_actors = self
696            .catalog_controller
697            .update_sink_rate_limit_by_job_id(sink_id as _, rate_limit)
698            .await?;
699        Ok(fragment_actors
700            .into_iter()
701            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
702            .collect())
703    }
704
705    pub async fn update_dml_rate_limit_by_table_id(
706        &self,
707        table_id: TableId,
708        rate_limit: Option<u32>,
709    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
710        let fragment_actors = self
711            .catalog_controller
712            .update_dml_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
713            .await?;
714        Ok(fragment_actors
715            .into_iter()
716            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
717            .collect())
718    }
719
720    pub async fn update_sink_props_by_sink_id(
721        &self,
722        sink_id: SinkId,
723        props: BTreeMap<String, String>,
724    ) -> MetaResult<HashMap<String, String>> {
725        let new_props = self
726            .catalog_controller
727            .update_sink_props_by_sink_id(sink_id, props)
728            .await?;
729        Ok(new_props)
730    }
731
732    pub async fn update_iceberg_table_props_by_table_id(
733        &self,
734        table_id: TableId,
735        props: BTreeMap<String, String>,
736        alter_iceberg_table_props: Option<
737            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
738        >,
739    ) -> MetaResult<(HashMap<String, String>, u32)> {
740        let (new_props, sink_id) = self
741            .catalog_controller
742            .update_iceberg_table_props_by_table_id(
743                table_id.table_id as _,
744                props,
745                alter_iceberg_table_props,
746            )
747            .await?;
748        Ok((new_props, sink_id))
749    }
750
751    pub async fn update_fragment_rate_limit_by_fragment_id(
752        &self,
753        fragment_id: FragmentId,
754        rate_limit: Option<u32>,
755    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
756        let fragment_actors = self
757            .catalog_controller
758            .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
759            .await?;
760        Ok(fragment_actors
761            .into_iter()
762            .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
763            .collect())
764    }
765
766    #[await_tree::instrument]
767    pub async fn update_actor_splits_by_split_assignment(
768        &self,
769        split_assignment: &SplitAssignment,
770    ) -> MetaResult<()> {
771        self.catalog_controller
772            .update_actor_splits(split_assignment)
773            .await
774    }
775
776    #[await_tree::instrument]
777    pub async fn update_source_splits(
778        &self,
779        source_splits: &HashMap<SourceId, Vec<SplitImpl>>,
780    ) -> MetaResult<()> {
781        self.catalog_controller
782            .update_source_splits(source_splits)
783            .await
784    }
785
786    pub async fn get_mv_depended_subscriptions(
787        &self,
788        database_id: Option<DatabaseId>,
789    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<SubscriptionId, u64>>>> {
790        let database_id = database_id.map(|database_id| database_id.database_id as _);
791        Ok(self
792            .catalog_controller
793            .get_mv_depended_subscriptions(database_id)
794            .await?
795            .into_iter()
796            .map(|(loaded_database_id, mv_depended_subscriptions)| {
797                if let Some(database_id) = database_id {
798                    assert_eq!(loaded_database_id, database_id);
799                }
800                (
801                    DatabaseId::new(loaded_database_id as _),
802                    mv_depended_subscriptions
803                        .into_iter()
804                        .map(|(table_id, subscriptions)| {
805                            (
806                                TableId::new(table_id as _),
807                                subscriptions
808                                    .into_iter()
809                                    .map(|(subscription_id, retention_time)| {
810                                        (subscription_id as SubscriptionId, retention_time)
811                                    })
812                                    .collect(),
813                            )
814                        })
815                        .collect(),
816                )
817            })
818            .collect())
819    }
820
821    pub async fn get_job_max_parallelism(&self, table_id: TableId) -> MetaResult<usize> {
822        self.catalog_controller
823            .get_max_parallelism_by_id(table_id.table_id as _)
824            .await
825    }
826
827    pub async fn get_existing_job_resource_group(
828        &self,
829        streaming_job_id: ObjectId,
830    ) -> MetaResult<String> {
831        self.catalog_controller
832            .get_existing_job_resource_group(streaming_job_id)
833            .await
834    }
835
836    pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
837        self.catalog_controller
838            .get_database_resource_group(database_id)
839            .await
840    }
841
842    pub fn cluster_id(&self) -> &ClusterId {
843        self.cluster_controller.cluster_id()
844    }
845
846    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
847        let rate_limits = self.catalog_controller.list_rate_limits().await?;
848        Ok(rate_limits)
849    }
850
851    pub async fn get_job_backfill_scan_types(
852        &self,
853        job_id: &TableId,
854    ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
855        let backfill_types = self
856            .catalog_controller
857            .get_job_fragment_backfill_scan_type(job_id.table_id as _)
858            .await?;
859        Ok(backfill_types)
860    }
861}
862
863impl MetadataManager {
864    /// Wait for job finishing notification in `TrackingJob::finish`.
865    /// The progress is updated per barrier.
866    #[await_tree::instrument]
867    pub(crate) async fn wait_streaming_job_finished(
868        &self,
869        database_id: DatabaseId,
870        id: ObjectId,
871    ) -> MetaResult<NotificationVersion> {
872        tracing::debug!("wait_streaming_job_finished: {id:?}");
873        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
874        if mgr.streaming_job_is_finished(id).await? {
875            return Ok(self.catalog_controller.current_notification_version().await);
876        }
877        let (tx, rx) = oneshot::channel();
878
879        mgr.register_finish_notifier(database_id.database_id as _, id, tx);
880        drop(mgr);
881        rx.await
882            .map_err(|_| "no received reason".to_owned())
883            .and_then(|result| result)
884            .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
885    }
886
887    pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
888        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
889        mgr.notify_finish_failed(database_id.map(|id| id.database_id as _), err);
890    }
891
892    pub(crate) async fn notify_cancelled(&self, database_id: DatabaseId, id: ObjectId) {
893        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
894        mgr.notify_cancelled(database_id.database_id as _, id);
895    }
896}