1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
21use risingwave_common::id::JobId;
22use risingwave_meta_model::refresh_job::{self, RefreshState};
23use risingwave_meta_model::{SinkId, SourceId, WorkerId};
24use risingwave_pb::catalog::{PbSource, PbTable};
25use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
26use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
27use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
28use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode, PbStreamScanType};
29use sea_orm::TransactionTrait;
30use sea_orm::prelude::DateTime;
31use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
32use tokio::sync::oneshot;
33use tracing::warn;
34
35use crate::MetaResult;
36use crate::barrier::SharedFragmentInfo;
37use crate::controller::catalog::CatalogControllerRef;
38use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
39use crate::controller::fragment::FragmentParallelismInfo;
40use crate::controller::scale::find_fragment_no_shuffle_dags_detailed;
41use crate::manager::{LocalNotification, NotificationVersion};
42use crate::model::{ActorId, ClusterId, FragmentId, StreamJobFragments, SubscriptionId};
43use crate::stream::SplitAssignment;
44use crate::telemetry::MetaTelemetryJobDesc;
45
46#[derive(Clone)]
47pub struct MetadataManager {
48 pub cluster_controller: ClusterControllerRef,
49 pub catalog_controller: CatalogControllerRef,
50}
51
52#[derive(Debug)]
53pub(crate) enum ActiveStreamingWorkerChange {
54 Add(WorkerNode),
55 Remove(WorkerNode),
56 Update(WorkerNode),
57}
58
59pub struct ActiveStreamingWorkerNodes {
60 worker_nodes: HashMap<WorkerId, WorkerNode>,
61 rx: UnboundedReceiver<LocalNotification>,
62 #[cfg_attr(not(debug_assertions), expect(dead_code))]
63 meta_manager: Option<MetadataManager>,
64}
65
66impl Debug for ActiveStreamingWorkerNodes {
67 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("ActiveStreamingWorkerNodes")
69 .field("worker_nodes", &self.worker_nodes)
70 .finish()
71 }
72}
73
74impl ActiveStreamingWorkerNodes {
75 pub(crate) fn uninitialized() -> Self {
76 Self {
77 worker_nodes: Default::default(),
78 rx: unbounded_channel().1,
79 meta_manager: None,
80 }
81 }
82
83 #[cfg(test)]
84 pub(crate) fn for_test(worker_nodes: HashMap<WorkerId, WorkerNode>) -> Self {
85 let (tx, rx) = unbounded_channel();
86 let _join_handle = tokio::spawn(async move {
87 let _tx = tx;
88 std::future::pending::<()>().await
89 });
90 Self {
91 worker_nodes,
92 rx,
93 meta_manager: None,
94 }
95 }
96
97 pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
99 let (nodes, rx) = meta_manager
100 .subscribe_active_streaming_compute_nodes()
101 .await?;
102 Ok(Self {
103 worker_nodes: nodes
104 .into_iter()
105 .filter_map(|node| node.is_streaming_schedulable().then_some((node.id, node)))
106 .collect(),
107 rx,
108 meta_manager: Some(meta_manager),
109 })
110 }
111
112 pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
113 &self.worker_nodes
114 }
115
116 pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
117 loop {
118 let notification = self
119 .rx
120 .recv()
121 .await
122 .expect("notification stopped or uninitialized");
123 fn is_target_worker_node(worker: &WorkerNode) -> bool {
124 worker.r#type == WorkerType::ComputeNode as i32
125 && worker.property.as_ref().unwrap().is_streaming
126 && worker.is_streaming_schedulable()
127 }
128 match notification {
129 LocalNotification::WorkerNodeDeleted(worker) => {
130 let is_target_worker_node = is_target_worker_node(&worker);
131 let Some(prev_worker) = self.worker_nodes.remove(&worker.id) else {
132 if is_target_worker_node {
133 warn!(
134 ?worker,
135 "notify to delete an non-existing streaming compute worker"
136 );
137 }
138 continue;
139 };
140 if !is_target_worker_node {
141 warn!(
142 ?worker,
143 ?prev_worker,
144 "deleted worker has a different recent type"
145 );
146 }
147 if worker.state == State::Starting as i32 {
148 warn!(
149 id = %worker.id,
150 host = ?worker.host,
151 state = worker.state,
152 "a starting streaming worker is deleted"
153 );
154 }
155 break ActiveStreamingWorkerChange::Remove(prev_worker);
156 }
157 LocalNotification::WorkerNodeActivated(worker) => {
158 if !is_target_worker_node(&worker) {
159 if let Some(prev_worker) = self.worker_nodes.remove(&worker.id) {
160 warn!(
161 ?worker,
162 ?prev_worker,
163 "the type of a streaming worker is changed"
164 );
165 break ActiveStreamingWorkerChange::Remove(prev_worker);
166 } else {
167 continue;
168 }
169 }
170 assert_eq!(
171 worker.state,
172 State::Running as i32,
173 "not started worker added: {:?}",
174 worker
175 );
176 if let Some(prev_worker) = self.worker_nodes.insert(worker.id, worker.clone()) {
177 assert_eq!(prev_worker.host, worker.host);
178 assert_eq!(prev_worker.r#type, worker.r#type);
179 warn!(
180 ?prev_worker,
181 ?worker,
182 eq = prev_worker == worker,
183 "notify to update an existing active worker"
184 );
185 if prev_worker == worker {
186 continue;
187 } else {
188 break ActiveStreamingWorkerChange::Update(worker);
189 }
190 } else {
191 break ActiveStreamingWorkerChange::Add(worker);
192 }
193 }
194 _ => {
195 continue;
196 }
197 }
198 }
199 }
200
201 #[cfg(debug_assertions)]
202 pub(crate) async fn validate_change(&self) {
203 use risingwave_pb::common::WorkerNode;
204 use thiserror_ext::AsReport;
205 let Some(meta_manager) = &self.meta_manager else {
206 return;
207 };
208 match meta_manager.list_active_streaming_compute_nodes().await {
209 Ok(worker_nodes) => {
210 let ignore_irrelevant_info = |node: &WorkerNode| {
211 (
212 node.id,
213 WorkerNode {
214 id: node.id,
215 r#type: node.r#type,
216 host: node.host.clone(),
217 property: node.property.clone(),
218 resource: node.resource.clone(),
219 ..Default::default()
220 },
221 )
222 };
223 let worker_nodes: HashMap<_, _> =
224 worker_nodes.iter().map(ignore_irrelevant_info).collect();
225 let curr_worker_nodes: HashMap<_, _> = self
226 .current()
227 .values()
228 .map(ignore_irrelevant_info)
229 .collect();
230 if worker_nodes != curr_worker_nodes {
231 warn!(
232 ?worker_nodes,
233 ?curr_worker_nodes,
234 "different to global snapshot"
235 );
236 }
237 }
238 Err(e) => {
239 warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
240 }
241 }
242 }
243}
244
245impl MetadataManager {
246 pub fn new(
247 cluster_controller: ClusterControllerRef,
248 catalog_controller: CatalogControllerRef,
249 ) -> Self {
250 Self {
251 cluster_controller,
252 catalog_controller,
253 }
254 }
255
256 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
257 self.cluster_controller.get_worker_by_id(worker_id).await
258 }
259
260 pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
261 let node_map = self.cluster_controller.count_worker_by_type().await?;
262 Ok(node_map
263 .into_iter()
264 .map(|(ty, cnt)| (ty.into(), cnt as u64))
265 .collect())
266 }
267
268 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
269 self.cluster_controller
270 .get_worker_info_by_id(worker_id as _)
271 .await
272 }
273
274 pub async fn add_worker_node(
275 &self,
276 r#type: PbWorkerType,
277 host_address: HostAddress,
278 property: AddNodeProperty,
279 resource: PbResource,
280 ) -> MetaResult<WorkerId> {
281 self.cluster_controller
282 .add_worker(r#type, host_address, property, resource)
283 .await
284 .map(|id| id as WorkerId)
285 }
286
287 pub async fn list_worker_node(
288 &self,
289 worker_type: Option<WorkerType>,
290 worker_state: Option<State>,
291 ) -> MetaResult<Vec<PbWorkerNode>> {
292 self.cluster_controller
293 .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
294 .await
295 }
296
297 pub async fn subscribe_active_streaming_compute_nodes(
298 &self,
299 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
300 self.cluster_controller
301 .subscribe_active_streaming_compute_nodes()
302 .await
303 }
304
305 pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
306 self.cluster_controller
307 .list_active_streaming_workers()
308 .await
309 }
310
311 pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
312 self.cluster_controller.list_active_serving_workers().await
313 }
314
315 pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
316 Ok(self
317 .catalog_controller
318 .list_fragment_database_ids(None)
319 .await?
320 .into_iter()
321 .map(|(_, database_id)| database_id)
322 .collect())
323 }
324
325 pub async fn split_fragment_map_by_database<T: Debug>(
326 &self,
327 fragment_map: HashMap<FragmentId, T>,
328 ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
329 let fragment_to_database_map: HashMap<_, _> = self
330 .catalog_controller
331 .list_fragment_database_ids(Some(
332 fragment_map
333 .keys()
334 .map(|fragment_id| *fragment_id as _)
335 .collect(),
336 ))
337 .await?
338 .into_iter()
339 .map(|(fragment_id, database_id)| (fragment_id as FragmentId, database_id))
340 .collect();
341 let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
342 for (fragment_id, value) in fragment_map {
343 let database_id = *fragment_to_database_map
344 .get(&fragment_id)
345 .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
346 ret.entry(database_id)
347 .or_default()
348 .try_insert(fragment_id, value)
349 .expect("non duplicate");
350 }
351 Ok(ret)
352 }
353
354 pub async fn list_background_creating_jobs(&self) -> MetaResult<HashSet<JobId>> {
355 self.catalog_controller
356 .list_background_creating_jobs(false, None)
357 .await
358 }
359
360 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
361 self.catalog_controller.list_sources().await
362 }
363
364 pub fn running_fragment_parallelisms(
365 &self,
366 id_filter: Option<HashSet<FragmentId>>,
367 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
368 let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
369 Ok(self
370 .catalog_controller
371 .running_fragment_parallelisms(id_filter)?
372 .into_iter()
373 .map(|(k, v)| (k as FragmentId, v))
374 .collect())
375 }
376
377 pub async fn get_upstream_root_fragments(
382 &self,
383 upstream_table_ids: &HashSet<TableId>,
384 ) -> MetaResult<(
385 HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
386 HashMap<ActorId, WorkerId>,
387 )> {
388 let (upstream_root_fragments, actors) = self
389 .catalog_controller
390 .get_root_fragments(upstream_table_ids.iter().map(|id| id.as_job_id()).collect())
391 .await?;
392
393 Ok((upstream_root_fragments, actors))
394 }
395
396 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
397 self.cluster_controller.get_streaming_cluster_info().await
398 }
399
400 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
401 self.catalog_controller.get_all_table_options().await
402 }
403
404 pub async fn get_table_name_type_mapping(
405 &self,
406 ) -> MetaResult<HashMap<TableId, (String, String)>> {
407 self.catalog_controller.get_table_name_type_mapping().await
408 }
409
410 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
411 self.catalog_controller.get_created_table_ids().await
412 }
413
414 pub async fn get_table_associated_source_id(
415 &self,
416 table_id: TableId,
417 ) -> MetaResult<Option<SourceId>> {
418 self.catalog_controller
419 .get_table_associated_source_id(table_id)
420 .await
421 }
422
423 pub async fn get_table_catalog_by_ids(&self, ids: &[TableId]) -> MetaResult<Vec<PbTable>> {
424 self.catalog_controller
425 .get_table_by_ids(ids.to_vec(), false)
426 .await
427 }
428
429 pub async fn list_refresh_jobs(&self) -> MetaResult<Vec<refresh_job::Model>> {
430 self.catalog_controller.list_refresh_jobs().await
431 }
432
433 pub async fn list_refreshable_table_ids(&self) -> MetaResult<Vec<TableId>> {
434 self.catalog_controller.list_refreshable_table_ids().await
435 }
436
437 pub async fn ensure_refresh_job(&self, table_id: TableId) -> MetaResult<()> {
438 self.catalog_controller.ensure_refresh_job(table_id).await
439 }
440
441 pub async fn update_refresh_job_status(
442 &self,
443 table_id: TableId,
444 status: RefreshState,
445 trigger_time: Option<DateTime>,
446 is_success: bool,
447 ) -> MetaResult<()> {
448 self.catalog_controller
449 .update_refresh_job_status(table_id, status, trigger_time, is_success)
450 .await
451 }
452
453 pub async fn reset_all_refresh_jobs_to_idle(&self) -> MetaResult<()> {
454 self.catalog_controller
455 .reset_all_refresh_jobs_to_idle()
456 .await
457 }
458
459 pub async fn update_refresh_job_interval(
460 &self,
461 table_id: TableId,
462 trigger_interval_secs: Option<i64>,
463 ) -> MetaResult<()> {
464 self.catalog_controller
465 .update_refresh_job_interval(table_id, trigger_interval_secs)
466 .await
467 }
468
469 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
470 self.catalog_controller
471 .get_sink_state_table_ids(sink_id)
472 .await
473 }
474
475 pub async fn get_table_catalog_by_cdc_table_id(
476 &self,
477 cdc_table_id: &String,
478 ) -> MetaResult<Vec<PbTable>> {
479 self.catalog_controller
480 .get_table_by_cdc_table_id(cdc_table_id)
481 .await
482 }
483
484 pub async fn get_downstream_fragments(
485 &self,
486 job_id: JobId,
487 ) -> MetaResult<(
488 Vec<(PbDispatcherType, SharedFragmentInfo, PbStreamNode)>,
489 HashMap<ActorId, WorkerId>,
490 )> {
491 let (fragments, actors) = self
492 .catalog_controller
493 .get_downstream_fragments(job_id)
494 .await?;
495
496 Ok((fragments, actors))
497 }
498
499 pub async fn get_job_id_to_internal_table_ids_mapping(
500 &self,
501 ) -> Option<Vec<(JobId, Vec<TableId>)>> {
502 self.catalog_controller.get_job_internal_table_ids().await
503 }
504
505 pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
506 self.catalog_controller
507 .get_job_fragments_by_id(job_id)
508 .await
509 }
510
511 pub fn get_running_actors_of_fragment(&self, id: FragmentId) -> MetaResult<HashSet<ActorId>> {
512 self.catalog_controller
513 .get_running_actors_of_fragment(id as _)
514 }
515
516 pub async fn get_running_actors_for_source_backfill(
518 &self,
519 source_backfill_fragment_id: FragmentId,
520 source_fragment_id: FragmentId,
521 ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
522 let actor_ids = self
523 .catalog_controller
524 .get_running_actors_for_source_backfill(
525 source_backfill_fragment_id as _,
526 source_fragment_id as _,
527 )
528 .await?;
529 Ok(actor_ids
530 .into_iter()
531 .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
532 .collect())
533 }
534
535 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
536 let actor_cnt = self.catalog_controller.worker_actor_count()?;
537 Ok(actor_cnt
538 .into_iter()
539 .map(|(id, cnt)| (id as WorkerId, cnt))
540 .collect())
541 }
542
543 pub async fn count_streaming_job(&self) -> MetaResult<usize> {
544 self.catalog_controller.count_streaming_jobs().await
545 }
546
547 pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
548 self.catalog_controller
549 .list_stream_job_desc_for_telemetry()
550 .await
551 }
552
553 pub async fn update_source_rate_limit_by_source_id(
554 &self,
555 source_id: SourceId,
556 rate_limit: Option<u32>,
557 ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
558 self.catalog_controller
559 .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
560 .await
561 }
562
563 pub async fn update_backfill_rate_limit_by_job_id(
564 &self,
565 job_id: JobId,
566 rate_limit: Option<u32>,
567 ) -> MetaResult<HashSet<FragmentId>> {
568 self.catalog_controller
569 .update_backfill_rate_limit_by_job_id(job_id, rate_limit)
570 .await
571 }
572
573 pub async fn update_sink_rate_limit_by_sink_id(
574 &self,
575 sink_id: SinkId,
576 rate_limit: Option<u32>,
577 ) -> MetaResult<HashSet<FragmentId>> {
578 self.catalog_controller
579 .update_sink_rate_limit_by_job_id(sink_id, rate_limit)
580 .await
581 }
582
583 pub async fn update_dml_rate_limit_by_job_id(
584 &self,
585 job_id: JobId,
586 rate_limit: Option<u32>,
587 ) -> MetaResult<HashSet<FragmentId>> {
588 self.catalog_controller
589 .update_dml_rate_limit_by_job_id(job_id, rate_limit)
590 .await
591 }
592
593 pub async fn update_sink_props_by_sink_id(
594 &self,
595 sink_id: SinkId,
596 props: BTreeMap<String, String>,
597 ) -> MetaResult<HashMap<String, String>> {
598 let new_props = self
599 .catalog_controller
600 .update_sink_props_by_sink_id(sink_id, props)
601 .await?;
602 Ok(new_props)
603 }
604
605 pub async fn update_iceberg_table_props_by_table_id(
606 &self,
607 table_id: TableId,
608 props: BTreeMap<String, String>,
609 alter_iceberg_table_props: Option<
610 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
611 >,
612 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
613 let (new_props, sink_id) = self
614 .catalog_controller
615 .update_iceberg_table_props_by_table_id(table_id, props, alter_iceberg_table_props)
616 .await?;
617 Ok((new_props, sink_id))
618 }
619
620 pub async fn update_fragment_rate_limit_by_fragment_id(
621 &self,
622 fragment_id: FragmentId,
623 rate_limit: Option<u32>,
624 ) -> MetaResult<()> {
625 self.catalog_controller
626 .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
627 .await
628 }
629
630 #[await_tree::instrument]
631 pub async fn update_fragment_splits(
632 &self,
633 split_assignment: &SplitAssignment,
634 ) -> MetaResult<()> {
635 let fragment_splits = split_assignment
636 .iter()
637 .map(|(fragment_id, splits)| {
638 (
639 *fragment_id as _,
640 splits.values().flatten().cloned().collect_vec(),
641 )
642 })
643 .collect();
644
645 let inner = self.catalog_controller.inner.write().await;
646
647 self.catalog_controller
648 .update_fragment_splits(&inner.db, &fragment_splits)
649 .await
650 }
651
652 pub async fn get_mv_depended_subscriptions(
653 &self,
654 database_id: Option<DatabaseId>,
655 ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
656 Ok(self
657 .catalog_controller
658 .get_mv_depended_subscriptions(database_id)
659 .await?
660 .into_iter()
661 .map(|(table_id, subscriptions)| {
662 (
663 table_id,
664 subscriptions
665 .into_iter()
666 .map(|(subscription_id, retention_time)| {
667 (subscription_id as SubscriptionId, retention_time)
668 })
669 .collect(),
670 )
671 })
672 .collect())
673 }
674
675 pub async fn get_job_max_parallelism(&self, job_id: JobId) -> MetaResult<usize> {
676 self.catalog_controller
677 .get_max_parallelism_by_id(job_id)
678 .await
679 }
680
681 pub async fn get_existing_job_resource_group(
682 &self,
683 streaming_job_id: JobId,
684 ) -> MetaResult<String> {
685 self.catalog_controller
686 .get_existing_job_resource_group(streaming_job_id)
687 .await
688 }
689
690 pub async fn get_database_resource_group(&self, database_id: DatabaseId) -> MetaResult<String> {
691 self.catalog_controller
692 .get_database_resource_group(database_id)
693 .await
694 }
695
696 pub fn cluster_id(&self) -> &ClusterId {
697 self.cluster_controller.cluster_id()
698 }
699
700 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
701 let rate_limits = self.catalog_controller.list_rate_limits().await?;
702 Ok(rate_limits)
703 }
704
705 pub async fn get_job_backfill_scan_types(
706 &self,
707 job_id: JobId,
708 ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
709 let backfill_types = self
710 .catalog_controller
711 .get_job_fragment_backfill_scan_type(job_id)
712 .await?;
713 Ok(backfill_types)
714 }
715
716 pub async fn collect_unreschedulable_backfill_jobs(
717 &self,
718 job_ids: impl IntoIterator<Item = &JobId>,
719 is_online: bool,
720 ) -> MetaResult<HashSet<JobId>> {
721 let mut unreschedulable = HashSet::new();
722
723 for job_id in job_ids {
724 let scan_types = self
725 .catalog_controller
726 .get_job_fragment_backfill_scan_type(*job_id)
727 .await?;
728 if scan_types
729 .values()
730 .any(|scan_type| !scan_type.is_reschedulable(is_online))
731 {
732 unreschedulable.insert(*job_id);
733 }
734 }
735
736 Ok(unreschedulable)
737 }
738
739 pub async fn collect_reschedule_blocked_jobs_for_creating_jobs(
740 &self,
741 creating_job_ids: impl IntoIterator<Item = &JobId>,
742 is_online: bool,
743 ) -> MetaResult<HashSet<JobId>> {
744 let creating_job_ids: HashSet<_> = creating_job_ids.into_iter().copied().collect();
745 if creating_job_ids.is_empty() {
746 return Ok(HashSet::new());
747 }
748
749 let inner = self.catalog_controller.inner.read().await;
750 let txn = inner.db.begin().await?;
751
752 let mut initial_fragment_ids = HashSet::new();
753 for job_id in &creating_job_ids {
754 let scan_types = self
755 .catalog_controller
756 .get_job_fragment_backfill_scan_type_in_txn(&txn, *job_id)
757 .await?;
758 initial_fragment_ids.extend(scan_types.into_iter().filter_map(
759 |(fragment_id, scan_type)| {
760 (!scan_type.is_reschedulable(is_online)).then_some(fragment_id)
761 },
762 ));
763 }
764
765 if !initial_fragment_ids.is_empty() {
766 let upstream_fragments = self
767 .catalog_controller
768 .upstream_fragments_in_txn(&txn, initial_fragment_ids.iter().copied())
769 .await?;
770 initial_fragment_ids.extend(upstream_fragments.into_values().flatten());
771 }
772
773 let mut blocked_fragment_ids = initial_fragment_ids.clone();
774 if !initial_fragment_ids.is_empty() {
775 let initial_fragment_ids = initial_fragment_ids.into_iter().collect_vec();
776 let ensembles =
777 find_fragment_no_shuffle_dags_detailed(&txn, &initial_fragment_ids).await?;
778 for ensemble in ensembles {
779 blocked_fragment_ids.extend(ensemble.fragments());
780 }
781 }
782
783 let mut blocked_job_ids = HashSet::new();
784 if !blocked_fragment_ids.is_empty() {
785 let fragment_ids = blocked_fragment_ids.into_iter().collect_vec();
786 let fragment_job_ids = self
787 .catalog_controller
788 .get_fragment_job_id_in_txn(&txn, fragment_ids)
789 .await?;
790 blocked_job_ids.extend(
791 fragment_job_ids
792 .into_iter()
793 .map(|job_id| job_id.as_job_id()),
794 );
795 }
796
797 txn.commit().await?;
798
799 Ok(blocked_job_ids)
800 }
801}
802
803impl MetadataManager {
804 #[await_tree::instrument]
807 pub async fn wait_streaming_job_finished(
808 &self,
809 database_id: DatabaseId,
810 id: JobId,
811 ) -> MetaResult<NotificationVersion> {
812 tracing::debug!("wait_streaming_job_finished: {id:?}");
813 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
814 if mgr.streaming_job_is_finished(id).await? {
815 return Ok(self.catalog_controller.notify_frontend_trivial().await);
816 }
817 let (tx, rx) = oneshot::channel();
818
819 mgr.register_finish_notifier(database_id, id, tx);
820 drop(mgr);
821 rx.await
822 .map_err(|_| "no received reason".to_owned())
823 .and_then(|result| result)
824 .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
825 }
826
827 pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
828 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
829 mgr.notify_finish_failed(database_id, err);
830 }
831}