risingwave_meta/manager/
metadata.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
21use risingwave_common::id::JobId;
22use risingwave_meta_model::refresh_job::{self, RefreshState};
23use risingwave_meta_model::{SinkId, SourceId, WorkerId};
24use risingwave_pb::catalog::{PbSource, PbTable};
25use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
26use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
27use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
28use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode, PbStreamScanType};
29use sea_orm::TransactionTrait;
30use sea_orm::prelude::DateTime;
31use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
32use tokio::sync::oneshot;
33use tracing::warn;
34
35use crate::MetaResult;
36use crate::barrier::SharedFragmentInfo;
37use crate::controller::catalog::CatalogControllerRef;
38use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
39use crate::controller::fragment::FragmentParallelismInfo;
40use crate::controller::scale::find_fragment_no_shuffle_dags_detailed;
41use crate::manager::{LocalNotification, NotificationVersion};
42use crate::model::{ActorId, ClusterId, FragmentId, StreamJobFragments, SubscriptionId};
43use crate::stream::SplitAssignment;
44use crate::telemetry::MetaTelemetryJobDesc;
45
46#[derive(Clone)]
47pub struct MetadataManager {
48    pub cluster_controller: ClusterControllerRef,
49    pub catalog_controller: CatalogControllerRef,
50}
51
52#[derive(Debug)]
53pub(crate) enum ActiveStreamingWorkerChange {
54    Add(WorkerNode),
55    Remove(WorkerNode),
56    Update(WorkerNode),
57}
58
59pub struct ActiveStreamingWorkerNodes {
60    worker_nodes: HashMap<WorkerId, WorkerNode>,
61    rx: UnboundedReceiver<LocalNotification>,
62    #[cfg_attr(not(debug_assertions), expect(dead_code))]
63    meta_manager: Option<MetadataManager>,
64}
65
66impl Debug for ActiveStreamingWorkerNodes {
67    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("ActiveStreamingWorkerNodes")
69            .field("worker_nodes", &self.worker_nodes)
70            .finish()
71    }
72}
73
74impl ActiveStreamingWorkerNodes {
75    pub(crate) fn uninitialized() -> Self {
76        Self {
77            worker_nodes: Default::default(),
78            rx: unbounded_channel().1,
79            meta_manager: None,
80        }
81    }
82
83    #[cfg(test)]
84    pub(crate) fn for_test(worker_nodes: HashMap<WorkerId, WorkerNode>) -> Self {
85        let (tx, rx) = unbounded_channel();
86        let _join_handle = tokio::spawn(async move {
87            let _tx = tx;
88            std::future::pending::<()>().await
89        });
90        Self {
91            worker_nodes,
92            rx,
93            meta_manager: None,
94        }
95    }
96
97    /// Return an uninitialized one as a placeholder for future initialized
98    pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
99        let (nodes, rx) = meta_manager
100            .subscribe_active_streaming_compute_nodes()
101            .await?;
102        Ok(Self {
103            worker_nodes: nodes
104                .into_iter()
105                .filter_map(|node| node.is_streaming_schedulable().then_some((node.id, node)))
106                .collect(),
107            rx,
108            meta_manager: Some(meta_manager),
109        })
110    }
111
112    pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
113        &self.worker_nodes
114    }
115
116    pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
117        loop {
118            let notification = self
119                .rx
120                .recv()
121                .await
122                .expect("notification stopped or uninitialized");
123            fn is_target_worker_node(worker: &WorkerNode) -> bool {
124                worker.r#type == WorkerType::ComputeNode as i32
125                    && worker.property.as_ref().unwrap().is_streaming
126                    && worker.is_streaming_schedulable()
127            }
128            match notification {
129                LocalNotification::WorkerNodeDeleted(worker) => {
130                    let is_target_worker_node = is_target_worker_node(&worker);
131                    let Some(prev_worker) = self.worker_nodes.remove(&worker.id) else {
132                        if is_target_worker_node {
133                            warn!(
134                                ?worker,
135                                "notify to delete an non-existing streaming compute worker"
136                            );
137                        }
138                        continue;
139                    };
140                    if !is_target_worker_node {
141                        warn!(
142                            ?worker,
143                            ?prev_worker,
144                            "deleted worker has a different recent type"
145                        );
146                    }
147                    if worker.state == State::Starting as i32 {
148                        warn!(
149                            id = %worker.id,
150                            host = ?worker.host,
151                            state = worker.state,
152                            "a starting streaming worker is deleted"
153                        );
154                    }
155                    break ActiveStreamingWorkerChange::Remove(prev_worker);
156                }
157                LocalNotification::WorkerNodeActivated(worker) => {
158                    if !is_target_worker_node(&worker) {
159                        if let Some(prev_worker) = self.worker_nodes.remove(&worker.id) {
160                            warn!(
161                                ?worker,
162                                ?prev_worker,
163                                "the type of a streaming worker is changed"
164                            );
165                            break ActiveStreamingWorkerChange::Remove(prev_worker);
166                        } else {
167                            continue;
168                        }
169                    }
170                    assert_eq!(
171                        worker.state,
172                        State::Running as i32,
173                        "not started worker added: {:?}",
174                        worker
175                    );
176                    if let Some(prev_worker) = self.worker_nodes.insert(worker.id, worker.clone()) {
177                        assert_eq!(prev_worker.host, worker.host);
178                        assert_eq!(prev_worker.r#type, worker.r#type);
179                        warn!(
180                            ?prev_worker,
181                            ?worker,
182                            eq = prev_worker == worker,
183                            "notify to update an existing active worker"
184                        );
185                        if prev_worker == worker {
186                            continue;
187                        } else {
188                            break ActiveStreamingWorkerChange::Update(worker);
189                        }
190                    } else {
191                        break ActiveStreamingWorkerChange::Add(worker);
192                    }
193                }
194                _ => {
195                    continue;
196                }
197            }
198        }
199    }
200
201    #[cfg(debug_assertions)]
202    pub(crate) async fn validate_change(&self) {
203        use risingwave_pb::common::WorkerNode;
204        use thiserror_ext::AsReport;
205        let Some(meta_manager) = &self.meta_manager else {
206            return;
207        };
208        match meta_manager.list_active_streaming_compute_nodes().await {
209            Ok(worker_nodes) => {
210                let ignore_irrelevant_info = |node: &WorkerNode| {
211                    (
212                        node.id,
213                        WorkerNode {
214                            id: node.id,
215                            r#type: node.r#type,
216                            host: node.host.clone(),
217                            property: node.property.clone(),
218                            resource: node.resource.clone(),
219                            ..Default::default()
220                        },
221                    )
222                };
223                let worker_nodes: HashMap<_, _> =
224                    worker_nodes.iter().map(ignore_irrelevant_info).collect();
225                let curr_worker_nodes: HashMap<_, _> = self
226                    .current()
227                    .values()
228                    .map(ignore_irrelevant_info)
229                    .collect();
230                if worker_nodes != curr_worker_nodes {
231                    warn!(
232                        ?worker_nodes,
233                        ?curr_worker_nodes,
234                        "different to global snapshot"
235                    );
236                }
237            }
238            Err(e) => {
239                warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
240            }
241        }
242    }
243}
244
245impl MetadataManager {
246    pub fn new(
247        cluster_controller: ClusterControllerRef,
248        catalog_controller: CatalogControllerRef,
249    ) -> Self {
250        Self {
251            cluster_controller,
252            catalog_controller,
253        }
254    }
255
256    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
257        self.cluster_controller.get_worker_by_id(worker_id).await
258    }
259
260    pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
261        let node_map = self.cluster_controller.count_worker_by_type().await?;
262        Ok(node_map
263            .into_iter()
264            .map(|(ty, cnt)| (ty.into(), cnt as u64))
265            .collect())
266    }
267
268    pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
269        self.cluster_controller
270            .get_worker_info_by_id(worker_id as _)
271            .await
272    }
273
274    pub async fn add_worker_node(
275        &self,
276        r#type: PbWorkerType,
277        host_address: HostAddress,
278        property: AddNodeProperty,
279        resource: PbResource,
280    ) -> MetaResult<WorkerId> {
281        self.cluster_controller
282            .add_worker(r#type, host_address, property, resource)
283            .await
284            .map(|id| id as WorkerId)
285    }
286
287    pub async fn list_worker_node(
288        &self,
289        worker_type: Option<WorkerType>,
290        worker_state: Option<State>,
291    ) -> MetaResult<Vec<PbWorkerNode>> {
292        self.cluster_controller
293            .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
294            .await
295    }
296
297    pub async fn subscribe_active_streaming_compute_nodes(
298        &self,
299    ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
300        self.cluster_controller
301            .subscribe_active_streaming_compute_nodes()
302            .await
303    }
304
305    pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
306        self.cluster_controller
307            .list_active_streaming_workers()
308            .await
309    }
310
311    pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
312        self.cluster_controller.list_active_serving_workers().await
313    }
314
315    pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
316        Ok(self
317            .catalog_controller
318            .list_fragment_database_ids(None)
319            .await?
320            .into_iter()
321            .map(|(_, database_id)| database_id)
322            .collect())
323    }
324
325    pub async fn split_fragment_map_by_database<T: Debug>(
326        &self,
327        fragment_map: HashMap<FragmentId, T>,
328    ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
329        let fragment_to_database_map: HashMap<_, _> = self
330            .catalog_controller
331            .list_fragment_database_ids(Some(
332                fragment_map
333                    .keys()
334                    .map(|fragment_id| *fragment_id as _)
335                    .collect(),
336            ))
337            .await?
338            .into_iter()
339            .map(|(fragment_id, database_id)| (fragment_id as FragmentId, database_id))
340            .collect();
341        let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
342        for (fragment_id, value) in fragment_map {
343            let database_id = *fragment_to_database_map
344                .get(&fragment_id)
345                .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
346            ret.entry(database_id)
347                .or_default()
348                .try_insert(fragment_id, value)
349                .expect("non duplicate");
350        }
351        Ok(ret)
352    }
353
354    pub async fn list_background_creating_jobs(&self) -> MetaResult<HashSet<JobId>> {
355        self.catalog_controller
356            .list_background_creating_jobs(false, None)
357            .await
358    }
359
360    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
361        self.catalog_controller.list_sources().await
362    }
363
364    pub fn running_fragment_parallelisms(
365        &self,
366        id_filter: Option<HashSet<FragmentId>>,
367    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
368        let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
369        Ok(self
370            .catalog_controller
371            .running_fragment_parallelisms(id_filter)?
372            .into_iter()
373            .map(|(k, v)| (k as FragmentId, v))
374            .collect())
375    }
376
377    /// Get and filter the "**root**" fragments of the specified relations.
378    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
379    ///
380    /// See also [`crate::controller::catalog::CatalogController::get_root_fragments`].
381    pub async fn get_upstream_root_fragments(
382        &self,
383        upstream_table_ids: &HashSet<TableId>,
384    ) -> MetaResult<(
385        HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
386        HashMap<ActorId, WorkerId>,
387    )> {
388        let (upstream_root_fragments, actors) = self
389            .catalog_controller
390            .get_root_fragments(upstream_table_ids.iter().map(|id| id.as_job_id()).collect())
391            .await?;
392
393        Ok((upstream_root_fragments, actors))
394    }
395
396    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
397        self.cluster_controller.get_streaming_cluster_info().await
398    }
399
400    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
401        self.catalog_controller.get_all_table_options().await
402    }
403
404    pub async fn get_table_name_type_mapping(
405        &self,
406    ) -> MetaResult<HashMap<TableId, (String, String)>> {
407        self.catalog_controller.get_table_name_type_mapping().await
408    }
409
410    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
411        self.catalog_controller.get_created_table_ids().await
412    }
413
414    pub async fn get_table_associated_source_id(
415        &self,
416        table_id: TableId,
417    ) -> MetaResult<Option<SourceId>> {
418        self.catalog_controller
419            .get_table_associated_source_id(table_id)
420            .await
421    }
422
423    pub async fn get_table_catalog_by_ids(&self, ids: &[TableId]) -> MetaResult<Vec<PbTable>> {
424        self.catalog_controller
425            .get_table_by_ids(ids.to_vec(), false)
426            .await
427    }
428
429    pub async fn list_refresh_jobs(&self) -> MetaResult<Vec<refresh_job::Model>> {
430        self.catalog_controller.list_refresh_jobs().await
431    }
432
433    pub async fn list_refreshable_table_ids(&self) -> MetaResult<Vec<TableId>> {
434        self.catalog_controller.list_refreshable_table_ids().await
435    }
436
437    pub async fn ensure_refresh_job(&self, table_id: TableId) -> MetaResult<()> {
438        self.catalog_controller.ensure_refresh_job(table_id).await
439    }
440
441    pub async fn update_refresh_job_status(
442        &self,
443        table_id: TableId,
444        status: RefreshState,
445        trigger_time: Option<DateTime>,
446        is_success: bool,
447    ) -> MetaResult<()> {
448        self.catalog_controller
449            .update_refresh_job_status(table_id, status, trigger_time, is_success)
450            .await
451    }
452
453    pub async fn reset_all_refresh_jobs_to_idle(&self) -> MetaResult<()> {
454        self.catalog_controller
455            .reset_all_refresh_jobs_to_idle()
456            .await
457    }
458
459    pub async fn update_refresh_job_interval(
460        &self,
461        table_id: TableId,
462        trigger_interval_secs: Option<i64>,
463    ) -> MetaResult<()> {
464        self.catalog_controller
465            .update_refresh_job_interval(table_id, trigger_interval_secs)
466            .await
467    }
468
469    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
470        self.catalog_controller
471            .get_sink_state_table_ids(sink_id)
472            .await
473    }
474
475    pub async fn get_table_catalog_by_cdc_table_id(
476        &self,
477        cdc_table_id: &String,
478    ) -> MetaResult<Vec<PbTable>> {
479        self.catalog_controller
480            .get_table_by_cdc_table_id(cdc_table_id)
481            .await
482    }
483
484    pub async fn get_downstream_fragments(
485        &self,
486        job_id: JobId,
487    ) -> MetaResult<(
488        Vec<(PbDispatcherType, SharedFragmentInfo, PbStreamNode)>,
489        HashMap<ActorId, WorkerId>,
490    )> {
491        let (fragments, actors) = self
492            .catalog_controller
493            .get_downstream_fragments(job_id)
494            .await?;
495
496        Ok((fragments, actors))
497    }
498
499    pub async fn get_job_id_to_internal_table_ids_mapping(
500        &self,
501    ) -> Option<Vec<(JobId, Vec<TableId>)>> {
502        self.catalog_controller.get_job_internal_table_ids().await
503    }
504
505    pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
506        self.catalog_controller
507            .get_job_fragments_by_id(job_id)
508            .await
509    }
510
511    pub fn get_running_actors_of_fragment(&self, id: FragmentId) -> MetaResult<HashSet<ActorId>> {
512        self.catalog_controller
513            .get_running_actors_of_fragment(id as _)
514    }
515
516    // (backfill_actor_id, upstream_source_actor_id)
517    pub async fn get_running_actors_for_source_backfill(
518        &self,
519        source_backfill_fragment_id: FragmentId,
520        source_fragment_id: FragmentId,
521    ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
522        let actor_ids = self
523            .catalog_controller
524            .get_running_actors_for_source_backfill(
525                source_backfill_fragment_id as _,
526                source_fragment_id as _,
527            )
528            .await?;
529        Ok(actor_ids
530            .into_iter()
531            .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
532            .collect())
533    }
534
535    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
536        let actor_cnt = self.catalog_controller.worker_actor_count()?;
537        Ok(actor_cnt
538            .into_iter()
539            .map(|(id, cnt)| (id as WorkerId, cnt))
540            .collect())
541    }
542
543    pub async fn count_streaming_job(&self) -> MetaResult<usize> {
544        self.catalog_controller.count_streaming_jobs().await
545    }
546
547    pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
548        self.catalog_controller
549            .list_stream_job_desc_for_telemetry()
550            .await
551    }
552
553    pub async fn update_source_rate_limit_by_source_id(
554        &self,
555        source_id: SourceId,
556        rate_limit: Option<u32>,
557    ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
558        self.catalog_controller
559            .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
560            .await
561    }
562
563    pub async fn update_backfill_rate_limit_by_job_id(
564        &self,
565        job_id: JobId,
566        rate_limit: Option<u32>,
567    ) -> MetaResult<HashSet<FragmentId>> {
568        self.catalog_controller
569            .update_backfill_rate_limit_by_job_id(job_id, rate_limit)
570            .await
571    }
572
573    pub async fn update_sink_rate_limit_by_sink_id(
574        &self,
575        sink_id: SinkId,
576        rate_limit: Option<u32>,
577    ) -> MetaResult<HashSet<FragmentId>> {
578        self.catalog_controller
579            .update_sink_rate_limit_by_job_id(sink_id, rate_limit)
580            .await
581    }
582
583    pub async fn update_dml_rate_limit_by_job_id(
584        &self,
585        job_id: JobId,
586        rate_limit: Option<u32>,
587    ) -> MetaResult<HashSet<FragmentId>> {
588        self.catalog_controller
589            .update_dml_rate_limit_by_job_id(job_id, rate_limit)
590            .await
591    }
592
593    pub async fn update_sink_props_by_sink_id(
594        &self,
595        sink_id: SinkId,
596        props: BTreeMap<String, String>,
597    ) -> MetaResult<HashMap<String, String>> {
598        let new_props = self
599            .catalog_controller
600            .update_sink_props_by_sink_id(sink_id, props)
601            .await?;
602        Ok(new_props)
603    }
604
605    pub async fn update_iceberg_table_props_by_table_id(
606        &self,
607        table_id: TableId,
608        props: BTreeMap<String, String>,
609        alter_iceberg_table_props: Option<
610            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
611        >,
612    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
613        let (new_props, sink_id) = self
614            .catalog_controller
615            .update_iceberg_table_props_by_table_id(table_id, props, alter_iceberg_table_props)
616            .await?;
617        Ok((new_props, sink_id))
618    }
619
620    pub async fn update_fragment_rate_limit_by_fragment_id(
621        &self,
622        fragment_id: FragmentId,
623        rate_limit: Option<u32>,
624    ) -> MetaResult<()> {
625        self.catalog_controller
626            .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
627            .await
628    }
629
630    #[await_tree::instrument]
631    pub async fn update_fragment_splits(
632        &self,
633        split_assignment: &SplitAssignment,
634    ) -> MetaResult<()> {
635        let fragment_splits = split_assignment
636            .iter()
637            .map(|(fragment_id, splits)| {
638                (
639                    *fragment_id as _,
640                    splits.values().flatten().cloned().collect_vec(),
641                )
642            })
643            .collect();
644
645        let inner = self.catalog_controller.inner.write().await;
646
647        self.catalog_controller
648            .update_fragment_splits(&inner.db, &fragment_splits)
649            .await
650    }
651
652    pub async fn get_mv_depended_subscriptions(
653        &self,
654        database_id: Option<DatabaseId>,
655    ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
656        Ok(self
657            .catalog_controller
658            .get_mv_depended_subscriptions(database_id)
659            .await?
660            .into_iter()
661            .map(|(table_id, subscriptions)| {
662                (
663                    table_id,
664                    subscriptions
665                        .into_iter()
666                        .map(|(subscription_id, retention_time)| {
667                            (subscription_id as SubscriptionId, retention_time)
668                        })
669                        .collect(),
670                )
671            })
672            .collect())
673    }
674
675    pub async fn get_job_max_parallelism(&self, job_id: JobId) -> MetaResult<usize> {
676        self.catalog_controller
677            .get_max_parallelism_by_id(job_id)
678            .await
679    }
680
681    pub async fn get_existing_job_resource_group(
682        &self,
683        streaming_job_id: JobId,
684    ) -> MetaResult<String> {
685        self.catalog_controller
686            .get_existing_job_resource_group(streaming_job_id)
687            .await
688    }
689
690    pub async fn get_database_resource_group(&self, database_id: DatabaseId) -> MetaResult<String> {
691        self.catalog_controller
692            .get_database_resource_group(database_id)
693            .await
694    }
695
696    pub fn cluster_id(&self) -> &ClusterId {
697        self.cluster_controller.cluster_id()
698    }
699
700    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
701        let rate_limits = self.catalog_controller.list_rate_limits().await?;
702        Ok(rate_limits)
703    }
704
705    pub async fn get_job_backfill_scan_types(
706        &self,
707        job_id: JobId,
708    ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
709        let backfill_types = self
710            .catalog_controller
711            .get_job_fragment_backfill_scan_type(job_id)
712            .await?;
713        Ok(backfill_types)
714    }
715
716    pub async fn collect_unreschedulable_backfill_jobs(
717        &self,
718        job_ids: impl IntoIterator<Item = &JobId>,
719        is_online: bool,
720    ) -> MetaResult<HashSet<JobId>> {
721        let mut unreschedulable = HashSet::new();
722
723        for job_id in job_ids {
724            let scan_types = self
725                .catalog_controller
726                .get_job_fragment_backfill_scan_type(*job_id)
727                .await?;
728            if scan_types
729                .values()
730                .any(|scan_type| !scan_type.is_reschedulable(is_online))
731            {
732                unreschedulable.insert(*job_id);
733            }
734        }
735
736        Ok(unreschedulable)
737    }
738
739    pub async fn collect_reschedule_blocked_jobs_for_creating_jobs(
740        &self,
741        creating_job_ids: impl IntoIterator<Item = &JobId>,
742        is_online: bool,
743    ) -> MetaResult<HashSet<JobId>> {
744        let creating_job_ids: HashSet<_> = creating_job_ids.into_iter().copied().collect();
745        if creating_job_ids.is_empty() {
746            return Ok(HashSet::new());
747        }
748
749        let inner = self.catalog_controller.inner.read().await;
750        let txn = inner.db.begin().await?;
751
752        let mut initial_fragment_ids = HashSet::new();
753        for job_id in &creating_job_ids {
754            let scan_types = self
755                .catalog_controller
756                .get_job_fragment_backfill_scan_type_in_txn(&txn, *job_id)
757                .await?;
758            initial_fragment_ids.extend(scan_types.into_iter().filter_map(
759                |(fragment_id, scan_type)| {
760                    (!scan_type.is_reschedulable(is_online)).then_some(fragment_id)
761                },
762            ));
763        }
764
765        if !initial_fragment_ids.is_empty() {
766            let upstream_fragments = self
767                .catalog_controller
768                .upstream_fragments_in_txn(&txn, initial_fragment_ids.iter().copied())
769                .await?;
770            initial_fragment_ids.extend(upstream_fragments.into_values().flatten());
771        }
772
773        let mut blocked_fragment_ids = initial_fragment_ids.clone();
774        if !initial_fragment_ids.is_empty() {
775            let initial_fragment_ids = initial_fragment_ids.into_iter().collect_vec();
776            let ensembles =
777                find_fragment_no_shuffle_dags_detailed(&txn, &initial_fragment_ids).await?;
778            for ensemble in ensembles {
779                blocked_fragment_ids.extend(ensemble.fragments());
780            }
781        }
782
783        let mut blocked_job_ids = HashSet::new();
784        if !blocked_fragment_ids.is_empty() {
785            let fragment_ids = blocked_fragment_ids.into_iter().collect_vec();
786            let fragment_job_ids = self
787                .catalog_controller
788                .get_fragment_job_id_in_txn(&txn, fragment_ids)
789                .await?;
790            blocked_job_ids.extend(
791                fragment_job_ids
792                    .into_iter()
793                    .map(|job_id| job_id.as_job_id()),
794            );
795        }
796
797        txn.commit().await?;
798
799        Ok(blocked_job_ids)
800    }
801}
802
803impl MetadataManager {
804    /// Wait for job finishing notification in `TrackingJob::finish`.
805    /// The progress is updated per barrier.
806    #[await_tree::instrument]
807    pub async fn wait_streaming_job_finished(
808        &self,
809        database_id: DatabaseId,
810        id: JobId,
811    ) -> MetaResult<NotificationVersion> {
812        tracing::debug!("wait_streaming_job_finished: {id:?}");
813        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
814        if mgr.streaming_job_is_finished(id).await? {
815            return Ok(self.catalog_controller.notify_frontend_trivial().await);
816        }
817        let (tx, rx) = oneshot::channel();
818
819        mgr.register_finish_notifier(database_id, id, tx);
820        drop(mgr);
821        rx.await
822            .map_err(|_| "no received reason".to_owned())
823            .and_then(|result| result)
824            .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
825    }
826
827    pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
828        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
829        mgr.notify_finish_failed(database_id, err);
830    }
831}