1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::CdcScanOptions;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34 Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39 VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink,
40 source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46 Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::fragment::{
49 FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan;
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57 PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, StreamScanType,
58};
59use sea_orm::ActiveValue::Set;
60use sea_orm::sea_query::Expr;
61use sea_orm::{
62 ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, JoinType, PaginatorTrait,
63 QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
64};
65use serde::{Deserialize, Serialize};
66
67use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
68use crate::controller::catalog::CatalogController;
69use crate::controller::scale::{
70 FragmentRenderMap, NoShuffleEnsemble, find_fragment_no_shuffle_dags_detailed,
71 load_fragment_info, resolve_streaming_job_definition,
72};
73use crate::controller::utils::{
74 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
75 get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
76 resolve_no_shuffle_actor_dispatcher,
77};
78use crate::error::MetaError;
79use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
80use crate::model::{
81 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
82 StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
83 TableParallelism,
84};
85use crate::rpc::ddl_controller::build_upstream_sink_info;
86use crate::stream::UpstreamSinkInfo;
87use crate::{MetaResult, model};
88
89#[derive(Debug)]
91pub struct InflightActorInfo {
92 pub worker_id: WorkerId,
93 pub vnode_bitmap: Option<Bitmap>,
94 pub splits: Vec<SplitImpl>,
95}
96
97#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
98struct ActorInfo {
99 pub actor_id: ActorId,
100 pub fragment_id: FragmentId,
101 pub splits: ConnectorSplits,
102 pub worker_id: WorkerId,
103 pub vnode_bitmap: Option<VnodeBitmap>,
104 pub expr_context: ExprContext,
105 pub config_override: Arc<str>,
106}
107
108#[derive(Debug)]
109pub struct InflightFragmentInfo {
110 pub fragment_id: FragmentId,
111 pub distribution_type: DistributionType,
112 pub fragment_type_mask: FragmentTypeMask,
113 pub vnode_count: usize,
114 pub nodes: PbStreamNode,
115 pub actors: HashMap<ActorId, InflightActorInfo>,
116 pub state_table_ids: HashSet<TableId>,
117}
118
119#[derive(Clone, Debug)]
120pub struct FragmentParallelismInfo {
121 pub distribution_type: FragmentDistributionType,
122 pub actor_count: usize,
123 pub vnode_count: usize,
124}
125
126#[easy_ext::ext(FragmentTypeMaskExt)]
127pub impl FragmentTypeMask {
128 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
129 Expr::col(fragment::Column::FragmentTypeMask)
130 .bit_and(Expr::value(flag as i32))
131 .ne(0)
132 }
133
134 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
135 Expr::col(fragment::Column::FragmentTypeMask)
136 .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
137 .ne(0)
138 }
139
140 fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
141 Expr::col(fragment::Column::FragmentTypeMask)
142 .bit_and(Expr::value(flag as i32))
143 .eq(0)
144 }
145}
146
147#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
148#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
150 pub job_id: JobId,
151 pub obj_type: ObjectType,
152 pub name: String,
153 pub job_status: JobStatus,
154 pub parallelism: StreamingParallelism,
155 pub max_parallelism: i32,
156 pub resource_group: String,
157 pub config_override: String,
158 pub database_id: DatabaseId,
159 pub schema_id: SchemaId,
160}
161
162impl NotificationManager {
163 pub(crate) fn notify_fragment_mapping(
164 &self,
165 operation: NotificationOperation,
166 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
167 ) {
168 let fragment_ids = fragment_mappings
169 .iter()
170 .map(|mapping| mapping.fragment_id)
171 .collect_vec();
172 if fragment_ids.is_empty() {
173 return;
174 }
175 for fragment_mapping in fragment_mappings {
177 self.notify_frontend_without_version(
178 operation,
179 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
180 );
181 }
182
183 match operation {
185 NotificationOperation::Add | NotificationOperation::Update => {
186 self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
187 fragment_ids,
188 ));
189 }
190 NotificationOperation::Delete => {
191 self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
192 fragment_ids,
193 ));
194 }
195 op => {
196 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
197 }
198 }
199 }
200}
201
202impl CatalogController {
203 pub fn prepare_fragment_models_from_fragments(
204 job_id: JobId,
205 fragments: impl Iterator<Item = &Fragment>,
206 ) -> MetaResult<Vec<fragment::Model>> {
207 fragments
208 .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
209 .try_collect()
210 }
211
212 pub fn prepare_fragment_model_for_new_job(
213 job_id: JobId,
214 fragment: &Fragment,
215 ) -> MetaResult<fragment::Model> {
216 let vnode_count = fragment.vnode_count();
217 let Fragment {
218 fragment_id: pb_fragment_id,
219 fragment_type_mask: pb_fragment_type_mask,
220 distribution_type: pb_distribution_type,
221 actors: pb_actors,
222 state_table_ids: pb_state_table_ids,
223 nodes,
224 ..
225 } = fragment;
226
227 let state_table_ids = pb_state_table_ids.clone().into();
228
229 assert!(!pb_actors.is_empty());
230
231 let fragment_parallelism = nodes
232 .node_body
233 .as_ref()
234 .and_then(|body| match body {
235 NodeBody::StreamCdcScan(node) => Some(node),
236 _ => None,
237 })
238 .and_then(|node| node.options.as_ref())
239 .map(CdcScanOptions::from_proto)
240 .filter(|opts| opts.is_parallelized_backfill())
241 .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
242
243 let stream_node = StreamNode::from(nodes);
244
245 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
246 .unwrap()
247 .into();
248
249 #[expect(deprecated)]
250 let fragment = fragment::Model {
251 fragment_id: *pb_fragment_id as _,
252 job_id,
253 fragment_type_mask: (*pb_fragment_type_mask).into(),
254 distribution_type,
255 stream_node,
256 state_table_ids,
257 upstream_fragment_id: Default::default(),
258 vnode_count: vnode_count as _,
259 parallelism: fragment_parallelism,
260 };
261
262 Ok(fragment)
263 }
264
265 fn compose_table_fragments(
266 job_id: JobId,
267 state: PbState,
268 ctx: StreamContext,
269 fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
270 parallelism: StreamingParallelism,
271 max_parallelism: usize,
272 job_definition: Option<String>,
273 ) -> MetaResult<StreamJobFragments> {
274 let mut pb_fragments = BTreeMap::new();
275 let mut pb_actor_status = BTreeMap::new();
276
277 for (fragment, actors) in fragments {
278 let (fragment, fragment_actor_status, _) =
279 Self::compose_fragment(fragment, actors, job_definition.clone())?;
280
281 pb_fragments.insert(fragment.fragment_id, fragment);
282 pb_actor_status.extend(fragment_actor_status.into_iter());
283 }
284
285 let table_fragments = StreamJobFragments {
286 stream_job_id: job_id,
287 state: state as _,
288 fragments: pb_fragments,
289 actor_status: pb_actor_status,
290 ctx,
291 assigned_parallelism: match parallelism {
292 StreamingParallelism::Custom => TableParallelism::Custom,
293 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
294 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
295 },
296 max_parallelism,
297 };
298
299 Ok(table_fragments)
300 }
301
302 #[allow(clippy::type_complexity)]
303 fn compose_fragment(
304 fragment: fragment::Model,
305 actors: Vec<ActorInfo>,
306 job_definition: Option<String>,
307 ) -> MetaResult<(
308 Fragment,
309 HashMap<crate::model::ActorId, PbActorStatus>,
310 HashMap<crate::model::ActorId, PbConnectorSplits>,
311 )> {
312 let fragment::Model {
313 fragment_id,
314 fragment_type_mask,
315 distribution_type,
316 stream_node,
317 state_table_ids,
318 vnode_count,
319 ..
320 } = fragment;
321
322 let stream_node = stream_node.to_protobuf();
323 let mut upstream_fragments = HashSet::new();
324 visit_stream_node_body(&stream_node, |body| {
325 if let NodeBody::Merge(m) = body {
326 assert!(
327 upstream_fragments.insert(m.upstream_fragment_id),
328 "non-duplicate upstream fragment"
329 );
330 }
331 });
332
333 let mut pb_actors = vec![];
334
335 let mut pb_actor_status = HashMap::new();
336 let mut pb_actor_splits = HashMap::new();
337
338 for actor in actors {
339 if actor.fragment_id != fragment_id {
340 bail!(
341 "fragment id {} from actor {} is different from fragment {}",
342 actor.fragment_id,
343 actor.actor_id,
344 fragment_id
345 )
346 }
347
348 let ActorInfo {
349 actor_id,
350 fragment_id,
351 worker_id,
352 splits,
353 vnode_bitmap,
354 expr_context,
355 config_override,
356 ..
357 } = actor;
358
359 let vnode_bitmap =
360 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
361 let pb_expr_context = Some(expr_context.to_protobuf());
362
363 pb_actor_status.insert(
364 actor_id as _,
365 PbActorStatus {
366 location: PbActorLocation::from_worker(worker_id),
367 },
368 );
369
370 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
371
372 pb_actors.push(StreamActor {
373 actor_id: actor_id as _,
374 fragment_id: fragment_id as _,
375 vnode_bitmap,
376 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
377 expr_context: pb_expr_context,
378 config_override,
379 })
380 }
381
382 let pb_state_table_ids = state_table_ids.0;
383 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
384 let pb_fragment = Fragment {
385 fragment_id: fragment_id as _,
386 fragment_type_mask: fragment_type_mask.into(),
387 distribution_type: pb_distribution_type,
388 actors: pb_actors,
389 state_table_ids: pb_state_table_ids,
390 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
391 nodes: stream_node,
392 };
393
394 Ok((pb_fragment, pb_actor_status, pb_actor_splits))
395 }
396
397 pub fn running_fragment_parallelisms(
398 &self,
399 id_filter: Option<HashSet<FragmentId>>,
400 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
401 let info = self.env.shared_actor_infos().read_guard();
402
403 let mut result = HashMap::new();
404 for (fragment_id, fragment) in info.iter_over_fragments() {
405 if let Some(id_filter) = &id_filter
406 && !id_filter.contains(&(*fragment_id as _))
407 {
408 continue; }
410
411 result.insert(
412 *fragment_id as _,
413 FragmentParallelismInfo {
414 distribution_type: fragment.distribution_type.into(),
415 actor_count: fragment.actors.len() as _,
416 vnode_count: fragment.vnode_count,
417 },
418 );
419 }
420
421 Ok(result)
422 }
423
424 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
425 let inner = self.inner.read().await;
426 let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
427 .select_only()
428 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
429 .into_tuple()
430 .all(&inner.db)
431 .await?;
432 Ok(fragment_jobs.into_iter().collect())
433 }
434
435 pub async fn get_fragment_job_id(
436 &self,
437 fragment_ids: Vec<FragmentId>,
438 ) -> MetaResult<Vec<ObjectId>> {
439 let inner = self.inner.read().await;
440
441 let object_ids: Vec<ObjectId> = FragmentModel::find()
442 .select_only()
443 .column(fragment::Column::JobId)
444 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
445 .into_tuple()
446 .all(&inner.db)
447 .await?;
448
449 Ok(object_ids)
450 }
451
452 pub async fn get_fragment_desc_by_id(
453 &self,
454 fragment_id: FragmentId,
455 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
456 let inner = self.inner.read().await;
457
458 let fragment_model = match FragmentModel::find_by_id(fragment_id)
459 .one(&inner.db)
460 .await?
461 {
462 Some(fragment) => fragment,
463 None => return Ok(None),
464 };
465
466 let job_parallelism: Option<StreamingParallelism> =
467 StreamingJob::find_by_id(fragment_model.job_id)
468 .select_only()
469 .column(streaming_job::Column::Parallelism)
470 .into_tuple()
471 .one(&inner.db)
472 .await?;
473
474 let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
475 .select_only()
476 .columns([
477 fragment_relation::Column::SourceFragmentId,
478 fragment_relation::Column::DispatcherType,
479 ])
480 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
481 .into_tuple()
482 .all(&inner.db)
483 .await?;
484
485 let upstreams: Vec<_> = upstream_entries
486 .into_iter()
487 .map(|(source_id, _)| source_id)
488 .collect();
489
490 let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
491 .await
492 .map(Self::collect_root_fragment_mapping)?;
493 let root_fragments = root_fragment_map
494 .get(&fragment_id)
495 .cloned()
496 .unwrap_or_default();
497
498 let info = self.env.shared_actor_infos().read_guard();
499 let SharedFragmentInfo { actors, .. } = info
500 .get_fragment(fragment_model.fragment_id as _)
501 .unwrap_or_else(|| {
502 panic!(
503 "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
504 fragment_model.fragment_id,
505 fragment_model.job_id
506 )
507 });
508
509 let parallelism_policy = Self::format_fragment_parallelism_policy(
510 &fragment_model,
511 job_parallelism.as_ref(),
512 &root_fragments,
513 );
514
515 let fragment = FragmentDesc {
516 fragment_id: fragment_model.fragment_id,
517 job_id: fragment_model.job_id,
518 fragment_type_mask: fragment_model.fragment_type_mask,
519 distribution_type: fragment_model.distribution_type,
520 state_table_ids: fragment_model.state_table_ids.clone(),
521 parallelism: actors.len() as _,
522 vnode_count: fragment_model.vnode_count,
523 stream_node: fragment_model.stream_node.clone(),
524 parallelism_policy,
525 };
526
527 Ok(Some((fragment, upstreams)))
528 }
529
530 pub async fn list_fragment_database_ids(
531 &self,
532 select_fragment_ids: Option<Vec<FragmentId>>,
533 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
534 let inner = self.inner.read().await;
535 let select = FragmentModel::find()
536 .select_only()
537 .column(fragment::Column::FragmentId)
538 .column(object::Column::DatabaseId)
539 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
540 let select = if let Some(select_fragment_ids) = select_fragment_ids {
541 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
542 } else {
543 select
544 };
545 Ok(select.into_tuple().all(&inner.db).await?)
546 }
547
548 pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
549 let inner = self.inner.read().await;
550
551 let fragments: Vec<_> = FragmentModel::find()
553 .filter(fragment::Column::JobId.eq(job_id))
554 .all(&inner.db)
555 .await?;
556
557 let job_info = StreamingJob::find_by_id(job_id)
558 .one(&inner.db)
559 .await?
560 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
561
562 let fragment_actors =
563 self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
564
565 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
566 .await?
567 .remove(&job_id);
568
569 Self::compose_table_fragments(
570 job_id,
571 job_info.job_status.into(),
572 job_info.stream_context(),
573 fragment_actors,
574 job_info.parallelism.clone(),
575 job_info.max_parallelism as _,
576 job_definition,
577 )
578 }
579
580 pub async fn get_fragment_actor_dispatchers(
581 &self,
582 fragment_ids: Vec<FragmentId>,
583 ) -> MetaResult<FragmentActorDispatchers> {
584 let inner = self.inner.read().await;
585
586 self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
587 .await
588 }
589
590 pub async fn get_fragment_actor_dispatchers_txn(
591 &self,
592 c: &impl ConnectionTrait,
593 fragment_ids: Vec<FragmentId>,
594 ) -> MetaResult<FragmentActorDispatchers> {
595 let fragment_relations = FragmentRelation::find()
596 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
597 .all(c)
598 .await?;
599
600 type FragmentActorInfo = (
601 DistributionType,
602 Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
603 );
604
605 let shared_info = self.env.shared_actor_infos();
606 let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
607 let get_fragment_actors = |fragment_id: FragmentId| async move {
608 let result: MetaResult<FragmentActorInfo> = try {
609 let read_guard = shared_info.read_guard();
610
611 let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
612
613 (
614 fragment.distribution_type,
615 Arc::new(
616 fragment
617 .actors
618 .iter()
619 .map(|(actor_id, actor_info)| {
620 (
621 *actor_id,
622 actor_info
623 .vnode_bitmap
624 .as_ref()
625 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
626 )
627 })
628 .collect(),
629 ),
630 )
631 };
632 result
633 };
634
635 let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
636 for fragment_relation::Model {
637 source_fragment_id,
638 target_fragment_id,
639 dispatcher_type,
640 dist_key_indices,
641 output_indices,
642 output_type_mapping,
643 } in fragment_relations
644 {
645 let (source_fragment_distribution, source_fragment_actors) = {
646 let (distribution, actors) = {
647 match fragment_actor_cache.entry(source_fragment_id) {
648 Entry::Occupied(entry) => entry.into_mut(),
649 Entry::Vacant(entry) => {
650 entry.insert(get_fragment_actors(source_fragment_id).await?)
651 }
652 }
653 };
654 (*distribution, actors.clone())
655 };
656 let (target_fragment_distribution, target_fragment_actors) = {
657 let (distribution, actors) = {
658 match fragment_actor_cache.entry(target_fragment_id) {
659 Entry::Occupied(entry) => entry.into_mut(),
660 Entry::Vacant(entry) => {
661 entry.insert(get_fragment_actors(target_fragment_id).await?)
662 }
663 }
664 };
665 (*distribution, actors.clone())
666 };
667 let output_mapping = PbDispatchOutputMapping {
668 indices: output_indices.into_u32_array(),
669 types: output_type_mapping.unwrap_or_default().to_protobuf(),
670 };
671 let dispatchers = compose_dispatchers(
672 source_fragment_distribution,
673 &source_fragment_actors,
674 target_fragment_id as _,
675 target_fragment_distribution,
676 &target_fragment_actors,
677 dispatcher_type,
678 dist_key_indices.into_u32_array(),
679 output_mapping,
680 );
681 let actor_dispatchers_map = actor_dispatchers_map
682 .entry(source_fragment_id as _)
683 .or_default();
684 for (actor_id, dispatchers) in dispatchers {
685 actor_dispatchers_map
686 .entry(actor_id as _)
687 .or_default()
688 .push(dispatchers);
689 }
690 }
691 Ok(actor_dispatchers_map)
692 }
693
694 pub async fn get_fragment_downstream_relations(
695 &self,
696 fragment_ids: Vec<FragmentId>,
697 ) -> MetaResult<FragmentDownstreamRelation> {
698 let inner = self.inner.read().await;
699 let mut stream = FragmentRelation::find()
700 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
701 .stream(&inner.db)
702 .await?;
703 let mut relations = FragmentDownstreamRelation::new();
704 while let Some(relation) = stream.try_next().await? {
705 relations
706 .entry(relation.source_fragment_id as _)
707 .or_default()
708 .push(DownstreamFragmentRelation {
709 downstream_fragment_id: relation.target_fragment_id as _,
710 dispatcher_type: relation.dispatcher_type,
711 dist_key_indices: relation.dist_key_indices.into_u32_array(),
712 output_mapping: PbDispatchOutputMapping {
713 indices: relation.output_indices.into_u32_array(),
714 types: relation
715 .output_type_mapping
716 .unwrap_or_default()
717 .to_protobuf(),
718 },
719 });
720 }
721 Ok(relations)
722 }
723
724 pub async fn get_job_fragment_backfill_scan_type(
725 &self,
726 job_id: JobId,
727 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
728 let inner = self.inner.read().await;
729 let fragments: Vec<_> = FragmentModel::find()
730 .filter(fragment::Column::JobId.eq(job_id))
731 .all(&inner.db)
732 .await?;
733
734 let mut result = HashMap::new();
735
736 for fragment::Model {
737 fragment_id,
738 stream_node,
739 ..
740 } in fragments
741 {
742 let stream_node = stream_node.to_protobuf();
743 visit_stream_node_body(&stream_node, |body| {
744 if let NodeBody::StreamScan(node) = body {
745 match node.stream_scan_type() {
746 StreamScanType::Unspecified => {}
747 scan_type => {
748 result.insert(fragment_id as crate::model::FragmentId, scan_type);
749 }
750 }
751 }
752 });
753 }
754
755 Ok(result)
756 }
757
758 pub async fn count_streaming_jobs(&self) -> MetaResult<usize> {
759 let inner = self.inner.read().await;
760 let count = StreamingJob::find().count(&inner.db).await?;
761 Ok(usize::try_from(count).context("streaming job count overflow")?)
762 }
763
764 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
765 let inner = self.inner.read().await;
766 let job_states = StreamingJob::find()
767 .select_only()
768 .column(streaming_job::Column::JobId)
769 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
770 .join(JoinType::InnerJoin, object::Relation::Database2.def())
771 .column(object::Column::ObjType)
772 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
773 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
774 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
775 .column_as(
776 Expr::if_null(
777 Expr::col((table::Entity, table::Column::Name)),
778 Expr::if_null(
779 Expr::col((source::Entity, source::Column::Name)),
780 Expr::if_null(
781 Expr::col((sink::Entity, sink::Column::Name)),
782 Expr::val("<unknown>"),
783 ),
784 ),
785 ),
786 "name",
787 )
788 .columns([
789 streaming_job::Column::JobStatus,
790 streaming_job::Column::Parallelism,
791 streaming_job::Column::MaxParallelism,
792 ])
793 .column_as(
794 Expr::if_null(
795 Expr::col((
796 streaming_job::Entity,
797 streaming_job::Column::SpecificResourceGroup,
798 )),
799 Expr::col((database::Entity, database::Column::ResourceGroup)),
800 ),
801 "resource_group",
802 )
803 .column_as(
804 Expr::if_null(
805 Expr::col((streaming_job::Entity, streaming_job::Column::ConfigOverride)),
806 Expr::val(""),
807 ),
808 "config_override",
809 )
810 .column(object::Column::DatabaseId)
811 .column(object::Column::SchemaId)
812 .into_model()
813 .all(&inner.db)
814 .await?;
815 Ok(job_states)
816 }
817
818 pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
819 let inner = self.inner.read().await;
820 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
821 .select_only()
822 .column(streaming_job::Column::MaxParallelism)
823 .into_tuple()
824 .one(&inner.db)
825 .await?
826 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
827 Ok(max_parallelism as usize)
828 }
829
830 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
832 if let Ok(inner) = self.inner.try_read()
833 && let Ok(job_state_tables) = FragmentModel::find()
834 .select_only()
835 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
836 .into_tuple::<(JobId, I32Array)>()
837 .all(&inner.db)
838 .await
839 {
840 let mut job_internal_table_ids = HashMap::new();
841 for (job_id, state_table_ids) in job_state_tables {
842 job_internal_table_ids
843 .entry(job_id)
844 .or_insert_with(Vec::new)
845 .extend(
846 state_table_ids
847 .into_inner()
848 .into_iter()
849 .map(|table_id| TableId::new(table_id as _)),
850 );
851 }
852 return Some(job_internal_table_ids.into_iter().collect());
853 }
854 None
855 }
856
857 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
858 let inner = self.inner.read().await;
859 let count = FragmentModel::find().count(&inner.db).await?;
860 Ok(count > 0)
861 }
862
863 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
864 let read_guard = self.env.shared_actor_infos().read_guard();
865 let actor_cnt: HashMap<WorkerId, _> = read_guard
866 .iter_over_fragments()
867 .flat_map(|(_, fragment)| {
868 fragment
869 .actors
870 .iter()
871 .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
872 })
873 .into_group_map()
874 .into_iter()
875 .map(|(k, v)| (k, v.len()))
876 .collect();
877
878 Ok(actor_cnt)
879 }
880
881 fn collect_fragment_actor_map(
882 &self,
883 fragment_ids: &[FragmentId],
884 stream_context: StreamContext,
885 ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
886 let guard = self.env.shared_actor_infos().read_guard();
887 let pb_expr_context = stream_context.to_expr_context();
888 let expr_context: ExprContext = (&pb_expr_context).into();
889
890 let mut actor_map = HashMap::with_capacity(fragment_ids.len());
891 for fragment_id in fragment_ids {
892 let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
893 anyhow!("fragment {} not found in shared actor info", fragment_id)
894 })?;
895
896 let actors = fragment_info
897 .actors
898 .iter()
899 .map(|(actor_id, actor_info)| ActorInfo {
900 actor_id: *actor_id as _,
901 fragment_id: *fragment_id,
902 splits: ConnectorSplits::from(&PbConnectorSplits {
903 splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
904 }),
905 worker_id: actor_info.worker_id as _,
906 vnode_bitmap: actor_info
907 .vnode_bitmap
908 .as_ref()
909 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
910 expr_context: expr_context.clone(),
911 config_override: stream_context.config_override.clone(),
912 })
913 .collect();
914
915 actor_map.insert(*fragment_id, actors);
916 }
917
918 Ok(actor_map)
919 }
920
921 fn collect_fragment_actor_pairs(
922 &self,
923 fragments: Vec<fragment::Model>,
924 stream_context: StreamContext,
925 ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
926 let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
927 let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
928 fragments
929 .into_iter()
930 .map(|fragment| {
931 let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
932 anyhow!(
933 "fragment {} missing in shared actor info map",
934 fragment.fragment_id
935 )
936 })?;
937 Ok((fragment, actors))
938 })
939 .collect()
940 }
941
942 pub async fn table_fragments(&self) -> MetaResult<BTreeMap<JobId, StreamJobFragments>> {
944 let inner = self.inner.read().await;
945 let jobs = StreamingJob::find().all(&inner.db).await?;
946
947 let mut job_definition = resolve_streaming_job_definition(
948 &inner.db,
949 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
950 )
951 .await?;
952
953 let mut table_fragments = BTreeMap::new();
954 for job in jobs {
955 let fragments = FragmentModel::find()
956 .filter(fragment::Column::JobId.eq(job.job_id))
957 .all(&inner.db)
958 .await?;
959
960 let fragment_actors =
961 self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
962
963 table_fragments.insert(
964 job.job_id,
965 Self::compose_table_fragments(
966 job.job_id,
967 job.job_status.into(),
968 job.stream_context(),
969 fragment_actors,
970 job.parallelism.clone(),
971 job.max_parallelism as _,
972 job_definition.remove(&job.job_id),
973 )?,
974 );
975 }
976
977 Ok(table_fragments)
978 }
979
980 pub async fn upstream_fragments(
981 &self,
982 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
983 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
984 let inner = self.inner.read().await;
985 let mut stream = FragmentRelation::find()
986 .select_only()
987 .columns([
988 fragment_relation::Column::SourceFragmentId,
989 fragment_relation::Column::TargetFragmentId,
990 ])
991 .filter(
992 fragment_relation::Column::TargetFragmentId
993 .is_in(fragment_ids.map(|id| id as FragmentId)),
994 )
995 .into_tuple::<(FragmentId, FragmentId)>()
996 .stream(&inner.db)
997 .await?;
998 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
999 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
1000 upstream_fragments
1001 .entry(downstream_fragment_id as crate::model::FragmentId)
1002 .or_default()
1003 .insert(upstream_fragment_id as crate::model::FragmentId);
1004 }
1005 Ok(upstream_fragments)
1006 }
1007
1008 pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
1009 let info = self.env.shared_actor_infos().read_guard();
1010
1011 let actor_locations = info
1012 .iter_over_fragments()
1013 .flat_map(|(fragment_id, fragment)| {
1014 fragment
1015 .actors
1016 .iter()
1017 .map(|(actor_id, actor)| PartialActorLocation {
1018 actor_id: *actor_id as _,
1019 fragment_id: *fragment_id as _,
1020 worker_id: actor.worker_id,
1021 })
1022 })
1023 .collect_vec();
1024
1025 Ok(actor_locations)
1026 }
1027
1028 pub async fn list_actor_info(
1029 &self,
1030 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1031 let inner = self.inner.read().await;
1032
1033 let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1034 FragmentModel::find()
1035 .select_only()
1036 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1037 .column(fragment::Column::FragmentId)
1038 .column_as(object::Column::Oid, "job_id")
1039 .column_as(object::Column::SchemaId, "schema_id")
1040 .column_as(object::Column::ObjType, "type")
1041 .into_tuple()
1042 .all(&inner.db)
1043 .await?;
1044
1045 let actor_infos = {
1046 let info = self.env.shared_actor_infos().read_guard();
1047
1048 let mut result = Vec::new();
1049
1050 for (fragment_id, object_id, schema_id, object_type) in fragment_objects {
1051 let Some(fragment) = info.get_fragment(fragment_id as _) else {
1052 return Err(MetaError::unavailable(format!(
1053 "shared actor info missing for fragment {fragment_id} while listing actors"
1054 )));
1055 };
1056
1057 for actor_id in fragment.actors.keys() {
1058 result.push((
1059 *actor_id as _,
1060 fragment.fragment_id as _,
1061 object_id,
1062 schema_id,
1063 object_type,
1064 ));
1065 }
1066 }
1067
1068 result
1069 };
1070
1071 Ok(actor_infos)
1072 }
1073
1074 pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1075 let guard = self.env.shared_actor_info.read_guard();
1076 guard
1077 .iter_over_fragments()
1078 .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1079 .collect_vec()
1080 }
1081
1082 pub async fn list_fragment_descs(
1083 &self,
1084 is_creating: bool,
1085 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1086 let inner = self.inner.read().await;
1087
1088 let txn = inner.db.begin().await?;
1089
1090 let fragments: Vec<_> = if is_creating {
1091 FragmentModel::find()
1092 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1093 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1094 .filter(
1095 streaming_job::Column::JobStatus
1096 .eq(JobStatus::Initial)
1097 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1098 )
1099 .all(&txn)
1100 .await?
1101 } else {
1102 FragmentModel::find().all(&txn).await?
1103 };
1104
1105 let fragment_ids = fragments.iter().map(|f| f.fragment_id).collect_vec();
1106
1107 let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragments.is_empty() {
1108 HashMap::new()
1109 } else {
1110 let job_ids = fragments.iter().map(|f| f.job_id).unique().collect_vec();
1111 StreamingJob::find()
1112 .select_only()
1113 .columns([
1114 streaming_job::Column::JobId,
1115 streaming_job::Column::Parallelism,
1116 ])
1117 .filter(streaming_job::Column::JobId.is_in(job_ids))
1118 .into_tuple()
1119 .all(&txn)
1120 .await?
1121 .into_iter()
1122 .collect()
1123 };
1124
1125 let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1126 if fragment_ids.is_empty() {
1127 Vec::new()
1128 } else {
1129 FragmentRelation::find()
1130 .select_only()
1131 .columns([
1132 fragment_relation::Column::TargetFragmentId,
1133 fragment_relation::Column::SourceFragmentId,
1134 fragment_relation::Column::DispatcherType,
1135 ])
1136 .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1137 .into_tuple()
1138 .all(&txn)
1139 .await?
1140 };
1141
1142 let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1143 for (target_id, source_id, _) in upstream_entries {
1144 all_upstreams.entry(target_id).or_default().push(source_id);
1145 }
1146
1147 let root_fragment_map = if fragment_ids.is_empty() {
1148 HashMap::new()
1149 } else {
1150 let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?;
1151 Self::collect_root_fragment_mapping(ensembles)
1152 };
1153
1154 let guard = self.env.shared_actor_info.read_guard();
1155
1156 let mut result = Vec::new();
1157
1158 for fragment_desc in fragments {
1159 let parallelism = guard
1161 .get_fragment(fragment_desc.fragment_id as _)
1162 .map(|fragment| fragment.actors.len())
1163 .unwrap_or_default();
1164
1165 let root_fragments = root_fragment_map
1166 .get(&fragment_desc.fragment_id)
1167 .cloned()
1168 .unwrap_or_default();
1169
1170 let upstreams = all_upstreams
1171 .remove(&fragment_desc.fragment_id)
1172 .unwrap_or_default();
1173
1174 let parallelism_policy = Self::format_fragment_parallelism_policy(
1175 &fragment_desc,
1176 job_parallelisms.get(&fragment_desc.job_id),
1177 &root_fragments,
1178 );
1179
1180 let fragment = FragmentDistribution {
1181 fragment_id: fragment_desc.fragment_id,
1182 table_id: fragment_desc.job_id,
1183 distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type)
1184 as _,
1185 state_table_ids: fragment_desc.state_table_ids.0,
1186 upstream_fragment_ids: upstreams.clone(),
1187 fragment_type_mask: fragment_desc.fragment_type_mask as _,
1188 parallelism: parallelism as _,
1189 vnode_count: fragment_desc.vnode_count as _,
1190 node: Some(fragment_desc.stream_node.to_protobuf()),
1191 parallelism_policy,
1192 };
1193
1194 result.push((fragment, upstreams));
1195 }
1196 Ok(result)
1197 }
1198
1199 fn collect_root_fragment_mapping(
1201 ensembles: Vec<NoShuffleEnsemble>,
1202 ) -> HashMap<FragmentId, Vec<FragmentId>> {
1203 let mut mapping = HashMap::new();
1204
1205 for ensemble in ensembles {
1206 let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1207 roots.sort_unstable();
1208 roots.dedup();
1209
1210 if roots.is_empty() {
1211 continue;
1212 }
1213
1214 let root_set: HashSet<_> = roots.iter().copied().collect();
1215
1216 for fragment_id in ensemble.component_fragments() {
1217 if root_set.contains(&fragment_id) {
1218 mapping.insert(fragment_id, Vec::new());
1219 } else {
1220 mapping.insert(fragment_id, roots.clone());
1221 }
1222 }
1223 }
1224
1225 mapping
1226 }
1227
1228 fn format_fragment_parallelism_policy(
1229 fragment: &fragment::Model,
1230 job_parallelism: Option<&StreamingParallelism>,
1231 root_fragments: &[FragmentId],
1232 ) -> String {
1233 if fragment.distribution_type == DistributionType::Single {
1234 return "single".to_owned();
1235 }
1236
1237 if let Some(parallelism) = fragment.parallelism.as_ref() {
1238 return format!(
1239 "override({})",
1240 Self::format_streaming_parallelism(parallelism)
1241 );
1242 }
1243
1244 if !root_fragments.is_empty() {
1245 let mut upstreams = root_fragments.to_vec();
1246 upstreams.sort_unstable();
1247 upstreams.dedup();
1248
1249 return format!("upstream_fragment({upstreams:?})");
1250 }
1251
1252 let inherited = job_parallelism
1253 .map(Self::format_streaming_parallelism)
1254 .unwrap_or_else(|| "unknown".to_owned());
1255 format!("inherit({inherited})")
1256 }
1257
1258 fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1259 match parallelism {
1260 StreamingParallelism::Adaptive => "adaptive".to_owned(),
1261 StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1262 StreamingParallelism::Custom => "custom".to_owned(),
1263 }
1264 }
1265
1266 pub async fn list_sink_actor_mapping(
1267 &self,
1268 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1269 let inner = self.inner.read().await;
1270 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1271 .select_only()
1272 .columns([sink::Column::SinkId, sink::Column::Name])
1273 .into_tuple()
1274 .all(&inner.db)
1275 .await?;
1276 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1277
1278 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1279
1280 let actor_with_type: Vec<(ActorId, SinkId)> = {
1281 let info = self.env.shared_actor_infos().read_guard();
1282
1283 info.iter_over_fragments()
1284 .filter(|(_, fragment)| {
1285 sink_ids.contains(&fragment.job_id.as_sink_id())
1286 && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1287 })
1288 .flat_map(|(_, fragment)| {
1289 fragment
1290 .actors
1291 .keys()
1292 .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1293 })
1294 .collect()
1295 };
1296
1297 let mut sink_actor_mapping = HashMap::new();
1298 for (actor_id, sink_id) in actor_with_type {
1299 sink_actor_mapping
1300 .entry(sink_id)
1301 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1302 .1
1303 .push(actor_id);
1304 }
1305
1306 Ok(sink_actor_mapping)
1307 }
1308
1309 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1310 let inner = self.inner.read().await;
1311 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1312 .select_only()
1313 .columns([
1314 fragment::Column::FragmentId,
1315 fragment::Column::JobId,
1316 fragment::Column::StateTableIds,
1317 ])
1318 .into_partial_model()
1319 .all(&inner.db)
1320 .await?;
1321 Ok(fragment_state_tables)
1322 }
1323
1324 pub async fn load_all_actors_dynamic(
1327 &self,
1328 database_id: Option<DatabaseId>,
1329 worker_nodes: &ActiveStreamingWorkerNodes,
1330 ) -> MetaResult<FragmentRenderMap> {
1331 let adaptive_parallelism_strategy = {
1332 let system_params_reader = self.env.system_params_reader().await;
1333 system_params_reader.adaptive_parallelism_strategy()
1334 };
1335
1336 let inner = self.inner.read().await;
1337 let txn = inner.db.begin().await?;
1338
1339 let database_fragment_infos = load_fragment_info(
1340 &txn,
1341 self.env.actor_id_generator(),
1342 database_id,
1343 worker_nodes,
1344 adaptive_parallelism_strategy,
1345 )
1346 .await?;
1347
1348 tracing::trace!(?database_fragment_infos, "reload all actors");
1349
1350 Ok(database_fragment_infos)
1351 }
1352
1353 #[await_tree::instrument]
1354 pub async fn fill_snapshot_backfill_epoch(
1355 &self,
1356 fragment_ids: impl Iterator<Item = FragmentId>,
1357 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1358 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1359 ) -> MetaResult<()> {
1360 let inner = self.inner.write().await;
1361 let txn = inner.db.begin().await?;
1362 for fragment_id in fragment_ids {
1363 let fragment = FragmentModel::find_by_id(fragment_id)
1364 .one(&txn)
1365 .await?
1366 .context(format!("fragment {} not found", fragment_id))?;
1367 let mut node = fragment.stream_node.to_protobuf();
1368 if crate::stream::fill_snapshot_backfill_epoch(
1369 &mut node,
1370 snapshot_backfill_info,
1371 cross_db_snapshot_backfill_info,
1372 )? {
1373 let node = StreamNode::from(&node);
1374 FragmentModel::update(fragment::ActiveModel {
1375 fragment_id: Set(fragment_id),
1376 stream_node: Set(node),
1377 ..Default::default()
1378 })
1379 .exec(&txn)
1380 .await?;
1381 }
1382 }
1383 txn.commit().await?;
1384 Ok(())
1385 }
1386
1387 pub fn get_running_actors_of_fragment(
1389 &self,
1390 fragment_id: FragmentId,
1391 ) -> MetaResult<HashSet<model::ActorId>> {
1392 let info = self.env.shared_actor_infos().read_guard();
1393
1394 let actors = info
1395 .get_fragment(fragment_id as _)
1396 .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1397 .unwrap_or_default();
1398
1399 Ok(actors)
1400 }
1401
1402 pub async fn get_running_actors_for_source_backfill(
1405 &self,
1406 source_backfill_fragment_id: FragmentId,
1407 source_fragment_id: FragmentId,
1408 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1409 let inner = self.inner.read().await;
1410 let txn = inner.db.begin().await?;
1411
1412 let fragment_relation: DispatcherType = FragmentRelation::find()
1413 .select_only()
1414 .column(fragment_relation::Column::DispatcherType)
1415 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1416 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1417 .into_tuple()
1418 .one(&txn)
1419 .await?
1420 .ok_or_else(|| {
1421 anyhow!(
1422 "no fragment connection from source fragment {} to source backfill fragment {}",
1423 source_fragment_id,
1424 source_backfill_fragment_id
1425 )
1426 })?;
1427
1428 if fragment_relation != DispatcherType::NoShuffle {
1429 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1430 }
1431
1432 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1433 let result: MetaResult<DistributionType> = try {
1434 FragmentModel::find_by_id(fragment_id)
1435 .select_only()
1436 .column(fragment::Column::DistributionType)
1437 .into_tuple()
1438 .one(txn)
1439 .await?
1440 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1441 };
1442 result
1443 };
1444
1445 let source_backfill_distribution_type =
1446 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1447 let source_distribution_type =
1448 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1449
1450 let load_fragment_actor_distribution =
1451 |actor_info: &SharedActorInfos,
1452 fragment_id: FragmentId|
1453 -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1454 let guard = actor_info.read_guard();
1455
1456 guard
1457 .get_fragment(fragment_id as _)
1458 .map(|fragment| {
1459 fragment
1460 .actors
1461 .iter()
1462 .map(|(actor_id, actor)| {
1463 (
1464 *actor_id as _,
1465 actor
1466 .vnode_bitmap
1467 .as_ref()
1468 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1469 )
1470 })
1471 .collect()
1472 })
1473 .unwrap_or_default()
1474 };
1475
1476 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1477 load_fragment_actor_distribution(
1478 self.env.shared_actor_infos(),
1479 source_backfill_fragment_id,
1480 );
1481
1482 let source_actors =
1483 load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1484
1485 Ok(resolve_no_shuffle_actor_dispatcher(
1486 source_distribution_type,
1487 &source_actors,
1488 source_backfill_distribution_type,
1489 &source_backfill_actors,
1490 )
1491 .into_iter()
1492 .map(|(source_actor, source_backfill_actor)| {
1493 (source_backfill_actor as _, source_actor as _)
1494 })
1495 .collect())
1496 }
1497
1498 pub async fn get_root_fragments(
1511 &self,
1512 job_ids: Vec<JobId>,
1513 ) -> MetaResult<(
1514 HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1515 HashMap<ActorId, WorkerId>,
1516 )> {
1517 let inner = self.inner.read().await;
1518
1519 let all_fragments = FragmentModel::find()
1520 .filter(fragment::Column::JobId.is_in(job_ids))
1521 .all(&inner.db)
1522 .await?;
1523 let mut root_fragments = HashMap::<JobId, fragment::Model>::new();
1525 for fragment in all_fragments {
1526 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1527 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1528 _ = root_fragments.insert(fragment.job_id, fragment);
1529 } else if mask.contains(FragmentTypeFlag::Source) {
1530 _ = root_fragments.try_insert(fragment.job_id, fragment);
1533 }
1534 }
1535
1536 let mut root_fragments_pb = HashMap::new();
1537
1538 let info = self.env.shared_actor_infos().read_guard();
1539
1540 let root_fragment_to_jobs: HashMap<_, _> = root_fragments
1541 .iter()
1542 .map(|(job_id, fragment)| (fragment.fragment_id, *job_id))
1543 .collect();
1544
1545 for fragment in root_fragment_to_jobs.keys() {
1546 let fragment_info = info.get_fragment(*fragment).context(format!(
1547 "root fragment {} not found in shared actor info",
1548 fragment
1549 ))?;
1550
1551 let job_id = root_fragment_to_jobs[&fragment_info.fragment_id];
1552 let fragment = root_fragments
1553 .get(&job_id)
1554 .context(format!("root fragment for job {} not found", job_id))?;
1555
1556 root_fragments_pb.insert(
1557 job_id,
1558 (fragment_info.clone(), fragment.stream_node.to_protobuf()),
1559 );
1560 }
1561
1562 let mut all_actor_locations = HashMap::new();
1563
1564 for (_, SharedFragmentInfo { actors, .. }) in info.iter_over_fragments() {
1565 for (actor_id, actor_info) in actors {
1566 all_actor_locations.insert(*actor_id as ActorId, actor_info.worker_id);
1567 }
1568 }
1569
1570 Ok((root_fragments_pb, all_actor_locations))
1571 }
1572
1573 pub async fn get_root_fragment(
1574 &self,
1575 job_id: JobId,
1576 ) -> MetaResult<(SharedFragmentInfo, HashMap<ActorId, WorkerId>)> {
1577 let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1578 let (root_fragment, _) = root_fragments
1579 .remove(&job_id)
1580 .context(format!("root fragment for job {} not found", job_id))?;
1581
1582 Ok((root_fragment, actors))
1583 }
1584
1585 pub async fn get_downstream_fragments(
1587 &self,
1588 job_id: JobId,
1589 ) -> MetaResult<(
1590 Vec<(
1591 stream_plan::DispatcherType,
1592 SharedFragmentInfo,
1593 PbStreamNode,
1594 )>,
1595 HashMap<ActorId, WorkerId>,
1596 )> {
1597 let (root_fragment, actor_locations) = self.get_root_fragment(job_id).await?;
1598
1599 let inner = self.inner.read().await;
1600 let txn = inner.db.begin().await?;
1601 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1602 .filter(
1603 fragment_relation::Column::SourceFragmentId
1604 .eq(root_fragment.fragment_id as FragmentId),
1605 )
1606 .all(&txn)
1607 .await?;
1608
1609 let downstream_fragment_ids = downstream_fragment_relations
1610 .iter()
1611 .map(|model| model.target_fragment_id as FragmentId)
1612 .collect::<HashSet<_>>();
1613
1614 let downstream_fragment_nodes: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1615 .select_only()
1616 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1617 .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1618 .into_tuple()
1619 .all(&txn)
1620 .await?;
1621
1622 let downstream_fragment_nodes: HashMap<_, _> =
1623 downstream_fragment_nodes.into_iter().collect();
1624
1625 let mut downstream_fragments = vec![];
1626
1627 let info = self.env.shared_actor_infos().read_guard();
1628
1629 let fragment_map: HashMap<_, _> = downstream_fragment_relations
1630 .iter()
1631 .map(|model| (model.target_fragment_id, model.dispatcher_type))
1632 .collect();
1633
1634 for fragment_id in fragment_map.keys() {
1635 let fragment_info @ SharedFragmentInfo { actors, .. } =
1636 info.get_fragment(*fragment_id).unwrap();
1637
1638 let dispatcher_type = fragment_map[fragment_id];
1639
1640 if actors.is_empty() {
1641 bail!("No fragment found for fragment id {}", fragment_id);
1642 }
1643
1644 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1645
1646 let nodes = downstream_fragment_nodes
1647 .get(fragment_id)
1648 .context(format!(
1649 "downstream fragment node for id {} not found",
1650 fragment_id
1651 ))?
1652 .to_protobuf();
1653
1654 downstream_fragments.push((dispatch_type, fragment_info.clone(), nodes));
1655 }
1656 Ok((downstream_fragments, actor_locations))
1657 }
1658
1659 pub async fn load_source_fragment_ids(
1660 &self,
1661 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1662 let inner = self.inner.read().await;
1663 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1664 .select_only()
1665 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1666 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1667 .into_tuple()
1668 .all(&inner.db)
1669 .await?;
1670
1671 let mut source_fragment_ids = HashMap::new();
1672 for (fragment_id, stream_node) in fragments {
1673 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1674 source_fragment_ids
1675 .entry(source_id)
1676 .or_insert_with(BTreeSet::new)
1677 .insert(fragment_id);
1678 }
1679 }
1680 Ok(source_fragment_ids)
1681 }
1682
1683 pub async fn load_backfill_fragment_ids(
1684 &self,
1685 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1686 let inner = self.inner.read().await;
1687 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1688 .select_only()
1689 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1690 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1691 .into_tuple()
1692 .all(&inner.db)
1693 .await?;
1694
1695 let mut source_fragment_ids = HashMap::new();
1696 for (fragment_id, stream_node) in fragments {
1697 if let Some((source_id, upstream_source_fragment_id)) =
1698 stream_node.to_protobuf().find_source_backfill()
1699 {
1700 source_fragment_ids
1701 .entry(source_id)
1702 .or_insert_with(BTreeSet::new)
1703 .insert((fragment_id, upstream_source_fragment_id));
1704 }
1705 }
1706 Ok(source_fragment_ids)
1707 }
1708
1709 pub async fn get_all_upstream_sink_infos(
1710 &self,
1711 target_table: &PbTable,
1712 target_fragment_id: FragmentId,
1713 ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1714 let incoming_sinks = self.get_table_incoming_sinks(target_table.id).await?;
1715
1716 let inner = self.inner.read().await;
1717 let txn = inner.db.begin().await?;
1718
1719 let sink_ids = incoming_sinks.iter().map(|s| s.id).collect_vec();
1720 let sink_fragment_ids = get_sink_fragment_by_ids(&txn, sink_ids).await?;
1721
1722 let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1723 for pb_sink in &incoming_sinks {
1724 let sink_fragment_id =
1725 sink_fragment_ids
1726 .get(&pb_sink.id)
1727 .cloned()
1728 .ok_or(anyhow::anyhow!(
1729 "sink fragment not found for sink id {}",
1730 pb_sink.id
1731 ))?;
1732 let upstream_info = build_upstream_sink_info(
1733 pb_sink,
1734 sink_fragment_id,
1735 target_table,
1736 target_fragment_id,
1737 )?;
1738 upstream_sink_infos.push(upstream_info);
1739 }
1740
1741 Ok(upstream_sink_infos)
1742 }
1743
1744 pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1745 let inner = self.inner.read().await;
1746 let txn = inner.db.begin().await?;
1747
1748 let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1749 .select_only()
1750 .column(fragment::Column::FragmentId)
1751 .filter(
1752 fragment::Column::JobId
1753 .eq(job_id)
1754 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1755 )
1756 .into_tuple()
1757 .all(&txn)
1758 .await?;
1759
1760 if mview_fragment.len() != 1 {
1761 return Err(anyhow::anyhow!(
1762 "expected exactly one mview fragment for job {}, found {}",
1763 job_id,
1764 mview_fragment.len()
1765 )
1766 .into());
1767 }
1768
1769 Ok(mview_fragment.into_iter().next().unwrap())
1770 }
1771
1772 pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1773 let inner = self.inner.read().await;
1774 let txn = inner.db.begin().await?;
1775 has_table_been_migrated(&txn, table_id).await
1776 }
1777
1778 pub async fn update_fragment_splits<C>(
1779 &self,
1780 txn: &C,
1781 fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
1782 ) -> MetaResult<()>
1783 where
1784 C: ConnectionTrait,
1785 {
1786 if fragment_splits.is_empty() {
1787 return Ok(());
1788 }
1789
1790 let models: Vec<fragment_splits::ActiveModel> = fragment_splits
1791 .iter()
1792 .map(|(fragment_id, splits)| fragment_splits::ActiveModel {
1793 fragment_id: Set(*fragment_id as _),
1794 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1795 splits: splits.iter().map(Into::into).collect_vec(),
1796 }))),
1797 })
1798 .collect();
1799
1800 FragmentSplits::insert_many(models)
1801 .on_conflict(
1802 OnConflict::column(fragment_splits::Column::FragmentId)
1803 .update_column(fragment_splits::Column::Splits)
1804 .to_owned(),
1805 )
1806 .exec(txn)
1807 .await?;
1808
1809 Ok(())
1810 }
1811}
1812
1813#[cfg(test)]
1814mod tests {
1815 use std::collections::{BTreeMap, HashMap, HashSet};
1816
1817 use itertools::Itertools;
1818 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1819 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1820 use risingwave_common::id::JobId;
1821 use risingwave_common::util::iter_util::ZipEqDebug;
1822 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1823 use risingwave_meta_model::fragment::DistributionType;
1824 use risingwave_meta_model::*;
1825 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1826 use risingwave_pb::plan_common::PbExprContext;
1827 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1828 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1829 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1830
1831 use super::ActorInfo;
1832 use crate::MetaResult;
1833 use crate::controller::catalog::CatalogController;
1834 use crate::model::{Fragment, StreamActor};
1835
1836 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1837
1838 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1839
1840 const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
1841
1842 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
1843
1844 const TEST_JOB_ID: JobId = JobId::new(1);
1845
1846 const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
1847
1848 fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
1849 let mut upstream_actor_ids = BTreeMap::new();
1850 upstream_actor_ids.insert(
1851 TEST_UPSTREAM_FRAGMENT_ID,
1852 HashSet::from_iter([(actor_id + 100)]),
1853 );
1854 upstream_actor_ids.insert(
1855 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1856 HashSet::from_iter([(actor_id + 200)]),
1857 );
1858 upstream_actor_ids
1859 }
1860
1861 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1862 let mut input = vec![];
1863 for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
1864 input.push(PbStreamNode {
1865 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1866 upstream_fragment_id,
1867 ..Default::default()
1868 }))),
1869 ..Default::default()
1870 });
1871 }
1872
1873 PbStreamNode {
1874 input,
1875 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1876 ..Default::default()
1877 }
1878 }
1879
1880 #[tokio::test]
1881 async fn test_extract_fragment() -> MetaResult<()> {
1882 let actor_count = 3u32;
1883 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1884 .map(|actor_id| {
1885 (
1886 actor_id.into(),
1887 generate_upstream_actor_ids_for_actor(actor_id.into()),
1888 )
1889 })
1890 .collect();
1891
1892 let actor_bitmaps = ActorMapping::new_uniform(
1893 (0..actor_count).map(|i| i.into()),
1894 VirtualNode::COUNT_FOR_TEST,
1895 )
1896 .to_bitmaps();
1897
1898 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1899
1900 let pb_actors = (0..actor_count)
1901 .map(|actor_id| StreamActor {
1902 actor_id: actor_id.into(),
1903 fragment_id: TEST_FRAGMENT_ID as _,
1904 vnode_bitmap: actor_bitmaps.get(&actor_id.into()).cloned(),
1905 mview_definition: "".to_owned(),
1906 expr_context: Some(PbExprContext {
1907 time_zone: String::from("America/New_York"),
1908 strict_mode: false,
1909 }),
1910 config_override: "".into(),
1911 })
1912 .collect_vec();
1913
1914 let pb_fragment = Fragment {
1915 fragment_id: TEST_FRAGMENT_ID as _,
1916 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1917 distribution_type: PbFragmentDistributionType::Hash as _,
1918 actors: pb_actors,
1919 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1920 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1921 nodes: stream_node,
1922 };
1923
1924 let fragment =
1925 CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
1926
1927 check_fragment(fragment, pb_fragment);
1928
1929 Ok(())
1930 }
1931
1932 #[tokio::test]
1933 async fn test_compose_fragment() -> MetaResult<()> {
1934 let actor_count = 3u32;
1935
1936 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1937 .map(|actor_id| {
1938 (
1939 actor_id.into(),
1940 generate_upstream_actor_ids_for_actor(actor_id.into()),
1941 )
1942 })
1943 .collect();
1944
1945 let mut actor_bitmaps = ActorMapping::new_uniform(
1946 (0..actor_count).map(|i| i.into()),
1947 VirtualNode::COUNT_FOR_TEST,
1948 )
1949 .to_bitmaps();
1950
1951 let actors = (0..actor_count)
1952 .map(|actor_id| {
1953 let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
1954 splits: vec![PbConnectorSplit {
1955 split_type: "dummy".to_owned(),
1956 ..Default::default()
1957 }],
1958 });
1959
1960 ActorInfo {
1961 actor_id: actor_id.into(),
1962 fragment_id: TEST_FRAGMENT_ID,
1963 splits: actor_splits,
1964 worker_id: 0.into(),
1965 vnode_bitmap: actor_bitmaps
1966 .remove(&actor_id.into())
1967 .map(|bitmap| bitmap.to_protobuf())
1968 .as_ref()
1969 .map(VnodeBitmap::from),
1970 expr_context: ExprContext::from(&PbExprContext {
1971 time_zone: String::from("America/New_York"),
1972 strict_mode: false,
1973 }),
1974 config_override: "a.b.c = true".into(),
1975 }
1976 })
1977 .collect_vec();
1978
1979 let stream_node = {
1980 let template_actor = actors.first().cloned().unwrap();
1981
1982 let template_upstream_actor_ids = upstream_actor_ids
1983 .get(&(template_actor.actor_id as _))
1984 .unwrap();
1985
1986 generate_merger_stream_node(template_upstream_actor_ids)
1987 };
1988
1989 #[expect(deprecated)]
1990 let fragment = fragment::Model {
1991 fragment_id: TEST_FRAGMENT_ID,
1992 job_id: TEST_JOB_ID,
1993 fragment_type_mask: 0,
1994 distribution_type: DistributionType::Hash,
1995 stream_node: StreamNode::from(&stream_node),
1996 state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
1997 upstream_fragment_id: Default::default(),
1998 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1999 parallelism: None,
2000 };
2001
2002 let (pb_fragment, pb_actor_status, pb_actor_splits) =
2003 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2004
2005 assert_eq!(pb_actor_status.len(), actor_count as usize);
2006 assert!(
2007 pb_actor_status
2008 .values()
2009 .all(|actor_status| actor_status.location.is_some())
2010 );
2011 assert_eq!(pb_actor_splits.len(), actor_count as usize);
2012
2013 let pb_actors = pb_fragment.actors.clone();
2014
2015 check_fragment(fragment, pb_fragment);
2016 check_actors(
2017 actors,
2018 &upstream_actor_ids,
2019 pb_actors,
2020 pb_actor_splits,
2021 &stream_node,
2022 );
2023
2024 Ok(())
2025 }
2026
2027 fn check_actors(
2028 actors: Vec<ActorInfo>,
2029 actor_upstreams: &FragmentActorUpstreams,
2030 pb_actors: Vec<StreamActor>,
2031 pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2032 stream_node: &PbStreamNode,
2033 ) {
2034 for (
2035 ActorInfo {
2036 actor_id,
2037 fragment_id,
2038 splits,
2039 worker_id: _,
2040 vnode_bitmap,
2041 expr_context,
2042 ..
2043 },
2044 StreamActor {
2045 actor_id: pb_actor_id,
2046 fragment_id: pb_fragment_id,
2047 vnode_bitmap: pb_vnode_bitmap,
2048 mview_definition,
2049 expr_context: pb_expr_context,
2050 ..
2051 },
2052 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2053 {
2054 assert_eq!(actor_id, pb_actor_id as ActorId);
2055 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2056
2057 assert_eq!(
2058 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2059 pb_vnode_bitmap,
2060 );
2061
2062 assert_eq!(mview_definition, "");
2063
2064 visit_stream_node_body(stream_node, |body| {
2065 if let PbNodeBody::Merge(m) = body {
2066 assert!(
2067 actor_upstreams
2068 .get(&(actor_id as _))
2069 .unwrap()
2070 .contains_key(&m.upstream_fragment_id)
2071 );
2072 }
2073 });
2074
2075 assert_eq!(
2076 splits,
2077 pb_actor_splits
2078 .get(&pb_actor_id)
2079 .map(ConnectorSplits::from)
2080 .unwrap_or_default()
2081 );
2082
2083 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2084 }
2085 }
2086
2087 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2088 let Fragment {
2089 fragment_id,
2090 fragment_type_mask,
2091 distribution_type: pb_distribution_type,
2092 actors: _,
2093 state_table_ids: pb_state_table_ids,
2094 maybe_vnode_count: _,
2095 nodes,
2096 } = pb_fragment;
2097
2098 assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2099 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2100 assert_eq!(
2101 pb_distribution_type,
2102 PbFragmentDistributionType::from(fragment.distribution_type)
2103 );
2104
2105 assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2106 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2107 }
2108
2109 #[test]
2110 fn test_parallelism_policy_with_root_fragments() {
2111 #[expect(deprecated)]
2112 let fragment = fragment::Model {
2113 fragment_id: 3.into(),
2114 job_id: TEST_JOB_ID,
2115 fragment_type_mask: 0,
2116 distribution_type: DistributionType::Hash,
2117 stream_node: StreamNode::from(&PbStreamNode::default()),
2118 state_table_ids: TableIdArray::default(),
2119 upstream_fragment_id: Default::default(),
2120 vnode_count: 0,
2121 parallelism: None,
2122 };
2123
2124 let job_parallelism = StreamingParallelism::Fixed(4);
2125
2126 let policy = super::CatalogController::format_fragment_parallelism_policy(
2127 &fragment,
2128 Some(&job_parallelism),
2129 &[],
2130 );
2131
2132 assert_eq!(policy, "inherit(fixed(4))");
2133 }
2134
2135 #[test]
2136 fn test_parallelism_policy_with_upstream_roots() {
2137 #[expect(deprecated)]
2138 let fragment = fragment::Model {
2139 fragment_id: 5.into(),
2140 job_id: TEST_JOB_ID,
2141 fragment_type_mask: 0,
2142 distribution_type: DistributionType::Hash,
2143 stream_node: StreamNode::from(&PbStreamNode::default()),
2144 state_table_ids: TableIdArray::default(),
2145 upstream_fragment_id: Default::default(),
2146 vnode_count: 0,
2147 parallelism: None,
2148 };
2149
2150 let policy = super::CatalogController::format_fragment_parallelism_policy(
2151 &fragment,
2152 None,
2153 &[3.into(), 1.into(), 2.into(), 1.into()],
2154 );
2155
2156 assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2157 }
2158}