1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::CdcScanOptions;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34 Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39 VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink,
40 source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46 Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::fragment::{
49 FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan;
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57 PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, StreamScanType,
58};
59use sea_orm::ActiveValue::Set;
60use sea_orm::sea_query::Expr;
61use sea_orm::{
62 ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, JoinType, PaginatorTrait,
63 QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
64};
65use serde::{Deserialize, Serialize};
66
67use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
68use crate::controller::catalog::CatalogController;
69use crate::controller::scale::{
70 FragmentRenderMap, NoShuffleEnsemble, find_fragment_no_shuffle_dags_detailed,
71 load_fragment_info, resolve_streaming_job_definition,
72};
73use crate::controller::utils::{
74 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
75 get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
76 resolve_no_shuffle_actor_dispatcher,
77};
78use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
79use crate::model::{
80 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
81 StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
82 TableParallelism,
83};
84use crate::rpc::ddl_controller::build_upstream_sink_info;
85use crate::stream::UpstreamSinkInfo;
86use crate::{MetaResult, model};
87
88#[derive(Clone, Debug)]
90pub struct InflightActorInfo {
91 pub worker_id: WorkerId,
92 pub vnode_bitmap: Option<Bitmap>,
93 pub splits: Vec<SplitImpl>,
94}
95
96#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
97struct ActorInfo {
98 pub actor_id: ActorId,
99 pub fragment_id: FragmentId,
100 pub splits: ConnectorSplits,
101 pub worker_id: WorkerId,
102 pub vnode_bitmap: Option<VnodeBitmap>,
103 pub expr_context: ExprContext,
104 pub config_override: Arc<str>,
105}
106
107#[derive(Clone, Debug)]
108pub struct InflightFragmentInfo {
109 pub fragment_id: crate::model::FragmentId,
110 pub distribution_type: DistributionType,
111 pub fragment_type_mask: FragmentTypeMask,
112 pub vnode_count: usize,
113 pub nodes: PbStreamNode,
114 pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
115 pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
116}
117
118#[derive(Clone, Debug)]
119pub struct FragmentParallelismInfo {
120 pub distribution_type: FragmentDistributionType,
121 pub actor_count: usize,
122 pub vnode_count: usize,
123}
124
125#[easy_ext::ext(FragmentTypeMaskExt)]
126pub impl FragmentTypeMask {
127 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
128 Expr::col(fragment::Column::FragmentTypeMask)
129 .bit_and(Expr::value(flag as i32))
130 .ne(0)
131 }
132
133 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
134 Expr::col(fragment::Column::FragmentTypeMask)
135 .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
136 .ne(0)
137 }
138
139 fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
140 Expr::col(fragment::Column::FragmentTypeMask)
141 .bit_and(Expr::value(flag as i32))
142 .eq(0)
143 }
144}
145
146#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
147#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
149 pub job_id: JobId,
150 pub obj_type: ObjectType,
151 pub name: String,
152 pub job_status: JobStatus,
153 pub parallelism: StreamingParallelism,
154 pub max_parallelism: i32,
155 pub resource_group: String,
156 pub database_id: DatabaseId,
157 pub schema_id: SchemaId,
158}
159
160impl NotificationManager {
161 pub(crate) fn notify_fragment_mapping(
162 &self,
163 operation: NotificationOperation,
164 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
165 ) {
166 let fragment_ids = fragment_mappings
167 .iter()
168 .map(|mapping| mapping.fragment_id)
169 .collect_vec();
170 if fragment_ids.is_empty() {
171 return;
172 }
173 for fragment_mapping in fragment_mappings {
175 self.notify_frontend_without_version(
176 operation,
177 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
178 );
179 }
180
181 match operation {
183 NotificationOperation::Add | NotificationOperation::Update => {
184 self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
185 fragment_ids,
186 ));
187 }
188 NotificationOperation::Delete => {
189 self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
190 fragment_ids,
191 ));
192 }
193 op => {
194 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
195 }
196 }
197 }
198}
199
200impl CatalogController {
201 pub fn prepare_fragment_models_from_fragments(
202 job_id: JobId,
203 fragments: impl Iterator<Item = &Fragment>,
204 ) -> MetaResult<Vec<fragment::Model>> {
205 fragments
206 .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
207 .try_collect()
208 }
209
210 pub fn prepare_fragment_model_for_new_job(
211 job_id: JobId,
212 fragment: &Fragment,
213 ) -> MetaResult<fragment::Model> {
214 let vnode_count = fragment.vnode_count();
215 let Fragment {
216 fragment_id: pb_fragment_id,
217 fragment_type_mask: pb_fragment_type_mask,
218 distribution_type: pb_distribution_type,
219 actors: pb_actors,
220 state_table_ids: pb_state_table_ids,
221 nodes,
222 ..
223 } = fragment;
224
225 let state_table_ids = pb_state_table_ids.clone().into();
226
227 assert!(!pb_actors.is_empty());
228
229 let fragment_parallelism = nodes
230 .node_body
231 .as_ref()
232 .and_then(|body| match body {
233 NodeBody::StreamCdcScan(node) => Some(node),
234 _ => None,
235 })
236 .and_then(|node| node.options.as_ref())
237 .map(CdcScanOptions::from_proto)
238 .filter(|opts| opts.is_parallelized_backfill())
239 .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
240
241 let stream_node = StreamNode::from(nodes);
242
243 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
244 .unwrap()
245 .into();
246
247 #[expect(deprecated)]
248 let fragment = fragment::Model {
249 fragment_id: *pb_fragment_id as _,
250 job_id,
251 fragment_type_mask: (*pb_fragment_type_mask).into(),
252 distribution_type,
253 stream_node,
254 state_table_ids,
255 upstream_fragment_id: Default::default(),
256 vnode_count: vnode_count as _,
257 parallelism: fragment_parallelism,
258 };
259
260 Ok(fragment)
261 }
262
263 fn compose_table_fragments(
264 job_id: JobId,
265 state: PbState,
266 ctx: StreamContext,
267 fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
268 parallelism: StreamingParallelism,
269 max_parallelism: usize,
270 job_definition: Option<String>,
271 ) -> MetaResult<StreamJobFragments> {
272 let mut pb_fragments = BTreeMap::new();
273 let mut pb_actor_status = BTreeMap::new();
274
275 for (fragment, actors) in fragments {
276 let (fragment, fragment_actor_status, _) =
277 Self::compose_fragment(fragment, actors, job_definition.clone())?;
278
279 pb_fragments.insert(fragment.fragment_id, fragment);
280 pb_actor_status.extend(fragment_actor_status.into_iter());
281 }
282
283 let table_fragments = StreamJobFragments {
284 stream_job_id: job_id,
285 state: state as _,
286 fragments: pb_fragments,
287 actor_status: pb_actor_status,
288 ctx,
289 assigned_parallelism: match parallelism {
290 StreamingParallelism::Custom => TableParallelism::Custom,
291 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
292 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
293 },
294 max_parallelism,
295 };
296
297 Ok(table_fragments)
298 }
299
300 #[allow(clippy::type_complexity)]
301 fn compose_fragment(
302 fragment: fragment::Model,
303 actors: Vec<ActorInfo>,
304 job_definition: Option<String>,
305 ) -> MetaResult<(
306 Fragment,
307 HashMap<crate::model::ActorId, PbActorStatus>,
308 HashMap<crate::model::ActorId, PbConnectorSplits>,
309 )> {
310 let fragment::Model {
311 fragment_id,
312 fragment_type_mask,
313 distribution_type,
314 stream_node,
315 state_table_ids,
316 vnode_count,
317 ..
318 } = fragment;
319
320 let stream_node = stream_node.to_protobuf();
321 let mut upstream_fragments = HashSet::new();
322 visit_stream_node_body(&stream_node, |body| {
323 if let NodeBody::Merge(m) = body {
324 assert!(
325 upstream_fragments.insert(m.upstream_fragment_id),
326 "non-duplicate upstream fragment"
327 );
328 }
329 });
330
331 let mut pb_actors = vec![];
332
333 let mut pb_actor_status = HashMap::new();
334 let mut pb_actor_splits = HashMap::new();
335
336 for actor in actors {
337 if actor.fragment_id != fragment_id {
338 bail!(
339 "fragment id {} from actor {} is different from fragment {}",
340 actor.fragment_id,
341 actor.actor_id,
342 fragment_id
343 )
344 }
345
346 let ActorInfo {
347 actor_id,
348 fragment_id,
349 worker_id,
350 splits,
351 vnode_bitmap,
352 expr_context,
353 config_override,
354 ..
355 } = actor;
356
357 let vnode_bitmap =
358 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
359 let pb_expr_context = Some(expr_context.to_protobuf());
360
361 pb_actor_status.insert(
362 actor_id as _,
363 PbActorStatus {
364 location: PbActorLocation::from_worker(worker_id),
365 },
366 );
367
368 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
369
370 pb_actors.push(StreamActor {
371 actor_id: actor_id as _,
372 fragment_id: fragment_id as _,
373 vnode_bitmap,
374 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
375 expr_context: pb_expr_context,
376 config_override,
377 })
378 }
379
380 let pb_state_table_ids = state_table_ids.0;
381 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
382 let pb_fragment = Fragment {
383 fragment_id: fragment_id as _,
384 fragment_type_mask: fragment_type_mask.into(),
385 distribution_type: pb_distribution_type,
386 actors: pb_actors,
387 state_table_ids: pb_state_table_ids,
388 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
389 nodes: stream_node,
390 };
391
392 Ok((pb_fragment, pb_actor_status, pb_actor_splits))
393 }
394
395 pub fn running_fragment_parallelisms(
396 &self,
397 id_filter: Option<HashSet<FragmentId>>,
398 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
399 let info = self.env.shared_actor_infos().read_guard();
400
401 let mut result = HashMap::new();
402 for (fragment_id, fragment) in info.iter_over_fragments() {
403 if let Some(id_filter) = &id_filter
404 && !id_filter.contains(&(*fragment_id as _))
405 {
406 continue; }
408
409 result.insert(
410 *fragment_id as _,
411 FragmentParallelismInfo {
412 distribution_type: fragment.distribution_type.into(),
413 actor_count: fragment.actors.len() as _,
414 vnode_count: fragment.vnode_count,
415 },
416 );
417 }
418
419 Ok(result)
420 }
421
422 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
423 let inner = self.inner.read().await;
424 let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
425 .select_only()
426 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
427 .into_tuple()
428 .all(&inner.db)
429 .await?;
430 Ok(fragment_jobs.into_iter().collect())
431 }
432
433 pub async fn get_fragment_job_id(
434 &self,
435 fragment_ids: Vec<FragmentId>,
436 ) -> MetaResult<Vec<ObjectId>> {
437 let inner = self.inner.read().await;
438
439 let object_ids: Vec<ObjectId> = FragmentModel::find()
440 .select_only()
441 .column(fragment::Column::JobId)
442 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
443 .into_tuple()
444 .all(&inner.db)
445 .await?;
446
447 Ok(object_ids)
448 }
449
450 pub async fn get_fragment_desc_by_id(
451 &self,
452 fragment_id: FragmentId,
453 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
454 let inner = self.inner.read().await;
455
456 let fragment_model = match FragmentModel::find_by_id(fragment_id)
457 .one(&inner.db)
458 .await?
459 {
460 Some(fragment) => fragment,
461 None => return Ok(None),
462 };
463
464 let job_parallelism: Option<StreamingParallelism> =
465 StreamingJob::find_by_id(fragment_model.job_id)
466 .select_only()
467 .column(streaming_job::Column::Parallelism)
468 .into_tuple()
469 .one(&inner.db)
470 .await?;
471
472 let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
473 .select_only()
474 .columns([
475 fragment_relation::Column::SourceFragmentId,
476 fragment_relation::Column::DispatcherType,
477 ])
478 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
479 .into_tuple()
480 .all(&inner.db)
481 .await?;
482
483 let upstreams: Vec<_> = upstream_entries
484 .into_iter()
485 .map(|(source_id, _)| source_id)
486 .collect();
487
488 let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
489 .await
490 .map(Self::collect_root_fragment_mapping)?;
491 let root_fragments = root_fragment_map
492 .get(&fragment_id)
493 .cloned()
494 .unwrap_or_default();
495
496 let info = self.env.shared_actor_infos().read_guard();
497 let SharedFragmentInfo { actors, .. } = info
498 .get_fragment(fragment_model.fragment_id as _)
499 .unwrap_or_else(|| {
500 panic!(
501 "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
502 fragment_model.fragment_id,
503 fragment_model.job_id
504 )
505 });
506
507 let parallelism_policy = Self::format_fragment_parallelism_policy(
508 &fragment_model,
509 job_parallelism.as_ref(),
510 &root_fragments,
511 );
512
513 let fragment = FragmentDesc {
514 fragment_id: fragment_model.fragment_id,
515 job_id: fragment_model.job_id,
516 fragment_type_mask: fragment_model.fragment_type_mask,
517 distribution_type: fragment_model.distribution_type,
518 state_table_ids: fragment_model.state_table_ids.clone(),
519 parallelism: actors.len() as _,
520 vnode_count: fragment_model.vnode_count,
521 stream_node: fragment_model.stream_node.clone(),
522 parallelism_policy,
523 };
524
525 Ok(Some((fragment, upstreams)))
526 }
527
528 pub async fn list_fragment_database_ids(
529 &self,
530 select_fragment_ids: Option<Vec<FragmentId>>,
531 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
532 let inner = self.inner.read().await;
533 let select = FragmentModel::find()
534 .select_only()
535 .column(fragment::Column::FragmentId)
536 .column(object::Column::DatabaseId)
537 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
538 let select = if let Some(select_fragment_ids) = select_fragment_ids {
539 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
540 } else {
541 select
542 };
543 Ok(select.into_tuple().all(&inner.db).await?)
544 }
545
546 pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
547 let inner = self.inner.read().await;
548
549 let fragments: Vec<_> = FragmentModel::find()
551 .filter(fragment::Column::JobId.eq(job_id))
552 .all(&inner.db)
553 .await?;
554
555 let job_info = StreamingJob::find_by_id(job_id)
556 .one(&inner.db)
557 .await?
558 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
559
560 let fragment_actors =
561 self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
562
563 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
564 .await?
565 .remove(&job_id);
566
567 Self::compose_table_fragments(
568 job_id,
569 job_info.job_status.into(),
570 job_info.stream_context(),
571 fragment_actors,
572 job_info.parallelism.clone(),
573 job_info.max_parallelism as _,
574 job_definition,
575 )
576 }
577
578 pub async fn get_fragment_actor_dispatchers(
579 &self,
580 fragment_ids: Vec<FragmentId>,
581 ) -> MetaResult<FragmentActorDispatchers> {
582 let inner = self.inner.read().await;
583
584 self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
585 .await
586 }
587
588 pub async fn get_fragment_actor_dispatchers_txn(
589 &self,
590 c: &impl ConnectionTrait,
591 fragment_ids: Vec<FragmentId>,
592 ) -> MetaResult<FragmentActorDispatchers> {
593 let fragment_relations = FragmentRelation::find()
594 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
595 .all(c)
596 .await?;
597
598 type FragmentActorInfo = (
599 DistributionType,
600 Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
601 );
602
603 let shared_info = self.env.shared_actor_infos();
604 let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
605 let get_fragment_actors = |fragment_id: FragmentId| async move {
606 let result: MetaResult<FragmentActorInfo> = try {
607 let read_guard = shared_info.read_guard();
608
609 let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
610
611 (
612 fragment.distribution_type,
613 Arc::new(
614 fragment
615 .actors
616 .iter()
617 .map(|(actor_id, actor_info)| {
618 (
619 *actor_id,
620 actor_info
621 .vnode_bitmap
622 .as_ref()
623 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
624 )
625 })
626 .collect(),
627 ),
628 )
629 };
630 result
631 };
632
633 let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
634 for fragment_relation::Model {
635 source_fragment_id,
636 target_fragment_id,
637 dispatcher_type,
638 dist_key_indices,
639 output_indices,
640 output_type_mapping,
641 } in fragment_relations
642 {
643 let (source_fragment_distribution, source_fragment_actors) = {
644 let (distribution, actors) = {
645 match fragment_actor_cache.entry(source_fragment_id) {
646 Entry::Occupied(entry) => entry.into_mut(),
647 Entry::Vacant(entry) => {
648 entry.insert(get_fragment_actors(source_fragment_id).await?)
649 }
650 }
651 };
652 (*distribution, actors.clone())
653 };
654 let (target_fragment_distribution, target_fragment_actors) = {
655 let (distribution, actors) = {
656 match fragment_actor_cache.entry(target_fragment_id) {
657 Entry::Occupied(entry) => entry.into_mut(),
658 Entry::Vacant(entry) => {
659 entry.insert(get_fragment_actors(target_fragment_id).await?)
660 }
661 }
662 };
663 (*distribution, actors.clone())
664 };
665 let output_mapping = PbDispatchOutputMapping {
666 indices: output_indices.into_u32_array(),
667 types: output_type_mapping.unwrap_or_default().to_protobuf(),
668 };
669 let dispatchers = compose_dispatchers(
670 source_fragment_distribution,
671 &source_fragment_actors,
672 target_fragment_id as _,
673 target_fragment_distribution,
674 &target_fragment_actors,
675 dispatcher_type,
676 dist_key_indices.into_u32_array(),
677 output_mapping,
678 );
679 let actor_dispatchers_map = actor_dispatchers_map
680 .entry(source_fragment_id as _)
681 .or_default();
682 for (actor_id, dispatchers) in dispatchers {
683 actor_dispatchers_map
684 .entry(actor_id as _)
685 .or_default()
686 .push(dispatchers);
687 }
688 }
689 Ok(actor_dispatchers_map)
690 }
691
692 pub async fn get_fragment_downstream_relations(
693 &self,
694 fragment_ids: Vec<FragmentId>,
695 ) -> MetaResult<FragmentDownstreamRelation> {
696 let inner = self.inner.read().await;
697 let mut stream = FragmentRelation::find()
698 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
699 .stream(&inner.db)
700 .await?;
701 let mut relations = FragmentDownstreamRelation::new();
702 while let Some(relation) = stream.try_next().await? {
703 relations
704 .entry(relation.source_fragment_id as _)
705 .or_default()
706 .push(DownstreamFragmentRelation {
707 downstream_fragment_id: relation.target_fragment_id as _,
708 dispatcher_type: relation.dispatcher_type,
709 dist_key_indices: relation.dist_key_indices.into_u32_array(),
710 output_mapping: PbDispatchOutputMapping {
711 indices: relation.output_indices.into_u32_array(),
712 types: relation
713 .output_type_mapping
714 .unwrap_or_default()
715 .to_protobuf(),
716 },
717 });
718 }
719 Ok(relations)
720 }
721
722 pub async fn get_job_fragment_backfill_scan_type(
723 &self,
724 job_id: JobId,
725 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
726 let inner = self.inner.read().await;
727 let fragments: Vec<_> = FragmentModel::find()
728 .filter(fragment::Column::JobId.eq(job_id))
729 .all(&inner.db)
730 .await?;
731
732 let mut result = HashMap::new();
733
734 for fragment::Model {
735 fragment_id,
736 stream_node,
737 ..
738 } in fragments
739 {
740 let stream_node = stream_node.to_protobuf();
741 visit_stream_node_body(&stream_node, |body| {
742 if let NodeBody::StreamScan(node) = body {
743 match node.stream_scan_type() {
744 StreamScanType::Unspecified => {}
745 scan_type => {
746 result.insert(fragment_id as crate::model::FragmentId, scan_type);
747 }
748 }
749 }
750 });
751 }
752
753 Ok(result)
754 }
755
756 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
757 let inner = self.inner.read().await;
758 let job_states = StreamingJob::find()
759 .select_only()
760 .column(streaming_job::Column::JobId)
761 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
762 .join(JoinType::InnerJoin, object::Relation::Database2.def())
763 .column(object::Column::ObjType)
764 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
765 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
766 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
767 .column_as(
768 Expr::if_null(
769 Expr::col((table::Entity, table::Column::Name)),
770 Expr::if_null(
771 Expr::col((source::Entity, source::Column::Name)),
772 Expr::if_null(
773 Expr::col((sink::Entity, sink::Column::Name)),
774 Expr::val("<unknown>"),
775 ),
776 ),
777 ),
778 "name",
779 )
780 .columns([
781 streaming_job::Column::JobStatus,
782 streaming_job::Column::Parallelism,
783 streaming_job::Column::MaxParallelism,
784 ])
785 .column_as(
786 Expr::if_null(
787 Expr::col((
788 streaming_job::Entity,
789 streaming_job::Column::SpecificResourceGroup,
790 )),
791 Expr::col((database::Entity, database::Column::ResourceGroup)),
792 ),
793 "resource_group",
794 )
795 .column(object::Column::DatabaseId)
796 .column(object::Column::SchemaId)
797 .into_model()
798 .all(&inner.db)
799 .await?;
800 Ok(job_states)
801 }
802
803 pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
804 let inner = self.inner.read().await;
805 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
806 .select_only()
807 .column(streaming_job::Column::MaxParallelism)
808 .into_tuple()
809 .one(&inner.db)
810 .await?
811 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
812 Ok(max_parallelism as usize)
813 }
814
815 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
817 if let Ok(inner) = self.inner.try_read()
818 && let Ok(job_state_tables) = FragmentModel::find()
819 .select_only()
820 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
821 .into_tuple::<(JobId, I32Array)>()
822 .all(&inner.db)
823 .await
824 {
825 let mut job_internal_table_ids = HashMap::new();
826 for (job_id, state_table_ids) in job_state_tables {
827 job_internal_table_ids
828 .entry(job_id)
829 .or_insert_with(Vec::new)
830 .extend(
831 state_table_ids
832 .into_inner()
833 .into_iter()
834 .map(|table_id| TableId::new(table_id as _)),
835 );
836 }
837 return Some(job_internal_table_ids.into_iter().collect());
838 }
839 None
840 }
841
842 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
843 let inner = self.inner.read().await;
844 let count = FragmentModel::find().count(&inner.db).await?;
845 Ok(count > 0)
846 }
847
848 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
849 let read_guard = self.env.shared_actor_infos().read_guard();
850 let actor_cnt: HashMap<WorkerId, _> = read_guard
851 .iter_over_fragments()
852 .flat_map(|(_, fragment)| {
853 fragment
854 .actors
855 .iter()
856 .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
857 })
858 .into_group_map()
859 .into_iter()
860 .map(|(k, v)| (k, v.len()))
861 .collect();
862
863 Ok(actor_cnt)
864 }
865
866 fn collect_fragment_actor_map(
867 &self,
868 fragment_ids: &[FragmentId],
869 stream_context: StreamContext,
870 ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
871 let guard = self.env.shared_actor_infos().read_guard();
872 let pb_expr_context = stream_context.to_expr_context();
873 let expr_context: ExprContext = (&pb_expr_context).into();
874
875 let mut actor_map = HashMap::with_capacity(fragment_ids.len());
876 for fragment_id in fragment_ids {
877 let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
878 anyhow!("fragment {} not found in shared actor info", fragment_id)
879 })?;
880
881 let actors = fragment_info
882 .actors
883 .iter()
884 .map(|(actor_id, actor_info)| ActorInfo {
885 actor_id: *actor_id as _,
886 fragment_id: *fragment_id,
887 splits: ConnectorSplits::from(&PbConnectorSplits {
888 splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
889 }),
890 worker_id: actor_info.worker_id as _,
891 vnode_bitmap: actor_info
892 .vnode_bitmap
893 .as_ref()
894 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
895 expr_context: expr_context.clone(),
896 config_override: stream_context.config_override.clone(),
897 })
898 .collect();
899
900 actor_map.insert(*fragment_id, actors);
901 }
902
903 Ok(actor_map)
904 }
905
906 fn collect_fragment_actor_pairs(
907 &self,
908 fragments: Vec<fragment::Model>,
909 stream_context: StreamContext,
910 ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
911 let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
912 let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
913 fragments
914 .into_iter()
915 .map(|fragment| {
916 let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
917 anyhow!(
918 "fragment {} missing in shared actor info map",
919 fragment.fragment_id
920 )
921 })?;
922 Ok((fragment, actors))
923 })
924 .collect()
925 }
926
927 pub async fn table_fragments(&self) -> MetaResult<BTreeMap<JobId, StreamJobFragments>> {
929 let inner = self.inner.read().await;
930 let jobs = StreamingJob::find().all(&inner.db).await?;
931
932 let mut job_definition = resolve_streaming_job_definition(
933 &inner.db,
934 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
935 )
936 .await?;
937
938 let mut table_fragments = BTreeMap::new();
939 for job in jobs {
940 let fragments = FragmentModel::find()
941 .filter(fragment::Column::JobId.eq(job.job_id))
942 .all(&inner.db)
943 .await?;
944
945 let fragment_actors =
946 self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
947
948 table_fragments.insert(
949 job.job_id,
950 Self::compose_table_fragments(
951 job.job_id,
952 job.job_status.into(),
953 job.stream_context(),
954 fragment_actors,
955 job.parallelism.clone(),
956 job.max_parallelism as _,
957 job_definition.remove(&job.job_id),
958 )?,
959 );
960 }
961
962 Ok(table_fragments)
963 }
964
965 pub async fn upstream_fragments(
966 &self,
967 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
968 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
969 let inner = self.inner.read().await;
970 let mut stream = FragmentRelation::find()
971 .select_only()
972 .columns([
973 fragment_relation::Column::SourceFragmentId,
974 fragment_relation::Column::TargetFragmentId,
975 ])
976 .filter(
977 fragment_relation::Column::TargetFragmentId
978 .is_in(fragment_ids.map(|id| id as FragmentId)),
979 )
980 .into_tuple::<(FragmentId, FragmentId)>()
981 .stream(&inner.db)
982 .await?;
983 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
984 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
985 upstream_fragments
986 .entry(downstream_fragment_id as crate::model::FragmentId)
987 .or_default()
988 .insert(upstream_fragment_id as crate::model::FragmentId);
989 }
990 Ok(upstream_fragments)
991 }
992
993 pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
994 let info = self.env.shared_actor_infos().read_guard();
995
996 let actor_locations = info
997 .iter_over_fragments()
998 .flat_map(|(fragment_id, fragment)| {
999 fragment
1000 .actors
1001 .iter()
1002 .map(|(actor_id, actor)| PartialActorLocation {
1003 actor_id: *actor_id as _,
1004 fragment_id: *fragment_id as _,
1005 worker_id: actor.worker_id,
1006 })
1007 })
1008 .collect_vec();
1009
1010 Ok(actor_locations)
1011 }
1012
1013 pub async fn list_actor_info(
1014 &self,
1015 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1016 let inner = self.inner.read().await;
1017
1018 let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1019 FragmentModel::find()
1020 .select_only()
1021 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1022 .column(fragment::Column::FragmentId)
1023 .column_as(object::Column::Oid, "job_id")
1024 .column_as(object::Column::SchemaId, "schema_id")
1025 .column_as(object::Column::ObjType, "type")
1026 .into_tuple()
1027 .all(&inner.db)
1028 .await?;
1029
1030 let actor_infos = {
1031 let info = self.env.shared_actor_infos().read_guard();
1032
1033 fragment_objects
1034 .into_iter()
1035 .flat_map(|(fragment_id, object_id, schema_id, object_type)| {
1036 let SharedFragmentInfo {
1037 fragment_id,
1038 actors,
1039 ..
1040 } = info.get_fragment(fragment_id as _).unwrap();
1041 actors.keys().map(move |actor_id| {
1042 (
1043 *actor_id as _,
1044 *fragment_id as _,
1045 object_id,
1046 schema_id,
1047 object_type,
1048 )
1049 })
1050 })
1051 .collect_vec()
1052 };
1053
1054 Ok(actor_infos)
1055 }
1056
1057 pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1058 let guard = self.env.shared_actor_info.read_guard();
1059 guard
1060 .iter_over_fragments()
1061 .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1062 .collect_vec()
1063 }
1064
1065 pub async fn list_fragment_descs(
1066 &self,
1067 is_creating: bool,
1068 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1069 let inner = self.inner.read().await;
1070
1071 let txn = inner.db.begin().await?;
1072
1073 let fragments: Vec<_> = if is_creating {
1074 FragmentModel::find()
1075 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1076 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1077 .filter(
1078 streaming_job::Column::JobStatus
1079 .eq(JobStatus::Initial)
1080 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1081 )
1082 .all(&txn)
1083 .await?
1084 } else {
1085 FragmentModel::find().all(&txn).await?
1086 };
1087
1088 let fragment_ids = fragments.iter().map(|f| f.fragment_id).collect_vec();
1089
1090 let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragments.is_empty() {
1091 HashMap::new()
1092 } else {
1093 let job_ids = fragments.iter().map(|f| f.job_id).unique().collect_vec();
1094 StreamingJob::find()
1095 .select_only()
1096 .columns([
1097 streaming_job::Column::JobId,
1098 streaming_job::Column::Parallelism,
1099 ])
1100 .filter(streaming_job::Column::JobId.is_in(job_ids))
1101 .into_tuple()
1102 .all(&txn)
1103 .await?
1104 .into_iter()
1105 .collect()
1106 };
1107
1108 let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1109 if fragment_ids.is_empty() {
1110 Vec::new()
1111 } else {
1112 FragmentRelation::find()
1113 .select_only()
1114 .columns([
1115 fragment_relation::Column::TargetFragmentId,
1116 fragment_relation::Column::SourceFragmentId,
1117 fragment_relation::Column::DispatcherType,
1118 ])
1119 .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1120 .into_tuple()
1121 .all(&txn)
1122 .await?
1123 };
1124
1125 let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1126 for (target_id, source_id, _) in upstream_entries {
1127 all_upstreams.entry(target_id).or_default().push(source_id);
1128 }
1129
1130 let root_fragment_map = if fragment_ids.is_empty() {
1131 HashMap::new()
1132 } else {
1133 let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?;
1134 Self::collect_root_fragment_mapping(ensembles)
1135 };
1136
1137 let guard = self.env.shared_actor_info.read_guard();
1138
1139 let mut result = Vec::new();
1140
1141 for fragment_desc in fragments {
1142 let parallelism = guard
1144 .get_fragment(fragment_desc.fragment_id as _)
1145 .map(|fragment| fragment.actors.len())
1146 .unwrap_or_default();
1147
1148 let root_fragments = root_fragment_map
1149 .get(&fragment_desc.fragment_id)
1150 .cloned()
1151 .unwrap_or_default();
1152
1153 let upstreams = all_upstreams
1154 .remove(&fragment_desc.fragment_id)
1155 .unwrap_or_default();
1156
1157 let parallelism_policy = Self::format_fragment_parallelism_policy(
1158 &fragment_desc,
1159 job_parallelisms.get(&fragment_desc.job_id),
1160 &root_fragments,
1161 );
1162
1163 let fragment = FragmentDistribution {
1164 fragment_id: fragment_desc.fragment_id,
1165 table_id: fragment_desc.job_id,
1166 distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type)
1167 as _,
1168 state_table_ids: fragment_desc.state_table_ids.0,
1169 upstream_fragment_ids: upstreams.clone(),
1170 fragment_type_mask: fragment_desc.fragment_type_mask as _,
1171 parallelism: parallelism as _,
1172 vnode_count: fragment_desc.vnode_count as _,
1173 node: Some(fragment_desc.stream_node.to_protobuf()),
1174 parallelism_policy,
1175 };
1176
1177 result.push((fragment, upstreams));
1178 }
1179 Ok(result)
1180 }
1181
1182 fn collect_root_fragment_mapping(
1184 ensembles: Vec<NoShuffleEnsemble>,
1185 ) -> HashMap<FragmentId, Vec<FragmentId>> {
1186 let mut mapping = HashMap::new();
1187
1188 for ensemble in ensembles {
1189 let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1190 roots.sort_unstable();
1191 roots.dedup();
1192
1193 if roots.is_empty() {
1194 continue;
1195 }
1196
1197 let root_set: HashSet<_> = roots.iter().copied().collect();
1198
1199 for fragment_id in ensemble.component_fragments() {
1200 if root_set.contains(&fragment_id) {
1201 mapping.insert(fragment_id, Vec::new());
1202 } else {
1203 mapping.insert(fragment_id, roots.clone());
1204 }
1205 }
1206 }
1207
1208 mapping
1209 }
1210
1211 fn format_fragment_parallelism_policy(
1212 fragment: &fragment::Model,
1213 job_parallelism: Option<&StreamingParallelism>,
1214 root_fragments: &[FragmentId],
1215 ) -> String {
1216 if fragment.distribution_type == DistributionType::Single {
1217 return "single".to_owned();
1218 }
1219
1220 if let Some(parallelism) = fragment.parallelism.as_ref() {
1221 return format!(
1222 "override({})",
1223 Self::format_streaming_parallelism(parallelism)
1224 );
1225 }
1226
1227 if !root_fragments.is_empty() {
1228 let mut upstreams = root_fragments.to_vec();
1229 upstreams.sort_unstable();
1230 upstreams.dedup();
1231
1232 return format!("upstream_fragment({upstreams:?})");
1233 }
1234
1235 let inherited = job_parallelism
1236 .map(Self::format_streaming_parallelism)
1237 .unwrap_or_else(|| "unknown".to_owned());
1238 format!("inherit({inherited})")
1239 }
1240
1241 fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1242 match parallelism {
1243 StreamingParallelism::Adaptive => "adaptive".to_owned(),
1244 StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1245 StreamingParallelism::Custom => "custom".to_owned(),
1246 }
1247 }
1248
1249 pub async fn list_sink_actor_mapping(
1250 &self,
1251 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1252 let inner = self.inner.read().await;
1253 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1254 .select_only()
1255 .columns([sink::Column::SinkId, sink::Column::Name])
1256 .into_tuple()
1257 .all(&inner.db)
1258 .await?;
1259 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1260
1261 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1262
1263 let actor_with_type: Vec<(ActorId, SinkId)> = {
1264 let info = self.env.shared_actor_infos().read_guard();
1265
1266 info.iter_over_fragments()
1267 .filter(|(_, fragment)| {
1268 sink_ids.contains(&fragment.job_id.as_sink_id())
1269 && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1270 })
1271 .flat_map(|(_, fragment)| {
1272 fragment
1273 .actors
1274 .keys()
1275 .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1276 })
1277 .collect()
1278 };
1279
1280 let mut sink_actor_mapping = HashMap::new();
1281 for (actor_id, sink_id) in actor_with_type {
1282 sink_actor_mapping
1283 .entry(sink_id)
1284 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1285 .1
1286 .push(actor_id);
1287 }
1288
1289 Ok(sink_actor_mapping)
1290 }
1291
1292 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1293 let inner = self.inner.read().await;
1294 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1295 .select_only()
1296 .columns([
1297 fragment::Column::FragmentId,
1298 fragment::Column::JobId,
1299 fragment::Column::StateTableIds,
1300 ])
1301 .into_partial_model()
1302 .all(&inner.db)
1303 .await?;
1304 Ok(fragment_state_tables)
1305 }
1306
1307 pub async fn load_all_actors_dynamic(
1310 &self,
1311 database_id: Option<DatabaseId>,
1312 worker_nodes: &ActiveStreamingWorkerNodes,
1313 ) -> MetaResult<FragmentRenderMap> {
1314 let adaptive_parallelism_strategy = {
1315 let system_params_reader = self.env.system_params_reader().await;
1316 system_params_reader.adaptive_parallelism_strategy()
1317 };
1318
1319 let inner = self.inner.read().await;
1320 let txn = inner.db.begin().await?;
1321
1322 let database_fragment_infos = load_fragment_info(
1323 &txn,
1324 self.env.actor_id_generator(),
1325 database_id,
1326 worker_nodes,
1327 adaptive_parallelism_strategy,
1328 )
1329 .await?;
1330
1331 tracing::trace!(?database_fragment_infos, "reload all actors");
1332
1333 Ok(database_fragment_infos)
1334 }
1335
1336 #[await_tree::instrument]
1337 pub async fn fill_snapshot_backfill_epoch(
1338 &self,
1339 fragment_ids: impl Iterator<Item = FragmentId>,
1340 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1341 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1342 ) -> MetaResult<()> {
1343 let inner = self.inner.write().await;
1344 let txn = inner.db.begin().await?;
1345 for fragment_id in fragment_ids {
1346 let fragment = FragmentModel::find_by_id(fragment_id)
1347 .one(&txn)
1348 .await?
1349 .context(format!("fragment {} not found", fragment_id))?;
1350 let mut node = fragment.stream_node.to_protobuf();
1351 if crate::stream::fill_snapshot_backfill_epoch(
1352 &mut node,
1353 snapshot_backfill_info,
1354 cross_db_snapshot_backfill_info,
1355 )? {
1356 let node = StreamNode::from(&node);
1357 FragmentModel::update(fragment::ActiveModel {
1358 fragment_id: Set(fragment_id),
1359 stream_node: Set(node),
1360 ..Default::default()
1361 })
1362 .exec(&txn)
1363 .await?;
1364 }
1365 }
1366 txn.commit().await?;
1367 Ok(())
1368 }
1369
1370 pub fn get_running_actors_of_fragment(
1372 &self,
1373 fragment_id: FragmentId,
1374 ) -> MetaResult<HashSet<model::ActorId>> {
1375 let info = self.env.shared_actor_infos().read_guard();
1376
1377 let actors = info
1378 .get_fragment(fragment_id as _)
1379 .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1380 .unwrap_or_default();
1381
1382 Ok(actors)
1383 }
1384
1385 pub async fn get_running_actors_for_source_backfill(
1388 &self,
1389 source_backfill_fragment_id: FragmentId,
1390 source_fragment_id: FragmentId,
1391 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1392 let inner = self.inner.read().await;
1393 let txn = inner.db.begin().await?;
1394
1395 let fragment_relation: DispatcherType = FragmentRelation::find()
1396 .select_only()
1397 .column(fragment_relation::Column::DispatcherType)
1398 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1399 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1400 .into_tuple()
1401 .one(&txn)
1402 .await?
1403 .ok_or_else(|| {
1404 anyhow!(
1405 "no fragment connection from source fragment {} to source backfill fragment {}",
1406 source_fragment_id,
1407 source_backfill_fragment_id
1408 )
1409 })?;
1410
1411 if fragment_relation != DispatcherType::NoShuffle {
1412 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1413 }
1414
1415 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1416 let result: MetaResult<DistributionType> = try {
1417 FragmentModel::find_by_id(fragment_id)
1418 .select_only()
1419 .column(fragment::Column::DistributionType)
1420 .into_tuple()
1421 .one(txn)
1422 .await?
1423 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1424 };
1425 result
1426 };
1427
1428 let source_backfill_distribution_type =
1429 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1430 let source_distribution_type =
1431 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1432
1433 let load_fragment_actor_distribution =
1434 |actor_info: &SharedActorInfos,
1435 fragment_id: FragmentId|
1436 -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1437 let guard = actor_info.read_guard();
1438
1439 guard
1440 .get_fragment(fragment_id as _)
1441 .map(|fragment| {
1442 fragment
1443 .actors
1444 .iter()
1445 .map(|(actor_id, actor)| {
1446 (
1447 *actor_id as _,
1448 actor
1449 .vnode_bitmap
1450 .as_ref()
1451 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1452 )
1453 })
1454 .collect()
1455 })
1456 .unwrap_or_default()
1457 };
1458
1459 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1460 load_fragment_actor_distribution(
1461 self.env.shared_actor_infos(),
1462 source_backfill_fragment_id,
1463 );
1464
1465 let source_actors =
1466 load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1467
1468 Ok(resolve_no_shuffle_actor_dispatcher(
1469 source_distribution_type,
1470 &source_actors,
1471 source_backfill_distribution_type,
1472 &source_backfill_actors,
1473 )
1474 .into_iter()
1475 .map(|(source_actor, source_backfill_actor)| {
1476 (source_backfill_actor as _, source_actor as _)
1477 })
1478 .collect())
1479 }
1480
1481 pub async fn get_root_fragments(
1494 &self,
1495 job_ids: Vec<JobId>,
1496 ) -> MetaResult<(
1497 HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1498 HashMap<ActorId, WorkerId>,
1499 )> {
1500 let inner = self.inner.read().await;
1501
1502 let all_fragments = FragmentModel::find()
1503 .filter(fragment::Column::JobId.is_in(job_ids))
1504 .all(&inner.db)
1505 .await?;
1506 let mut root_fragments = HashMap::<JobId, fragment::Model>::new();
1508 for fragment in all_fragments {
1509 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1510 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1511 _ = root_fragments.insert(fragment.job_id, fragment);
1512 } else if mask.contains(FragmentTypeFlag::Source) {
1513 _ = root_fragments.try_insert(fragment.job_id, fragment);
1516 }
1517 }
1518
1519 let mut root_fragments_pb = HashMap::new();
1520
1521 let info = self.env.shared_actor_infos().read_guard();
1522
1523 let root_fragment_to_jobs: HashMap<_, _> = root_fragments
1524 .iter()
1525 .map(|(job_id, fragment)| (fragment.fragment_id, *job_id))
1526 .collect();
1527
1528 for fragment in root_fragment_to_jobs.keys() {
1529 let fragment_info = info.get_fragment(*fragment).context(format!(
1530 "fragment {} not found in shared actor info",
1531 fragment
1532 ))?;
1533
1534 let job_id = root_fragment_to_jobs[&fragment_info.fragment_id];
1535 let fragment = root_fragments
1536 .get(&job_id)
1537 .context(format!("root fragment for job {} not found", job_id))?;
1538
1539 root_fragments_pb.insert(
1540 job_id,
1541 (fragment_info.clone(), fragment.stream_node.to_protobuf()),
1542 );
1543 }
1544
1545 let mut all_actor_locations = HashMap::new();
1546
1547 for (_, SharedFragmentInfo { actors, .. }) in info.iter_over_fragments() {
1548 for (actor_id, actor_info) in actors {
1549 all_actor_locations.insert(*actor_id as ActorId, actor_info.worker_id);
1550 }
1551 }
1552
1553 Ok((root_fragments_pb, all_actor_locations))
1554 }
1555
1556 pub async fn get_root_fragment(
1557 &self,
1558 job_id: JobId,
1559 ) -> MetaResult<(SharedFragmentInfo, HashMap<ActorId, WorkerId>)> {
1560 let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1561 let (root_fragment, _) = root_fragments
1562 .remove(&job_id)
1563 .context(format!("root fragment for job {} not found", job_id))?;
1564
1565 Ok((root_fragment, actors))
1566 }
1567
1568 pub async fn get_downstream_fragments(
1570 &self,
1571 job_id: JobId,
1572 ) -> MetaResult<(
1573 Vec<(
1574 stream_plan::DispatcherType,
1575 SharedFragmentInfo,
1576 PbStreamNode,
1577 )>,
1578 HashMap<ActorId, WorkerId>,
1579 )> {
1580 let (root_fragment, actor_locations) = self.get_root_fragment(job_id).await?;
1581
1582 let inner = self.inner.read().await;
1583 let txn = inner.db.begin().await?;
1584 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1585 .filter(
1586 fragment_relation::Column::SourceFragmentId
1587 .eq(root_fragment.fragment_id as FragmentId),
1588 )
1589 .all(&txn)
1590 .await?;
1591
1592 let downstream_fragment_ids = downstream_fragment_relations
1593 .iter()
1594 .map(|model| model.target_fragment_id as FragmentId)
1595 .collect::<HashSet<_>>();
1596
1597 let downstream_fragment_nodes: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1598 .select_only()
1599 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1600 .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1601 .into_tuple()
1602 .all(&txn)
1603 .await?;
1604
1605 let downstream_fragment_nodes: HashMap<_, _> =
1606 downstream_fragment_nodes.into_iter().collect();
1607
1608 let mut downstream_fragments = vec![];
1609
1610 let info = self.env.shared_actor_infos().read_guard();
1611
1612 let fragment_map: HashMap<_, _> = downstream_fragment_relations
1613 .iter()
1614 .map(|model| (model.target_fragment_id, model.dispatcher_type))
1615 .collect();
1616
1617 for fragment_id in fragment_map.keys() {
1618 let fragment_info @ SharedFragmentInfo { actors, .. } =
1619 info.get_fragment(*fragment_id).unwrap();
1620
1621 let dispatcher_type = fragment_map[fragment_id];
1622
1623 if actors.is_empty() {
1624 bail!("No fragment found for fragment id {}", fragment_id);
1625 }
1626
1627 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1628
1629 let nodes = downstream_fragment_nodes
1630 .get(fragment_id)
1631 .context(format!(
1632 "downstream fragment node for id {} not found",
1633 fragment_id
1634 ))?
1635 .to_protobuf();
1636
1637 downstream_fragments.push((dispatch_type, fragment_info.clone(), nodes));
1638 }
1639 Ok((downstream_fragments, actor_locations))
1640 }
1641
1642 pub async fn load_source_fragment_ids(
1643 &self,
1644 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1645 let inner = self.inner.read().await;
1646 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1647 .select_only()
1648 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1649 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1650 .into_tuple()
1651 .all(&inner.db)
1652 .await?;
1653
1654 let mut source_fragment_ids = HashMap::new();
1655 for (fragment_id, stream_node) in fragments {
1656 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1657 source_fragment_ids
1658 .entry(source_id)
1659 .or_insert_with(BTreeSet::new)
1660 .insert(fragment_id);
1661 }
1662 }
1663 Ok(source_fragment_ids)
1664 }
1665
1666 pub async fn load_backfill_fragment_ids(
1667 &self,
1668 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1669 let inner = self.inner.read().await;
1670 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1671 .select_only()
1672 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1673 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1674 .into_tuple()
1675 .all(&inner.db)
1676 .await?;
1677
1678 let mut source_fragment_ids = HashMap::new();
1679 for (fragment_id, stream_node) in fragments {
1680 if let Some((source_id, upstream_source_fragment_id)) =
1681 stream_node.to_protobuf().find_source_backfill()
1682 {
1683 source_fragment_ids
1684 .entry(source_id)
1685 .or_insert_with(BTreeSet::new)
1686 .insert((fragment_id, upstream_source_fragment_id));
1687 }
1688 }
1689 Ok(source_fragment_ids)
1690 }
1691
1692 pub async fn get_all_upstream_sink_infos(
1693 &self,
1694 target_table: &PbTable,
1695 target_fragment_id: FragmentId,
1696 ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1697 let incoming_sinks = self.get_table_incoming_sinks(target_table.id).await?;
1698
1699 let inner = self.inner.read().await;
1700 let txn = inner.db.begin().await?;
1701
1702 let sink_ids = incoming_sinks.iter().map(|s| s.id).collect_vec();
1703 let sink_fragment_ids = get_sink_fragment_by_ids(&txn, sink_ids).await?;
1704
1705 let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1706 for pb_sink in &incoming_sinks {
1707 let sink_fragment_id =
1708 sink_fragment_ids
1709 .get(&pb_sink.id)
1710 .cloned()
1711 .ok_or(anyhow::anyhow!(
1712 "sink fragment not found for sink id {}",
1713 pb_sink.id
1714 ))?;
1715 let upstream_info = build_upstream_sink_info(
1716 pb_sink,
1717 sink_fragment_id,
1718 target_table,
1719 target_fragment_id,
1720 )?;
1721 upstream_sink_infos.push(upstream_info);
1722 }
1723
1724 Ok(upstream_sink_infos)
1725 }
1726
1727 pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1728 let inner = self.inner.read().await;
1729 let txn = inner.db.begin().await?;
1730
1731 let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1732 .select_only()
1733 .column(fragment::Column::FragmentId)
1734 .filter(
1735 fragment::Column::JobId
1736 .eq(job_id)
1737 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1738 )
1739 .into_tuple()
1740 .all(&txn)
1741 .await?;
1742
1743 if mview_fragment.len() != 1 {
1744 return Err(anyhow::anyhow!(
1745 "expected exactly one mview fragment for job {}, found {}",
1746 job_id,
1747 mview_fragment.len()
1748 )
1749 .into());
1750 }
1751
1752 Ok(mview_fragment.into_iter().next().unwrap())
1753 }
1754
1755 pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1756 let inner = self.inner.read().await;
1757 let txn = inner.db.begin().await?;
1758 has_table_been_migrated(&txn, table_id).await
1759 }
1760
1761 pub async fn update_fragment_splits<C>(
1762 &self,
1763 txn: &C,
1764 fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
1765 ) -> MetaResult<()>
1766 where
1767 C: ConnectionTrait,
1768 {
1769 if fragment_splits.is_empty() {
1770 return Ok(());
1771 }
1772
1773 let models: Vec<fragment_splits::ActiveModel> = fragment_splits
1774 .iter()
1775 .map(|(fragment_id, splits)| fragment_splits::ActiveModel {
1776 fragment_id: Set(*fragment_id as _),
1777 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1778 splits: splits.iter().map(Into::into).collect_vec(),
1779 }))),
1780 })
1781 .collect();
1782
1783 FragmentSplits::insert_many(models)
1784 .on_conflict(
1785 OnConflict::column(fragment_splits::Column::FragmentId)
1786 .update_column(fragment_splits::Column::Splits)
1787 .to_owned(),
1788 )
1789 .exec(txn)
1790 .await?;
1791
1792 Ok(())
1793 }
1794}
1795
1796#[cfg(test)]
1797mod tests {
1798 use std::collections::{BTreeMap, HashMap, HashSet};
1799
1800 use itertools::Itertools;
1801 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1802 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1803 use risingwave_common::id::JobId;
1804 use risingwave_common::util::iter_util::ZipEqDebug;
1805 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1806 use risingwave_meta_model::fragment::DistributionType;
1807 use risingwave_meta_model::*;
1808 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1809 use risingwave_pb::plan_common::PbExprContext;
1810 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1811 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1812 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1813
1814 use super::ActorInfo;
1815 use crate::MetaResult;
1816 use crate::controller::catalog::CatalogController;
1817 use crate::model::{Fragment, StreamActor};
1818
1819 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1820
1821 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1822
1823 const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
1824
1825 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
1826
1827 const TEST_JOB_ID: JobId = JobId::new(1);
1828
1829 const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
1830
1831 fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
1832 let mut upstream_actor_ids = BTreeMap::new();
1833 upstream_actor_ids.insert(
1834 TEST_UPSTREAM_FRAGMENT_ID,
1835 HashSet::from_iter([(actor_id + 100)]),
1836 );
1837 upstream_actor_ids.insert(
1838 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1839 HashSet::from_iter([(actor_id + 200)]),
1840 );
1841 upstream_actor_ids
1842 }
1843
1844 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1845 let mut input = vec![];
1846 for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
1847 input.push(PbStreamNode {
1848 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1849 upstream_fragment_id,
1850 ..Default::default()
1851 }))),
1852 ..Default::default()
1853 });
1854 }
1855
1856 PbStreamNode {
1857 input,
1858 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1859 ..Default::default()
1860 }
1861 }
1862
1863 #[tokio::test]
1864 async fn test_extract_fragment() -> MetaResult<()> {
1865 let actor_count = 3u32;
1866 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1867 .map(|actor_id| {
1868 (
1869 actor_id.into(),
1870 generate_upstream_actor_ids_for_actor(actor_id.into()),
1871 )
1872 })
1873 .collect();
1874
1875 let actor_bitmaps = ActorMapping::new_uniform(
1876 (0..actor_count).map(|i| i.into()),
1877 VirtualNode::COUNT_FOR_TEST,
1878 )
1879 .to_bitmaps();
1880
1881 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1882
1883 let pb_actors = (0..actor_count)
1884 .map(|actor_id| StreamActor {
1885 actor_id: actor_id.into(),
1886 fragment_id: TEST_FRAGMENT_ID as _,
1887 vnode_bitmap: actor_bitmaps.get(&actor_id.into()).cloned(),
1888 mview_definition: "".to_owned(),
1889 expr_context: Some(PbExprContext {
1890 time_zone: String::from("America/New_York"),
1891 strict_mode: false,
1892 }),
1893 config_override: "".into(),
1894 })
1895 .collect_vec();
1896
1897 let pb_fragment = Fragment {
1898 fragment_id: TEST_FRAGMENT_ID as _,
1899 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1900 distribution_type: PbFragmentDistributionType::Hash as _,
1901 actors: pb_actors,
1902 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1903 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1904 nodes: stream_node,
1905 };
1906
1907 let fragment =
1908 CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
1909
1910 check_fragment(fragment, pb_fragment);
1911
1912 Ok(())
1913 }
1914
1915 #[tokio::test]
1916 async fn test_compose_fragment() -> MetaResult<()> {
1917 let actor_count = 3u32;
1918
1919 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1920 .map(|actor_id| {
1921 (
1922 actor_id.into(),
1923 generate_upstream_actor_ids_for_actor(actor_id.into()),
1924 )
1925 })
1926 .collect();
1927
1928 let mut actor_bitmaps = ActorMapping::new_uniform(
1929 (0..actor_count).map(|i| i.into()),
1930 VirtualNode::COUNT_FOR_TEST,
1931 )
1932 .to_bitmaps();
1933
1934 let actors = (0..actor_count)
1935 .map(|actor_id| {
1936 let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
1937 splits: vec![PbConnectorSplit {
1938 split_type: "dummy".to_owned(),
1939 ..Default::default()
1940 }],
1941 });
1942
1943 ActorInfo {
1944 actor_id: actor_id.into(),
1945 fragment_id: TEST_FRAGMENT_ID,
1946 splits: actor_splits,
1947 worker_id: 0.into(),
1948 vnode_bitmap: actor_bitmaps
1949 .remove(&actor_id.into())
1950 .map(|bitmap| bitmap.to_protobuf())
1951 .as_ref()
1952 .map(VnodeBitmap::from),
1953 expr_context: ExprContext::from(&PbExprContext {
1954 time_zone: String::from("America/New_York"),
1955 strict_mode: false,
1956 }),
1957 config_override: "a.b.c = true".into(),
1958 }
1959 })
1960 .collect_vec();
1961
1962 let stream_node = {
1963 let template_actor = actors.first().cloned().unwrap();
1964
1965 let template_upstream_actor_ids = upstream_actor_ids
1966 .get(&(template_actor.actor_id as _))
1967 .unwrap();
1968
1969 generate_merger_stream_node(template_upstream_actor_ids)
1970 };
1971
1972 #[expect(deprecated)]
1973 let fragment = fragment::Model {
1974 fragment_id: TEST_FRAGMENT_ID,
1975 job_id: TEST_JOB_ID,
1976 fragment_type_mask: 0,
1977 distribution_type: DistributionType::Hash,
1978 stream_node: StreamNode::from(&stream_node),
1979 state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
1980 upstream_fragment_id: Default::default(),
1981 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1982 parallelism: None,
1983 };
1984
1985 let (pb_fragment, pb_actor_status, pb_actor_splits) =
1986 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
1987
1988 assert_eq!(pb_actor_status.len(), actor_count as usize);
1989 assert!(
1990 pb_actor_status
1991 .values()
1992 .all(|actor_status| actor_status.location.is_some())
1993 );
1994 assert_eq!(pb_actor_splits.len(), actor_count as usize);
1995
1996 let pb_actors = pb_fragment.actors.clone();
1997
1998 check_fragment(fragment, pb_fragment);
1999 check_actors(
2000 actors,
2001 &upstream_actor_ids,
2002 pb_actors,
2003 pb_actor_splits,
2004 &stream_node,
2005 );
2006
2007 Ok(())
2008 }
2009
2010 fn check_actors(
2011 actors: Vec<ActorInfo>,
2012 actor_upstreams: &FragmentActorUpstreams,
2013 pb_actors: Vec<StreamActor>,
2014 pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2015 stream_node: &PbStreamNode,
2016 ) {
2017 for (
2018 ActorInfo {
2019 actor_id,
2020 fragment_id,
2021 splits,
2022 worker_id: _,
2023 vnode_bitmap,
2024 expr_context,
2025 ..
2026 },
2027 StreamActor {
2028 actor_id: pb_actor_id,
2029 fragment_id: pb_fragment_id,
2030 vnode_bitmap: pb_vnode_bitmap,
2031 mview_definition,
2032 expr_context: pb_expr_context,
2033 ..
2034 },
2035 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2036 {
2037 assert_eq!(actor_id, pb_actor_id as ActorId);
2038 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2039
2040 assert_eq!(
2041 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2042 pb_vnode_bitmap,
2043 );
2044
2045 assert_eq!(mview_definition, "");
2046
2047 visit_stream_node_body(stream_node, |body| {
2048 if let PbNodeBody::Merge(m) = body {
2049 assert!(
2050 actor_upstreams
2051 .get(&(actor_id as _))
2052 .unwrap()
2053 .contains_key(&m.upstream_fragment_id)
2054 );
2055 }
2056 });
2057
2058 assert_eq!(
2059 splits,
2060 pb_actor_splits
2061 .get(&pb_actor_id)
2062 .map(ConnectorSplits::from)
2063 .unwrap_or_default()
2064 );
2065
2066 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2067 }
2068 }
2069
2070 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2071 let Fragment {
2072 fragment_id,
2073 fragment_type_mask,
2074 distribution_type: pb_distribution_type,
2075 actors: _,
2076 state_table_ids: pb_state_table_ids,
2077 maybe_vnode_count: _,
2078 nodes,
2079 } = pb_fragment;
2080
2081 assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2082 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2083 assert_eq!(
2084 pb_distribution_type,
2085 PbFragmentDistributionType::from(fragment.distribution_type)
2086 );
2087
2088 assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2089 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2090 }
2091
2092 #[test]
2093 fn test_parallelism_policy_with_root_fragments() {
2094 #[expect(deprecated)]
2095 let fragment = fragment::Model {
2096 fragment_id: 3.into(),
2097 job_id: TEST_JOB_ID,
2098 fragment_type_mask: 0,
2099 distribution_type: DistributionType::Hash,
2100 stream_node: StreamNode::from(&PbStreamNode::default()),
2101 state_table_ids: TableIdArray::default(),
2102 upstream_fragment_id: Default::default(),
2103 vnode_count: 0,
2104 parallelism: None,
2105 };
2106
2107 let job_parallelism = StreamingParallelism::Fixed(4);
2108
2109 let policy = super::CatalogController::format_fragment_parallelism_policy(
2110 &fragment,
2111 Some(&job_parallelism),
2112 &[],
2113 );
2114
2115 assert_eq!(policy, "inherit(fixed(4))");
2116 }
2117
2118 #[test]
2119 fn test_parallelism_policy_with_upstream_roots() {
2120 #[expect(deprecated)]
2121 let fragment = fragment::Model {
2122 fragment_id: 5.into(),
2123 job_id: TEST_JOB_ID,
2124 fragment_type_mask: 0,
2125 distribution_type: DistributionType::Hash,
2126 stream_node: StreamNode::from(&PbStreamNode::default()),
2127 state_table_ids: TableIdArray::default(),
2128 upstream_fragment_id: Default::default(),
2129 vnode_count: 0,
2130 parallelism: None,
2131 };
2132
2133 let policy = super::CatalogController::format_fragment_parallelism_policy(
2134 &fragment,
2135 None,
2136 &[3.into(), 1.into(), 2.into(), 1.into()],
2137 );
2138
2139 assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2140 }
2141}