1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17use std::pin::pin;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use futures::future::{Either, select};
22use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
23use risingwave_connector::source::SplitImpl;
24use risingwave_meta_model::{ObjectId, SinkId, SourceId, WorkerId};
25use risingwave_pb::catalog::{PbSink, PbSource, PbTable};
26use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
27use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
28use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
29use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamScanType};
30use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
31use tokio::sync::oneshot;
32use tokio::time::{Instant, sleep};
33use tracing::warn;
34
35use crate::MetaResult;
36use crate::barrier::Reschedule;
37use crate::controller::catalog::CatalogControllerRef;
38use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
39use crate::controller::fragment::FragmentParallelismInfo;
40use crate::manager::{LocalNotification, NotificationVersion};
41use crate::model::{
42 ActorId, ClusterId, Fragment, FragmentId, StreamActor, StreamJobFragments, SubscriptionId,
43};
44use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
45use crate::telemetry::MetaTelemetryJobDesc;
46
47#[derive(Clone)]
48pub struct MetadataManager {
49 pub cluster_controller: ClusterControllerRef,
50 pub catalog_controller: CatalogControllerRef,
51}
52
53#[derive(Debug)]
54pub(crate) enum ActiveStreamingWorkerChange {
55 Add(WorkerNode),
56 #[expect(dead_code)]
57 Remove(WorkerNode),
58 Update(WorkerNode),
59}
60
61pub struct ActiveStreamingWorkerNodes {
62 worker_nodes: HashMap<WorkerId, WorkerNode>,
63 rx: UnboundedReceiver<LocalNotification>,
64 #[cfg_attr(not(debug_assertions), expect(dead_code))]
65 meta_manager: Option<MetadataManager>,
66}
67
68impl Debug for ActiveStreamingWorkerNodes {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("ActiveStreamingWorkerNodes")
71 .field("worker_nodes", &self.worker_nodes)
72 .finish()
73 }
74}
75
76impl ActiveStreamingWorkerNodes {
77 pub(crate) fn uninitialized() -> Self {
78 Self {
79 worker_nodes: Default::default(),
80 rx: unbounded_channel().1,
81 meta_manager: None,
82 }
83 }
84
85 #[cfg(test)]
86 pub(crate) fn for_test(worker_nodes: HashMap<WorkerId, WorkerNode>) -> Self {
87 let (tx, rx) = unbounded_channel();
88 let _join_handle = tokio::spawn(async move {
89 let _tx = tx;
90 std::future::pending::<()>().await
91 });
92 Self {
93 worker_nodes,
94 rx,
95 meta_manager: None,
96 }
97 }
98
99 pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
101 let (nodes, rx) = meta_manager
102 .subscribe_active_streaming_compute_nodes()
103 .await?;
104 Ok(Self {
105 worker_nodes: nodes.into_iter().map(|node| (node.id as _, node)).collect(),
106 rx,
107 meta_manager: Some(meta_manager),
108 })
109 }
110
111 pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
112 &self.worker_nodes
113 }
114
115 pub(crate) async fn wait_changed(
116 &mut self,
117 verbose_internal: Duration,
118 verbose_timeout: Duration,
119 verbose_fn: impl Fn(&Self),
120 ) -> Option<ActiveStreamingWorkerChange> {
121 let start = Instant::now();
122 loop {
123 if let Either::Left((change, _)) =
124 select(pin!(self.changed()), pin!(sleep(verbose_internal))).await
125 {
126 break Some(change);
127 }
128
129 if start.elapsed() > verbose_timeout {
130 break None;
131 }
132
133 verbose_fn(self)
134 }
135 }
136
137 pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
138 loop {
139 let notification = self
140 .rx
141 .recv()
142 .await
143 .expect("notification stopped or uninitialized");
144 match notification {
145 LocalNotification::WorkerNodeDeleted(worker) => {
146 let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32
147 && worker.property.as_ref().unwrap().is_streaming;
148 let Some(prev_worker) = self.worker_nodes.remove(&(worker.id as _)) else {
149 if is_streaming_compute_node {
150 warn!(
151 ?worker,
152 "notify to delete an non-existing streaming compute worker"
153 );
154 }
155 continue;
156 };
157 if !is_streaming_compute_node {
158 warn!(
159 ?worker,
160 ?prev_worker,
161 "deleted worker has a different recent type"
162 );
163 }
164 if worker.state == State::Starting as i32 {
165 warn!(
166 id = worker.id,
167 host = ?worker.host,
168 state = worker.state,
169 "a starting streaming worker is deleted"
170 );
171 }
172 break ActiveStreamingWorkerChange::Remove(prev_worker);
173 }
174 LocalNotification::WorkerNodeActivated(worker) => {
175 if worker.r#type != WorkerType::ComputeNode as i32
176 || !worker.property.as_ref().unwrap().is_streaming
177 {
178 if let Some(prev_worker) = self.worker_nodes.remove(&(worker.id as _)) {
179 warn!(
180 ?worker,
181 ?prev_worker,
182 "the type of a streaming worker is changed"
183 );
184 break ActiveStreamingWorkerChange::Remove(prev_worker);
185 } else {
186 continue;
187 }
188 }
189 assert_eq!(
190 worker.state,
191 State::Running as i32,
192 "not started worker added: {:?}",
193 worker
194 );
195 if let Some(prev_worker) =
196 self.worker_nodes.insert(worker.id as _, worker.clone())
197 {
198 assert_eq!(prev_worker.host, worker.host);
199 assert_eq!(prev_worker.r#type, worker.r#type);
200 warn!(
201 ?prev_worker,
202 ?worker,
203 eq = prev_worker == worker,
204 "notify to update an existing active worker"
205 );
206 if prev_worker == worker {
207 continue;
208 } else {
209 break ActiveStreamingWorkerChange::Update(worker);
210 }
211 } else {
212 break ActiveStreamingWorkerChange::Add(worker);
213 }
214 }
215 _ => {
216 continue;
217 }
218 }
219 }
220 }
221
222 #[cfg(debug_assertions)]
223 pub(crate) async fn validate_change(&self) {
224 use risingwave_pb::common::WorkerNode;
225 use thiserror_ext::AsReport;
226 let Some(meta_manager) = &self.meta_manager else {
227 return;
228 };
229 match meta_manager.list_active_streaming_compute_nodes().await {
230 Ok(worker_nodes) => {
231 let ignore_irrelevant_info = |node: &WorkerNode| {
232 (
233 node.id,
234 WorkerNode {
235 id: node.id,
236 r#type: node.r#type,
237 host: node.host.clone(),
238 property: node.property.clone(),
239 resource: node.resource.clone(),
240 ..Default::default()
241 },
242 )
243 };
244 let worker_nodes: HashMap<_, _> =
245 worker_nodes.iter().map(ignore_irrelevant_info).collect();
246 let curr_worker_nodes: HashMap<_, _> = self
247 .current()
248 .values()
249 .map(ignore_irrelevant_info)
250 .collect();
251 if worker_nodes != curr_worker_nodes {
252 warn!(
253 ?worker_nodes,
254 ?curr_worker_nodes,
255 "different to global snapshot"
256 );
257 }
258 }
259 Err(e) => {
260 warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
261 }
262 }
263 }
264}
265
266impl MetadataManager {
267 pub fn new(
268 cluster_controller: ClusterControllerRef,
269 catalog_controller: CatalogControllerRef,
270 ) -> Self {
271 Self {
272 cluster_controller,
273 catalog_controller,
274 }
275 }
276
277 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
278 self.cluster_controller.get_worker_by_id(worker_id).await
279 }
280
281 pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
282 let node_map = self.cluster_controller.count_worker_by_type().await?;
283 Ok(node_map
284 .into_iter()
285 .map(|(ty, cnt)| (ty.into(), cnt as u64))
286 .collect())
287 }
288
289 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
290 self.cluster_controller
291 .get_worker_info_by_id(worker_id as _)
292 .await
293 }
294
295 pub async fn add_worker_node(
296 &self,
297 r#type: PbWorkerType,
298 host_address: HostAddress,
299 property: AddNodeProperty,
300 resource: PbResource,
301 ) -> MetaResult<WorkerId> {
302 self.cluster_controller
303 .add_worker(r#type, host_address, property, resource)
304 .await
305 .map(|id| id as WorkerId)
306 }
307
308 pub async fn list_worker_node(
309 &self,
310 worker_type: Option<WorkerType>,
311 worker_state: Option<State>,
312 ) -> MetaResult<Vec<PbWorkerNode>> {
313 self.cluster_controller
314 .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
315 .await
316 }
317
318 pub async fn subscribe_active_streaming_compute_nodes(
319 &self,
320 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
321 self.cluster_controller
322 .subscribe_active_streaming_compute_nodes()
323 .await
324 }
325
326 pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
327 self.cluster_controller
328 .list_active_streaming_workers()
329 .await
330 }
331
332 pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
333 self.cluster_controller.list_active_serving_workers().await
334 }
335
336 pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
337 Ok(self
338 .catalog_controller
339 .list_fragment_database_ids(None)
340 .await?
341 .into_iter()
342 .map(|(_, database_id)| DatabaseId::new(database_id as _))
343 .collect())
344 }
345
346 pub async fn split_fragment_map_by_database<T: Debug>(
347 &self,
348 fragment_map: HashMap<FragmentId, T>,
349 ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
350 let fragment_to_database_map: HashMap<_, _> = self
351 .catalog_controller
352 .list_fragment_database_ids(Some(
353 fragment_map
354 .keys()
355 .map(|fragment_id| *fragment_id as _)
356 .collect(),
357 ))
358 .await?
359 .into_iter()
360 .map(|(fragment_id, database_id)| {
361 (fragment_id as FragmentId, DatabaseId::new(database_id as _))
362 })
363 .collect();
364 let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
365 for (fragment_id, value) in fragment_map {
366 let database_id = *fragment_to_database_map
367 .get(&fragment_id)
368 .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
369 ret.entry(database_id)
370 .or_default()
371 .try_insert(fragment_id, value)
372 .expect("non duplicate");
373 }
374 Ok(ret)
375 }
376
377 pub async fn list_background_creating_jobs(&self) -> MetaResult<Vec<TableId>> {
378 let jobs = self
379 .catalog_controller
380 .list_background_creating_jobs(false)
381 .await?;
382
383 Ok(jobs
384 .into_iter()
385 .map(|(id, _, _)| TableId::from(id as u32))
386 .collect())
387 }
388
389 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
390 self.catalog_controller.list_sources().await
391 }
392
393 pub async fn post_apply_reschedules(
394 &self,
395 reschedules: HashMap<FragmentId, Reschedule>,
396 post_updates: &JobReschedulePostUpdates,
397 ) -> MetaResult<()> {
398 let reschedules = reschedules.into_iter().map(|(k, v)| (k as _, v)).collect();
400
401 self.catalog_controller
402 .post_apply_reschedules(reschedules, post_updates)
403 .await
404 }
405
406 pub async fn running_fragment_parallelisms(
407 &self,
408 id_filter: Option<HashSet<FragmentId>>,
409 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
410 let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
411 Ok(self
412 .catalog_controller
413 .running_fragment_parallelisms(id_filter)
414 .await?
415 .into_iter()
416 .map(|(k, v)| (k as FragmentId, v))
417 .collect())
418 }
419
420 pub async fn get_upstream_root_fragments(
425 &self,
426 upstream_table_ids: &HashSet<TableId>,
427 ) -> MetaResult<(HashMap<TableId, Fragment>, HashMap<ActorId, WorkerId>)> {
428 let (upstream_root_fragments, actors) = self
429 .catalog_controller
430 .get_root_fragments(
431 upstream_table_ids
432 .iter()
433 .map(|id| id.table_id as _)
434 .collect(),
435 )
436 .await?;
437
438 let actors = actors
439 .into_iter()
440 .map(|(actor, worker)| (actor as u32, worker))
441 .collect();
442
443 Ok((
444 upstream_root_fragments
445 .into_iter()
446 .map(|(id, fragment)| ((id as u32).into(), fragment))
447 .collect(),
448 actors,
449 ))
450 }
451
452 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
453 self.cluster_controller.get_streaming_cluster_info().await
454 }
455
456 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<u32, TableOption>> {
457 self.catalog_controller
458 .get_all_table_options()
459 .await
460 .map(|tops| tops.into_iter().map(|(id, opt)| (id as u32, opt)).collect())
461 }
462
463 pub async fn get_table_name_type_mapping(&self) -> MetaResult<HashMap<u32, (String, String)>> {
464 let mappings = self
465 .catalog_controller
466 .get_table_name_type_mapping()
467 .await?;
468 Ok(mappings
469 .into_iter()
470 .map(|(id, value)| (id as u32, value))
471 .collect())
472 }
473
474 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<u32>> {
475 let table_ids = self.catalog_controller.get_created_table_ids().await?;
476 Ok(table_ids.into_iter().map(|id| id as u32).collect())
477 }
478
479 pub async fn get_table_associated_source_id(
480 &self,
481 table_id: u32,
482 ) -> MetaResult<Option<SourceId>> {
483 self.catalog_controller
484 .get_table_associated_source_id(table_id as _)
485 .await
486 }
487
488 pub async fn get_table_catalog_by_ids(&self, ids: &[u32]) -> MetaResult<Vec<PbTable>> {
489 self.catalog_controller
490 .get_table_by_ids(ids.iter().map(|id| *id as _).collect(), false)
491 .await
492 }
493
494 pub async fn get_table_incoming_sinks(&self, table_id: u32) -> MetaResult<Vec<PbSink>> {
495 self.catalog_controller
496 .get_table_incoming_sinks(table_id as _)
497 .await
498 }
499
500 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
501 Ok(self
502 .catalog_controller
503 .get_sink_state_table_ids(sink_id)
504 .await?
505 .into_iter()
506 .map(|id| (id as u32).into())
507 .collect())
508 }
509
510 pub async fn get_table_catalog_by_cdc_table_id(
511 &self,
512 cdc_table_id: &String,
513 ) -> MetaResult<Vec<PbTable>> {
514 self.catalog_controller
515 .get_table_by_cdc_table_id(cdc_table_id)
516 .await
517 }
518
519 pub async fn get_downstream_fragments(
520 &self,
521 job_id: u32,
522 ) -> MetaResult<(
523 Vec<(PbDispatcherType, Fragment)>,
524 HashMap<ActorId, WorkerId>,
525 )> {
526 let (fragments, actors) = self
527 .catalog_controller
528 .get_downstream_fragments(job_id as _)
529 .await?;
530
531 let actors = actors
532 .into_iter()
533 .map(|(actor, worker)| (actor as u32, worker))
534 .collect();
535
536 Ok((fragments, actors))
537 }
538
539 pub async fn get_worker_actor_ids(
540 &self,
541 job_ids: HashSet<TableId>,
542 ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
543 let worker_actors = self
544 .catalog_controller
545 .get_worker_actor_ids(job_ids.into_iter().map(|id| id.table_id as _).collect())
546 .await?;
547 Ok(worker_actors
548 .into_iter()
549 .map(|(id, actors)| {
550 (
551 id as WorkerId,
552 actors.into_iter().map(|id| id as ActorId).collect(),
553 )
554 })
555 .collect())
556 }
557
558 pub async fn get_job_id_to_internal_table_ids_mapping(&self) -> Option<Vec<(u32, Vec<u32>)>> {
559 let job_internal_table_ids = self.catalog_controller.get_job_internal_table_ids().await;
560 job_internal_table_ids.map(|ids| {
561 ids.into_iter()
562 .map(|(id, internal_ids)| {
563 (
564 id as u32,
565 internal_ids.into_iter().map(|id| id as u32).collect(),
566 )
567 })
568 .collect()
569 })
570 }
571
572 pub async fn get_job_fragments_by_id(
573 &self,
574 job_id: &TableId,
575 ) -> MetaResult<StreamJobFragments> {
576 self.catalog_controller
577 .get_job_fragments_by_id(job_id.table_id as _)
578 .await
579 }
580
581 pub async fn get_running_actors_of_fragment(
582 &self,
583 id: FragmentId,
584 ) -> MetaResult<HashSet<ActorId>> {
585 let actor_ids = self
586 .catalog_controller
587 .get_running_actors_of_fragment(id as _)
588 .await?;
589 Ok(actor_ids.into_iter().map(|id| id as ActorId).collect())
590 }
591
592 pub async fn get_running_actors_for_source_backfill(
594 &self,
595 source_backfill_fragment_id: FragmentId,
596 source_fragment_id: FragmentId,
597 ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
598 let actor_ids = self
599 .catalog_controller
600 .get_running_actors_for_source_backfill(
601 source_backfill_fragment_id as _,
602 source_fragment_id as _,
603 )
604 .await?;
605 Ok(actor_ids
606 .into_iter()
607 .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
608 .collect())
609 }
610
611 pub async fn get_job_fragments_by_ids(
612 &self,
613 ids: &[TableId],
614 ) -> MetaResult<Vec<StreamJobFragments>> {
615 let mut table_fragments = vec![];
616 for id in ids {
617 table_fragments.push(
618 self.catalog_controller
619 .get_job_fragments_by_id(id.table_id as _)
620 .await?,
621 );
622 }
623 Ok(table_fragments)
624 }
625
626 pub async fn all_active_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
627 let table_fragments = self.catalog_controller.table_fragments().await?;
628 let mut actor_maps = HashMap::new();
629 for (_, tf) in table_fragments {
630 for actor in tf.active_actors() {
631 actor_maps
632 .try_insert(actor.actor_id, actor)
633 .expect("non duplicate");
634 }
635 }
636 Ok(actor_maps)
637 }
638
639 pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
640 let actor_cnt = self.catalog_controller.worker_actor_count().await?;
641 Ok(actor_cnt
642 .into_iter()
643 .map(|(id, cnt)| (id as WorkerId, cnt))
644 .collect())
645 }
646
647 pub async fn count_streaming_job(&self) -> MetaResult<usize> {
648 self.catalog_controller
649 .list_streaming_job_infos()
650 .await
651 .map(|x| x.len())
652 }
653
654 pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
655 self.catalog_controller
656 .list_stream_job_desc_for_telemetry()
657 .await
658 }
659
660 pub async fn update_source_rate_limit_by_source_id(
661 &self,
662 source_id: SourceId,
663 rate_limit: Option<u32>,
664 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
665 let fragment_actors = self
666 .catalog_controller
667 .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
668 .await?;
669 Ok(fragment_actors
670 .into_iter()
671 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
672 .collect())
673 }
674
675 pub async fn update_backfill_rate_limit_by_table_id(
676 &self,
677 table_id: TableId,
678 rate_limit: Option<u32>,
679 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
680 let fragment_actors = self
681 .catalog_controller
682 .update_backfill_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
683 .await?;
684 Ok(fragment_actors
685 .into_iter()
686 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
687 .collect())
688 }
689
690 pub async fn update_sink_rate_limit_by_sink_id(
691 &self,
692 sink_id: SinkId,
693 rate_limit: Option<u32>,
694 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
695 let fragment_actors = self
696 .catalog_controller
697 .update_sink_rate_limit_by_job_id(sink_id as _, rate_limit)
698 .await?;
699 Ok(fragment_actors
700 .into_iter()
701 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
702 .collect())
703 }
704
705 pub async fn update_dml_rate_limit_by_table_id(
706 &self,
707 table_id: TableId,
708 rate_limit: Option<u32>,
709 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
710 let fragment_actors = self
711 .catalog_controller
712 .update_dml_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
713 .await?;
714 Ok(fragment_actors
715 .into_iter()
716 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
717 .collect())
718 }
719
720 pub async fn update_sink_props_by_sink_id(
721 &self,
722 sink_id: SinkId,
723 props: BTreeMap<String, String>,
724 ) -> MetaResult<HashMap<String, String>> {
725 let new_props = self
726 .catalog_controller
727 .update_sink_props_by_sink_id(sink_id, props)
728 .await?;
729 Ok(new_props)
730 }
731
732 pub async fn update_iceberg_table_props_by_table_id(
733 &self,
734 table_id: TableId,
735 props: BTreeMap<String, String>,
736 alter_iceberg_table_props: Option<
737 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
738 >,
739 ) -> MetaResult<(HashMap<String, String>, u32)> {
740 let (new_props, sink_id) = self
741 .catalog_controller
742 .update_iceberg_table_props_by_table_id(
743 table_id.table_id as _,
744 props,
745 alter_iceberg_table_props,
746 )
747 .await?;
748 Ok((new_props, sink_id))
749 }
750
751 pub async fn update_fragment_rate_limit_by_fragment_id(
752 &self,
753 fragment_id: FragmentId,
754 rate_limit: Option<u32>,
755 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
756 let fragment_actors = self
757 .catalog_controller
758 .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
759 .await?;
760 Ok(fragment_actors
761 .into_iter()
762 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
763 .collect())
764 }
765
766 #[await_tree::instrument]
767 pub async fn update_actor_splits_by_split_assignment(
768 &self,
769 split_assignment: &SplitAssignment,
770 ) -> MetaResult<()> {
771 self.catalog_controller
772 .update_actor_splits(split_assignment)
773 .await
774 }
775
776 #[await_tree::instrument]
777 pub async fn update_source_splits(
778 &self,
779 source_splits: &HashMap<SourceId, Vec<SplitImpl>>,
780 ) -> MetaResult<()> {
781 self.catalog_controller
782 .update_source_splits(source_splits)
783 .await
784 }
785
786 pub async fn get_mv_depended_subscriptions(
787 &self,
788 database_id: Option<DatabaseId>,
789 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<SubscriptionId, u64>>>> {
790 let database_id = database_id.map(|database_id| database_id.database_id as _);
791 Ok(self
792 .catalog_controller
793 .get_mv_depended_subscriptions(database_id)
794 .await?
795 .into_iter()
796 .map(|(loaded_database_id, mv_depended_subscriptions)| {
797 if let Some(database_id) = database_id {
798 assert_eq!(loaded_database_id, database_id);
799 }
800 (
801 DatabaseId::new(loaded_database_id as _),
802 mv_depended_subscriptions
803 .into_iter()
804 .map(|(table_id, subscriptions)| {
805 (
806 TableId::new(table_id as _),
807 subscriptions
808 .into_iter()
809 .map(|(subscription_id, retention_time)| {
810 (subscription_id as SubscriptionId, retention_time)
811 })
812 .collect(),
813 )
814 })
815 .collect(),
816 )
817 })
818 .collect())
819 }
820
821 pub async fn get_job_max_parallelism(&self, table_id: TableId) -> MetaResult<usize> {
822 self.catalog_controller
823 .get_max_parallelism_by_id(table_id.table_id as _)
824 .await
825 }
826
827 pub async fn get_existing_job_resource_group(
828 &self,
829 streaming_job_id: ObjectId,
830 ) -> MetaResult<String> {
831 self.catalog_controller
832 .get_existing_job_resource_group(streaming_job_id)
833 .await
834 }
835
836 pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
837 self.catalog_controller
838 .get_database_resource_group(database_id)
839 .await
840 }
841
842 pub fn cluster_id(&self) -> &ClusterId {
843 self.cluster_controller.cluster_id()
844 }
845
846 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
847 let rate_limits = self.catalog_controller.list_rate_limits().await?;
848 Ok(rate_limits)
849 }
850
851 pub async fn get_job_backfill_scan_types(
852 &self,
853 job_id: &TableId,
854 ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
855 let backfill_types = self
856 .catalog_controller
857 .get_job_fragment_backfill_scan_type(job_id.table_id as _)
858 .await?;
859 Ok(backfill_types)
860 }
861}
862
863impl MetadataManager {
864 #[await_tree::instrument]
867 pub(crate) async fn wait_streaming_job_finished(
868 &self,
869 database_id: DatabaseId,
870 id: ObjectId,
871 ) -> MetaResult<NotificationVersion> {
872 tracing::debug!("wait_streaming_job_finished: {id:?}");
873 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
874 if mgr.streaming_job_is_finished(id).await? {
875 return Ok(self.catalog_controller.current_notification_version().await);
876 }
877 let (tx, rx) = oneshot::channel();
878
879 mgr.register_finish_notifier(database_id.database_id as _, id, tx);
880 drop(mgr);
881 rx.await
882 .map_err(|_| "no received reason".to_owned())
883 .and_then(|result| result)
884 .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
885 }
886
887 pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
888 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
889 mgr.notify_finish_failed(database_id.map(|id| id.database_id as _), err);
890 }
891
892 pub(crate) async fn notify_cancelled(&self, database_id: DatabaseId, id: ObjectId) {
893 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
894 mgr.notify_cancelled(database_id.database_id as _, id);
895 }
896}