1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
21use risingwave_meta_model::{ObjectId, SinkId, SourceId, WorkerId};
22use risingwave_pb::catalog::{PbSink, PbSource, PbTable};
23use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
24use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
25use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
26use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode, PbStreamScanType};
27use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
28use tokio::sync::oneshot;
29use tracing::warn;
30
31use crate::MetaResult;
32use crate::barrier::SharedFragmentInfo;
33use crate::controller::catalog::CatalogControllerRef;
34use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
35use crate::controller::fragment::FragmentParallelismInfo;
36use crate::manager::{LocalNotification, NotificationVersion};
37use crate::model::{ActorId, ClusterId, FragmentId, StreamJobFragments, SubscriptionId};
38use crate::stream::SplitAssignment;
39use crate::telemetry::MetaTelemetryJobDesc;
40
41#[derive(Clone)]
42pub struct MetadataManager {
43 pub cluster_controller: ClusterControllerRef,
44 pub catalog_controller: CatalogControllerRef,
45}
46
47#[derive(Debug)]
48pub(crate) enum ActiveStreamingWorkerChange {
49 Add(WorkerNode),
50 Remove(WorkerNode),
51 Update(WorkerNode),
52}
53
54pub struct ActiveStreamingWorkerNodes {
55 worker_nodes: HashMap<WorkerId, WorkerNode>,
56 rx: UnboundedReceiver<LocalNotification>,
57 #[cfg_attr(not(debug_assertions), expect(dead_code))]
58 meta_manager: Option<MetadataManager>,
59}
60
61impl Debug for ActiveStreamingWorkerNodes {
62 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63 f.debug_struct("ActiveStreamingWorkerNodes")
64 .field("worker_nodes", &self.worker_nodes)
65 .finish()
66 }
67}
68
69impl ActiveStreamingWorkerNodes {
70 pub(crate) fn uninitialized() -> Self {
71 Self {
72 worker_nodes: Default::default(),
73 rx: unbounded_channel().1,
74 meta_manager: None,
75 }
76 }
77
78 #[cfg(test)]
79 pub(crate) fn for_test(worker_nodes: HashMap<WorkerId, WorkerNode>) -> Self {
80 let (tx, rx) = unbounded_channel();
81 let _join_handle = tokio::spawn(async move {
82 let _tx = tx;
83 std::future::pending::<()>().await
84 });
85 Self {
86 worker_nodes,
87 rx,
88 meta_manager: None,
89 }
90 }
91
92 pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
94 let (nodes, rx) = meta_manager
95 .subscribe_active_streaming_compute_nodes()
96 .await?;
97 Ok(Self {
98 worker_nodes: nodes.into_iter().map(|node| (node.id as _, node)).collect(),
99 rx,
100 meta_manager: Some(meta_manager),
101 })
102 }
103
104 pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
105 &self.worker_nodes
106 }
107
108 pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
109 loop {
110 let notification = self
111 .rx
112 .recv()
113 .await
114 .expect("notification stopped or uninitialized");
115 match notification {
116 LocalNotification::WorkerNodeDeleted(worker) => {
117 let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32
118 && worker.property.as_ref().unwrap().is_streaming;
119 let Some(prev_worker) = self.worker_nodes.remove(&(worker.id as _)) else {
120 if is_streaming_compute_node {
121 warn!(
122 ?worker,
123 "notify to delete an non-existing streaming compute worker"
124 );
125 }
126 continue;
127 };
128 if !is_streaming_compute_node {
129 warn!(
130 ?worker,
131 ?prev_worker,
132 "deleted worker has a different recent type"
133 );
134 }
135 if worker.state == State::Starting as i32 {
136 warn!(
137 id = worker.id,
138 host = ?worker.host,
139 state = worker.state,
140 "a starting streaming worker is deleted"
141 );
142 }
143 break ActiveStreamingWorkerChange::Remove(prev_worker);
144 }
145 LocalNotification::WorkerNodeActivated(worker) => {
146 if worker.r#type != WorkerType::ComputeNode as i32
147 || !worker.property.as_ref().unwrap().is_streaming
148 {
149 if let Some(prev_worker) = self.worker_nodes.remove(&(worker.id as _)) {
150 warn!(
151 ?worker,
152 ?prev_worker,
153 "the type of a streaming worker is changed"
154 );
155 break ActiveStreamingWorkerChange::Remove(prev_worker);
156 } else {
157 continue;
158 }
159 }
160 assert_eq!(
161 worker.state,
162 State::Running as i32,
163 "not started worker added: {:?}",
164 worker
165 );
166 if let Some(prev_worker) =
167 self.worker_nodes.insert(worker.id as _, worker.clone())
168 {
169 assert_eq!(prev_worker.host, worker.host);
170 assert_eq!(prev_worker.r#type, worker.r#type);
171 warn!(
172 ?prev_worker,
173 ?worker,
174 eq = prev_worker == worker,
175 "notify to update an existing active worker"
176 );
177 if prev_worker == worker {
178 continue;
179 } else {
180 break ActiveStreamingWorkerChange::Update(worker);
181 }
182 } else {
183 break ActiveStreamingWorkerChange::Add(worker);
184 }
185 }
186 _ => {
187 continue;
188 }
189 }
190 }
191 }
192
193 #[cfg(debug_assertions)]
194 pub(crate) async fn validate_change(&self) {
195 use risingwave_pb::common::WorkerNode;
196 use thiserror_ext::AsReport;
197 let Some(meta_manager) = &self.meta_manager else {
198 return;
199 };
200 match meta_manager.list_active_streaming_compute_nodes().await {
201 Ok(worker_nodes) => {
202 let ignore_irrelevant_info = |node: &WorkerNode| {
203 (
204 node.id,
205 WorkerNode {
206 id: node.id,
207 r#type: node.r#type,
208 host: node.host.clone(),
209 property: node.property.clone(),
210 resource: node.resource.clone(),
211 ..Default::default()
212 },
213 )
214 };
215 let worker_nodes: HashMap<_, _> =
216 worker_nodes.iter().map(ignore_irrelevant_info).collect();
217 let curr_worker_nodes: HashMap<_, _> = self
218 .current()
219 .values()
220 .map(ignore_irrelevant_info)
221 .collect();
222 if worker_nodes != curr_worker_nodes {
223 warn!(
224 ?worker_nodes,
225 ?curr_worker_nodes,
226 "different to global snapshot"
227 );
228 }
229 }
230 Err(e) => {
231 warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
232 }
233 }
234 }
235}
236
237impl MetadataManager {
238 pub fn new(
239 cluster_controller: ClusterControllerRef,
240 catalog_controller: CatalogControllerRef,
241 ) -> Self {
242 Self {
243 cluster_controller,
244 catalog_controller,
245 }
246 }
247
248 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
249 self.cluster_controller.get_worker_by_id(worker_id).await
250 }
251
252 pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
253 let node_map = self.cluster_controller.count_worker_by_type().await?;
254 Ok(node_map
255 .into_iter()
256 .map(|(ty, cnt)| (ty.into(), cnt as u64))
257 .collect())
258 }
259
260 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
261 self.cluster_controller
262 .get_worker_info_by_id(worker_id as _)
263 .await
264 }
265
266 pub async fn add_worker_node(
267 &self,
268 r#type: PbWorkerType,
269 host_address: HostAddress,
270 property: AddNodeProperty,
271 resource: PbResource,
272 ) -> MetaResult<WorkerId> {
273 self.cluster_controller
274 .add_worker(r#type, host_address, property, resource)
275 .await
276 .map(|id| id as WorkerId)
277 }
278
279 pub async fn list_worker_node(
280 &self,
281 worker_type: Option<WorkerType>,
282 worker_state: Option<State>,
283 ) -> MetaResult<Vec<PbWorkerNode>> {
284 self.cluster_controller
285 .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
286 .await
287 }
288
289 pub async fn subscribe_active_streaming_compute_nodes(
290 &self,
291 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
292 self.cluster_controller
293 .subscribe_active_streaming_compute_nodes()
294 .await
295 }
296
297 pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
298 self.cluster_controller
299 .list_active_streaming_workers()
300 .await
301 }
302
303 pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
304 self.cluster_controller.list_active_serving_workers().await
305 }
306
307 pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
308 Ok(self
309 .catalog_controller
310 .list_fragment_database_ids(None)
311 .await?
312 .into_iter()
313 .map(|(_, database_id)| DatabaseId::new(database_id as _))
314 .collect())
315 }
316
317 pub async fn split_fragment_map_by_database<T: Debug>(
318 &self,
319 fragment_map: HashMap<FragmentId, T>,
320 ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
321 let fragment_to_database_map: HashMap<_, _> = self
322 .catalog_controller
323 .list_fragment_database_ids(Some(
324 fragment_map
325 .keys()
326 .map(|fragment_id| *fragment_id as _)
327 .collect(),
328 ))
329 .await?
330 .into_iter()
331 .map(|(fragment_id, database_id)| {
332 (fragment_id as FragmentId, DatabaseId::new(database_id as _))
333 })
334 .collect();
335 let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
336 for (fragment_id, value) in fragment_map {
337 let database_id = *fragment_to_database_map
338 .get(&fragment_id)
339 .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
340 ret.entry(database_id)
341 .or_default()
342 .try_insert(fragment_id, value)
343 .expect("non duplicate");
344 }
345 Ok(ret)
346 }
347
348 pub async fn list_background_creating_jobs(&self) -> MetaResult<Vec<ObjectId>> {
349 let jobs = self
350 .catalog_controller
351 .list_background_creating_jobs(false, None)
352 .await?;
353
354 let jobs = jobs.into_iter().map(|(id, _, _)| id).collect();
355
356 Ok(jobs)
357 }
358
359 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
360 self.catalog_controller.list_sources().await
361 }
362
363 pub fn running_fragment_parallelisms(
364 &self,
365 id_filter: Option<HashSet<FragmentId>>,
366 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
367 let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
368 Ok(self
369 .catalog_controller
370 .running_fragment_parallelisms(id_filter)?
371 .into_iter()
372 .map(|(k, v)| (k as FragmentId, v))
373 .collect())
374 }
375
376 pub async fn get_upstream_root_fragments(
381 &self,
382 upstream_table_ids: &HashSet<TableId>,
383 ) -> MetaResult<(
384 HashMap<TableId, (SharedFragmentInfo, PbStreamNode)>,
385 HashMap<ActorId, WorkerId>,
386 )> {
387 let (upstream_root_fragments, actors) = self
388 .catalog_controller
389 .get_root_fragments(
390 upstream_table_ids
391 .iter()
392 .map(|id| id.table_id as _)
393 .collect(),
394 )
395 .await?;
396
397 let actors = actors
398 .into_iter()
399 .map(|(actor, worker)| (actor as u32, worker))
400 .collect();
401
402 Ok((
403 upstream_root_fragments
404 .into_iter()
405 .map(|(id, fragment)| ((id as u32).into(), fragment))
406 .collect(),
407 actors,
408 ))
409 }
410
411 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
412 self.cluster_controller.get_streaming_cluster_info().await
413 }
414
415 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<u32, TableOption>> {
416 self.catalog_controller
417 .get_all_table_options()
418 .await
419 .map(|tops| tops.into_iter().map(|(id, opt)| (id as u32, opt)).collect())
420 }
421
422 pub async fn get_table_name_type_mapping(&self) -> MetaResult<HashMap<u32, (String, String)>> {
423 let mappings = self
424 .catalog_controller
425 .get_table_name_type_mapping()
426 .await?;
427 Ok(mappings
428 .into_iter()
429 .map(|(id, value)| (id as u32, value))
430 .collect())
431 }
432
433 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<u32>> {
434 let table_ids = self.catalog_controller.get_created_table_ids().await?;
435 Ok(table_ids.into_iter().map(|id| id as u32).collect())
436 }
437
438 pub async fn get_table_associated_source_id(
439 &self,
440 table_id: u32,
441 ) -> MetaResult<Option<SourceId>> {
442 self.catalog_controller
443 .get_table_associated_source_id(table_id as _)
444 .await
445 }
446
447 pub async fn get_table_catalog_by_ids(&self, ids: &[u32]) -> MetaResult<Vec<PbTable>> {
448 self.catalog_controller
449 .get_table_by_ids(ids.iter().map(|id| *id as _).collect(), false)
450 .await
451 }
452
453 pub async fn get_table_incoming_sinks(&self, table_id: u32) -> MetaResult<Vec<PbSink>> {
454 self.catalog_controller
455 .get_table_incoming_sinks(table_id as _)
456 .await
457 }
458
459 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
460 Ok(self
461 .catalog_controller
462 .get_sink_state_table_ids(sink_id)
463 .await?
464 .into_iter()
465 .map(|id| (id as u32).into())
466 .collect())
467 }
468
469 pub async fn get_table_catalog_by_cdc_table_id(
470 &self,
471 cdc_table_id: &String,
472 ) -> MetaResult<Vec<PbTable>> {
473 self.catalog_controller
474 .get_table_by_cdc_table_id(cdc_table_id)
475 .await
476 }
477
478 pub async fn get_downstream_fragments(
479 &self,
480 job_id: u32,
481 ) -> MetaResult<(
482 Vec<(PbDispatcherType, SharedFragmentInfo, PbStreamNode)>,
483 HashMap<ActorId, WorkerId>,
484 )> {
485 let (fragments, actors) = self
486 .catalog_controller
487 .get_downstream_fragments(job_id as _)
488 .await?;
489
490 let actors = actors
491 .into_iter()
492 .map(|(actor, worker)| (actor as u32, worker))
493 .collect();
494
495 Ok((fragments, actors))
496 }
497
498 pub async fn get_job_id_to_internal_table_ids_mapping(&self) -> Option<Vec<(u32, Vec<u32>)>> {
499 let job_internal_table_ids = self.catalog_controller.get_job_internal_table_ids().await;
500 job_internal_table_ids.map(|ids| {
501 ids.into_iter()
502 .map(|(id, internal_ids)| {
503 (
504 id as u32,
505 internal_ids.into_iter().map(|id| id as u32).collect(),
506 )
507 })
508 .collect()
509 })
510 }
511
512 pub async fn get_job_fragments_by_id(
513 &self,
514 job_id: &TableId,
515 ) -> MetaResult<StreamJobFragments> {
516 self.catalog_controller
517 .get_job_fragments_by_id(job_id.table_id as _)
518 .await
519 }
520
521 pub fn get_running_actors_of_fragment(&self, id: FragmentId) -> MetaResult<HashSet<ActorId>> {
522 self.catalog_controller
523 .get_running_actors_of_fragment(id as _)
524 }
525
526 pub async fn get_running_actors_for_source_backfill(
528 &self,
529 source_backfill_fragment_id: FragmentId,
530 source_fragment_id: FragmentId,
531 ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
532 let actor_ids = self
533 .catalog_controller
534 .get_running_actors_for_source_backfill(
535 source_backfill_fragment_id as _,
536 source_fragment_id as _,
537 )
538 .await?;
539 Ok(actor_ids
540 .into_iter()
541 .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
542 .collect())
543 }
544
545 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
546 let actor_cnt = self.catalog_controller.worker_actor_count()?;
547 Ok(actor_cnt
548 .into_iter()
549 .map(|(id, cnt)| (id as WorkerId, cnt))
550 .collect())
551 }
552
553 pub async fn count_streaming_job(&self) -> MetaResult<usize> {
554 self.catalog_controller
555 .list_streaming_job_infos()
556 .await
557 .map(|x| x.len())
558 }
559
560 pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
561 self.catalog_controller
562 .list_stream_job_desc_for_telemetry()
563 .await
564 }
565
566 pub async fn update_source_rate_limit_by_source_id(
567 &self,
568 source_id: SourceId,
569 rate_limit: Option<u32>,
570 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
571 let fragment_actors = self
572 .catalog_controller
573 .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
574 .await?;
575 Ok(fragment_actors
576 .into_iter()
577 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
578 .collect())
579 }
580
581 pub async fn update_backfill_rate_limit_by_table_id(
582 &self,
583 table_id: TableId,
584 rate_limit: Option<u32>,
585 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
586 let fragment_actors = self
587 .catalog_controller
588 .update_backfill_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
589 .await?;
590 Ok(fragment_actors
591 .into_iter()
592 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
593 .collect())
594 }
595
596 pub async fn update_sink_rate_limit_by_sink_id(
597 &self,
598 sink_id: SinkId,
599 rate_limit: Option<u32>,
600 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
601 let fragment_actors = self
602 .catalog_controller
603 .update_sink_rate_limit_by_job_id(sink_id as _, rate_limit)
604 .await?;
605 Ok(fragment_actors
606 .into_iter()
607 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
608 .collect())
609 }
610
611 pub async fn update_dml_rate_limit_by_table_id(
612 &self,
613 table_id: TableId,
614 rate_limit: Option<u32>,
615 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
616 let fragment_actors = self
617 .catalog_controller
618 .update_dml_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
619 .await?;
620 Ok(fragment_actors
621 .into_iter()
622 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
623 .collect())
624 }
625
626 pub async fn update_sink_props_by_sink_id(
627 &self,
628 sink_id: SinkId,
629 props: BTreeMap<String, String>,
630 ) -> MetaResult<HashMap<String, String>> {
631 let new_props = self
632 .catalog_controller
633 .update_sink_props_by_sink_id(sink_id, props)
634 .await?;
635 Ok(new_props)
636 }
637
638 pub async fn update_iceberg_table_props_by_table_id(
639 &self,
640 table_id: TableId,
641 props: BTreeMap<String, String>,
642 alter_iceberg_table_props: Option<
643 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
644 >,
645 ) -> MetaResult<(HashMap<String, String>, u32)> {
646 let (new_props, sink_id) = self
647 .catalog_controller
648 .update_iceberg_table_props_by_table_id(
649 table_id.table_id as _,
650 props,
651 alter_iceberg_table_props,
652 )
653 .await?;
654 Ok((new_props, sink_id))
655 }
656
657 pub async fn update_fragment_rate_limit_by_fragment_id(
658 &self,
659 fragment_id: FragmentId,
660 rate_limit: Option<u32>,
661 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
662 let fragment_actors = self
663 .catalog_controller
664 .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
665 .await?;
666 Ok(fragment_actors
667 .into_iter()
668 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
669 .collect())
670 }
671
672 #[await_tree::instrument]
673 pub async fn update_fragment_splits(
674 &self,
675 split_assignment: &SplitAssignment,
676 ) -> MetaResult<()> {
677 let fragment_splits = split_assignment
678 .iter()
679 .map(|(fragment_id, splits)| {
680 (
681 *fragment_id as _,
682 splits.values().flatten().cloned().collect_vec(),
683 )
684 })
685 .collect();
686
687 let inner = self.catalog_controller.inner.write().await;
688
689 self.catalog_controller
690 .update_fragment_splits(&inner.db, &fragment_splits)
691 .await
692 }
693
694 pub async fn get_mv_depended_subscriptions(
695 &self,
696 database_id: Option<DatabaseId>,
697 ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
698 let database_id = database_id.map(|database_id| database_id.database_id as _);
699 Ok(self
700 .catalog_controller
701 .get_mv_depended_subscriptions(database_id)
702 .await?
703 .into_iter()
704 .map(|(table_id, subscriptions)| {
705 (
706 TableId::new(table_id as _),
707 subscriptions
708 .into_iter()
709 .map(|(subscription_id, retention_time)| {
710 (subscription_id as SubscriptionId, retention_time)
711 })
712 .collect(),
713 )
714 })
715 .collect())
716 }
717
718 pub async fn get_job_max_parallelism(&self, table_id: TableId) -> MetaResult<usize> {
719 self.catalog_controller
720 .get_max_parallelism_by_id(table_id.table_id as _)
721 .await
722 }
723
724 pub async fn get_existing_job_resource_group(
725 &self,
726 streaming_job_id: ObjectId,
727 ) -> MetaResult<String> {
728 self.catalog_controller
729 .get_existing_job_resource_group(streaming_job_id)
730 .await
731 }
732
733 pub async fn get_database_resource_group(&self, database_id: ObjectId) -> MetaResult<String> {
734 self.catalog_controller
735 .get_database_resource_group(database_id)
736 .await
737 }
738
739 pub fn cluster_id(&self) -> &ClusterId {
740 self.cluster_controller.cluster_id()
741 }
742
743 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
744 let rate_limits = self.catalog_controller.list_rate_limits().await?;
745 Ok(rate_limits)
746 }
747
748 pub async fn get_job_backfill_scan_types(
749 &self,
750 job_id: ObjectId,
751 ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
752 let backfill_types = self
753 .catalog_controller
754 .get_job_fragment_backfill_scan_type(job_id)
755 .await?;
756 Ok(backfill_types)
757 }
758}
759
760impl MetadataManager {
761 #[await_tree::instrument]
764 pub(crate) async fn wait_streaming_job_finished(
765 &self,
766 database_id: DatabaseId,
767 id: ObjectId,
768 ) -> MetaResult<NotificationVersion> {
769 tracing::debug!("wait_streaming_job_finished: {id:?}");
770 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
771 if mgr.streaming_job_is_finished(id).await? {
772 return Ok(self.catalog_controller.current_notification_version().await);
773 }
774 let (tx, rx) = oneshot::channel();
775
776 mgr.register_finish_notifier(database_id.database_id as _, id, tx);
777 drop(mgr);
778 rx.await
779 .map_err(|_| "no received reason".to_owned())
780 .and_then(|result| result)
781 .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
782 }
783
784 pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
785 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
786 mgr.notify_finish_failed(database_id.map(|id| id.database_id as _), err);
787 }
788
789 pub(crate) async fn notify_cancelled(&self, database_id: DatabaseId, id: ObjectId) {
790 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
791 mgr.notify_cancelled(database_id.database_id as _, id);
792 }
793}