1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17use std::pin::pin;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use futures::future::{Either, select};
22use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
23use risingwave_meta_model::{ObjectId, SinkId, SourceId, WorkerId};
24use risingwave_pb::catalog::{PbSink, PbSource, PbTable};
25use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
26use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
27use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
28use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamScanType};
29use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
30use tokio::sync::oneshot;
31use tokio::time::{Instant, sleep};
32use tracing::warn;
33
34use crate::MetaResult;
35use crate::barrier::Reschedule;
36use crate::controller::catalog::CatalogControllerRef;
37use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
38use crate::controller::fragment::FragmentParallelismInfo;
39use crate::manager::{LocalNotification, NotificationVersion};
40use crate::model::{
41 ActorId, ClusterId, Fragment, FragmentId, StreamActor, StreamJobFragments, SubscriptionId,
42};
43use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
44use crate::telemetry::MetaTelemetryJobDesc;
45
46#[derive(Clone)]
47pub struct MetadataManager {
48 pub cluster_controller: ClusterControllerRef,
49 pub catalog_controller: CatalogControllerRef,
50}
51
52#[derive(Debug)]
53pub(crate) enum ActiveStreamingWorkerChange {
54 Add(WorkerNode),
55 #[expect(dead_code)]
56 Remove(WorkerNode),
57 Update(WorkerNode),
58}
59
60pub struct ActiveStreamingWorkerNodes {
61 worker_nodes: HashMap<WorkerId, WorkerNode>,
62 rx: UnboundedReceiver<LocalNotification>,
63 #[cfg_attr(not(debug_assertions), expect(dead_code))]
64 meta_manager: Option<MetadataManager>,
65}
66
67impl Debug for ActiveStreamingWorkerNodes {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("ActiveStreamingWorkerNodes")
70 .field("worker_nodes", &self.worker_nodes)
71 .finish()
72 }
73}
74
75impl ActiveStreamingWorkerNodes {
76 pub(crate) fn uninitialized() -> Self {
77 Self {
78 worker_nodes: Default::default(),
79 rx: unbounded_channel().1,
80 meta_manager: None,
81 }
82 }
83
84 #[cfg(test)]
85 pub(crate) fn for_test(worker_nodes: HashMap<WorkerId, WorkerNode>) -> Self {
86 let (tx, rx) = unbounded_channel();
87 let _join_handle = tokio::spawn(async move {
88 let _tx = tx;
89 std::future::pending::<()>().await
90 });
91 Self {
92 worker_nodes,
93 rx,
94 meta_manager: None,
95 }
96 }
97
98 pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
100 let (nodes, rx) = meta_manager
101 .subscribe_active_streaming_compute_nodes()
102 .await?;
103 Ok(Self {
104 worker_nodes: nodes.into_iter().map(|node| (node.id as _, node)).collect(),
105 rx,
106 meta_manager: Some(meta_manager),
107 })
108 }
109
110 pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
111 &self.worker_nodes
112 }
113
114 pub(crate) async fn wait_changed(
115 &mut self,
116 verbose_internal: Duration,
117 verbose_timeout: Duration,
118 verbose_fn: impl Fn(&Self),
119 ) -> Option<ActiveStreamingWorkerChange> {
120 let start = Instant::now();
121 loop {
122 if let Either::Left((change, _)) =
123 select(pin!(self.changed()), pin!(sleep(verbose_internal))).await
124 {
125 break Some(change);
126 }
127
128 if start.elapsed() > verbose_timeout {
129 break None;
130 }
131
132 verbose_fn(self)
133 }
134 }
135
136 pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
137 loop {
138 let notification = self
139 .rx
140 .recv()
141 .await
142 .expect("notification stopped or uninitialized");
143 match notification {
144 LocalNotification::WorkerNodeDeleted(worker) => {
145 let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32
146 && worker.property.as_ref().unwrap().is_streaming;
147 let Some(prev_worker) = self.worker_nodes.remove(&(worker.id as _)) else {
148 if is_streaming_compute_node {
149 warn!(
150 ?worker,
151 "notify to delete an non-existing streaming compute worker"
152 );
153 }
154 continue;
155 };
156 if !is_streaming_compute_node {
157 warn!(
158 ?worker,
159 ?prev_worker,
160 "deleted worker has a different recent type"
161 );
162 }
163 if worker.state == State::Starting as i32 {
164 warn!(
165 id = worker.id,
166 host = ?worker.host,
167 state = worker.state,
168 "a starting streaming worker is deleted"
169 );
170 }
171 break ActiveStreamingWorkerChange::Remove(prev_worker);
172 }
173 LocalNotification::WorkerNodeActivated(worker) => {
174 if worker.r#type != WorkerType::ComputeNode as i32
175 || !worker.property.as_ref().unwrap().is_streaming
176 {
177 if let Some(prev_worker) = self.worker_nodes.remove(&(worker.id as _)) {
178 warn!(
179 ?worker,
180 ?prev_worker,
181 "the type of a streaming worker is changed"
182 );
183 break ActiveStreamingWorkerChange::Remove(prev_worker);
184 } else {
185 continue;
186 }
187 }
188 assert_eq!(
189 worker.state,
190 State::Running as i32,
191 "not started worker added: {:?}",
192 worker
193 );
194 if let Some(prev_worker) =
195 self.worker_nodes.insert(worker.id as _, worker.clone())
196 {
197 assert_eq!(prev_worker.host, worker.host);
198 assert_eq!(prev_worker.r#type, worker.r#type);
199 warn!(
200 ?prev_worker,
201 ?worker,
202 eq = prev_worker == worker,
203 "notify to update an existing active worker"
204 );
205 if prev_worker == worker {
206 continue;
207 } else {
208 break ActiveStreamingWorkerChange::Update(worker);
209 }
210 } else {
211 break ActiveStreamingWorkerChange::Add(worker);
212 }
213 }
214 _ => {
215 continue;
216 }
217 }
218 }
219 }
220
221 #[cfg(debug_assertions)]
222 pub(crate) async fn validate_change(&self) {
223 use risingwave_pb::common::WorkerNode;
224 use thiserror_ext::AsReport;
225 let Some(meta_manager) = &self.meta_manager else {
226 return;
227 };
228 match meta_manager.list_active_streaming_compute_nodes().await {
229 Ok(worker_nodes) => {
230 let ignore_irrelevant_info = |node: &WorkerNode| {
231 (
232 node.id,
233 WorkerNode {
234 id: node.id,
235 r#type: node.r#type,
236 host: node.host.clone(),
237 property: node.property.clone(),
238 resource: node.resource.clone(),
239 ..Default::default()
240 },
241 )
242 };
243 let worker_nodes: HashMap<_, _> =
244 worker_nodes.iter().map(ignore_irrelevant_info).collect();
245 let curr_worker_nodes: HashMap<_, _> = self
246 .current()
247 .values()
248 .map(ignore_irrelevant_info)
249 .collect();
250 if worker_nodes != curr_worker_nodes {
251 warn!(
252 ?worker_nodes,
253 ?curr_worker_nodes,
254 "different to global snapshot"
255 );
256 }
257 }
258 Err(e) => {
259 warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
260 }
261 }
262 }
263}
264
265impl MetadataManager {
266 pub fn new(
267 cluster_controller: ClusterControllerRef,
268 catalog_controller: CatalogControllerRef,
269 ) -> Self {
270 Self {
271 cluster_controller,
272 catalog_controller,
273 }
274 }
275
276 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
277 self.cluster_controller.get_worker_by_id(worker_id).await
278 }
279
280 pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
281 let node_map = self.cluster_controller.count_worker_by_type().await?;
282 Ok(node_map
283 .into_iter()
284 .map(|(ty, cnt)| (ty.into(), cnt as u64))
285 .collect())
286 }
287
288 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
289 self.cluster_controller
290 .get_worker_info_by_id(worker_id as _)
291 .await
292 }
293
294 pub async fn add_worker_node(
295 &self,
296 r#type: PbWorkerType,
297 host_address: HostAddress,
298 property: AddNodeProperty,
299 resource: PbResource,
300 ) -> MetaResult<WorkerId> {
301 self.cluster_controller
302 .add_worker(r#type, host_address, property, resource)
303 .await
304 .map(|id| id as WorkerId)
305 }
306
307 pub async fn list_worker_node(
308 &self,
309 worker_type: Option<WorkerType>,
310 worker_state: Option<State>,
311 ) -> MetaResult<Vec<PbWorkerNode>> {
312 self.cluster_controller
313 .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
314 .await
315 }
316
317 pub async fn subscribe_active_streaming_compute_nodes(
318 &self,
319 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
320 self.cluster_controller
321 .subscribe_active_streaming_compute_nodes()
322 .await
323 }
324
325 pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
326 self.cluster_controller
327 .list_active_streaming_workers()
328 .await
329 }
330
331 pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
332 self.cluster_controller.list_active_serving_workers().await
333 }
334
335 pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
336 Ok(self
337 .catalog_controller
338 .list_fragment_database_ids(None)
339 .await?
340 .into_iter()
341 .map(|(_, database_id)| DatabaseId::new(database_id as _))
342 .collect())
343 }
344
345 pub async fn split_fragment_map_by_database<T: Debug>(
346 &self,
347 fragment_map: HashMap<FragmentId, T>,
348 ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
349 let fragment_to_database_map: HashMap<_, _> = self
350 .catalog_controller
351 .list_fragment_database_ids(Some(
352 fragment_map
353 .keys()
354 .map(|fragment_id| *fragment_id as _)
355 .collect(),
356 ))
357 .await?
358 .into_iter()
359 .map(|(fragment_id, database_id)| {
360 (fragment_id as FragmentId, DatabaseId::new(database_id as _))
361 })
362 .collect();
363 let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
364 for (fragment_id, value) in fragment_map {
365 let database_id = *fragment_to_database_map
366 .get(&fragment_id)
367 .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
368 ret.entry(database_id)
369 .or_default()
370 .try_insert(fragment_id, value)
371 .expect("non duplicate");
372 }
373 Ok(ret)
374 }
375
376 pub async fn list_background_creating_jobs(&self) -> MetaResult<Vec<TableId>> {
377 let tables = self
378 .catalog_controller
379 .list_background_creating_mviews(false)
380 .await?;
381
382 Ok(tables
383 .into_iter()
384 .map(|table| TableId::from(table.table_id as u32))
385 .collect())
386 }
387
388 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
389 self.catalog_controller.list_sources().await
390 }
391
392 pub async fn post_apply_reschedules(
393 &self,
394 reschedules: HashMap<FragmentId, Reschedule>,
395 post_updates: &JobReschedulePostUpdates,
396 ) -> MetaResult<()> {
397 let reschedules = reschedules.into_iter().map(|(k, v)| (k as _, v)).collect();
399
400 self.catalog_controller
401 .post_apply_reschedules(reschedules, post_updates)
402 .await
403 }
404
405 pub async fn running_fragment_parallelisms(
406 &self,
407 id_filter: Option<HashSet<FragmentId>>,
408 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
409 let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
410 Ok(self
411 .catalog_controller
412 .running_fragment_parallelisms(id_filter)
413 .await?
414 .into_iter()
415 .map(|(k, v)| (k as FragmentId, v))
416 .collect())
417 }
418
419 pub async fn get_upstream_root_fragments(
424 &self,
425 upstream_table_ids: &HashSet<TableId>,
426 ) -> MetaResult<(HashMap<TableId, Fragment>, HashMap<ActorId, WorkerId>)> {
427 let (upstream_root_fragments, actors) = self
428 .catalog_controller
429 .get_root_fragments(
430 upstream_table_ids
431 .iter()
432 .map(|id| id.table_id as _)
433 .collect(),
434 )
435 .await?;
436
437 let actors = actors
438 .into_iter()
439 .map(|(actor, worker)| (actor as u32, worker))
440 .collect();
441
442 Ok((
443 upstream_root_fragments
444 .into_iter()
445 .map(|(id, fragment)| ((id as u32).into(), fragment))
446 .collect(),
447 actors,
448 ))
449 }
450
451 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
452 self.cluster_controller.get_streaming_cluster_info().await
453 }
454
455 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<u32, TableOption>> {
456 self.catalog_controller
457 .get_all_table_options()
458 .await
459 .map(|tops| tops.into_iter().map(|(id, opt)| (id as u32, opt)).collect())
460 }
461
462 pub async fn get_table_name_type_mapping(&self) -> MetaResult<HashMap<u32, (String, String)>> {
463 let mappings = self
464 .catalog_controller
465 .get_table_name_type_mapping()
466 .await?;
467 Ok(mappings
468 .into_iter()
469 .map(|(id, value)| (id as u32, value))
470 .collect())
471 }
472
473 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<u32>> {
474 let table_ids = self.catalog_controller.get_created_table_ids().await?;
475 Ok(table_ids.into_iter().map(|id| id as u32).collect())
476 }
477
478 pub async fn get_table_associated_source_id(
479 &self,
480 table_id: u32,
481 ) -> MetaResult<Option<SourceId>> {
482 self.catalog_controller
483 .get_table_associated_source_id(table_id as _)
484 .await
485 }
486
487 pub async fn get_table_catalog_by_ids(&self, ids: Vec<u32>) -> MetaResult<Vec<PbTable>> {
488 self.catalog_controller
489 .get_table_by_ids(ids.into_iter().map(|id| id as _).collect(), false)
490 .await
491 }
492
493 pub async fn get_sink_catalog_by_ids(&self, ids: &[u32]) -> MetaResult<Vec<PbSink>> {
494 self.catalog_controller
495 .get_sink_by_ids(ids.iter().map(|id| *id as _).collect())
496 .await
497 }
498
499 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
500 Ok(self
501 .catalog_controller
502 .get_sink_state_table_ids(sink_id)
503 .await?
504 .into_iter()
505 .map(|id| (id as u32).into())
506 .collect())
507 }
508
509 pub async fn get_table_catalog_by_cdc_table_id(
510 &self,
511 cdc_table_id: &String,
512 ) -> MetaResult<Vec<PbTable>> {
513 self.catalog_controller
514 .get_table_by_cdc_table_id(cdc_table_id)
515 .await
516 }
517
518 pub async fn get_downstream_fragments(
519 &self,
520 job_id: u32,
521 ) -> MetaResult<(
522 Vec<(PbDispatcherType, Fragment)>,
523 HashMap<ActorId, WorkerId>,
524 )> {
525 let (fragments, actors) = self
526 .catalog_controller
527 .get_downstream_fragments(job_id as _)
528 .await?;
529
530 let actors = actors
531 .into_iter()
532 .map(|(actor, worker)| (actor as u32, worker))
533 .collect();
534
535 Ok((fragments, actors))
536 }
537
538 pub async fn get_worker_actor_ids(
539 &self,
540 job_ids: HashSet<TableId>,
541 ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
542 let worker_actors = self
543 .catalog_controller
544 .get_worker_actor_ids(job_ids.into_iter().map(|id| id.table_id as _).collect())
545 .await?;
546 Ok(worker_actors
547 .into_iter()
548 .map(|(id, actors)| {
549 (
550 id as WorkerId,
551 actors.into_iter().map(|id| id as ActorId).collect(),
552 )
553 })
554 .collect())
555 }
556
557 pub async fn get_job_id_to_internal_table_ids_mapping(&self) -> Option<Vec<(u32, Vec<u32>)>> {
558 let job_internal_table_ids = self.catalog_controller.get_job_internal_table_ids().await;
559 job_internal_table_ids.map(|ids| {
560 ids.into_iter()
561 .map(|(id, internal_ids)| {
562 (
563 id as u32,
564 internal_ids.into_iter().map(|id| id as u32).collect(),
565 )
566 })
567 .collect()
568 })
569 }
570
571 pub async fn get_job_fragments_by_id(
572 &self,
573 job_id: &TableId,
574 ) -> MetaResult<StreamJobFragments> {
575 self.catalog_controller
576 .get_job_fragments_by_id(job_id.table_id as _)
577 .await
578 }
579
580 pub async fn get_running_actors_of_fragment(
581 &self,
582 id: FragmentId,
583 ) -> MetaResult<HashSet<ActorId>> {
584 let actor_ids = self
585 .catalog_controller
586 .get_running_actors_of_fragment(id as _)
587 .await?;
588 Ok(actor_ids.into_iter().map(|id| id as ActorId).collect())
589 }
590
591 pub async fn get_running_actors_for_source_backfill(
593 &self,
594 source_backfill_fragment_id: FragmentId,
595 source_fragment_id: FragmentId,
596 ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
597 let actor_ids = self
598 .catalog_controller
599 .get_running_actors_for_source_backfill(
600 source_backfill_fragment_id as _,
601 source_fragment_id as _,
602 )
603 .await?;
604 Ok(actor_ids
605 .into_iter()
606 .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
607 .collect())
608 }
609
610 pub async fn get_job_fragments_by_ids(
611 &self,
612 ids: &[TableId],
613 ) -> MetaResult<Vec<StreamJobFragments>> {
614 let mut table_fragments = vec![];
615 for id in ids {
616 table_fragments.push(
617 self.catalog_controller
618 .get_job_fragments_by_id(id.table_id as _)
619 .await?,
620 );
621 }
622 Ok(table_fragments)
623 }
624
625 pub async fn all_active_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
626 let table_fragments = self.catalog_controller.table_fragments().await?;
627 let mut actor_maps = HashMap::new();
628 for (_, tf) in table_fragments {
629 for actor in tf.active_actors() {
630 actor_maps
631 .try_insert(actor.actor_id, actor)
632 .expect("non duplicate");
633 }
634 }
635 Ok(actor_maps)
636 }
637
638 pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
639 let actor_cnt = self.catalog_controller.worker_actor_count().await?;
640 Ok(actor_cnt
641 .into_iter()
642 .map(|(id, cnt)| (id as WorkerId, cnt))
643 .collect())
644 }
645
646 pub async fn count_streaming_job(&self) -> MetaResult<usize> {
647 self.catalog_controller
648 .list_streaming_job_infos()
649 .await
650 .map(|x| x.len())
651 }
652
653 pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
654 self.catalog_controller
655 .list_stream_job_desc_for_telemetry()
656 .await
657 }
658
659 pub async fn update_source_rate_limit_by_source_id(
660 &self,
661 source_id: SourceId,
662 rate_limit: Option<u32>,
663 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
664 let fragment_actors = self
665 .catalog_controller
666 .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
667 .await?;
668 Ok(fragment_actors
669 .into_iter()
670 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
671 .collect())
672 }
673
674 pub async fn update_backfill_rate_limit_by_table_id(
675 &self,
676 table_id: TableId,
677 rate_limit: Option<u32>,
678 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
679 let fragment_actors = self
680 .catalog_controller
681 .update_backfill_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
682 .await?;
683 Ok(fragment_actors
684 .into_iter()
685 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
686 .collect())
687 }
688
689 pub async fn update_sink_rate_limit_by_sink_id(
690 &self,
691 sink_id: SinkId,
692 rate_limit: Option<u32>,
693 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
694 let fragment_actors = self
695 .catalog_controller
696 .update_sink_rate_limit_by_job_id(sink_id as _, rate_limit)
697 .await?;
698 Ok(fragment_actors
699 .into_iter()
700 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
701 .collect())
702 }
703
704 pub async fn update_dml_rate_limit_by_table_id(
705 &self,
706 table_id: TableId,
707 rate_limit: Option<u32>,
708 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
709 let fragment_actors = self
710 .catalog_controller
711 .update_dml_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
712 .await?;
713 Ok(fragment_actors
714 .into_iter()
715 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
716 .collect())
717 }
718
719 pub async fn update_sink_props_by_sink_id(
720 &self,
721 sink_id: SinkId,
722 props: BTreeMap<String, String>,
723 ) -> MetaResult<HashMap<String, String>> {
724 let new_props = self
725 .catalog_controller
726 .update_sink_props_by_sink_id(sink_id, props)
727 .await?;
728 Ok(new_props)
729 }
730
731 pub async fn update_fragment_rate_limit_by_fragment_id(
732 &self,
733 fragment_id: FragmentId,
734 rate_limit: Option<u32>,
735 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
736 let fragment_actors = self
737 .catalog_controller
738 .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
739 .await?;
740 Ok(fragment_actors
741 .into_iter()
742 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
743 .collect())
744 }
745
746 #[await_tree::instrument]
747 pub async fn update_actor_splits_by_split_assignment(
748 &self,
749 split_assignment: &SplitAssignment,
750 ) -> MetaResult<()> {
751 self.catalog_controller
752 .update_actor_splits(split_assignment)
753 .await
754 }
755
756 pub async fn get_mv_depended_subscriptions(
757 &self,
758 database_id: Option<DatabaseId>,
759 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<SubscriptionId, u64>>>> {
760 let database_id = database_id.map(|database_id| database_id.database_id as _);
761 Ok(self
762 .catalog_controller
763 .get_mv_depended_subscriptions(database_id)
764 .await?
765 .into_iter()
766 .map(|(loaded_database_id, mv_depended_subscriptions)| {
767 if let Some(database_id) = database_id {
768 assert_eq!(loaded_database_id, database_id);
769 }
770 (
771 DatabaseId::new(loaded_database_id as _),
772 mv_depended_subscriptions
773 .into_iter()
774 .map(|(table_id, subscriptions)| {
775 (
776 TableId::new(table_id as _),
777 subscriptions
778 .into_iter()
779 .map(|(subscription_id, retention_time)| {
780 (subscription_id as SubscriptionId, retention_time)
781 })
782 .collect(),
783 )
784 })
785 .collect(),
786 )
787 })
788 .collect())
789 }
790
791 pub async fn get_job_max_parallelism(&self, table_id: TableId) -> MetaResult<usize> {
792 self.catalog_controller
793 .get_max_parallelism_by_id(table_id.table_id as _)
794 .await
795 }
796
797 pub async fn get_existing_job_resource_group(
798 &self,
799 streaming_job_id: ObjectId,
800 ) -> MetaResult<String> {
801 self.catalog_controller
802 .get_existing_job_resource_group(streaming_job_id)
803 .await
804 }
805
806 pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
807 self.catalog_controller
808 .get_database_resource_group(database_id)
809 .await
810 }
811
812 pub fn cluster_id(&self) -> &ClusterId {
813 self.cluster_controller.cluster_id()
814 }
815
816 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
817 let rate_limits = self.catalog_controller.list_rate_limits().await?;
818 Ok(rate_limits)
819 }
820
821 pub async fn get_job_backfill_scan_types(
822 &self,
823 job_id: &TableId,
824 ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
825 let backfill_types = self
826 .catalog_controller
827 .get_job_fragment_backfill_scan_type(job_id.table_id as _)
828 .await?;
829 Ok(backfill_types)
830 }
831}
832
833impl MetadataManager {
834 #[await_tree::instrument]
837 pub(crate) async fn wait_streaming_job_finished(
838 &self,
839 database_id: DatabaseId,
840 id: ObjectId,
841 ) -> MetaResult<NotificationVersion> {
842 tracing::debug!("wait_streaming_job_finished: {id:?}");
843 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
844 if mgr.streaming_job_is_finished(id).await? {
845 return Ok(self.catalog_controller.current_notification_version().await);
846 }
847 let (tx, rx) = oneshot::channel();
848
849 mgr.register_finish_notifier(database_id.database_id as _, id, tx);
850 drop(mgr);
851 rx.await
852 .map_err(|_| "no received reason".to_owned())
853 .and_then(|result| result)
854 .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
855 }
856
857 pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
858 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
859 mgr.notify_finish_failed(database_id.map(|id| id.database_id as _), err);
860 }
861}