1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17use std::pin::pin;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use futures::future::{Either, select};
22use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
23use risingwave_meta_model::{ObjectId, SinkId, SourceId, WorkerId};
24use risingwave_pb::catalog::{PbSink, PbSource, PbTable};
25use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
26use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
27use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
28use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamScanType};
29use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
30use tokio::sync::oneshot;
31use tokio::time::{Instant, sleep};
32use tracing::warn;
33
34use crate::MetaResult;
35use crate::barrier::Reschedule;
36use crate::controller::catalog::CatalogControllerRef;
37use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
38use crate::controller::fragment::FragmentParallelismInfo;
39use crate::manager::{LocalNotification, NotificationVersion};
40use crate::model::{
41 ActorId, ClusterId, Fragment, FragmentId, StreamActor, StreamJobFragments, SubscriptionId,
42};
43use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
44use crate::telemetry::MetaTelemetryJobDesc;
45
46#[derive(Clone)]
47pub struct MetadataManager {
48 pub cluster_controller: ClusterControllerRef,
49 pub catalog_controller: CatalogControllerRef,
50}
51
52#[derive(Debug)]
53pub(crate) enum ActiveStreamingWorkerChange {
54 Add(WorkerNode),
55 #[expect(dead_code)]
56 Remove(WorkerNode),
57 Update(WorkerNode),
58}
59
60pub struct ActiveStreamingWorkerNodes {
61 worker_nodes: HashMap<WorkerId, WorkerNode>,
62 rx: UnboundedReceiver<LocalNotification>,
63 #[cfg_attr(not(debug_assertions), expect(dead_code))]
64 meta_manager: MetadataManager,
65}
66
67impl Debug for ActiveStreamingWorkerNodes {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("ActiveStreamingWorkerNodes")
70 .field("worker_nodes", &self.worker_nodes)
71 .finish()
72 }
73}
74
75impl ActiveStreamingWorkerNodes {
76 pub(crate) fn uninitialized(meta_manager: MetadataManager) -> Self {
77 Self {
78 worker_nodes: Default::default(),
79 rx: unbounded_channel().1,
80 meta_manager,
81 }
82 }
83
84 pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
86 let (nodes, rx) = meta_manager
87 .subscribe_active_streaming_compute_nodes()
88 .await?;
89 Ok(Self {
90 worker_nodes: nodes.into_iter().map(|node| (node.id as _, node)).collect(),
91 rx,
92 meta_manager,
93 })
94 }
95
96 pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
97 &self.worker_nodes
98 }
99
100 pub(crate) async fn wait_changed(
101 &mut self,
102 verbose_internal: Duration,
103 verbose_timeout: Duration,
104 verbose_fn: impl Fn(&Self),
105 ) -> Option<ActiveStreamingWorkerChange> {
106 let start = Instant::now();
107 loop {
108 if let Either::Left((change, _)) =
109 select(pin!(self.changed()), pin!(sleep(verbose_internal))).await
110 {
111 break Some(change);
112 }
113
114 if start.elapsed() > verbose_timeout {
115 break None;
116 }
117
118 verbose_fn(self)
119 }
120 }
121
122 pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
123 loop {
124 let notification = self
125 .rx
126 .recv()
127 .await
128 .expect("notification stopped or uninitialized");
129 match notification {
130 LocalNotification::WorkerNodeDeleted(worker) => {
131 let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32
132 && worker.property.as_ref().unwrap().is_streaming;
133 let Some(prev_worker) = self.worker_nodes.remove(&(worker.id as _)) else {
134 if is_streaming_compute_node {
135 warn!(
136 ?worker,
137 "notify to delete an non-existing streaming compute worker"
138 );
139 }
140 continue;
141 };
142 if !is_streaming_compute_node {
143 warn!(
144 ?worker,
145 ?prev_worker,
146 "deleted worker has a different recent type"
147 );
148 }
149 if worker.state == State::Starting as i32 {
150 warn!(
151 id = worker.id,
152 host = ?worker.host,
153 state = worker.state,
154 "a starting streaming worker is deleted"
155 );
156 }
157 break ActiveStreamingWorkerChange::Remove(prev_worker);
158 }
159 LocalNotification::WorkerNodeActivated(worker) => {
160 if worker.r#type != WorkerType::ComputeNode as i32
161 || !worker.property.as_ref().unwrap().is_streaming
162 {
163 if let Some(prev_worker) = self.worker_nodes.remove(&(worker.id as _)) {
164 warn!(
165 ?worker,
166 ?prev_worker,
167 "the type of a streaming worker is changed"
168 );
169 break ActiveStreamingWorkerChange::Remove(prev_worker);
170 } else {
171 continue;
172 }
173 }
174 assert_eq!(
175 worker.state,
176 State::Running as i32,
177 "not started worker added: {:?}",
178 worker
179 );
180 if let Some(prev_worker) =
181 self.worker_nodes.insert(worker.id as _, worker.clone())
182 {
183 assert_eq!(prev_worker.host, worker.host);
184 assert_eq!(prev_worker.r#type, worker.r#type);
185 warn!(
186 ?prev_worker,
187 ?worker,
188 eq = prev_worker == worker,
189 "notify to update an existing active worker"
190 );
191 if prev_worker == worker {
192 continue;
193 } else {
194 break ActiveStreamingWorkerChange::Update(worker);
195 }
196 } else {
197 break ActiveStreamingWorkerChange::Add(worker);
198 }
199 }
200 _ => {
201 continue;
202 }
203 }
204 }
205 }
206
207 #[cfg(debug_assertions)]
208 pub(crate) async fn validate_change(&self) {
209 use risingwave_pb::common::WorkerNode;
210 use thiserror_ext::AsReport;
211 match self
212 .meta_manager
213 .list_active_streaming_compute_nodes()
214 .await
215 {
216 Ok(worker_nodes) => {
217 let ignore_irrelevant_info = |node: &WorkerNode| {
218 (
219 node.id,
220 WorkerNode {
221 id: node.id,
222 r#type: node.r#type,
223 host: node.host.clone(),
224 property: node.property.clone(),
225 resource: node.resource.clone(),
226 ..Default::default()
227 },
228 )
229 };
230 let worker_nodes: HashMap<_, _> =
231 worker_nodes.iter().map(ignore_irrelevant_info).collect();
232 let curr_worker_nodes: HashMap<_, _> = self
233 .current()
234 .values()
235 .map(ignore_irrelevant_info)
236 .collect();
237 if worker_nodes != curr_worker_nodes {
238 warn!(
239 ?worker_nodes,
240 ?curr_worker_nodes,
241 "different to global snapshot"
242 );
243 }
244 }
245 Err(e) => {
246 warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
247 }
248 }
249 }
250}
251
252impl MetadataManager {
253 pub fn new(
254 cluster_controller: ClusterControllerRef,
255 catalog_controller: CatalogControllerRef,
256 ) -> Self {
257 Self {
258 cluster_controller,
259 catalog_controller,
260 }
261 }
262
263 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
264 self.cluster_controller.get_worker_by_id(worker_id).await
265 }
266
267 pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
268 let node_map = self.cluster_controller.count_worker_by_type().await?;
269 Ok(node_map
270 .into_iter()
271 .map(|(ty, cnt)| (ty.into(), cnt as u64))
272 .collect())
273 }
274
275 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
276 self.cluster_controller
277 .get_worker_info_by_id(worker_id as _)
278 .await
279 }
280
281 pub async fn add_worker_node(
282 &self,
283 r#type: PbWorkerType,
284 host_address: HostAddress,
285 property: AddNodeProperty,
286 resource: PbResource,
287 ) -> MetaResult<WorkerId> {
288 self.cluster_controller
289 .add_worker(r#type, host_address, property, resource)
290 .await
291 .map(|id| id as WorkerId)
292 }
293
294 pub async fn list_worker_node(
295 &self,
296 worker_type: Option<WorkerType>,
297 worker_state: Option<State>,
298 ) -> MetaResult<Vec<PbWorkerNode>> {
299 self.cluster_controller
300 .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
301 .await
302 }
303
304 pub async fn subscribe_active_streaming_compute_nodes(
305 &self,
306 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
307 self.cluster_controller
308 .subscribe_active_streaming_compute_nodes()
309 .await
310 }
311
312 pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
313 self.cluster_controller
314 .list_active_streaming_workers()
315 .await
316 }
317
318 pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
319 self.cluster_controller.list_active_serving_workers().await
320 }
321
322 pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
323 Ok(self
324 .catalog_controller
325 .list_fragment_database_ids(None)
326 .await?
327 .into_iter()
328 .map(|(_, database_id)| DatabaseId::new(database_id as _))
329 .collect())
330 }
331
332 pub async fn split_fragment_map_by_database<T: Debug>(
333 &self,
334 fragment_map: HashMap<FragmentId, T>,
335 ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
336 let fragment_to_database_map: HashMap<_, _> = self
337 .catalog_controller
338 .list_fragment_database_ids(Some(
339 fragment_map
340 .keys()
341 .map(|fragment_id| *fragment_id as _)
342 .collect(),
343 ))
344 .await?
345 .into_iter()
346 .map(|(fragment_id, database_id)| {
347 (fragment_id as FragmentId, DatabaseId::new(database_id as _))
348 })
349 .collect();
350 let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
351 for (fragment_id, value) in fragment_map {
352 let database_id = *fragment_to_database_map
353 .get(&fragment_id)
354 .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
355 ret.entry(database_id)
356 .or_default()
357 .try_insert(fragment_id, value)
358 .expect("non duplicate");
359 }
360 Ok(ret)
361 }
362
363 pub async fn list_background_creating_jobs(&self) -> MetaResult<Vec<TableId>> {
364 let tables = self
365 .catalog_controller
366 .list_background_creating_mviews(false)
367 .await?;
368
369 Ok(tables
370 .into_iter()
371 .map(|table| TableId::from(table.table_id as u32))
372 .collect())
373 }
374
375 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
376 self.catalog_controller.list_sources().await
377 }
378
379 pub async fn post_apply_reschedules(
380 &self,
381 reschedules: HashMap<FragmentId, Reschedule>,
382 post_updates: &JobReschedulePostUpdates,
383 ) -> MetaResult<()> {
384 let reschedules = reschedules.into_iter().map(|(k, v)| (k as _, v)).collect();
386
387 self.catalog_controller
388 .post_apply_reschedules(reschedules, post_updates)
389 .await
390 }
391
392 pub async fn running_fragment_parallelisms(
393 &self,
394 id_filter: Option<HashSet<FragmentId>>,
395 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
396 let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
397 Ok(self
398 .catalog_controller
399 .running_fragment_parallelisms(id_filter)
400 .await?
401 .into_iter()
402 .map(|(k, v)| (k as FragmentId, v))
403 .collect())
404 }
405
406 pub async fn get_upstream_root_fragments(
411 &self,
412 upstream_table_ids: &HashSet<TableId>,
413 ) -> MetaResult<(HashMap<TableId, Fragment>, HashMap<ActorId, WorkerId>)> {
414 let (upstream_root_fragments, actors) = self
415 .catalog_controller
416 .get_root_fragments(
417 upstream_table_ids
418 .iter()
419 .map(|id| id.table_id as _)
420 .collect(),
421 )
422 .await?;
423
424 let actors = actors
425 .into_iter()
426 .map(|(actor, worker)| (actor as u32, worker))
427 .collect();
428
429 Ok((
430 upstream_root_fragments
431 .into_iter()
432 .map(|(id, fragment)| ((id as u32).into(), fragment))
433 .collect(),
434 actors,
435 ))
436 }
437
438 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
439 self.cluster_controller.get_streaming_cluster_info().await
440 }
441
442 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<u32, TableOption>> {
443 self.catalog_controller
444 .get_all_table_options()
445 .await
446 .map(|tops| tops.into_iter().map(|(id, opt)| (id as u32, opt)).collect())
447 }
448
449 pub async fn get_table_name_type_mapping(&self) -> MetaResult<HashMap<u32, (String, String)>> {
450 let mappings = self
451 .catalog_controller
452 .get_table_name_type_mapping()
453 .await?;
454 Ok(mappings
455 .into_iter()
456 .map(|(id, value)| (id as u32, value))
457 .collect())
458 }
459
460 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<u32>> {
461 let table_ids = self.catalog_controller.get_created_table_ids().await?;
462 Ok(table_ids.into_iter().map(|id| id as u32).collect())
463 }
464
465 pub async fn get_table_associated_source_id(
466 &self,
467 table_id: u32,
468 ) -> MetaResult<Option<SourceId>> {
469 self.catalog_controller
470 .get_table_associated_source_id(table_id as _)
471 .await
472 }
473
474 pub async fn get_table_catalog_by_ids(&self, ids: Vec<u32>) -> MetaResult<Vec<PbTable>> {
475 self.catalog_controller
476 .get_table_by_ids(ids.into_iter().map(|id| id as _).collect(), false)
477 .await
478 }
479
480 pub async fn get_sink_catalog_by_ids(&self, ids: &[u32]) -> MetaResult<Vec<PbSink>> {
481 self.catalog_controller
482 .get_sink_by_ids(ids.iter().map(|id| *id as _).collect())
483 .await
484 }
485
486 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
487 Ok(self
488 .catalog_controller
489 .get_sink_state_table_ids(sink_id)
490 .await?
491 .into_iter()
492 .map(|id| (id as u32).into())
493 .collect())
494 }
495
496 pub async fn get_table_catalog_by_cdc_table_id(
497 &self,
498 cdc_table_id: &String,
499 ) -> MetaResult<Vec<PbTable>> {
500 self.catalog_controller
501 .get_table_by_cdc_table_id(cdc_table_id)
502 .await
503 }
504
505 pub async fn get_downstream_fragments(
506 &self,
507 job_id: u32,
508 ) -> MetaResult<(
509 Vec<(PbDispatchStrategy, Fragment)>,
510 HashMap<ActorId, WorkerId>,
511 )> {
512 let (fragments, actors) = self
513 .catalog_controller
514 .get_downstream_fragments(job_id as _)
515 .await?;
516
517 let actors = actors
518 .into_iter()
519 .map(|(actor, worker)| (actor as u32, worker))
520 .collect();
521
522 Ok((fragments, actors))
523 }
524
525 pub async fn get_worker_actor_ids(
526 &self,
527 job_ids: HashSet<TableId>,
528 ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
529 let worker_actors = self
530 .catalog_controller
531 .get_worker_actor_ids(job_ids.into_iter().map(|id| id.table_id as _).collect())
532 .await?;
533 Ok(worker_actors
534 .into_iter()
535 .map(|(id, actors)| {
536 (
537 id as WorkerId,
538 actors.into_iter().map(|id| id as ActorId).collect(),
539 )
540 })
541 .collect())
542 }
543
544 pub async fn get_job_id_to_internal_table_ids_mapping(&self) -> Option<Vec<(u32, Vec<u32>)>> {
545 let job_internal_table_ids = self.catalog_controller.get_job_internal_table_ids().await;
546 job_internal_table_ids.map(|ids| {
547 ids.into_iter()
548 .map(|(id, internal_ids)| {
549 (
550 id as u32,
551 internal_ids.into_iter().map(|id| id as u32).collect(),
552 )
553 })
554 .collect()
555 })
556 }
557
558 pub async fn get_job_fragments_by_id(
559 &self,
560 job_id: &TableId,
561 ) -> MetaResult<StreamJobFragments> {
562 self.catalog_controller
563 .get_job_fragments_by_id(job_id.table_id as _)
564 .await
565 }
566
567 pub async fn get_running_actors_of_fragment(
568 &self,
569 id: FragmentId,
570 ) -> MetaResult<HashSet<ActorId>> {
571 let actor_ids = self
572 .catalog_controller
573 .get_running_actors_of_fragment(id as _)
574 .await?;
575 Ok(actor_ids.into_iter().map(|id| id as ActorId).collect())
576 }
577
578 pub async fn get_running_actors_for_source_backfill(
580 &self,
581 source_backfill_fragment_id: FragmentId,
582 source_fragment_id: FragmentId,
583 ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
584 let actor_ids = self
585 .catalog_controller
586 .get_running_actors_for_source_backfill(
587 source_backfill_fragment_id as _,
588 source_fragment_id as _,
589 )
590 .await?;
591 Ok(actor_ids
592 .into_iter()
593 .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
594 .collect())
595 }
596
597 pub async fn get_job_fragments_by_ids(
598 &self,
599 ids: &[TableId],
600 ) -> MetaResult<Vec<StreamJobFragments>> {
601 let mut table_fragments = vec![];
602 for id in ids {
603 table_fragments.push(
604 self.catalog_controller
605 .get_job_fragments_by_id(id.table_id as _)
606 .await?,
607 );
608 }
609 Ok(table_fragments)
610 }
611
612 pub async fn all_active_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
613 let table_fragments = self.catalog_controller.table_fragments().await?;
614 let mut actor_maps = HashMap::new();
615 for (_, tf) in table_fragments {
616 for actor in tf.active_actors() {
617 actor_maps
618 .try_insert(actor.actor_id, actor)
619 .expect("non duplicate");
620 }
621 }
622 Ok(actor_maps)
623 }
624
625 pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
626 let actor_cnt = self.catalog_controller.worker_actor_count().await?;
627 Ok(actor_cnt
628 .into_iter()
629 .map(|(id, cnt)| (id as WorkerId, cnt))
630 .collect())
631 }
632
633 pub async fn count_streaming_job(&self) -> MetaResult<usize> {
634 self.catalog_controller
635 .list_streaming_job_infos()
636 .await
637 .map(|x| x.len())
638 }
639
640 pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
641 self.catalog_controller
642 .list_stream_job_desc_for_telemetry()
643 .await
644 }
645
646 pub async fn update_source_rate_limit_by_source_id(
647 &self,
648 source_id: SourceId,
649 rate_limit: Option<u32>,
650 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
651 let fragment_actors = self
652 .catalog_controller
653 .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
654 .await?;
655 Ok(fragment_actors
656 .into_iter()
657 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
658 .collect())
659 }
660
661 pub async fn update_backfill_rate_limit_by_table_id(
662 &self,
663 table_id: TableId,
664 rate_limit: Option<u32>,
665 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
666 let fragment_actors = self
667 .catalog_controller
668 .update_backfill_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
669 .await?;
670 Ok(fragment_actors
671 .into_iter()
672 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
673 .collect())
674 }
675
676 pub async fn update_sink_rate_limit_by_sink_id(
677 &self,
678 sink_id: SinkId,
679 rate_limit: Option<u32>,
680 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
681 let fragment_actors = self
682 .catalog_controller
683 .update_sink_rate_limit_by_job_id(sink_id as _, rate_limit)
684 .await?;
685 Ok(fragment_actors
686 .into_iter()
687 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
688 .collect())
689 }
690
691 pub async fn update_dml_rate_limit_by_table_id(
692 &self,
693 table_id: TableId,
694 rate_limit: Option<u32>,
695 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
696 let fragment_actors = self
697 .catalog_controller
698 .update_dml_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
699 .await?;
700 Ok(fragment_actors
701 .into_iter()
702 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
703 .collect())
704 }
705
706 pub async fn update_sink_props_by_sink_id(
707 &self,
708 sink_id: SinkId,
709 props: BTreeMap<String, String>,
710 ) -> MetaResult<HashMap<String, String>> {
711 let new_props = self
712 .catalog_controller
713 .update_sink_props_by_sink_id(sink_id, props)
714 .await?;
715 Ok(new_props)
716 }
717
718 pub async fn update_fragment_rate_limit_by_fragment_id(
719 &self,
720 fragment_id: FragmentId,
721 rate_limit: Option<u32>,
722 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
723 let fragment_actors = self
724 .catalog_controller
725 .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
726 .await?;
727 Ok(fragment_actors
728 .into_iter()
729 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
730 .collect())
731 }
732
733 pub async fn update_actor_splits_by_split_assignment(
734 &self,
735 split_assignment: &SplitAssignment,
736 ) -> MetaResult<()> {
737 self.catalog_controller
738 .update_actor_splits(split_assignment)
739 .await
740 }
741
742 pub async fn get_mv_depended_subscriptions(
743 &self,
744 database_id: Option<DatabaseId>,
745 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<SubscriptionId, u64>>>> {
746 let database_id = database_id.map(|database_id| database_id.database_id as _);
747 Ok(self
748 .catalog_controller
749 .get_mv_depended_subscriptions(database_id)
750 .await?
751 .into_iter()
752 .map(|(loaded_database_id, mv_depended_subscriptions)| {
753 if let Some(database_id) = database_id {
754 assert_eq!(loaded_database_id, database_id);
755 }
756 (
757 DatabaseId::new(loaded_database_id as _),
758 mv_depended_subscriptions
759 .into_iter()
760 .map(|(table_id, subscriptions)| {
761 (
762 TableId::new(table_id as _),
763 subscriptions
764 .into_iter()
765 .map(|(subscription_id, retention_time)| {
766 (subscription_id as SubscriptionId, retention_time)
767 })
768 .collect(),
769 )
770 })
771 .collect(),
772 )
773 })
774 .collect())
775 }
776
777 pub async fn get_job_max_parallelism(&self, table_id: TableId) -> MetaResult<usize> {
778 self.catalog_controller
779 .get_max_parallelism_by_id(table_id.table_id as _)
780 .await
781 }
782
783 pub async fn get_existing_job_resource_group(
784 &self,
785 streaming_job_id: ObjectId,
786 ) -> MetaResult<String> {
787 self.catalog_controller
788 .get_existing_job_resource_group(streaming_job_id)
789 .await
790 }
791
792 pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
793 self.catalog_controller
794 .get_database_resource_group(database_id)
795 .await
796 }
797
798 pub fn cluster_id(&self) -> &ClusterId {
799 self.cluster_controller.cluster_id()
800 }
801
802 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
803 let rate_limits = self.catalog_controller.list_rate_limits().await?;
804 Ok(rate_limits)
805 }
806
807 pub async fn get_job_backfill_scan_types(
808 &self,
809 job_id: &TableId,
810 ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
811 let backfill_types = self
812 .catalog_controller
813 .get_job_fragment_backfill_scan_type(job_id.table_id as _)
814 .await?;
815 Ok(backfill_types)
816 }
817}
818
819impl MetadataManager {
820 pub(crate) async fn wait_streaming_job_finished(
823 &self,
824 database_id: DatabaseId,
825 id: ObjectId,
826 ) -> MetaResult<NotificationVersion> {
827 tracing::debug!("wait_streaming_job_finished: {id:?}");
828 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
829 if mgr.streaming_job_is_finished(id).await? {
830 return Ok(self.catalog_controller.current_notification_version().await);
831 }
832 let (tx, rx) = oneshot::channel();
833
834 mgr.register_finish_notifier(database_id.database_id as _, id, tx);
835 drop(mgr);
836 rx.await
837 .map_err(|_| "no received reason".to_owned())
838 .and_then(|result| result)
839 .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
840 }
841
842 pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
843 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
844 mgr.notify_finish_failed(database_id.map(|id| id.database_id as _), err);
845 }
846}