risingwave_meta/manager/
metadata.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
21use risingwave_common::id::JobId;
22use risingwave_meta_model::refresh_job::{self, RefreshState};
23use risingwave_meta_model::{SinkId, SourceId, WorkerId};
24use risingwave_pb::catalog::{PbSource, PbTable};
25use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
26use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
27use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
28use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode, PbStreamScanType};
29use sea_orm::prelude::DateTime;
30use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
31use tokio::sync::oneshot;
32use tracing::warn;
33
34use crate::MetaResult;
35use crate::barrier::SharedFragmentInfo;
36use crate::controller::catalog::CatalogControllerRef;
37use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
38use crate::controller::fragment::FragmentParallelismInfo;
39use crate::manager::{LocalNotification, NotificationVersion};
40use crate::model::{ActorId, ClusterId, FragmentId, StreamJobFragments, SubscriptionId};
41use crate::stream::SplitAssignment;
42use crate::telemetry::MetaTelemetryJobDesc;
43
44#[derive(Clone)]
45pub struct MetadataManager {
46    pub cluster_controller: ClusterControllerRef,
47    pub catalog_controller: CatalogControllerRef,
48}
49
50#[derive(Debug)]
51pub(crate) enum ActiveStreamingWorkerChange {
52    Add(WorkerNode),
53    Remove(WorkerNode),
54    Update(WorkerNode),
55}
56
57pub struct ActiveStreamingWorkerNodes {
58    worker_nodes: HashMap<WorkerId, WorkerNode>,
59    rx: UnboundedReceiver<LocalNotification>,
60    #[cfg_attr(not(debug_assertions), expect(dead_code))]
61    meta_manager: Option<MetadataManager>,
62}
63
64impl Debug for ActiveStreamingWorkerNodes {
65    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
66        f.debug_struct("ActiveStreamingWorkerNodes")
67            .field("worker_nodes", &self.worker_nodes)
68            .finish()
69    }
70}
71
72impl ActiveStreamingWorkerNodes {
73    pub(crate) fn uninitialized() -> Self {
74        Self {
75            worker_nodes: Default::default(),
76            rx: unbounded_channel().1,
77            meta_manager: None,
78        }
79    }
80
81    #[cfg(test)]
82    pub(crate) fn for_test(worker_nodes: HashMap<WorkerId, WorkerNode>) -> Self {
83        let (tx, rx) = unbounded_channel();
84        let _join_handle = tokio::spawn(async move {
85            let _tx = tx;
86            std::future::pending::<()>().await
87        });
88        Self {
89            worker_nodes,
90            rx,
91            meta_manager: None,
92        }
93    }
94
95    /// Return an uninitialized one as a placeholder for future initialized
96    pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
97        let (nodes, rx) = meta_manager
98            .subscribe_active_streaming_compute_nodes()
99            .await?;
100        Ok(Self {
101            worker_nodes: nodes
102                .into_iter()
103                .filter_map(|node| node.is_streaming_schedulable().then_some((node.id, node)))
104                .collect(),
105            rx,
106            meta_manager: Some(meta_manager),
107        })
108    }
109
110    pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
111        &self.worker_nodes
112    }
113
114    pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
115        loop {
116            let notification = self
117                .rx
118                .recv()
119                .await
120                .expect("notification stopped or uninitialized");
121            fn is_target_worker_node(worker: &WorkerNode) -> bool {
122                worker.r#type == WorkerType::ComputeNode as i32
123                    && worker.property.as_ref().unwrap().is_streaming
124                    && worker.is_streaming_schedulable()
125            }
126            match notification {
127                LocalNotification::WorkerNodeDeleted(worker) => {
128                    let is_target_worker_node = is_target_worker_node(&worker);
129                    let Some(prev_worker) = self.worker_nodes.remove(&worker.id) else {
130                        if is_target_worker_node {
131                            warn!(
132                                ?worker,
133                                "notify to delete an non-existing streaming compute worker"
134                            );
135                        }
136                        continue;
137                    };
138                    if !is_target_worker_node {
139                        warn!(
140                            ?worker,
141                            ?prev_worker,
142                            "deleted worker has a different recent type"
143                        );
144                    }
145                    if worker.state == State::Starting as i32 {
146                        warn!(
147                            id = %worker.id,
148                            host = ?worker.host,
149                            state = worker.state,
150                            "a starting streaming worker is deleted"
151                        );
152                    }
153                    break ActiveStreamingWorkerChange::Remove(prev_worker);
154                }
155                LocalNotification::WorkerNodeActivated(worker) => {
156                    if !is_target_worker_node(&worker) {
157                        if let Some(prev_worker) = self.worker_nodes.remove(&worker.id) {
158                            warn!(
159                                ?worker,
160                                ?prev_worker,
161                                "the type of a streaming worker is changed"
162                            );
163                            break ActiveStreamingWorkerChange::Remove(prev_worker);
164                        } else {
165                            continue;
166                        }
167                    }
168                    assert_eq!(
169                        worker.state,
170                        State::Running as i32,
171                        "not started worker added: {:?}",
172                        worker
173                    );
174                    if let Some(prev_worker) = self.worker_nodes.insert(worker.id, worker.clone()) {
175                        assert_eq!(prev_worker.host, worker.host);
176                        assert_eq!(prev_worker.r#type, worker.r#type);
177                        warn!(
178                            ?prev_worker,
179                            ?worker,
180                            eq = prev_worker == worker,
181                            "notify to update an existing active worker"
182                        );
183                        if prev_worker == worker {
184                            continue;
185                        } else {
186                            break ActiveStreamingWorkerChange::Update(worker);
187                        }
188                    } else {
189                        break ActiveStreamingWorkerChange::Add(worker);
190                    }
191                }
192                _ => {
193                    continue;
194                }
195            }
196        }
197    }
198
199    #[cfg(debug_assertions)]
200    pub(crate) async fn validate_change(&self) {
201        use risingwave_pb::common::WorkerNode;
202        use thiserror_ext::AsReport;
203        let Some(meta_manager) = &self.meta_manager else {
204            return;
205        };
206        match meta_manager.list_active_streaming_compute_nodes().await {
207            Ok(worker_nodes) => {
208                let ignore_irrelevant_info = |node: &WorkerNode| {
209                    (
210                        node.id,
211                        WorkerNode {
212                            id: node.id,
213                            r#type: node.r#type,
214                            host: node.host.clone(),
215                            property: node.property.clone(),
216                            resource: node.resource.clone(),
217                            ..Default::default()
218                        },
219                    )
220                };
221                let worker_nodes: HashMap<_, _> =
222                    worker_nodes.iter().map(ignore_irrelevant_info).collect();
223                let curr_worker_nodes: HashMap<_, _> = self
224                    .current()
225                    .values()
226                    .map(ignore_irrelevant_info)
227                    .collect();
228                if worker_nodes != curr_worker_nodes {
229                    warn!(
230                        ?worker_nodes,
231                        ?curr_worker_nodes,
232                        "different to global snapshot"
233                    );
234                }
235            }
236            Err(e) => {
237                warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
238            }
239        }
240    }
241}
242
243impl MetadataManager {
244    pub fn new(
245        cluster_controller: ClusterControllerRef,
246        catalog_controller: CatalogControllerRef,
247    ) -> Self {
248        Self {
249            cluster_controller,
250            catalog_controller,
251        }
252    }
253
254    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
255        self.cluster_controller.get_worker_by_id(worker_id).await
256    }
257
258    pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
259        let node_map = self.cluster_controller.count_worker_by_type().await?;
260        Ok(node_map
261            .into_iter()
262            .map(|(ty, cnt)| (ty.into(), cnt as u64))
263            .collect())
264    }
265
266    pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
267        self.cluster_controller
268            .get_worker_info_by_id(worker_id as _)
269            .await
270    }
271
272    pub async fn add_worker_node(
273        &self,
274        r#type: PbWorkerType,
275        host_address: HostAddress,
276        property: AddNodeProperty,
277        resource: PbResource,
278    ) -> MetaResult<WorkerId> {
279        self.cluster_controller
280            .add_worker(r#type, host_address, property, resource)
281            .await
282            .map(|id| id as WorkerId)
283    }
284
285    pub async fn list_worker_node(
286        &self,
287        worker_type: Option<WorkerType>,
288        worker_state: Option<State>,
289    ) -> MetaResult<Vec<PbWorkerNode>> {
290        self.cluster_controller
291            .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
292            .await
293    }
294
295    pub async fn subscribe_active_streaming_compute_nodes(
296        &self,
297    ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
298        self.cluster_controller
299            .subscribe_active_streaming_compute_nodes()
300            .await
301    }
302
303    pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
304        self.cluster_controller
305            .list_active_streaming_workers()
306            .await
307    }
308
309    pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
310        self.cluster_controller.list_active_serving_workers().await
311    }
312
313    pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
314        Ok(self
315            .catalog_controller
316            .list_fragment_database_ids(None)
317            .await?
318            .into_iter()
319            .map(|(_, database_id)| database_id)
320            .collect())
321    }
322
323    pub async fn split_fragment_map_by_database<T: Debug>(
324        &self,
325        fragment_map: HashMap<FragmentId, T>,
326    ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
327        let fragment_to_database_map: HashMap<_, _> = self
328            .catalog_controller
329            .list_fragment_database_ids(Some(
330                fragment_map
331                    .keys()
332                    .map(|fragment_id| *fragment_id as _)
333                    .collect(),
334            ))
335            .await?
336            .into_iter()
337            .map(|(fragment_id, database_id)| (fragment_id as FragmentId, database_id))
338            .collect();
339        let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
340        for (fragment_id, value) in fragment_map {
341            let database_id = *fragment_to_database_map
342                .get(&fragment_id)
343                .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
344            ret.entry(database_id)
345                .or_default()
346                .try_insert(fragment_id, value)
347                .expect("non duplicate");
348        }
349        Ok(ret)
350    }
351
352    pub async fn list_background_creating_jobs(&self) -> MetaResult<HashSet<JobId>> {
353        self.catalog_controller
354            .list_background_creating_jobs(false, None)
355            .await
356    }
357
358    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
359        self.catalog_controller.list_sources().await
360    }
361
362    pub fn running_fragment_parallelisms(
363        &self,
364        id_filter: Option<HashSet<FragmentId>>,
365    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
366        let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
367        Ok(self
368            .catalog_controller
369            .running_fragment_parallelisms(id_filter)?
370            .into_iter()
371            .map(|(k, v)| (k as FragmentId, v))
372            .collect())
373    }
374
375    /// Get and filter the "**root**" fragments of the specified relations.
376    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
377    ///
378    /// See also [`crate::controller::catalog::CatalogController::get_root_fragments`].
379    pub async fn get_upstream_root_fragments(
380        &self,
381        upstream_table_ids: &HashSet<TableId>,
382    ) -> MetaResult<(
383        HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
384        HashMap<ActorId, WorkerId>,
385    )> {
386        let (upstream_root_fragments, actors) = self
387            .catalog_controller
388            .get_root_fragments(upstream_table_ids.iter().map(|id| id.as_job_id()).collect())
389            .await?;
390
391        Ok((upstream_root_fragments, actors))
392    }
393
394    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
395        self.cluster_controller.get_streaming_cluster_info().await
396    }
397
398    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
399        self.catalog_controller.get_all_table_options().await
400    }
401
402    pub async fn get_table_name_type_mapping(
403        &self,
404    ) -> MetaResult<HashMap<TableId, (String, String)>> {
405        self.catalog_controller.get_table_name_type_mapping().await
406    }
407
408    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
409        self.catalog_controller.get_created_table_ids().await
410    }
411
412    pub async fn get_table_associated_source_id(
413        &self,
414        table_id: TableId,
415    ) -> MetaResult<Option<SourceId>> {
416        self.catalog_controller
417            .get_table_associated_source_id(table_id)
418            .await
419    }
420
421    pub async fn get_table_catalog_by_ids(&self, ids: &[TableId]) -> MetaResult<Vec<PbTable>> {
422        self.catalog_controller
423            .get_table_by_ids(ids.to_vec(), false)
424            .await
425    }
426
427    pub async fn list_refresh_jobs(&self) -> MetaResult<Vec<refresh_job::Model>> {
428        self.catalog_controller.list_refresh_jobs().await
429    }
430
431    pub async fn list_refreshable_table_ids(&self) -> MetaResult<Vec<TableId>> {
432        self.catalog_controller.list_refreshable_table_ids().await
433    }
434
435    pub async fn ensure_refresh_job(&self, table_id: TableId) -> MetaResult<()> {
436        self.catalog_controller.ensure_refresh_job(table_id).await
437    }
438
439    pub async fn update_refresh_job_status(
440        &self,
441        table_id: TableId,
442        status: RefreshState,
443        trigger_time: Option<DateTime>,
444        is_success: bool,
445    ) -> MetaResult<()> {
446        self.catalog_controller
447            .update_refresh_job_status(table_id, status, trigger_time, is_success)
448            .await
449    }
450
451    pub async fn reset_all_refresh_jobs_to_idle(&self) -> MetaResult<()> {
452        self.catalog_controller
453            .reset_all_refresh_jobs_to_idle()
454            .await
455    }
456
457    pub async fn update_refresh_job_interval(
458        &self,
459        table_id: TableId,
460        trigger_interval_secs: Option<i64>,
461    ) -> MetaResult<()> {
462        self.catalog_controller
463            .update_refresh_job_interval(table_id, trigger_interval_secs)
464            .await
465    }
466
467    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
468        self.catalog_controller
469            .get_sink_state_table_ids(sink_id)
470            .await
471    }
472
473    pub async fn get_table_catalog_by_cdc_table_id(
474        &self,
475        cdc_table_id: &String,
476    ) -> MetaResult<Vec<PbTable>> {
477        self.catalog_controller
478            .get_table_by_cdc_table_id(cdc_table_id)
479            .await
480    }
481
482    pub async fn get_downstream_fragments(
483        &self,
484        job_id: JobId,
485    ) -> MetaResult<(
486        Vec<(PbDispatcherType, SharedFragmentInfo, PbStreamNode)>,
487        HashMap<ActorId, WorkerId>,
488    )> {
489        let (fragments, actors) = self
490            .catalog_controller
491            .get_downstream_fragments(job_id)
492            .await?;
493
494        Ok((fragments, actors))
495    }
496
497    pub async fn get_job_id_to_internal_table_ids_mapping(
498        &self,
499    ) -> Option<Vec<(JobId, Vec<TableId>)>> {
500        self.catalog_controller.get_job_internal_table_ids().await
501    }
502
503    pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
504        self.catalog_controller
505            .get_job_fragments_by_id(job_id)
506            .await
507    }
508
509    pub fn get_running_actors_of_fragment(&self, id: FragmentId) -> MetaResult<HashSet<ActorId>> {
510        self.catalog_controller
511            .get_running_actors_of_fragment(id as _)
512    }
513
514    // (backfill_actor_id, upstream_source_actor_id)
515    pub async fn get_running_actors_for_source_backfill(
516        &self,
517        source_backfill_fragment_id: FragmentId,
518        source_fragment_id: FragmentId,
519    ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
520        let actor_ids = self
521            .catalog_controller
522            .get_running_actors_for_source_backfill(
523                source_backfill_fragment_id as _,
524                source_fragment_id as _,
525            )
526            .await?;
527        Ok(actor_ids
528            .into_iter()
529            .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
530            .collect())
531    }
532
533    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
534        let actor_cnt = self.catalog_controller.worker_actor_count()?;
535        Ok(actor_cnt
536            .into_iter()
537            .map(|(id, cnt)| (id as WorkerId, cnt))
538            .collect())
539    }
540
541    pub async fn count_streaming_job(&self) -> MetaResult<usize> {
542        self.catalog_controller.count_streaming_jobs().await
543    }
544
545    pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
546        self.catalog_controller
547            .list_stream_job_desc_for_telemetry()
548            .await
549    }
550
551    pub async fn update_source_rate_limit_by_source_id(
552        &self,
553        source_id: SourceId,
554        rate_limit: Option<u32>,
555    ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
556        self.catalog_controller
557            .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
558            .await
559    }
560
561    pub async fn update_backfill_rate_limit_by_job_id(
562        &self,
563        job_id: JobId,
564        rate_limit: Option<u32>,
565    ) -> MetaResult<HashSet<FragmentId>> {
566        self.catalog_controller
567            .update_backfill_rate_limit_by_job_id(job_id, rate_limit)
568            .await
569    }
570
571    pub async fn update_sink_rate_limit_by_sink_id(
572        &self,
573        sink_id: SinkId,
574        rate_limit: Option<u32>,
575    ) -> MetaResult<HashSet<FragmentId>> {
576        self.catalog_controller
577            .update_sink_rate_limit_by_job_id(sink_id, rate_limit)
578            .await
579    }
580
581    pub async fn update_dml_rate_limit_by_job_id(
582        &self,
583        job_id: JobId,
584        rate_limit: Option<u32>,
585    ) -> MetaResult<HashSet<FragmentId>> {
586        self.catalog_controller
587            .update_dml_rate_limit_by_job_id(job_id, rate_limit)
588            .await
589    }
590
591    pub async fn update_sink_props_by_sink_id(
592        &self,
593        sink_id: SinkId,
594        props: BTreeMap<String, String>,
595    ) -> MetaResult<HashMap<String, String>> {
596        let new_props = self
597            .catalog_controller
598            .update_sink_props_by_sink_id(sink_id, props)
599            .await?;
600        Ok(new_props)
601    }
602
603    pub async fn update_iceberg_table_props_by_table_id(
604        &self,
605        table_id: TableId,
606        props: BTreeMap<String, String>,
607        alter_iceberg_table_props: Option<
608            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
609        >,
610    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
611        let (new_props, sink_id) = self
612            .catalog_controller
613            .update_iceberg_table_props_by_table_id(table_id, props, alter_iceberg_table_props)
614            .await?;
615        Ok((new_props, sink_id))
616    }
617
618    pub async fn update_fragment_rate_limit_by_fragment_id(
619        &self,
620        fragment_id: FragmentId,
621        rate_limit: Option<u32>,
622    ) -> MetaResult<()> {
623        self.catalog_controller
624            .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
625            .await
626    }
627
628    #[await_tree::instrument]
629    pub async fn update_fragment_splits(
630        &self,
631        split_assignment: &SplitAssignment,
632    ) -> MetaResult<()> {
633        let fragment_splits = split_assignment
634            .iter()
635            .map(|(fragment_id, splits)| {
636                (
637                    *fragment_id as _,
638                    splits.values().flatten().cloned().collect_vec(),
639                )
640            })
641            .collect();
642
643        let inner = self.catalog_controller.inner.write().await;
644
645        self.catalog_controller
646            .update_fragment_splits(&inner.db, &fragment_splits)
647            .await
648    }
649
650    pub async fn get_mv_depended_subscriptions(
651        &self,
652        database_id: Option<DatabaseId>,
653    ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
654        Ok(self
655            .catalog_controller
656            .get_mv_depended_subscriptions(database_id)
657            .await?
658            .into_iter()
659            .map(|(table_id, subscriptions)| {
660                (
661                    table_id,
662                    subscriptions
663                        .into_iter()
664                        .map(|(subscription_id, retention_time)| {
665                            (subscription_id as SubscriptionId, retention_time)
666                        })
667                        .collect(),
668                )
669            })
670            .collect())
671    }
672
673    pub async fn get_job_max_parallelism(&self, job_id: JobId) -> MetaResult<usize> {
674        self.catalog_controller
675            .get_max_parallelism_by_id(job_id)
676            .await
677    }
678
679    pub async fn get_existing_job_resource_group(
680        &self,
681        streaming_job_id: JobId,
682    ) -> MetaResult<String> {
683        self.catalog_controller
684            .get_existing_job_resource_group(streaming_job_id)
685            .await
686    }
687
688    pub async fn get_database_resource_group(&self, database_id: DatabaseId) -> MetaResult<String> {
689        self.catalog_controller
690            .get_database_resource_group(database_id)
691            .await
692    }
693
694    pub fn cluster_id(&self) -> &ClusterId {
695        self.cluster_controller.cluster_id()
696    }
697
698    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
699        let rate_limits = self.catalog_controller.list_rate_limits().await?;
700        Ok(rate_limits)
701    }
702
703    pub async fn get_job_backfill_scan_types(
704        &self,
705        job_id: JobId,
706    ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
707        let backfill_types = self
708            .catalog_controller
709            .get_job_fragment_backfill_scan_type(job_id)
710            .await?;
711        Ok(backfill_types)
712    }
713
714    pub async fn collect_unreschedulable_backfill_jobs(
715        &self,
716        job_ids: impl IntoIterator<Item = &JobId>,
717    ) -> MetaResult<HashSet<JobId>> {
718        let mut unreschedulable = HashSet::new();
719
720        for job_id in job_ids {
721            let scan_types = self
722                .catalog_controller
723                .get_job_fragment_backfill_scan_type(*job_id)
724                .await?;
725            if scan_types
726                .values()
727                .any(|scan_type| !scan_type.is_reschedulable())
728            {
729                unreschedulable.insert(*job_id);
730            }
731        }
732
733        Ok(unreschedulable)
734    }
735}
736
737impl MetadataManager {
738    /// Wait for job finishing notification in `TrackingJob::finish`.
739    /// The progress is updated per barrier.
740    #[await_tree::instrument]
741    pub async fn wait_streaming_job_finished(
742        &self,
743        database_id: DatabaseId,
744        id: JobId,
745    ) -> MetaResult<NotificationVersion> {
746        tracing::debug!("wait_streaming_job_finished: {id:?}");
747        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
748        if mgr.streaming_job_is_finished(id).await? {
749            return Ok(self.catalog_controller.current_notification_version().await);
750        }
751        let (tx, rx) = oneshot::channel();
752
753        mgr.register_finish_notifier(database_id, id, tx);
754        drop(mgr);
755        rx.await
756            .map_err(|_| "no received reason".to_owned())
757            .and_then(|result| result)
758            .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
759    }
760
761    pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
762        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
763        mgr.notify_finish_failed(database_id, err);
764    }
765}