1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
21use risingwave_common::id::JobId;
22use risingwave_meta_model::refresh_job::{self, RefreshState};
23use risingwave_meta_model::{SinkId, SourceId, WorkerId};
24use risingwave_pb::catalog::{PbSource, PbTable};
25use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
26use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
27use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
28use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamScanType};
29use sea_orm::TransactionTrait;
30use sea_orm::prelude::DateTime;
31use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
32use tokio::sync::oneshot;
33use tracing::warn;
34
35use crate::MetaResult;
36use crate::controller::catalog::CatalogControllerRef;
37use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
38use crate::controller::fragment::FragmentParallelismInfo;
39use crate::controller::scale::find_fragment_no_shuffle_dags_detailed;
40use crate::manager::{LocalNotification, NotificationVersion};
41use crate::model::{ActorId, ClusterId, Fragment, FragmentId, StreamJobFragments, SubscriptionId};
42use crate::stream::SplitAssignment;
43use crate::telemetry::MetaTelemetryJobDesc;
44
45#[derive(Clone)]
46pub struct MetadataManager {
47 pub cluster_controller: ClusterControllerRef,
48 pub catalog_controller: CatalogControllerRef,
49}
50
51#[derive(Debug)]
52pub(crate) enum ActiveStreamingWorkerChange {
53 Add(WorkerNode),
54 Remove(WorkerNode),
55 Update(WorkerNode),
56}
57
58pub struct ActiveStreamingWorkerNodes {
59 worker_nodes: HashMap<WorkerId, WorkerNode>,
60 rx: UnboundedReceiver<LocalNotification>,
61 #[cfg_attr(not(debug_assertions), expect(dead_code))]
62 meta_manager: Option<MetadataManager>,
63}
64
65impl Debug for ActiveStreamingWorkerNodes {
66 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67 f.debug_struct("ActiveStreamingWorkerNodes")
68 .field("worker_nodes", &self.worker_nodes)
69 .finish()
70 }
71}
72
73impl ActiveStreamingWorkerNodes {
74 pub(crate) fn uninitialized() -> Self {
75 Self {
76 worker_nodes: Default::default(),
77 rx: unbounded_channel().1,
78 meta_manager: None,
79 }
80 }
81
82 #[cfg(test)]
83 pub(crate) fn for_test(worker_nodes: HashMap<WorkerId, WorkerNode>) -> Self {
84 let (tx, rx) = unbounded_channel();
85 let _join_handle = tokio::spawn(async move {
86 let _tx = tx;
87 std::future::pending::<()>().await
88 });
89 Self {
90 worker_nodes,
91 rx,
92 meta_manager: None,
93 }
94 }
95
96 pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
98 let (nodes, rx) = meta_manager
99 .subscribe_active_streaming_compute_nodes()
100 .await?;
101 Ok(Self {
102 worker_nodes: nodes
103 .into_iter()
104 .filter_map(|node| {
105 let is_streaming = node.property.as_ref().is_some_and(|p| p.is_streaming);
106 is_streaming.then_some((node.id, node))
107 })
108 .collect(),
109 rx,
110 meta_manager: Some(meta_manager),
111 })
112 }
113
114 pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
115 &self.worker_nodes
116 }
117
118 pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
119 loop {
120 let notification = self
121 .rx
122 .recv()
123 .await
124 .expect("notification stopped or uninitialized");
125 fn is_target_worker_node(worker: &WorkerNode) -> bool {
126 worker.r#type == WorkerType::ComputeNode as i32
127 && worker.property.as_ref().unwrap().is_streaming
128 }
129 match notification {
130 LocalNotification::WorkerNodeDeleted(worker) => {
131 let is_target_worker_node = is_target_worker_node(&worker);
132 let Some(prev_worker) = self.worker_nodes.remove(&worker.id) else {
133 if is_target_worker_node {
134 warn!(
135 ?worker,
136 "notify to delete an non-existing streaming compute worker"
137 );
138 }
139 continue;
140 };
141 if !is_target_worker_node {
142 warn!(
143 ?worker,
144 ?prev_worker,
145 "deleted worker has a different recent type"
146 );
147 }
148 if worker.state == State::Starting as i32 {
149 warn!(
150 id = %worker.id,
151 host = ?worker.host,
152 state = worker.state,
153 "a starting streaming worker is deleted"
154 );
155 }
156 break ActiveStreamingWorkerChange::Remove(prev_worker);
157 }
158 LocalNotification::WorkerNodeActivated(worker) => {
159 if !is_target_worker_node(&worker) {
160 if let Some(prev_worker) = self.worker_nodes.remove(&worker.id) {
161 warn!(
162 ?worker,
163 ?prev_worker,
164 "the type of a streaming worker is changed"
165 );
166 break ActiveStreamingWorkerChange::Remove(prev_worker);
167 } else {
168 continue;
169 }
170 }
171 assert_eq!(
172 worker.state,
173 State::Running as i32,
174 "not started worker added: {:?}",
175 worker
176 );
177 if let Some(prev_worker) = self.worker_nodes.insert(worker.id, worker.clone()) {
178 assert_eq!(prev_worker.host, worker.host);
179 assert_eq!(prev_worker.r#type, worker.r#type);
180 warn!(
181 ?prev_worker,
182 ?worker,
183 eq = prev_worker == worker,
184 "notify to update an existing active worker"
185 );
186 if prev_worker == worker {
187 continue;
188 } else {
189 break ActiveStreamingWorkerChange::Update(worker);
190 }
191 } else {
192 break ActiveStreamingWorkerChange::Add(worker);
193 }
194 }
195 _ => {
196 continue;
197 }
198 }
199 }
200 }
201
202 #[cfg(debug_assertions)]
203 pub(crate) async fn validate_change(&self) {
204 use risingwave_pb::common::WorkerNode;
205 use thiserror_ext::AsReport;
206 let Some(meta_manager) = &self.meta_manager else {
207 return;
208 };
209 match meta_manager.list_active_streaming_compute_nodes().await {
210 Ok(worker_nodes) => {
211 let ignore_irrelevant_info = |node: &WorkerNode| {
212 (
213 node.id,
214 WorkerNode {
215 id: node.id,
216 r#type: node.r#type,
217 host: node.host.clone(),
218 property: node.property.clone(),
219 resource: node.resource.clone(),
220 ..Default::default()
221 },
222 )
223 };
224 let worker_nodes: HashMap<_, _> =
225 worker_nodes.iter().map(ignore_irrelevant_info).collect();
226 let curr_worker_nodes: HashMap<_, _> = self
227 .current()
228 .values()
229 .map(ignore_irrelevant_info)
230 .collect();
231 if worker_nodes != curr_worker_nodes {
232 warn!(
233 ?worker_nodes,
234 ?curr_worker_nodes,
235 "different to global snapshot"
236 );
237 }
238 }
239 Err(e) => {
240 warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
241 }
242 }
243 }
244}
245
246impl MetadataManager {
247 pub fn new(
248 cluster_controller: ClusterControllerRef,
249 catalog_controller: CatalogControllerRef,
250 ) -> Self {
251 Self {
252 cluster_controller,
253 catalog_controller,
254 }
255 }
256
257 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
258 self.cluster_controller.get_worker_by_id(worker_id).await
259 }
260
261 pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
262 let node_map = self.cluster_controller.count_worker_by_type().await?;
263 Ok(node_map
264 .into_iter()
265 .map(|(ty, cnt)| (ty.into(), cnt as u64))
266 .collect())
267 }
268
269 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
270 self.cluster_controller
271 .get_worker_info_by_id(worker_id as _)
272 .await
273 }
274
275 pub async fn add_worker_node(
276 &self,
277 r#type: PbWorkerType,
278 host_address: HostAddress,
279 property: AddNodeProperty,
280 resource: PbResource,
281 ) -> MetaResult<WorkerId> {
282 self.cluster_controller
283 .add_worker(r#type, host_address, property, resource)
284 .await
285 .map(|id| id as WorkerId)
286 }
287
288 pub async fn list_worker_node(
289 &self,
290 worker_type: Option<WorkerType>,
291 worker_state: Option<State>,
292 ) -> MetaResult<Vec<PbWorkerNode>> {
293 self.cluster_controller
294 .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
295 .await
296 }
297
298 pub async fn subscribe_active_streaming_compute_nodes(
299 &self,
300 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
301 self.cluster_controller
302 .subscribe_active_streaming_compute_nodes()
303 .await
304 }
305
306 pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
307 self.cluster_controller
308 .list_active_streaming_workers()
309 .await
310 }
311
312 pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
313 self.cluster_controller.list_active_serving_workers().await
314 }
315
316 pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
317 Ok(self
318 .catalog_controller
319 .list_fragment_database_ids(None)
320 .await?
321 .into_iter()
322 .map(|(_, database_id)| database_id)
323 .collect())
324 }
325
326 pub async fn split_fragment_map_by_database<T: Debug>(
327 &self,
328 fragment_map: HashMap<FragmentId, T>,
329 ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
330 let fragment_to_database_map: HashMap<_, _> = self
331 .catalog_controller
332 .list_fragment_database_ids(Some(
333 fragment_map
334 .keys()
335 .map(|fragment_id| *fragment_id as _)
336 .collect(),
337 ))
338 .await?
339 .into_iter()
340 .map(|(fragment_id, database_id)| (fragment_id as FragmentId, database_id))
341 .collect();
342 let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
343 for (fragment_id, value) in fragment_map {
344 let database_id = *fragment_to_database_map
345 .get(&fragment_id)
346 .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
347 ret.entry(database_id)
348 .or_default()
349 .try_insert(fragment_id, value)
350 .expect("non duplicate");
351 }
352 Ok(ret)
353 }
354
355 pub async fn list_background_creating_jobs(&self) -> MetaResult<HashSet<JobId>> {
356 self.catalog_controller
357 .list_background_creating_jobs(false, None)
358 .await
359 }
360
361 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
362 self.catalog_controller.list_sources().await
363 }
364
365 pub fn running_fragment_parallelisms(
366 &self,
367 id_filter: Option<HashSet<FragmentId>>,
368 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
369 let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
370 Ok(self
371 .catalog_controller
372 .running_fragment_parallelisms(id_filter)?
373 .into_iter()
374 .map(|(k, v)| (k as FragmentId, v))
375 .collect())
376 }
377
378 pub async fn get_upstream_root_fragments(
383 &self,
384 upstream_table_ids: &HashSet<TableId>,
385 ) -> MetaResult<HashMap<JobId, Fragment>> {
386 let upstream_root_fragments = self
387 .catalog_controller
388 .get_root_fragments(upstream_table_ids.iter().map(|id| id.as_job_id()).collect())
389 .await?;
390
391 Ok(upstream_root_fragments)
392 }
393
394 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
395 self.cluster_controller.get_streaming_cluster_info().await
396 }
397
398 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
399 self.catalog_controller.get_all_table_options().await
400 }
401
402 pub async fn get_table_name_type_mapping(
403 &self,
404 ) -> MetaResult<HashMap<TableId, (String, String)>> {
405 self.catalog_controller.get_table_name_type_mapping().await
406 }
407
408 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
409 self.catalog_controller.get_created_table_ids().await
410 }
411
412 pub async fn get_table_associated_source_id(
413 &self,
414 table_id: TableId,
415 ) -> MetaResult<Option<SourceId>> {
416 self.catalog_controller
417 .get_table_associated_source_id(table_id)
418 .await
419 }
420
421 pub async fn get_table_catalog_by_ids(&self, ids: &[TableId]) -> MetaResult<Vec<PbTable>> {
422 self.catalog_controller
423 .get_table_by_ids(ids.to_vec(), false)
424 .await
425 }
426
427 pub async fn list_refresh_jobs(&self) -> MetaResult<Vec<refresh_job::Model>> {
428 self.catalog_controller.list_refresh_jobs().await
429 }
430
431 pub async fn list_refreshable_table_ids(&self) -> MetaResult<Vec<TableId>> {
432 self.catalog_controller.list_refreshable_table_ids().await
433 }
434
435 pub async fn ensure_refresh_job(&self, table_id: TableId) -> MetaResult<()> {
436 self.catalog_controller.ensure_refresh_job(table_id).await
437 }
438
439 pub async fn update_refresh_job_status(
440 &self,
441 table_id: TableId,
442 status: RefreshState,
443 trigger_time: Option<DateTime>,
444 is_success: bool,
445 ) -> MetaResult<()> {
446 self.catalog_controller
447 .update_refresh_job_status(table_id, status, trigger_time, is_success)
448 .await
449 }
450
451 pub async fn reset_all_refresh_jobs_to_idle(&self) -> MetaResult<()> {
452 self.catalog_controller
453 .reset_all_refresh_jobs_to_idle()
454 .await
455 }
456
457 pub async fn update_refresh_job_interval(
458 &self,
459 table_id: TableId,
460 trigger_interval_secs: Option<i64>,
461 ) -> MetaResult<()> {
462 self.catalog_controller
463 .update_refresh_job_interval(table_id, trigger_interval_secs)
464 .await
465 }
466
467 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
468 self.catalog_controller
469 .get_sink_state_table_ids(sink_id)
470 .await
471 }
472
473 pub async fn get_table_catalog_by_cdc_table_id(
474 &self,
475 cdc_table_id: &String,
476 ) -> MetaResult<Vec<PbTable>> {
477 self.catalog_controller
478 .get_table_by_cdc_table_id(cdc_table_id)
479 .await
480 }
481
482 pub async fn get_downstream_fragments(
483 &self,
484 job_id: JobId,
485 ) -> MetaResult<Vec<(PbDispatcherType, Fragment)>> {
486 self.catalog_controller
487 .get_downstream_fragments(job_id)
488 .await
489 }
490
491 pub async fn get_job_id_to_internal_table_ids_mapping(
492 &self,
493 ) -> Option<Vec<(JobId, Vec<TableId>)>> {
494 self.catalog_controller.get_job_internal_table_ids().await
495 }
496
497 pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
498 Ok(self
499 .catalog_controller
500 .get_job_fragments_by_id(job_id)
501 .await?
502 .0)
503 }
504
505 pub fn get_running_actors_of_fragment(&self, id: FragmentId) -> MetaResult<HashSet<ActorId>> {
506 self.catalog_controller
507 .get_running_actors_of_fragment(id as _)
508 }
509
510 pub async fn get_running_actors_for_source_backfill(
512 &self,
513 source_backfill_fragment_id: FragmentId,
514 source_fragment_id: FragmentId,
515 ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
516 let actor_ids = self
517 .catalog_controller
518 .get_running_actors_for_source_backfill(
519 source_backfill_fragment_id as _,
520 source_fragment_id as _,
521 )
522 .await?;
523 Ok(actor_ids
524 .into_iter()
525 .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
526 .collect())
527 }
528
529 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
530 let actor_cnt = self.catalog_controller.worker_actor_count()?;
531 Ok(actor_cnt
532 .into_iter()
533 .map(|(id, cnt)| (id as WorkerId, cnt))
534 .collect())
535 }
536
537 pub async fn count_streaming_job(&self) -> MetaResult<usize> {
538 self.catalog_controller.count_streaming_jobs().await
539 }
540
541 pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
542 self.catalog_controller
543 .list_stream_job_desc_for_telemetry()
544 .await
545 }
546
547 pub async fn update_source_rate_limit_by_source_id(
548 &self,
549 source_id: SourceId,
550 rate_limit: Option<u32>,
551 ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
552 self.catalog_controller
553 .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
554 .await
555 }
556
557 pub async fn update_backfill_rate_limit_by_job_id(
558 &self,
559 job_id: JobId,
560 rate_limit: Option<u32>,
561 ) -> MetaResult<HashSet<FragmentId>> {
562 self.catalog_controller
563 .update_backfill_rate_limit_by_job_id(job_id, rate_limit)
564 .await
565 }
566
567 pub async fn update_sink_rate_limit_by_sink_id(
568 &self,
569 sink_id: SinkId,
570 rate_limit: Option<u32>,
571 ) -> MetaResult<HashSet<FragmentId>> {
572 self.catalog_controller
573 .update_sink_rate_limit_by_job_id(sink_id, rate_limit)
574 .await
575 }
576
577 pub async fn update_dml_rate_limit_by_job_id(
578 &self,
579 job_id: JobId,
580 rate_limit: Option<u32>,
581 ) -> MetaResult<HashSet<FragmentId>> {
582 self.catalog_controller
583 .update_dml_rate_limit_by_job_id(job_id, rate_limit)
584 .await
585 }
586
587 pub async fn update_sink_props_by_sink_id(
588 &self,
589 sink_id: SinkId,
590 props: BTreeMap<String, String>,
591 ) -> MetaResult<HashMap<String, String>> {
592 let new_props = self
593 .catalog_controller
594 .update_sink_props_by_sink_id(sink_id, props)
595 .await?;
596 Ok(new_props)
597 }
598
599 pub async fn update_iceberg_table_props_by_table_id(
600 &self,
601 table_id: TableId,
602 props: BTreeMap<String, String>,
603 alter_iceberg_table_props: Option<
604 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
605 >,
606 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
607 let (new_props, sink_id) = self
608 .catalog_controller
609 .update_iceberg_table_props_by_table_id(table_id, props, alter_iceberg_table_props)
610 .await?;
611 Ok((new_props, sink_id))
612 }
613
614 pub async fn update_fragment_rate_limit_by_fragment_id(
615 &self,
616 fragment_id: FragmentId,
617 rate_limit: Option<u32>,
618 ) -> MetaResult<()> {
619 self.catalog_controller
620 .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
621 .await
622 }
623
624 #[await_tree::instrument]
625 pub async fn update_fragment_splits(
626 &self,
627 split_assignment: &SplitAssignment,
628 ) -> MetaResult<()> {
629 let fragment_splits = split_assignment
630 .iter()
631 .map(|(fragment_id, splits)| {
632 (
633 *fragment_id as _,
634 splits.values().flatten().cloned().collect_vec(),
635 )
636 })
637 .collect();
638
639 let inner = self.catalog_controller.inner.write().await;
640
641 self.catalog_controller
642 .update_fragment_splits(&inner.db, &fragment_splits)
643 .await
644 }
645
646 pub async fn get_mv_depended_subscriptions(
647 &self,
648 database_id: Option<DatabaseId>,
649 ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
650 Ok(self
651 .catalog_controller
652 .get_mv_depended_subscriptions(database_id)
653 .await?
654 .into_iter()
655 .map(|(table_id, subscriptions)| {
656 (
657 table_id,
658 subscriptions
659 .into_iter()
660 .map(|(subscription_id, retention_time)| {
661 (subscription_id as SubscriptionId, retention_time)
662 })
663 .collect(),
664 )
665 })
666 .collect())
667 }
668
669 pub async fn get_job_max_parallelism(&self, job_id: JobId) -> MetaResult<usize> {
670 self.catalog_controller
671 .get_max_parallelism_by_id(job_id)
672 .await
673 }
674
675 pub async fn get_existing_job_resource_group(
676 &self,
677 streaming_job_id: JobId,
678 ) -> MetaResult<String> {
679 self.catalog_controller
680 .get_existing_job_resource_group(streaming_job_id)
681 .await
682 }
683
684 pub async fn get_database_resource_group(&self, database_id: DatabaseId) -> MetaResult<String> {
685 self.catalog_controller
686 .get_database_resource_group(database_id)
687 .await
688 }
689
690 pub fn cluster_id(&self) -> &ClusterId {
691 self.cluster_controller.cluster_id()
692 }
693
694 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
695 let rate_limits = self.catalog_controller.list_rate_limits().await?;
696 Ok(rate_limits)
697 }
698
699 pub async fn get_job_backfill_scan_types(
700 &self,
701 job_id: JobId,
702 ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
703 let backfill_types = self
704 .catalog_controller
705 .get_job_fragment_backfill_scan_type(job_id)
706 .await?;
707 Ok(backfill_types)
708 }
709
710 pub async fn collect_unreschedulable_backfill_jobs(
711 &self,
712 job_ids: impl IntoIterator<Item = &JobId>,
713 is_online: bool,
714 ) -> MetaResult<HashSet<JobId>> {
715 let mut unreschedulable = HashSet::new();
716
717 for job_id in job_ids {
718 let scan_types = self
719 .catalog_controller
720 .get_job_fragment_backfill_scan_type(*job_id)
721 .await?;
722 if scan_types
723 .values()
724 .any(|scan_type| !scan_type.is_reschedulable(is_online))
725 {
726 unreschedulable.insert(*job_id);
727 }
728 }
729
730 Ok(unreschedulable)
731 }
732
733 pub async fn collect_reschedule_blocked_jobs_for_creating_jobs(
734 &self,
735 creating_job_ids: impl IntoIterator<Item = &JobId>,
736 is_online: bool,
737 ) -> MetaResult<HashSet<JobId>> {
738 let creating_job_ids: HashSet<_> = creating_job_ids.into_iter().copied().collect();
739 if creating_job_ids.is_empty() {
740 return Ok(HashSet::new());
741 }
742
743 let inner = self.catalog_controller.inner.read().await;
744 let txn = inner.db.begin().await?;
745
746 let mut initial_fragment_ids = HashSet::new();
747 for job_id in &creating_job_ids {
748 let scan_types = self
749 .catalog_controller
750 .get_job_fragment_backfill_scan_type_in_txn(&txn, *job_id)
751 .await?;
752 initial_fragment_ids.extend(scan_types.into_iter().filter_map(
753 |(fragment_id, scan_type)| {
754 (!scan_type.is_reschedulable(is_online)).then_some(fragment_id)
755 },
756 ));
757 }
758
759 if !initial_fragment_ids.is_empty() {
760 let upstream_fragments = self
761 .catalog_controller
762 .upstream_fragments_in_txn(&txn, initial_fragment_ids.iter().copied())
763 .await?;
764 initial_fragment_ids.extend(upstream_fragments.into_values().flatten());
765 }
766
767 let mut blocked_fragment_ids = initial_fragment_ids.clone();
768 if !initial_fragment_ids.is_empty() {
769 let initial_fragment_ids = initial_fragment_ids.into_iter().collect_vec();
770 let ensembles =
771 find_fragment_no_shuffle_dags_detailed(&txn, &initial_fragment_ids).await?;
772 for ensemble in ensembles {
773 blocked_fragment_ids.extend(ensemble.fragments());
774 }
775 }
776
777 let mut blocked_job_ids = HashSet::new();
778 if !blocked_fragment_ids.is_empty() {
779 let fragment_ids = blocked_fragment_ids.into_iter().collect_vec();
780 let fragment_job_ids = self
781 .catalog_controller
782 .get_fragment_job_id_in_txn(&txn, fragment_ids)
783 .await?;
784 blocked_job_ids.extend(
785 fragment_job_ids
786 .into_iter()
787 .map(|job_id| job_id.as_job_id()),
788 );
789 }
790
791 txn.commit().await?;
792
793 Ok(blocked_job_ids)
794 }
795}
796
797impl MetadataManager {
798 #[await_tree::instrument]
801 pub async fn wait_streaming_job_finished(
802 &self,
803 database_id: DatabaseId,
804 id: JobId,
805 ) -> MetaResult<NotificationVersion> {
806 tracing::debug!("wait_streaming_job_finished: {id:?}");
807 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
808 if mgr.streaming_job_is_finished(id).await? {
809 return Ok(self.catalog_controller.notify_frontend_trivial().await);
810 }
811 let (tx, rx) = oneshot::channel();
812
813 mgr.register_finish_notifier(database_id, id, tx);
814 drop(mgr);
815 rx.await
816 .map_err(|_| "no received reason".to_owned())
817 .and_then(|result| result)
818 .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
819 }
820
821 pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
822 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
823 mgr.notify_finish_failed(database_id, err);
824 }
825}