1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
21use risingwave_common::id::JobId;
22use risingwave_meta_model::refresh_job::{self, RefreshState};
23use risingwave_meta_model::{SinkId, SourceId, WorkerId};
24use risingwave_pb::catalog::{PbSink, PbSource, PbTable};
25use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
26use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
27use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
28use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode, PbStreamScanType};
29use sea_orm::prelude::DateTime;
30use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
31use tokio::sync::oneshot;
32use tracing::warn;
33
34use crate::MetaResult;
35use crate::barrier::SharedFragmentInfo;
36use crate::controller::catalog::CatalogControllerRef;
37use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
38use crate::controller::fragment::FragmentParallelismInfo;
39use crate::manager::{LocalNotification, NotificationVersion};
40use crate::model::{ActorId, ClusterId, FragmentId, StreamJobFragments, SubscriptionId};
41use crate::stream::SplitAssignment;
42use crate::telemetry::MetaTelemetryJobDesc;
43
44#[derive(Clone)]
45pub struct MetadataManager {
46 pub cluster_controller: ClusterControllerRef,
47 pub catalog_controller: CatalogControllerRef,
48}
49
50#[derive(Debug)]
51pub(crate) enum ActiveStreamingWorkerChange {
52 Add(WorkerNode),
53 Remove(WorkerNode),
54 Update(WorkerNode),
55}
56
57pub struct ActiveStreamingWorkerNodes {
58 worker_nodes: HashMap<WorkerId, WorkerNode>,
59 rx: UnboundedReceiver<LocalNotification>,
60 #[cfg_attr(not(debug_assertions), expect(dead_code))]
61 meta_manager: Option<MetadataManager>,
62}
63
64impl Debug for ActiveStreamingWorkerNodes {
65 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
66 f.debug_struct("ActiveStreamingWorkerNodes")
67 .field("worker_nodes", &self.worker_nodes)
68 .finish()
69 }
70}
71
72impl ActiveStreamingWorkerNodes {
73 pub(crate) fn uninitialized() -> Self {
74 Self {
75 worker_nodes: Default::default(),
76 rx: unbounded_channel().1,
77 meta_manager: None,
78 }
79 }
80
81 #[cfg(test)]
82 pub(crate) fn for_test(worker_nodes: HashMap<WorkerId, WorkerNode>) -> Self {
83 let (tx, rx) = unbounded_channel();
84 let _join_handle = tokio::spawn(async move {
85 let _tx = tx;
86 std::future::pending::<()>().await
87 });
88 Self {
89 worker_nodes,
90 rx,
91 meta_manager: None,
92 }
93 }
94
95 pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
97 let (nodes, rx) = meta_manager
98 .subscribe_active_streaming_compute_nodes()
99 .await?;
100 Ok(Self {
101 worker_nodes: nodes.into_iter().map(|node| (node.id, node)).collect(),
102 rx,
103 meta_manager: Some(meta_manager),
104 })
105 }
106
107 pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
108 &self.worker_nodes
109 }
110
111 pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
112 loop {
113 let notification = self
114 .rx
115 .recv()
116 .await
117 .expect("notification stopped or uninitialized");
118 match notification {
119 LocalNotification::WorkerNodeDeleted(worker) => {
120 let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32
121 && worker.property.as_ref().unwrap().is_streaming;
122 let Some(prev_worker) = self.worker_nodes.remove(&worker.id) else {
123 if is_streaming_compute_node {
124 warn!(
125 ?worker,
126 "notify to delete an non-existing streaming compute worker"
127 );
128 }
129 continue;
130 };
131 if !is_streaming_compute_node {
132 warn!(
133 ?worker,
134 ?prev_worker,
135 "deleted worker has a different recent type"
136 );
137 }
138 if worker.state == State::Starting as i32 {
139 warn!(
140 id = %worker.id,
141 host = ?worker.host,
142 state = worker.state,
143 "a starting streaming worker is deleted"
144 );
145 }
146 break ActiveStreamingWorkerChange::Remove(prev_worker);
147 }
148 LocalNotification::WorkerNodeActivated(worker) => {
149 if worker.r#type != WorkerType::ComputeNode as i32
150 || !worker.property.as_ref().unwrap().is_streaming
151 {
152 if let Some(prev_worker) = self.worker_nodes.remove(&worker.id) {
153 warn!(
154 ?worker,
155 ?prev_worker,
156 "the type of a streaming worker is changed"
157 );
158 break ActiveStreamingWorkerChange::Remove(prev_worker);
159 } else {
160 continue;
161 }
162 }
163 assert_eq!(
164 worker.state,
165 State::Running as i32,
166 "not started worker added: {:?}",
167 worker
168 );
169 if let Some(prev_worker) = self.worker_nodes.insert(worker.id, worker.clone()) {
170 assert_eq!(prev_worker.host, worker.host);
171 assert_eq!(prev_worker.r#type, worker.r#type);
172 warn!(
173 ?prev_worker,
174 ?worker,
175 eq = prev_worker == worker,
176 "notify to update an existing active worker"
177 );
178 if prev_worker == worker {
179 continue;
180 } else {
181 break ActiveStreamingWorkerChange::Update(worker);
182 }
183 } else {
184 break ActiveStreamingWorkerChange::Add(worker);
185 }
186 }
187 _ => {
188 continue;
189 }
190 }
191 }
192 }
193
194 #[cfg(debug_assertions)]
195 pub(crate) async fn validate_change(&self) {
196 use risingwave_pb::common::WorkerNode;
197 use thiserror_ext::AsReport;
198 let Some(meta_manager) = &self.meta_manager else {
199 return;
200 };
201 match meta_manager.list_active_streaming_compute_nodes().await {
202 Ok(worker_nodes) => {
203 let ignore_irrelevant_info = |node: &WorkerNode| {
204 (
205 node.id,
206 WorkerNode {
207 id: node.id,
208 r#type: node.r#type,
209 host: node.host.clone(),
210 property: node.property.clone(),
211 resource: node.resource.clone(),
212 ..Default::default()
213 },
214 )
215 };
216 let worker_nodes: HashMap<_, _> =
217 worker_nodes.iter().map(ignore_irrelevant_info).collect();
218 let curr_worker_nodes: HashMap<_, _> = self
219 .current()
220 .values()
221 .map(ignore_irrelevant_info)
222 .collect();
223 if worker_nodes != curr_worker_nodes {
224 warn!(
225 ?worker_nodes,
226 ?curr_worker_nodes,
227 "different to global snapshot"
228 );
229 }
230 }
231 Err(e) => {
232 warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
233 }
234 }
235 }
236}
237
238impl MetadataManager {
239 pub fn new(
240 cluster_controller: ClusterControllerRef,
241 catalog_controller: CatalogControllerRef,
242 ) -> Self {
243 Self {
244 cluster_controller,
245 catalog_controller,
246 }
247 }
248
249 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
250 self.cluster_controller.get_worker_by_id(worker_id).await
251 }
252
253 pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
254 let node_map = self.cluster_controller.count_worker_by_type().await?;
255 Ok(node_map
256 .into_iter()
257 .map(|(ty, cnt)| (ty.into(), cnt as u64))
258 .collect())
259 }
260
261 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
262 self.cluster_controller
263 .get_worker_info_by_id(worker_id as _)
264 .await
265 }
266
267 pub async fn add_worker_node(
268 &self,
269 r#type: PbWorkerType,
270 host_address: HostAddress,
271 property: AddNodeProperty,
272 resource: PbResource,
273 ) -> MetaResult<WorkerId> {
274 self.cluster_controller
275 .add_worker(r#type, host_address, property, resource)
276 .await
277 .map(|id| id as WorkerId)
278 }
279
280 pub async fn list_worker_node(
281 &self,
282 worker_type: Option<WorkerType>,
283 worker_state: Option<State>,
284 ) -> MetaResult<Vec<PbWorkerNode>> {
285 self.cluster_controller
286 .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
287 .await
288 }
289
290 pub async fn subscribe_active_streaming_compute_nodes(
291 &self,
292 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
293 self.cluster_controller
294 .subscribe_active_streaming_compute_nodes()
295 .await
296 }
297
298 pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
299 self.cluster_controller
300 .list_active_streaming_workers()
301 .await
302 }
303
304 pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
305 self.cluster_controller.list_active_serving_workers().await
306 }
307
308 pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
309 Ok(self
310 .catalog_controller
311 .list_fragment_database_ids(None)
312 .await?
313 .into_iter()
314 .map(|(_, database_id)| database_id)
315 .collect())
316 }
317
318 pub async fn split_fragment_map_by_database<T: Debug>(
319 &self,
320 fragment_map: HashMap<FragmentId, T>,
321 ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
322 let fragment_to_database_map: HashMap<_, _> = self
323 .catalog_controller
324 .list_fragment_database_ids(Some(
325 fragment_map
326 .keys()
327 .map(|fragment_id| *fragment_id as _)
328 .collect(),
329 ))
330 .await?
331 .into_iter()
332 .map(|(fragment_id, database_id)| (fragment_id as FragmentId, database_id))
333 .collect();
334 let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
335 for (fragment_id, value) in fragment_map {
336 let database_id = *fragment_to_database_map
337 .get(&fragment_id)
338 .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
339 ret.entry(database_id)
340 .or_default()
341 .try_insert(fragment_id, value)
342 .expect("non duplicate");
343 }
344 Ok(ret)
345 }
346
347 pub async fn list_background_creating_jobs(&self) -> MetaResult<Vec<JobId>> {
348 let jobs = self
349 .catalog_controller
350 .list_background_creating_jobs(false, None)
351 .await?;
352
353 let jobs = jobs.into_iter().map(|(id, _, _)| id).collect();
354
355 Ok(jobs)
356 }
357
358 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
359 self.catalog_controller.list_sources().await
360 }
361
362 pub fn running_fragment_parallelisms(
363 &self,
364 id_filter: Option<HashSet<FragmentId>>,
365 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
366 let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
367 Ok(self
368 .catalog_controller
369 .running_fragment_parallelisms(id_filter)?
370 .into_iter()
371 .map(|(k, v)| (k as FragmentId, v))
372 .collect())
373 }
374
375 pub async fn get_upstream_root_fragments(
380 &self,
381 upstream_table_ids: &HashSet<TableId>,
382 ) -> MetaResult<(
383 HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
384 HashMap<ActorId, WorkerId>,
385 )> {
386 let (upstream_root_fragments, actors) = self
387 .catalog_controller
388 .get_root_fragments(upstream_table_ids.iter().map(|id| id.as_job_id()).collect())
389 .await?;
390
391 Ok((upstream_root_fragments, actors))
392 }
393
394 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
395 self.cluster_controller.get_streaming_cluster_info().await
396 }
397
398 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
399 self.catalog_controller.get_all_table_options().await
400 }
401
402 pub async fn get_table_name_type_mapping(
403 &self,
404 ) -> MetaResult<HashMap<TableId, (String, String)>> {
405 self.catalog_controller.get_table_name_type_mapping().await
406 }
407
408 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
409 self.catalog_controller.get_created_table_ids().await
410 }
411
412 pub async fn get_table_associated_source_id(
413 &self,
414 table_id: TableId,
415 ) -> MetaResult<Option<SourceId>> {
416 self.catalog_controller
417 .get_table_associated_source_id(table_id)
418 .await
419 }
420
421 pub async fn get_table_catalog_by_ids(&self, ids: &[TableId]) -> MetaResult<Vec<PbTable>> {
422 self.catalog_controller
423 .get_table_by_ids(ids.to_vec(), false)
424 .await
425 }
426
427 pub async fn get_table_incoming_sinks(&self, table_id: TableId) -> MetaResult<Vec<PbSink>> {
428 self.catalog_controller
429 .get_table_incoming_sinks(table_id)
430 .await
431 }
432
433 pub async fn list_refresh_jobs(&self) -> MetaResult<Vec<refresh_job::Model>> {
434 self.catalog_controller.list_refresh_jobs().await
435 }
436
437 pub async fn list_refreshable_table_ids(&self) -> MetaResult<Vec<TableId>> {
438 self.catalog_controller.list_refreshable_table_ids().await
439 }
440
441 pub async fn ensure_refresh_job(&self, table_id: TableId) -> MetaResult<()> {
442 self.catalog_controller.ensure_refresh_job(table_id).await
443 }
444
445 pub async fn update_refresh_job_status(
446 &self,
447 table_id: TableId,
448 status: RefreshState,
449 trigger_time: Option<DateTime>,
450 ) -> MetaResult<()> {
451 self.catalog_controller
452 .update_refresh_job_status(table_id, status, trigger_time)
453 .await
454 }
455
456 pub async fn reset_all_refresh_jobs_to_idle(&self) -> MetaResult<()> {
457 self.catalog_controller
458 .reset_all_refresh_jobs_to_idle()
459 .await
460 }
461
462 pub async fn update_refresh_job_interval(
463 &self,
464 table_id: TableId,
465 trigger_interval_secs: Option<i64>,
466 ) -> MetaResult<()> {
467 self.catalog_controller
468 .update_refresh_job_interval(table_id, trigger_interval_secs)
469 .await
470 }
471
472 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
473 self.catalog_controller
474 .get_sink_state_table_ids(sink_id)
475 .await
476 }
477
478 pub async fn get_table_catalog_by_cdc_table_id(
479 &self,
480 cdc_table_id: &String,
481 ) -> MetaResult<Vec<PbTable>> {
482 self.catalog_controller
483 .get_table_by_cdc_table_id(cdc_table_id)
484 .await
485 }
486
487 pub async fn get_downstream_fragments(
488 &self,
489 job_id: JobId,
490 ) -> MetaResult<(
491 Vec<(PbDispatcherType, SharedFragmentInfo, PbStreamNode)>,
492 HashMap<ActorId, WorkerId>,
493 )> {
494 let (fragments, actors) = self
495 .catalog_controller
496 .get_downstream_fragments(job_id)
497 .await?;
498
499 Ok((fragments, actors))
500 }
501
502 pub async fn get_job_id_to_internal_table_ids_mapping(
503 &self,
504 ) -> Option<Vec<(JobId, Vec<TableId>)>> {
505 self.catalog_controller.get_job_internal_table_ids().await
506 }
507
508 pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
509 self.catalog_controller
510 .get_job_fragments_by_id(job_id)
511 .await
512 }
513
514 pub fn get_running_actors_of_fragment(&self, id: FragmentId) -> MetaResult<HashSet<ActorId>> {
515 self.catalog_controller
516 .get_running_actors_of_fragment(id as _)
517 }
518
519 pub async fn get_running_actors_for_source_backfill(
521 &self,
522 source_backfill_fragment_id: FragmentId,
523 source_fragment_id: FragmentId,
524 ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
525 let actor_ids = self
526 .catalog_controller
527 .get_running_actors_for_source_backfill(
528 source_backfill_fragment_id as _,
529 source_fragment_id as _,
530 )
531 .await?;
532 Ok(actor_ids
533 .into_iter()
534 .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
535 .collect())
536 }
537
538 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
539 let actor_cnt = self.catalog_controller.worker_actor_count()?;
540 Ok(actor_cnt
541 .into_iter()
542 .map(|(id, cnt)| (id as WorkerId, cnt))
543 .collect())
544 }
545
546 pub async fn count_streaming_job(&self) -> MetaResult<usize> {
547 self.catalog_controller
548 .list_streaming_job_infos()
549 .await
550 .map(|x| x.len())
551 }
552
553 pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
554 self.catalog_controller
555 .list_stream_job_desc_for_telemetry()
556 .await
557 }
558
559 pub async fn update_source_rate_limit_by_source_id(
560 &self,
561 source_id: SourceId,
562 rate_limit: Option<u32>,
563 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
564 let fragment_actors = self
565 .catalog_controller
566 .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
567 .await?;
568 Ok(fragment_actors
569 .into_iter()
570 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
571 .collect())
572 }
573
574 pub async fn update_backfill_rate_limit_by_job_id(
575 &self,
576 job_id: JobId,
577 rate_limit: Option<u32>,
578 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
579 let fragment_actors = self
580 .catalog_controller
581 .update_backfill_rate_limit_by_job_id(job_id, rate_limit)
582 .await?;
583 Ok(fragment_actors
584 .into_iter()
585 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
586 .collect())
587 }
588
589 pub async fn update_sink_rate_limit_by_sink_id(
590 &self,
591 sink_id: SinkId,
592 rate_limit: Option<u32>,
593 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
594 let fragment_actors = self
595 .catalog_controller
596 .update_sink_rate_limit_by_job_id(sink_id, rate_limit)
597 .await?;
598 Ok(fragment_actors
599 .into_iter()
600 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
601 .collect())
602 }
603
604 pub async fn update_dml_rate_limit_by_job_id(
605 &self,
606 job_id: JobId,
607 rate_limit: Option<u32>,
608 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
609 let fragment_actors = self
610 .catalog_controller
611 .update_dml_rate_limit_by_job_id(job_id, rate_limit)
612 .await?;
613 Ok(fragment_actors
614 .into_iter()
615 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
616 .collect())
617 }
618
619 pub async fn update_sink_props_by_sink_id(
620 &self,
621 sink_id: SinkId,
622 props: BTreeMap<String, String>,
623 ) -> MetaResult<HashMap<String, String>> {
624 let new_props = self
625 .catalog_controller
626 .update_sink_props_by_sink_id(sink_id, props)
627 .await?;
628 Ok(new_props)
629 }
630
631 pub async fn update_iceberg_table_props_by_table_id(
632 &self,
633 table_id: TableId,
634 props: BTreeMap<String, String>,
635 alter_iceberg_table_props: Option<
636 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
637 >,
638 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
639 let (new_props, sink_id) = self
640 .catalog_controller
641 .update_iceberg_table_props_by_table_id(table_id, props, alter_iceberg_table_props)
642 .await?;
643 Ok((new_props, sink_id))
644 }
645
646 pub async fn update_fragment_rate_limit_by_fragment_id(
647 &self,
648 fragment_id: FragmentId,
649 rate_limit: Option<u32>,
650 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
651 let fragment_actors = self
652 .catalog_controller
653 .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
654 .await?;
655 Ok(fragment_actors
656 .into_iter()
657 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
658 .collect())
659 }
660
661 #[await_tree::instrument]
662 pub async fn update_fragment_splits(
663 &self,
664 split_assignment: &SplitAssignment,
665 ) -> MetaResult<()> {
666 let fragment_splits = split_assignment
667 .iter()
668 .map(|(fragment_id, splits)| {
669 (
670 *fragment_id as _,
671 splits.values().flatten().cloned().collect_vec(),
672 )
673 })
674 .collect();
675
676 let inner = self.catalog_controller.inner.write().await;
677
678 self.catalog_controller
679 .update_fragment_splits(&inner.db, &fragment_splits)
680 .await
681 }
682
683 pub async fn get_mv_depended_subscriptions(
684 &self,
685 database_id: Option<DatabaseId>,
686 ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
687 Ok(self
688 .catalog_controller
689 .get_mv_depended_subscriptions(database_id)
690 .await?
691 .into_iter()
692 .map(|(table_id, subscriptions)| {
693 (
694 table_id,
695 subscriptions
696 .into_iter()
697 .map(|(subscription_id, retention_time)| {
698 (subscription_id as SubscriptionId, retention_time)
699 })
700 .collect(),
701 )
702 })
703 .collect())
704 }
705
706 pub async fn get_job_max_parallelism(&self, job_id: JobId) -> MetaResult<usize> {
707 self.catalog_controller
708 .get_max_parallelism_by_id(job_id)
709 .await
710 }
711
712 pub async fn get_existing_job_resource_group(
713 &self,
714 streaming_job_id: JobId,
715 ) -> MetaResult<String> {
716 self.catalog_controller
717 .get_existing_job_resource_group(streaming_job_id)
718 .await
719 }
720
721 pub async fn get_database_resource_group(&self, database_id: DatabaseId) -> MetaResult<String> {
722 self.catalog_controller
723 .get_database_resource_group(database_id)
724 .await
725 }
726
727 pub fn cluster_id(&self) -> &ClusterId {
728 self.cluster_controller.cluster_id()
729 }
730
731 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
732 let rate_limits = self.catalog_controller.list_rate_limits().await?;
733 Ok(rate_limits)
734 }
735
736 pub async fn get_job_backfill_scan_types(
737 &self,
738 job_id: JobId,
739 ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
740 let backfill_types = self
741 .catalog_controller
742 .get_job_fragment_backfill_scan_type(job_id)
743 .await?;
744 Ok(backfill_types)
745 }
746}
747
748impl MetadataManager {
749 #[await_tree::instrument]
752 pub async fn wait_streaming_job_finished(
753 &self,
754 database_id: DatabaseId,
755 id: JobId,
756 ) -> MetaResult<NotificationVersion> {
757 tracing::debug!("wait_streaming_job_finished: {id:?}");
758 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
759 if mgr.streaming_job_is_finished(id).await? {
760 return Ok(self.catalog_controller.current_notification_version().await);
761 }
762 let (tx, rx) = oneshot::channel();
763
764 mgr.register_finish_notifier(database_id, id, tx);
765 drop(mgr);
766 rx.await
767 .map_err(|_| "no received reason".to_owned())
768 .and_then(|result| result)
769 .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
770 }
771
772 pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
773 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
774 mgr.notify_finish_failed(database_id, err);
775 }
776}