risingwave_meta/manager/
metadata.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
21use risingwave_common::id::JobId;
22use risingwave_meta_model::refresh_job::{self, RefreshState};
23use risingwave_meta_model::{SinkId, SourceId, WorkerId};
24use risingwave_pb::catalog::{PbSource, PbTable};
25use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
26use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
27use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
28use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamScanType};
29use sea_orm::TransactionTrait;
30use sea_orm::prelude::DateTime;
31use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
32use tokio::sync::oneshot;
33use tracing::warn;
34
35use crate::MetaResult;
36use crate::controller::catalog::CatalogControllerRef;
37use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
38use crate::controller::fragment::FragmentParallelismInfo;
39use crate::controller::scale::find_fragment_no_shuffle_dags_detailed;
40use crate::manager::{LocalNotification, NotificationVersion};
41use crate::model::{ActorId, ClusterId, Fragment, FragmentId, StreamJobFragments, SubscriptionId};
42use crate::stream::SplitAssignment;
43use crate::telemetry::MetaTelemetryJobDesc;
44
45#[derive(Clone)]
46pub struct MetadataManager {
47    pub cluster_controller: ClusterControllerRef,
48    pub catalog_controller: CatalogControllerRef,
49}
50
51#[derive(Debug)]
52pub(crate) enum ActiveStreamingWorkerChange {
53    Add(WorkerNode),
54    Remove(WorkerNode),
55    Update(WorkerNode),
56}
57
58pub struct ActiveStreamingWorkerNodes {
59    worker_nodes: HashMap<WorkerId, WorkerNode>,
60    rx: UnboundedReceiver<LocalNotification>,
61    #[cfg_attr(not(debug_assertions), expect(dead_code))]
62    meta_manager: Option<MetadataManager>,
63}
64
65impl Debug for ActiveStreamingWorkerNodes {
66    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67        f.debug_struct("ActiveStreamingWorkerNodes")
68            .field("worker_nodes", &self.worker_nodes)
69            .finish()
70    }
71}
72
73impl ActiveStreamingWorkerNodes {
74    pub(crate) fn uninitialized() -> Self {
75        Self {
76            worker_nodes: Default::default(),
77            rx: unbounded_channel().1,
78            meta_manager: None,
79        }
80    }
81
82    #[cfg(test)]
83    pub(crate) fn for_test(worker_nodes: HashMap<WorkerId, WorkerNode>) -> Self {
84        let (tx, rx) = unbounded_channel();
85        let _join_handle = tokio::spawn(async move {
86            let _tx = tx;
87            std::future::pending::<()>().await
88        });
89        Self {
90            worker_nodes,
91            rx,
92            meta_manager: None,
93        }
94    }
95
96    /// Return an uninitialized one as a placeholder for future initialized
97    pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
98        let (nodes, rx) = meta_manager
99            .subscribe_active_streaming_compute_nodes()
100            .await?;
101        Ok(Self {
102            worker_nodes: nodes
103                .into_iter()
104                .filter_map(|node| {
105                    let is_streaming = node.property.as_ref().is_some_and(|p| p.is_streaming);
106                    is_streaming.then_some((node.id, node))
107                })
108                .collect(),
109            rx,
110            meta_manager: Some(meta_manager),
111        })
112    }
113
114    pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
115        &self.worker_nodes
116    }
117
118    pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
119        loop {
120            let notification = self
121                .rx
122                .recv()
123                .await
124                .expect("notification stopped or uninitialized");
125            fn is_target_worker_node(worker: &WorkerNode) -> bool {
126                worker.r#type == WorkerType::ComputeNode as i32
127                    && worker.property.as_ref().unwrap().is_streaming
128            }
129            match notification {
130                LocalNotification::WorkerNodeDeleted(worker) => {
131                    let is_target_worker_node = is_target_worker_node(&worker);
132                    let Some(prev_worker) = self.worker_nodes.remove(&worker.id) else {
133                        if is_target_worker_node {
134                            warn!(
135                                ?worker,
136                                "notify to delete an non-existing streaming compute worker"
137                            );
138                        }
139                        continue;
140                    };
141                    if !is_target_worker_node {
142                        warn!(
143                            ?worker,
144                            ?prev_worker,
145                            "deleted worker has a different recent type"
146                        );
147                    }
148                    if worker.state == State::Starting as i32 {
149                        warn!(
150                            id = %worker.id,
151                            host = ?worker.host,
152                            state = worker.state,
153                            "a starting streaming worker is deleted"
154                        );
155                    }
156                    break ActiveStreamingWorkerChange::Remove(prev_worker);
157                }
158                LocalNotification::WorkerNodeActivated(worker) => {
159                    if !is_target_worker_node(&worker) {
160                        if let Some(prev_worker) = self.worker_nodes.remove(&worker.id) {
161                            warn!(
162                                ?worker,
163                                ?prev_worker,
164                                "the type of a streaming worker is changed"
165                            );
166                            break ActiveStreamingWorkerChange::Remove(prev_worker);
167                        } else {
168                            continue;
169                        }
170                    }
171                    assert_eq!(
172                        worker.state,
173                        State::Running as i32,
174                        "not started worker added: {:?}",
175                        worker
176                    );
177                    if let Some(prev_worker) = self.worker_nodes.insert(worker.id, worker.clone()) {
178                        assert_eq!(prev_worker.host, worker.host);
179                        assert_eq!(prev_worker.r#type, worker.r#type);
180                        warn!(
181                            ?prev_worker,
182                            ?worker,
183                            eq = prev_worker == worker,
184                            "notify to update an existing active worker"
185                        );
186                        if prev_worker == worker {
187                            continue;
188                        } else {
189                            break ActiveStreamingWorkerChange::Update(worker);
190                        }
191                    } else {
192                        break ActiveStreamingWorkerChange::Add(worker);
193                    }
194                }
195                _ => {
196                    continue;
197                }
198            }
199        }
200    }
201
202    #[cfg(debug_assertions)]
203    pub(crate) async fn validate_change(&self) {
204        use risingwave_pb::common::WorkerNode;
205        use thiserror_ext::AsReport;
206        let Some(meta_manager) = &self.meta_manager else {
207            return;
208        };
209        match meta_manager.list_active_streaming_compute_nodes().await {
210            Ok(worker_nodes) => {
211                let ignore_irrelevant_info = |node: &WorkerNode| {
212                    (
213                        node.id,
214                        WorkerNode {
215                            id: node.id,
216                            r#type: node.r#type,
217                            host: node.host.clone(),
218                            property: node.property.clone(),
219                            resource: node.resource.clone(),
220                            ..Default::default()
221                        },
222                    )
223                };
224                let worker_nodes: HashMap<_, _> =
225                    worker_nodes.iter().map(ignore_irrelevant_info).collect();
226                let curr_worker_nodes: HashMap<_, _> = self
227                    .current()
228                    .values()
229                    .map(ignore_irrelevant_info)
230                    .collect();
231                if worker_nodes != curr_worker_nodes {
232                    warn!(
233                        ?worker_nodes,
234                        ?curr_worker_nodes,
235                        "different to global snapshot"
236                    );
237                }
238            }
239            Err(e) => {
240                warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
241            }
242        }
243    }
244}
245
246impl MetadataManager {
247    pub fn new(
248        cluster_controller: ClusterControllerRef,
249        catalog_controller: CatalogControllerRef,
250    ) -> Self {
251        Self {
252            cluster_controller,
253            catalog_controller,
254        }
255    }
256
257    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
258        self.cluster_controller.get_worker_by_id(worker_id).await
259    }
260
261    pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
262        let node_map = self.cluster_controller.count_worker_by_type().await?;
263        Ok(node_map
264            .into_iter()
265            .map(|(ty, cnt)| (ty.into(), cnt as u64))
266            .collect())
267    }
268
269    pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
270        self.cluster_controller
271            .get_worker_info_by_id(worker_id as _)
272            .await
273    }
274
275    pub async fn add_worker_node(
276        &self,
277        r#type: PbWorkerType,
278        host_address: HostAddress,
279        property: AddNodeProperty,
280        resource: PbResource,
281    ) -> MetaResult<WorkerId> {
282        self.cluster_controller
283            .add_worker(r#type, host_address, property, resource)
284            .await
285            .map(|id| id as WorkerId)
286    }
287
288    pub async fn list_worker_node(
289        &self,
290        worker_type: Option<WorkerType>,
291        worker_state: Option<State>,
292    ) -> MetaResult<Vec<PbWorkerNode>> {
293        self.cluster_controller
294            .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
295            .await
296    }
297
298    pub async fn subscribe_active_streaming_compute_nodes(
299        &self,
300    ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
301        self.cluster_controller
302            .subscribe_active_streaming_compute_nodes()
303            .await
304    }
305
306    pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
307        self.cluster_controller
308            .list_active_streaming_workers()
309            .await
310    }
311
312    pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
313        self.cluster_controller.list_active_serving_workers().await
314    }
315
316    pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
317        Ok(self
318            .catalog_controller
319            .list_fragment_database_ids(None)
320            .await?
321            .into_iter()
322            .map(|(_, database_id)| database_id)
323            .collect())
324    }
325
326    pub async fn split_fragment_map_by_database<T: Debug>(
327        &self,
328        fragment_map: HashMap<FragmentId, T>,
329    ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
330        let fragment_to_database_map: HashMap<_, _> = self
331            .catalog_controller
332            .list_fragment_database_ids(Some(
333                fragment_map
334                    .keys()
335                    .map(|fragment_id| *fragment_id as _)
336                    .collect(),
337            ))
338            .await?
339            .into_iter()
340            .map(|(fragment_id, database_id)| (fragment_id as FragmentId, database_id))
341            .collect();
342        let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
343        for (fragment_id, value) in fragment_map {
344            let database_id = *fragment_to_database_map
345                .get(&fragment_id)
346                .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
347            ret.entry(database_id)
348                .or_default()
349                .try_insert(fragment_id, value)
350                .expect("non duplicate");
351        }
352        Ok(ret)
353    }
354
355    pub async fn list_background_creating_jobs(&self) -> MetaResult<HashSet<JobId>> {
356        self.catalog_controller
357            .list_background_creating_jobs(false, None)
358            .await
359    }
360
361    pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
362        self.catalog_controller.list_sources().await
363    }
364
365    pub fn running_fragment_parallelisms(
366        &self,
367        id_filter: Option<HashSet<FragmentId>>,
368    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
369        let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
370        Ok(self
371            .catalog_controller
372            .running_fragment_parallelisms(id_filter)?
373            .into_iter()
374            .map(|(k, v)| (k as FragmentId, v))
375            .collect())
376    }
377
378    /// Get and filter the "**root**" fragments of the specified relations.
379    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
380    ///
381    /// See also [`crate::controller::catalog::CatalogController::get_root_fragments`].
382    pub async fn get_upstream_root_fragments(
383        &self,
384        upstream_table_ids: &HashSet<TableId>,
385    ) -> MetaResult<HashMap<JobId, Fragment>> {
386        let upstream_root_fragments = self
387            .catalog_controller
388            .get_root_fragments(upstream_table_ids.iter().map(|id| id.as_job_id()).collect())
389            .await?;
390
391        Ok(upstream_root_fragments)
392    }
393
394    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
395        self.cluster_controller.get_streaming_cluster_info().await
396    }
397
398    pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
399        self.catalog_controller.get_all_table_options().await
400    }
401
402    pub async fn get_table_name_type_mapping(
403        &self,
404    ) -> MetaResult<HashMap<TableId, (String, String)>> {
405        self.catalog_controller.get_table_name_type_mapping().await
406    }
407
408    pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
409        self.catalog_controller.get_created_table_ids().await
410    }
411
412    pub async fn get_table_associated_source_id(
413        &self,
414        table_id: TableId,
415    ) -> MetaResult<Option<SourceId>> {
416        self.catalog_controller
417            .get_table_associated_source_id(table_id)
418            .await
419    }
420
421    pub async fn get_table_catalog_by_ids(&self, ids: &[TableId]) -> MetaResult<Vec<PbTable>> {
422        self.catalog_controller
423            .get_table_by_ids(ids.to_vec(), false)
424            .await
425    }
426
427    pub async fn list_refresh_jobs(&self) -> MetaResult<Vec<refresh_job::Model>> {
428        self.catalog_controller.list_refresh_jobs().await
429    }
430
431    pub async fn list_refreshable_table_ids(&self) -> MetaResult<Vec<TableId>> {
432        self.catalog_controller.list_refreshable_table_ids().await
433    }
434
435    pub async fn ensure_refresh_job(&self, table_id: TableId) -> MetaResult<()> {
436        self.catalog_controller.ensure_refresh_job(table_id).await
437    }
438
439    pub async fn update_refresh_job_status(
440        &self,
441        table_id: TableId,
442        status: RefreshState,
443        trigger_time: Option<DateTime>,
444        is_success: bool,
445    ) -> MetaResult<()> {
446        self.catalog_controller
447            .update_refresh_job_status(table_id, status, trigger_time, is_success)
448            .await
449    }
450
451    pub async fn reset_all_refresh_jobs_to_idle(&self) -> MetaResult<()> {
452        self.catalog_controller
453            .reset_all_refresh_jobs_to_idle()
454            .await
455    }
456
457    pub async fn update_refresh_job_interval(
458        &self,
459        table_id: TableId,
460        trigger_interval_secs: Option<i64>,
461    ) -> MetaResult<()> {
462        self.catalog_controller
463            .update_refresh_job_interval(table_id, trigger_interval_secs)
464            .await
465    }
466
467    pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
468        self.catalog_controller
469            .get_sink_state_table_ids(sink_id)
470            .await
471    }
472
473    pub async fn get_table_catalog_by_cdc_table_id(
474        &self,
475        cdc_table_id: &String,
476    ) -> MetaResult<Vec<PbTable>> {
477        self.catalog_controller
478            .get_table_by_cdc_table_id(cdc_table_id)
479            .await
480    }
481
482    pub async fn get_downstream_fragments(
483        &self,
484        job_id: JobId,
485    ) -> MetaResult<Vec<(PbDispatcherType, Fragment)>> {
486        self.catalog_controller
487            .get_downstream_fragments(job_id)
488            .await
489    }
490
491    pub async fn get_job_id_to_internal_table_ids_mapping(
492        &self,
493    ) -> Option<Vec<(JobId, Vec<TableId>)>> {
494        self.catalog_controller.get_job_internal_table_ids().await
495    }
496
497    pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
498        Ok(self
499            .catalog_controller
500            .get_job_fragments_by_id(job_id)
501            .await?
502            .0)
503    }
504
505    pub fn get_running_actors_of_fragment(&self, id: FragmentId) -> MetaResult<HashSet<ActorId>> {
506        self.catalog_controller
507            .get_running_actors_of_fragment(id as _)
508    }
509
510    // (backfill_actor_id, upstream_source_actor_id)
511    pub async fn get_running_actors_for_source_backfill(
512        &self,
513        source_backfill_fragment_id: FragmentId,
514        source_fragment_id: FragmentId,
515    ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
516        let actor_ids = self
517            .catalog_controller
518            .get_running_actors_for_source_backfill(
519                source_backfill_fragment_id as _,
520                source_fragment_id as _,
521            )
522            .await?;
523        Ok(actor_ids
524            .into_iter()
525            .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
526            .collect())
527    }
528
529    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
530        let actor_cnt = self.catalog_controller.worker_actor_count()?;
531        Ok(actor_cnt
532            .into_iter()
533            .map(|(id, cnt)| (id as WorkerId, cnt))
534            .collect())
535    }
536
537    pub async fn count_streaming_job(&self) -> MetaResult<usize> {
538        self.catalog_controller.count_streaming_jobs().await
539    }
540
541    pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
542        self.catalog_controller
543            .list_stream_job_desc_for_telemetry()
544            .await
545    }
546
547    pub async fn update_source_rate_limit_by_source_id(
548        &self,
549        source_id: SourceId,
550        rate_limit: Option<u32>,
551    ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
552        self.catalog_controller
553            .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
554            .await
555    }
556
557    pub async fn update_backfill_rate_limit_by_job_id(
558        &self,
559        job_id: JobId,
560        rate_limit: Option<u32>,
561    ) -> MetaResult<HashSet<FragmentId>> {
562        self.catalog_controller
563            .update_backfill_rate_limit_by_job_id(job_id, rate_limit)
564            .await
565    }
566
567    pub async fn update_sink_rate_limit_by_sink_id(
568        &self,
569        sink_id: SinkId,
570        rate_limit: Option<u32>,
571    ) -> MetaResult<HashSet<FragmentId>> {
572        self.catalog_controller
573            .update_sink_rate_limit_by_job_id(sink_id, rate_limit)
574            .await
575    }
576
577    pub async fn update_dml_rate_limit_by_job_id(
578        &self,
579        job_id: JobId,
580        rate_limit: Option<u32>,
581    ) -> MetaResult<HashSet<FragmentId>> {
582        self.catalog_controller
583            .update_dml_rate_limit_by_job_id(job_id, rate_limit)
584            .await
585    }
586
587    pub async fn update_sink_props_by_sink_id(
588        &self,
589        sink_id: SinkId,
590        props: BTreeMap<String, String>,
591    ) -> MetaResult<HashMap<String, String>> {
592        let new_props = self
593            .catalog_controller
594            .update_sink_props_by_sink_id(sink_id, props)
595            .await?;
596        Ok(new_props)
597    }
598
599    pub async fn update_iceberg_table_props_by_table_id(
600        &self,
601        table_id: TableId,
602        props: BTreeMap<String, String>,
603        alter_iceberg_table_props: Option<
604            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
605        >,
606    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
607        let (new_props, sink_id) = self
608            .catalog_controller
609            .update_iceberg_table_props_by_table_id(table_id, props, alter_iceberg_table_props)
610            .await?;
611        Ok((new_props, sink_id))
612    }
613
614    pub async fn update_fragment_rate_limit_by_fragment_id(
615        &self,
616        fragment_id: FragmentId,
617        rate_limit: Option<u32>,
618    ) -> MetaResult<()> {
619        self.catalog_controller
620            .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
621            .await
622    }
623
624    #[await_tree::instrument]
625    pub async fn update_fragment_splits(
626        &self,
627        split_assignment: &SplitAssignment,
628    ) -> MetaResult<()> {
629        let fragment_splits = split_assignment
630            .iter()
631            .map(|(fragment_id, splits)| {
632                (
633                    *fragment_id as _,
634                    splits.values().flatten().cloned().collect_vec(),
635                )
636            })
637            .collect();
638
639        let inner = self.catalog_controller.inner.write().await;
640
641        self.catalog_controller
642            .update_fragment_splits(&inner.db, &fragment_splits)
643            .await
644    }
645
646    pub async fn get_mv_depended_subscriptions(
647        &self,
648        database_id: Option<DatabaseId>,
649    ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
650        Ok(self
651            .catalog_controller
652            .get_mv_depended_subscriptions(database_id)
653            .await?
654            .into_iter()
655            .map(|(table_id, subscriptions)| {
656                (
657                    table_id,
658                    subscriptions
659                        .into_iter()
660                        .map(|(subscription_id, retention_time)| {
661                            (subscription_id as SubscriptionId, retention_time)
662                        })
663                        .collect(),
664                )
665            })
666            .collect())
667    }
668
669    pub async fn get_job_max_parallelism(&self, job_id: JobId) -> MetaResult<usize> {
670        self.catalog_controller
671            .get_max_parallelism_by_id(job_id)
672            .await
673    }
674
675    pub async fn get_existing_job_resource_group(
676        &self,
677        streaming_job_id: JobId,
678    ) -> MetaResult<String> {
679        self.catalog_controller
680            .get_existing_job_resource_group(streaming_job_id)
681            .await
682    }
683
684    pub async fn get_database_resource_group(&self, database_id: DatabaseId) -> MetaResult<String> {
685        self.catalog_controller
686            .get_database_resource_group(database_id)
687            .await
688    }
689
690    pub fn cluster_id(&self) -> &ClusterId {
691        self.cluster_controller.cluster_id()
692    }
693
694    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
695        let rate_limits = self.catalog_controller.list_rate_limits().await?;
696        Ok(rate_limits)
697    }
698
699    pub async fn get_job_backfill_scan_types(
700        &self,
701        job_id: JobId,
702    ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
703        let backfill_types = self
704            .catalog_controller
705            .get_job_fragment_backfill_scan_type(job_id)
706            .await?;
707        Ok(backfill_types)
708    }
709
710    pub async fn collect_unreschedulable_backfill_jobs(
711        &self,
712        job_ids: impl IntoIterator<Item = &JobId>,
713        is_online: bool,
714    ) -> MetaResult<HashSet<JobId>> {
715        let mut unreschedulable = HashSet::new();
716
717        for job_id in job_ids {
718            let scan_types = self
719                .catalog_controller
720                .get_job_fragment_backfill_scan_type(*job_id)
721                .await?;
722            if scan_types
723                .values()
724                .any(|scan_type| !scan_type.is_reschedulable(is_online))
725            {
726                unreschedulable.insert(*job_id);
727            }
728        }
729
730        Ok(unreschedulable)
731    }
732
733    pub async fn collect_reschedule_blocked_jobs_for_creating_jobs(
734        &self,
735        creating_job_ids: impl IntoIterator<Item = &JobId>,
736        is_online: bool,
737    ) -> MetaResult<HashSet<JobId>> {
738        let creating_job_ids: HashSet<_> = creating_job_ids.into_iter().copied().collect();
739        if creating_job_ids.is_empty() {
740            return Ok(HashSet::new());
741        }
742
743        let inner = self.catalog_controller.inner.read().await;
744        let txn = inner.db.begin().await?;
745
746        let mut initial_fragment_ids = HashSet::new();
747        for job_id in &creating_job_ids {
748            let scan_types = self
749                .catalog_controller
750                .get_job_fragment_backfill_scan_type_in_txn(&txn, *job_id)
751                .await?;
752            initial_fragment_ids.extend(scan_types.into_iter().filter_map(
753                |(fragment_id, scan_type)| {
754                    (!scan_type.is_reschedulable(is_online)).then_some(fragment_id)
755                },
756            ));
757        }
758
759        if !initial_fragment_ids.is_empty() {
760            let upstream_fragments = self
761                .catalog_controller
762                .upstream_fragments_in_txn(&txn, initial_fragment_ids.iter().copied())
763                .await?;
764            initial_fragment_ids.extend(upstream_fragments.into_values().flatten());
765        }
766
767        let mut blocked_fragment_ids = initial_fragment_ids.clone();
768        if !initial_fragment_ids.is_empty() {
769            let initial_fragment_ids = initial_fragment_ids.into_iter().collect_vec();
770            let ensembles =
771                find_fragment_no_shuffle_dags_detailed(&txn, &initial_fragment_ids).await?;
772            for ensemble in ensembles {
773                blocked_fragment_ids.extend(ensemble.fragments());
774            }
775        }
776
777        let mut blocked_job_ids = HashSet::new();
778        if !blocked_fragment_ids.is_empty() {
779            let fragment_ids = blocked_fragment_ids.into_iter().collect_vec();
780            let fragment_job_ids = self
781                .catalog_controller
782                .get_fragment_job_id_in_txn(&txn, fragment_ids)
783                .await?;
784            blocked_job_ids.extend(
785                fragment_job_ids
786                    .into_iter()
787                    .map(|job_id| job_id.as_job_id()),
788            );
789        }
790
791        txn.commit().await?;
792
793        Ok(blocked_job_ids)
794    }
795}
796
797impl MetadataManager {
798    /// Wait for job finishing notification in `TrackingJob::finish`.
799    /// The progress is updated per barrier.
800    #[await_tree::instrument]
801    pub async fn wait_streaming_job_finished(
802        &self,
803        database_id: DatabaseId,
804        id: JobId,
805    ) -> MetaResult<NotificationVersion> {
806        tracing::debug!("wait_streaming_job_finished: {id:?}");
807        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
808        if mgr.streaming_job_is_finished(id).await? {
809            return Ok(self.catalog_controller.notify_frontend_trivial().await);
810        }
811        let (tx, rx) = oneshot::channel();
812
813        mgr.register_finish_notifier(database_id, id, tx);
814        drop(mgr);
815        rx.await
816            .map_err(|_| "no received reason".to_owned())
817            .and_then(|result| result)
818            .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
819    }
820
821    pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
822        let mut mgr = self.catalog_controller.get_inner_write_guard().await;
823        mgr.notify_finish_failed(database_id, err);
824    }
825}