1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::CdcScanOptions;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34 Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39 VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink,
40 source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46 Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::fragment::{
49 FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan;
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57 PbDispatchOutputMapping, PbDispatcherType, PbStreamContext, PbStreamNode, PbStreamScanType,
58 StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63 ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, JoinType, PaginatorTrait,
64 QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
65};
66use serde::{Deserialize, Serialize};
67use tracing::debug;
68
69use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
70use crate::controller::catalog::CatalogController;
71use crate::controller::scale::{
72 FragmentRenderMap, NoShuffleEnsemble, find_fragment_no_shuffle_dags_detailed,
73 load_fragment_info, resolve_streaming_job_definition,
74};
75use crate::controller::utils::{
76 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
77 get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
78 resolve_no_shuffle_actor_dispatcher,
79};
80use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
81use crate::model::{
82 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
83 StreamActor, StreamContext, StreamJobFragments, TableParallelism,
84};
85use crate::rpc::ddl_controller::build_upstream_sink_info;
86use crate::stream::UpstreamSinkInfo;
87use crate::{MetaResult, model};
88
89#[derive(Clone, Debug)]
91pub struct InflightActorInfo {
92 pub worker_id: WorkerId,
93 pub vnode_bitmap: Option<Bitmap>,
94 pub splits: Vec<SplitImpl>,
95}
96
97#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
98struct ActorInfo {
99 pub actor_id: ActorId,
100 pub fragment_id: FragmentId,
101 pub splits: ConnectorSplits,
102 pub worker_id: WorkerId,
103 pub vnode_bitmap: Option<VnodeBitmap>,
104 pub expr_context: ExprContext,
105}
106
107#[derive(Clone, Debug)]
108pub struct InflightFragmentInfo {
109 pub fragment_id: crate::model::FragmentId,
110 pub distribution_type: DistributionType,
111 pub fragment_type_mask: FragmentTypeMask,
112 pub vnode_count: usize,
113 pub nodes: PbStreamNode,
114 pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
115 pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
116}
117
118#[derive(Clone, Debug)]
119pub struct FragmentParallelismInfo {
120 pub distribution_type: FragmentDistributionType,
121 pub actor_count: usize,
122 pub vnode_count: usize,
123}
124
125#[easy_ext::ext(FragmentTypeMaskExt)]
126pub impl FragmentTypeMask {
127 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
128 Expr::col(fragment::Column::FragmentTypeMask)
129 .bit_and(Expr::value(flag as i32))
130 .ne(0)
131 }
132
133 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
134 Expr::col(fragment::Column::FragmentTypeMask)
135 .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
136 .ne(0)
137 }
138
139 fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
140 Expr::col(fragment::Column::FragmentTypeMask)
141 .bit_and(Expr::value(flag as i32))
142 .eq(0)
143 }
144}
145
146#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
147#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
149 pub job_id: JobId,
150 pub obj_type: ObjectType,
151 pub name: String,
152 pub job_status: JobStatus,
153 pub parallelism: StreamingParallelism,
154 pub max_parallelism: i32,
155 pub resource_group: String,
156 pub database_id: DatabaseId,
157 pub schema_id: SchemaId,
158}
159
160impl NotificationManager {
161 pub(crate) fn notify_fragment_mapping(
162 &self,
163 operation: NotificationOperation,
164 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
165 ) {
166 let fragment_ids = fragment_mappings
167 .iter()
168 .map(|mapping| mapping.fragment_id)
169 .collect_vec();
170 if fragment_ids.is_empty() {
171 return;
172 }
173 for fragment_mapping in fragment_mappings {
175 self.notify_frontend_without_version(
176 operation,
177 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
178 );
179 }
180
181 match operation {
183 NotificationOperation::Add | NotificationOperation::Update => {
184 self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
185 fragment_ids,
186 ));
187 }
188 NotificationOperation::Delete => {
189 self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
190 fragment_ids,
191 ));
192 }
193 op => {
194 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
195 }
196 }
197 }
198}
199
200impl CatalogController {
201 pub fn prepare_fragment_models_from_fragments(
202 job_id: JobId,
203 fragments: impl Iterator<Item = &Fragment>,
204 ) -> MetaResult<Vec<fragment::Model>> {
205 fragments
206 .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
207 .try_collect()
208 }
209
210 pub fn prepare_fragment_model_for_new_job(
211 job_id: JobId,
212 fragment: &Fragment,
213 ) -> MetaResult<fragment::Model> {
214 let vnode_count = fragment.vnode_count();
215 let Fragment {
216 fragment_id: pb_fragment_id,
217 fragment_type_mask: pb_fragment_type_mask,
218 distribution_type: pb_distribution_type,
219 actors: pb_actors,
220 state_table_ids: pb_state_table_ids,
221 nodes,
222 ..
223 } = fragment;
224
225 let state_table_ids = pb_state_table_ids.clone().into();
226
227 assert!(!pb_actors.is_empty());
228
229 let fragment_parallelism = nodes
230 .node_body
231 .as_ref()
232 .and_then(|body| match body {
233 NodeBody::StreamCdcScan(node) => Some(node),
234 _ => None,
235 })
236 .and_then(|node| node.options.as_ref())
237 .map(CdcScanOptions::from_proto)
238 .filter(|opts| opts.is_parallelized_backfill())
239 .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
240
241 let stream_node = StreamNode::from(nodes);
242
243 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
244 .unwrap()
245 .into();
246
247 #[expect(deprecated)]
248 let fragment = fragment::Model {
249 fragment_id: *pb_fragment_id as _,
250 job_id,
251 fragment_type_mask: (*pb_fragment_type_mask).into(),
252 distribution_type,
253 stream_node,
254 state_table_ids,
255 upstream_fragment_id: Default::default(),
256 vnode_count: vnode_count as _,
257 parallelism: fragment_parallelism,
258 };
259
260 Ok(fragment)
261 }
262
263 fn compose_table_fragments(
264 job_id: JobId,
265 state: PbState,
266 ctx: Option<PbStreamContext>,
267 fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
268 parallelism: StreamingParallelism,
269 max_parallelism: usize,
270 job_definition: Option<String>,
271 ) -> MetaResult<StreamJobFragments> {
272 let mut pb_fragments = BTreeMap::new();
273 let mut pb_actor_status = BTreeMap::new();
274
275 for (fragment, actors) in fragments {
276 let (fragment, fragment_actor_status, _) =
277 Self::compose_fragment(fragment, actors, job_definition.clone())?;
278
279 pb_fragments.insert(fragment.fragment_id, fragment);
280 pb_actor_status.extend(fragment_actor_status.into_iter());
281 }
282
283 let table_fragments = StreamJobFragments {
284 stream_job_id: job_id,
285 state: state as _,
286 fragments: pb_fragments,
287 actor_status: pb_actor_status,
288 ctx: ctx
289 .as_ref()
290 .map(StreamContext::from_protobuf)
291 .unwrap_or_default(),
292 assigned_parallelism: match parallelism {
293 StreamingParallelism::Custom => TableParallelism::Custom,
294 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
295 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
296 },
297 max_parallelism,
298 };
299
300 Ok(table_fragments)
301 }
302
303 #[allow(clippy::type_complexity)]
304 fn compose_fragment(
305 fragment: fragment::Model,
306 actors: Vec<ActorInfo>,
307 job_definition: Option<String>,
308 ) -> MetaResult<(
309 Fragment,
310 HashMap<crate::model::ActorId, PbActorStatus>,
311 HashMap<crate::model::ActorId, PbConnectorSplits>,
312 )> {
313 let fragment::Model {
314 fragment_id,
315 fragment_type_mask,
316 distribution_type,
317 stream_node,
318 state_table_ids,
319 vnode_count,
320 ..
321 } = fragment;
322
323 let stream_node = stream_node.to_protobuf();
324 let mut upstream_fragments = HashSet::new();
325 visit_stream_node_body(&stream_node, |body| {
326 if let NodeBody::Merge(m) = body {
327 assert!(
328 upstream_fragments.insert(m.upstream_fragment_id),
329 "non-duplicate upstream fragment"
330 );
331 }
332 });
333
334 let mut pb_actors = vec![];
335
336 let mut pb_actor_status = HashMap::new();
337 let mut pb_actor_splits = HashMap::new();
338
339 for actor in actors {
340 if actor.fragment_id != fragment_id {
341 bail!(
342 "fragment id {} from actor {} is different from fragment {}",
343 actor.fragment_id,
344 actor.actor_id,
345 fragment_id
346 )
347 }
348
349 let ActorInfo {
350 actor_id,
351 fragment_id,
352 worker_id,
353 splits,
354 vnode_bitmap,
355 expr_context,
356 ..
357 } = actor;
358
359 let vnode_bitmap =
360 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
361 let pb_expr_context = Some(expr_context.to_protobuf());
362
363 pb_actor_status.insert(
364 actor_id as _,
365 PbActorStatus {
366 location: PbActorLocation::from_worker(worker_id),
367 },
368 );
369
370 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
371
372 pb_actors.push(StreamActor {
373 actor_id: actor_id as _,
374 fragment_id: fragment_id as _,
375 vnode_bitmap,
376 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
377 expr_context: pb_expr_context,
378 })
379 }
380
381 let pb_state_table_ids = state_table_ids.0;
382 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
383 let pb_fragment = Fragment {
384 fragment_id: fragment_id as _,
385 fragment_type_mask: fragment_type_mask.into(),
386 distribution_type: pb_distribution_type,
387 actors: pb_actors,
388 state_table_ids: pb_state_table_ids,
389 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
390 nodes: stream_node,
391 };
392
393 Ok((pb_fragment, pb_actor_status, pb_actor_splits))
394 }
395
396 pub fn running_fragment_parallelisms(
397 &self,
398 id_filter: Option<HashSet<FragmentId>>,
399 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
400 let info = self.env.shared_actor_infos().read_guard();
401
402 let mut result = HashMap::new();
403 for (fragment_id, fragment) in info.iter_over_fragments() {
404 if let Some(id_filter) = &id_filter
405 && !id_filter.contains(&(*fragment_id as _))
406 {
407 continue; }
409
410 result.insert(
411 *fragment_id as _,
412 FragmentParallelismInfo {
413 distribution_type: fragment.distribution_type.into(),
414 actor_count: fragment.actors.len() as _,
415 vnode_count: fragment.vnode_count,
416 },
417 );
418 }
419
420 Ok(result)
421 }
422
423 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
424 let inner = self.inner.read().await;
425 let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
426 .select_only()
427 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
428 .into_tuple()
429 .all(&inner.db)
430 .await?;
431 Ok(fragment_jobs.into_iter().collect())
432 }
433
434 pub async fn get_fragment_job_id(
435 &self,
436 fragment_ids: Vec<FragmentId>,
437 ) -> MetaResult<Vec<ObjectId>> {
438 let inner = self.inner.read().await;
439
440 let object_ids: Vec<ObjectId> = FragmentModel::find()
441 .select_only()
442 .column(fragment::Column::JobId)
443 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
444 .into_tuple()
445 .all(&inner.db)
446 .await?;
447
448 Ok(object_ids)
449 }
450
451 pub async fn get_fragment_desc_by_id(
452 &self,
453 fragment_id: FragmentId,
454 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
455 let inner = self.inner.read().await;
456
457 let fragment_model = match FragmentModel::find_by_id(fragment_id)
458 .one(&inner.db)
459 .await?
460 {
461 Some(fragment) => fragment,
462 None => return Ok(None),
463 };
464
465 let job_parallelism: Option<StreamingParallelism> =
466 StreamingJob::find_by_id(fragment_model.job_id)
467 .select_only()
468 .column(streaming_job::Column::Parallelism)
469 .into_tuple()
470 .one(&inner.db)
471 .await?;
472
473 let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
474 .select_only()
475 .columns([
476 fragment_relation::Column::SourceFragmentId,
477 fragment_relation::Column::DispatcherType,
478 ])
479 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
480 .into_tuple()
481 .all(&inner.db)
482 .await?;
483
484 let upstreams: Vec<_> = upstream_entries
485 .into_iter()
486 .map(|(source_id, _)| source_id)
487 .collect();
488
489 let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
490 .await
491 .map(Self::collect_root_fragment_mapping)?;
492 let root_fragments = root_fragment_map
493 .get(&fragment_id)
494 .cloned()
495 .unwrap_or_default();
496
497 let info = self.env.shared_actor_infos().read_guard();
498 let SharedFragmentInfo { actors, .. } = info
499 .get_fragment(fragment_model.fragment_id as _)
500 .unwrap_or_else(|| {
501 panic!(
502 "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
503 fragment_model.fragment_id,
504 fragment_model.job_id
505 )
506 });
507
508 let parallelism_policy = Self::format_fragment_parallelism_policy(
509 &fragment_model,
510 job_parallelism.as_ref(),
511 &root_fragments,
512 );
513
514 let fragment = FragmentDesc {
515 fragment_id: fragment_model.fragment_id,
516 job_id: fragment_model.job_id,
517 fragment_type_mask: fragment_model.fragment_type_mask,
518 distribution_type: fragment_model.distribution_type,
519 state_table_ids: fragment_model.state_table_ids.clone(),
520 parallelism: actors.len() as _,
521 vnode_count: fragment_model.vnode_count,
522 stream_node: fragment_model.stream_node.clone(),
523 parallelism_policy,
524 };
525
526 Ok(Some((fragment, upstreams)))
527 }
528
529 pub async fn list_fragment_database_ids(
530 &self,
531 select_fragment_ids: Option<Vec<FragmentId>>,
532 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
533 let inner = self.inner.read().await;
534 let select = FragmentModel::find()
535 .select_only()
536 .column(fragment::Column::FragmentId)
537 .column(object::Column::DatabaseId)
538 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
539 let select = if let Some(select_fragment_ids) = select_fragment_ids {
540 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
541 } else {
542 select
543 };
544 Ok(select.into_tuple().all(&inner.db).await?)
545 }
546
547 pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
548 let inner = self.inner.read().await;
549
550 let fragments: Vec<_> = FragmentModel::find()
552 .filter(fragment::Column::JobId.eq(job_id))
553 .all(&inner.db)
554 .await?;
555
556 let job_info = StreamingJob::find_by_id(job_id)
557 .one(&inner.db)
558 .await?
559 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
560
561 let fragment_actors =
562 self.collect_fragment_actor_pairs(fragments, job_info.timezone.clone())?;
563
564 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
565 .await?
566 .remove(&job_id);
567
568 Self::compose_table_fragments(
569 job_id,
570 job_info.job_status.into(),
571 job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
572 fragment_actors,
573 job_info.parallelism.clone(),
574 job_info.max_parallelism as _,
575 job_definition,
576 )
577 }
578
579 pub async fn get_fragment_actor_dispatchers(
580 &self,
581 fragment_ids: Vec<FragmentId>,
582 ) -> MetaResult<FragmentActorDispatchers> {
583 let inner = self.inner.read().await;
584
585 self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
586 .await
587 }
588
589 pub async fn get_fragment_actor_dispatchers_txn(
590 &self,
591 c: &impl ConnectionTrait,
592 fragment_ids: Vec<FragmentId>,
593 ) -> MetaResult<FragmentActorDispatchers> {
594 let fragment_relations = FragmentRelation::find()
595 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
596 .all(c)
597 .await?;
598
599 type FragmentActorInfo = (
600 DistributionType,
601 Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
602 );
603
604 let shared_info = self.env.shared_actor_infos();
605 let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
606 let get_fragment_actors = |fragment_id: FragmentId| async move {
607 let result: MetaResult<FragmentActorInfo> = try {
608 let read_guard = shared_info.read_guard();
609
610 let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
611
612 (
613 fragment.distribution_type,
614 Arc::new(
615 fragment
616 .actors
617 .iter()
618 .map(|(actor_id, actor_info)| {
619 (
620 *actor_id,
621 actor_info
622 .vnode_bitmap
623 .as_ref()
624 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
625 )
626 })
627 .collect(),
628 ),
629 )
630 };
631 result
632 };
633
634 let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
635 for fragment_relation::Model {
636 source_fragment_id,
637 target_fragment_id,
638 dispatcher_type,
639 dist_key_indices,
640 output_indices,
641 output_type_mapping,
642 } in fragment_relations
643 {
644 let (source_fragment_distribution, source_fragment_actors) = {
645 let (distribution, actors) = {
646 match fragment_actor_cache.entry(source_fragment_id) {
647 Entry::Occupied(entry) => entry.into_mut(),
648 Entry::Vacant(entry) => {
649 entry.insert(get_fragment_actors(source_fragment_id).await?)
650 }
651 }
652 };
653 (*distribution, actors.clone())
654 };
655 let (target_fragment_distribution, target_fragment_actors) = {
656 let (distribution, actors) = {
657 match fragment_actor_cache.entry(target_fragment_id) {
658 Entry::Occupied(entry) => entry.into_mut(),
659 Entry::Vacant(entry) => {
660 entry.insert(get_fragment_actors(target_fragment_id).await?)
661 }
662 }
663 };
664 (*distribution, actors.clone())
665 };
666 let output_mapping = PbDispatchOutputMapping {
667 indices: output_indices.into_u32_array(),
668 types: output_type_mapping.unwrap_or_default().to_protobuf(),
669 };
670 let dispatchers = compose_dispatchers(
671 source_fragment_distribution,
672 &source_fragment_actors,
673 target_fragment_id as _,
674 target_fragment_distribution,
675 &target_fragment_actors,
676 dispatcher_type,
677 dist_key_indices.into_u32_array(),
678 output_mapping,
679 );
680 let actor_dispatchers_map = actor_dispatchers_map
681 .entry(source_fragment_id as _)
682 .or_default();
683 for (actor_id, dispatchers) in dispatchers {
684 actor_dispatchers_map
685 .entry(actor_id as _)
686 .or_default()
687 .push(dispatchers);
688 }
689 }
690 Ok(actor_dispatchers_map)
691 }
692
693 pub async fn get_fragment_downstream_relations(
694 &self,
695 fragment_ids: Vec<FragmentId>,
696 ) -> MetaResult<FragmentDownstreamRelation> {
697 let inner = self.inner.read().await;
698 let mut stream = FragmentRelation::find()
699 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
700 .stream(&inner.db)
701 .await?;
702 let mut relations = FragmentDownstreamRelation::new();
703 while let Some(relation) = stream.try_next().await? {
704 relations
705 .entry(relation.source_fragment_id as _)
706 .or_default()
707 .push(DownstreamFragmentRelation {
708 downstream_fragment_id: relation.target_fragment_id as _,
709 dispatcher_type: relation.dispatcher_type,
710 dist_key_indices: relation.dist_key_indices.into_u32_array(),
711 output_mapping: PbDispatchOutputMapping {
712 indices: relation.output_indices.into_u32_array(),
713 types: relation
714 .output_type_mapping
715 .unwrap_or_default()
716 .to_protobuf(),
717 },
718 });
719 }
720 Ok(relations)
721 }
722
723 pub async fn get_job_fragment_backfill_scan_type(
724 &self,
725 job_id: ObjectId,
726 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
727 let inner = self.inner.read().await;
728 let fragments: Vec<_> = FragmentModel::find()
729 .filter(fragment::Column::JobId.eq(job_id))
730 .all(&inner.db)
731 .await?;
732
733 let mut result = HashMap::new();
734
735 for fragment::Model {
736 fragment_id,
737 stream_node,
738 ..
739 } in fragments
740 {
741 let stream_node = stream_node.to_protobuf();
742 visit_stream_node_body(&stream_node, |body| {
743 if let NodeBody::StreamScan(node) = body {
744 match node.stream_scan_type() {
745 StreamScanType::Unspecified => {}
746 scan_type => {
747 result.insert(fragment_id as crate::model::FragmentId, scan_type);
748 }
749 }
750 }
751 });
752 }
753
754 Ok(result)
755 }
756
757 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
758 let inner = self.inner.read().await;
759 let job_states = StreamingJob::find()
760 .select_only()
761 .column(streaming_job::Column::JobId)
762 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
763 .join(JoinType::InnerJoin, object::Relation::Database2.def())
764 .column(object::Column::ObjType)
765 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
766 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
767 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
768 .column_as(
769 Expr::if_null(
770 Expr::col((table::Entity, table::Column::Name)),
771 Expr::if_null(
772 Expr::col((source::Entity, source::Column::Name)),
773 Expr::if_null(
774 Expr::col((sink::Entity, sink::Column::Name)),
775 Expr::val("<unknown>"),
776 ),
777 ),
778 ),
779 "name",
780 )
781 .columns([
782 streaming_job::Column::JobStatus,
783 streaming_job::Column::Parallelism,
784 streaming_job::Column::MaxParallelism,
785 ])
786 .column_as(
787 Expr::if_null(
788 Expr::col((
789 streaming_job::Entity,
790 streaming_job::Column::SpecificResourceGroup,
791 )),
792 Expr::col((database::Entity, database::Column::ResourceGroup)),
793 ),
794 "resource_group",
795 )
796 .column(object::Column::DatabaseId)
797 .column(object::Column::SchemaId)
798 .into_model()
799 .all(&inner.db)
800 .await?;
801 Ok(job_states)
802 }
803
804 pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
805 let inner = self.inner.read().await;
806 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
807 .select_only()
808 .column(streaming_job::Column::MaxParallelism)
809 .into_tuple()
810 .one(&inner.db)
811 .await?
812 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
813 Ok(max_parallelism as usize)
814 }
815
816 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
818 if let Ok(inner) = self.inner.try_read()
819 && let Ok(job_state_tables) = FragmentModel::find()
820 .select_only()
821 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
822 .into_tuple::<(JobId, I32Array)>()
823 .all(&inner.db)
824 .await
825 {
826 let mut job_internal_table_ids = HashMap::new();
827 for (job_id, state_table_ids) in job_state_tables {
828 job_internal_table_ids
829 .entry(job_id)
830 .or_insert_with(Vec::new)
831 .extend(
832 state_table_ids
833 .into_inner()
834 .into_iter()
835 .map(|table_id| TableId::new(table_id as _)),
836 );
837 }
838 return Some(job_internal_table_ids.into_iter().collect());
839 }
840 None
841 }
842
843 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
844 let inner = self.inner.read().await;
845 let count = FragmentModel::find().count(&inner.db).await?;
846 Ok(count > 0)
847 }
848
849 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
850 let read_guard = self.env.shared_actor_infos().read_guard();
851 let actor_cnt: HashMap<WorkerId, _> = read_guard
852 .iter_over_fragments()
853 .flat_map(|(_, fragment)| {
854 fragment
855 .actors
856 .iter()
857 .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
858 })
859 .into_group_map()
860 .into_iter()
861 .map(|(k, v)| (k, v.len()))
862 .collect();
863
864 Ok(actor_cnt)
865 }
866
867 fn collect_fragment_actor_map(
868 &self,
869 fragment_ids: &[FragmentId],
870 timezone: Option<String>,
871 ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
872 let guard = self.env.shared_actor_infos().read_guard();
873 let stream_context = StreamContext { timezone };
874 let pb_expr_context = stream_context.to_expr_context();
875 let expr_context: ExprContext = (&pb_expr_context).into();
876
877 let mut actor_map = HashMap::with_capacity(fragment_ids.len());
878 for fragment_id in fragment_ids {
879 let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
880 anyhow!("fragment {} not found in shared actor info", fragment_id)
881 })?;
882
883 let actors = fragment_info
884 .actors
885 .iter()
886 .map(|(actor_id, actor_info)| ActorInfo {
887 actor_id: *actor_id as _,
888 fragment_id: *fragment_id,
889 splits: ConnectorSplits::from(&PbConnectorSplits {
890 splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
891 }),
892 worker_id: actor_info.worker_id as _,
893 vnode_bitmap: actor_info
894 .vnode_bitmap
895 .as_ref()
896 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
897 expr_context: expr_context.clone(),
898 })
899 .collect();
900
901 actor_map.insert(*fragment_id, actors);
902 }
903
904 Ok(actor_map)
905 }
906
907 fn collect_fragment_actor_pairs(
908 &self,
909 fragments: Vec<fragment::Model>,
910 timezone: Option<String>,
911 ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
912 let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
913 let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, timezone)?;
914 fragments
915 .into_iter()
916 .map(|fragment| {
917 let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
918 anyhow!(
919 "fragment {} missing in shared actor info map",
920 fragment.fragment_id
921 )
922 })?;
923 Ok((fragment, actors))
924 })
925 .collect()
926 }
927
928 pub async fn table_fragments(&self) -> MetaResult<BTreeMap<JobId, StreamJobFragments>> {
930 let inner = self.inner.read().await;
931 let jobs = StreamingJob::find().all(&inner.db).await?;
932
933 let mut job_definition = resolve_streaming_job_definition(
934 &inner.db,
935 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
936 )
937 .await?;
938
939 let mut table_fragments = BTreeMap::new();
940 for job in jobs {
941 let fragments = FragmentModel::find()
942 .filter(fragment::Column::JobId.eq(job.job_id))
943 .all(&inner.db)
944 .await?;
945
946 let fragment_actors =
947 self.collect_fragment_actor_pairs(fragments, job.timezone.clone())?;
948
949 table_fragments.insert(
950 job.job_id,
951 Self::compose_table_fragments(
952 job.job_id,
953 job.job_status.into(),
954 job.timezone.map(|tz| PbStreamContext { timezone: tz }),
955 fragment_actors,
956 job.parallelism.clone(),
957 job.max_parallelism as _,
958 job_definition.remove(&job.job_id),
959 )?,
960 );
961 }
962
963 Ok(table_fragments)
964 }
965
966 pub async fn upstream_fragments(
967 &self,
968 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
969 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
970 let inner = self.inner.read().await;
971 let mut stream = FragmentRelation::find()
972 .select_only()
973 .columns([
974 fragment_relation::Column::SourceFragmentId,
975 fragment_relation::Column::TargetFragmentId,
976 ])
977 .filter(
978 fragment_relation::Column::TargetFragmentId
979 .is_in(fragment_ids.map(|id| id as FragmentId)),
980 )
981 .into_tuple::<(FragmentId, FragmentId)>()
982 .stream(&inner.db)
983 .await?;
984 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
985 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
986 upstream_fragments
987 .entry(downstream_fragment_id as crate::model::FragmentId)
988 .or_default()
989 .insert(upstream_fragment_id as crate::model::FragmentId);
990 }
991 Ok(upstream_fragments)
992 }
993
994 pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
995 let info = self.env.shared_actor_infos().read_guard();
996
997 let actor_locations = info
998 .iter_over_fragments()
999 .flat_map(|(fragment_id, fragment)| {
1000 fragment
1001 .actors
1002 .iter()
1003 .map(|(actor_id, actor)| PartialActorLocation {
1004 actor_id: *actor_id as _,
1005 fragment_id: *fragment_id as _,
1006 worker_id: actor.worker_id,
1007 })
1008 })
1009 .collect_vec();
1010
1011 Ok(actor_locations)
1012 }
1013
1014 pub async fn list_actor_info(
1015 &self,
1016 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1017 let inner = self.inner.read().await;
1018
1019 let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1020 FragmentModel::find()
1021 .select_only()
1022 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1023 .column(fragment::Column::FragmentId)
1024 .column_as(object::Column::Oid, "job_id")
1025 .column_as(object::Column::SchemaId, "schema_id")
1026 .column_as(object::Column::ObjType, "type")
1027 .into_tuple()
1028 .all(&inner.db)
1029 .await?;
1030
1031 let actor_infos = {
1032 let info = self.env.shared_actor_infos().read_guard();
1033
1034 fragment_objects
1035 .into_iter()
1036 .flat_map(|(fragment_id, object_id, schema_id, object_type)| {
1037 let SharedFragmentInfo {
1038 fragment_id,
1039 actors,
1040 ..
1041 } = info.get_fragment(fragment_id as _).unwrap();
1042 actors.keys().map(move |actor_id| {
1043 (
1044 *actor_id as _,
1045 *fragment_id as _,
1046 object_id,
1047 schema_id,
1048 object_type,
1049 )
1050 })
1051 })
1052 .collect_vec()
1053 };
1054
1055 Ok(actor_infos)
1056 }
1057
1058 pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1059 let guard = self.env.shared_actor_info.read_guard();
1060 guard
1061 .iter_over_fragments()
1062 .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1063 .collect_vec()
1064 }
1065
1066 pub async fn list_fragment_descs(
1067 &self,
1068 is_creating: bool,
1069 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1070 let inner = self.inner.read().await;
1071
1072 let txn = inner.db.begin().await?;
1073
1074 let fragments: Vec<_> = if is_creating {
1075 FragmentModel::find()
1076 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1077 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1078 .filter(
1079 streaming_job::Column::JobStatus
1080 .eq(JobStatus::Initial)
1081 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1082 )
1083 .all(&txn)
1084 .await?
1085 } else {
1086 FragmentModel::find().all(&txn).await?
1087 };
1088
1089 let fragment_ids = fragments.iter().map(|f| f.fragment_id).collect_vec();
1090
1091 let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragments.is_empty() {
1092 HashMap::new()
1093 } else {
1094 let job_ids = fragments.iter().map(|f| f.job_id).unique().collect_vec();
1095 StreamingJob::find()
1096 .select_only()
1097 .columns([
1098 streaming_job::Column::JobId,
1099 streaming_job::Column::Parallelism,
1100 ])
1101 .filter(streaming_job::Column::JobId.is_in(job_ids))
1102 .into_tuple()
1103 .all(&txn)
1104 .await?
1105 .into_iter()
1106 .collect()
1107 };
1108
1109 let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1110 if fragment_ids.is_empty() {
1111 Vec::new()
1112 } else {
1113 FragmentRelation::find()
1114 .select_only()
1115 .columns([
1116 fragment_relation::Column::TargetFragmentId,
1117 fragment_relation::Column::SourceFragmentId,
1118 fragment_relation::Column::DispatcherType,
1119 ])
1120 .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1121 .into_tuple()
1122 .all(&txn)
1123 .await?
1124 };
1125
1126 let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1127 for (target_id, source_id, _) in upstream_entries {
1128 all_upstreams.entry(target_id).or_default().push(source_id);
1129 }
1130
1131 let root_fragment_map = if fragment_ids.is_empty() {
1132 HashMap::new()
1133 } else {
1134 let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?;
1135 Self::collect_root_fragment_mapping(ensembles)
1136 };
1137
1138 let guard = self.env.shared_actor_info.read_guard();
1139
1140 let mut result = Vec::new();
1141
1142 for fragment_desc in fragments {
1143 let parallelism = guard
1145 .get_fragment(fragment_desc.fragment_id as _)
1146 .map(|fragment| fragment.actors.len())
1147 .unwrap_or_default();
1148
1149 let root_fragments = root_fragment_map
1150 .get(&fragment_desc.fragment_id)
1151 .cloned()
1152 .unwrap_or_default();
1153
1154 let upstreams = all_upstreams
1155 .remove(&fragment_desc.fragment_id)
1156 .unwrap_or_default();
1157
1158 let parallelism_policy = Self::format_fragment_parallelism_policy(
1159 &fragment_desc,
1160 job_parallelisms.get(&fragment_desc.job_id),
1161 &root_fragments,
1162 );
1163
1164 let fragment = FragmentDistribution {
1165 fragment_id: fragment_desc.fragment_id,
1166 table_id: fragment_desc.job_id,
1167 distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type)
1168 as _,
1169 state_table_ids: fragment_desc.state_table_ids.0,
1170 upstream_fragment_ids: upstreams.clone(),
1171 fragment_type_mask: fragment_desc.fragment_type_mask as _,
1172 parallelism: parallelism as _,
1173 vnode_count: fragment_desc.vnode_count as _,
1174 node: Some(fragment_desc.stream_node.to_protobuf()),
1175 parallelism_policy,
1176 };
1177
1178 result.push((fragment, upstreams));
1179 }
1180 Ok(result)
1181 }
1182
1183 fn collect_root_fragment_mapping(
1185 ensembles: Vec<NoShuffleEnsemble>,
1186 ) -> HashMap<FragmentId, Vec<FragmentId>> {
1187 let mut mapping = HashMap::new();
1188
1189 for ensemble in ensembles {
1190 let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1191 roots.sort_unstable();
1192 roots.dedup();
1193
1194 if roots.is_empty() {
1195 continue;
1196 }
1197
1198 let root_set: HashSet<_> = roots.iter().copied().collect();
1199
1200 for fragment_id in ensemble.component_fragments() {
1201 if root_set.contains(&fragment_id) {
1202 mapping.insert(fragment_id, Vec::new());
1203 } else {
1204 mapping.insert(fragment_id, roots.clone());
1205 }
1206 }
1207 }
1208
1209 mapping
1210 }
1211
1212 fn format_fragment_parallelism_policy(
1213 fragment: &fragment::Model,
1214 job_parallelism: Option<&StreamingParallelism>,
1215 root_fragments: &[FragmentId],
1216 ) -> String {
1217 if fragment.distribution_type == DistributionType::Single {
1218 return "single".to_owned();
1219 }
1220
1221 if let Some(parallelism) = fragment.parallelism.as_ref() {
1222 return format!(
1223 "override({})",
1224 Self::format_streaming_parallelism(parallelism)
1225 );
1226 }
1227
1228 if !root_fragments.is_empty() {
1229 let mut upstreams = root_fragments.to_vec();
1230 upstreams.sort_unstable();
1231 upstreams.dedup();
1232
1233 return format!("upstream_fragment({upstreams:?})");
1234 }
1235
1236 let inherited = job_parallelism
1237 .map(Self::format_streaming_parallelism)
1238 .unwrap_or_else(|| "unknown".to_owned());
1239 format!("inherit({inherited})")
1240 }
1241
1242 fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1243 match parallelism {
1244 StreamingParallelism::Adaptive => "adaptive".to_owned(),
1245 StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1246 StreamingParallelism::Custom => "custom".to_owned(),
1247 }
1248 }
1249
1250 pub async fn list_sink_actor_mapping(
1251 &self,
1252 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1253 let inner = self.inner.read().await;
1254 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1255 .select_only()
1256 .columns([sink::Column::SinkId, sink::Column::Name])
1257 .into_tuple()
1258 .all(&inner.db)
1259 .await?;
1260 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1261
1262 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1263
1264 let actor_with_type: Vec<(ActorId, SinkId)> = {
1265 let info = self.env.shared_actor_infos().read_guard();
1266
1267 info.iter_over_fragments()
1268 .filter(|(_, fragment)| {
1269 sink_ids.contains(&fragment.job_id.as_sink_id())
1270 && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1271 })
1272 .flat_map(|(_, fragment)| {
1273 fragment
1274 .actors
1275 .keys()
1276 .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1277 })
1278 .collect()
1279 };
1280
1281 let mut sink_actor_mapping = HashMap::new();
1282 for (actor_id, sink_id) in actor_with_type {
1283 sink_actor_mapping
1284 .entry(sink_id)
1285 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1286 .1
1287 .push(actor_id);
1288 }
1289
1290 Ok(sink_actor_mapping)
1291 }
1292
1293 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1294 let inner = self.inner.read().await;
1295 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1296 .select_only()
1297 .columns([
1298 fragment::Column::FragmentId,
1299 fragment::Column::JobId,
1300 fragment::Column::StateTableIds,
1301 ])
1302 .into_partial_model()
1303 .all(&inner.db)
1304 .await?;
1305 Ok(fragment_state_tables)
1306 }
1307
1308 pub async fn load_all_actors_dynamic(
1311 &self,
1312 database_id: Option<DatabaseId>,
1313 worker_nodes: &ActiveStreamingWorkerNodes,
1314 ) -> MetaResult<FragmentRenderMap> {
1315 let adaptive_parallelism_strategy = {
1316 let system_params_reader = self.env.system_params_reader().await;
1317 system_params_reader.adaptive_parallelism_strategy()
1318 };
1319
1320 let inner = self.inner.read().await;
1321 let txn = inner.db.begin().await?;
1322
1323 let database_fragment_infos = load_fragment_info(
1324 &txn,
1325 self.env.actor_id_generator(),
1326 database_id,
1327 worker_nodes,
1328 adaptive_parallelism_strategy,
1329 )
1330 .await?;
1331
1332 debug!(?database_fragment_infos, "reload all actors");
1333
1334 Ok(database_fragment_infos)
1335 }
1336
1337 #[await_tree::instrument]
1338 pub async fn fill_snapshot_backfill_epoch(
1339 &self,
1340 fragment_ids: impl Iterator<Item = FragmentId>,
1341 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1342 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1343 ) -> MetaResult<()> {
1344 let inner = self.inner.write().await;
1345 let txn = inner.db.begin().await?;
1346 for fragment_id in fragment_ids {
1347 let fragment = FragmentModel::find_by_id(fragment_id)
1348 .one(&txn)
1349 .await?
1350 .context(format!("fragment {} not found", fragment_id))?;
1351 let mut node = fragment.stream_node.to_protobuf();
1352 if crate::stream::fill_snapshot_backfill_epoch(
1353 &mut node,
1354 snapshot_backfill_info,
1355 cross_db_snapshot_backfill_info,
1356 )? {
1357 let node = StreamNode::from(&node);
1358 FragmentModel::update(fragment::ActiveModel {
1359 fragment_id: Set(fragment_id),
1360 stream_node: Set(node),
1361 ..Default::default()
1362 })
1363 .exec(&txn)
1364 .await?;
1365 }
1366 }
1367 txn.commit().await?;
1368 Ok(())
1369 }
1370
1371 pub fn get_running_actors_of_fragment(
1373 &self,
1374 fragment_id: FragmentId,
1375 ) -> MetaResult<HashSet<model::ActorId>> {
1376 let info = self.env.shared_actor_infos().read_guard();
1377
1378 let actors = info
1379 .get_fragment(fragment_id as _)
1380 .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1381 .unwrap_or_default();
1382
1383 Ok(actors)
1384 }
1385
1386 pub async fn get_running_actors_for_source_backfill(
1389 &self,
1390 source_backfill_fragment_id: FragmentId,
1391 source_fragment_id: FragmentId,
1392 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1393 let inner = self.inner.read().await;
1394 let txn = inner.db.begin().await?;
1395
1396 let fragment_relation: DispatcherType = FragmentRelation::find()
1397 .select_only()
1398 .column(fragment_relation::Column::DispatcherType)
1399 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1400 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1401 .into_tuple()
1402 .one(&txn)
1403 .await?
1404 .ok_or_else(|| {
1405 anyhow!(
1406 "no fragment connection from source fragment {} to source backfill fragment {}",
1407 source_fragment_id,
1408 source_backfill_fragment_id
1409 )
1410 })?;
1411
1412 if fragment_relation != DispatcherType::NoShuffle {
1413 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1414 }
1415
1416 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1417 let result: MetaResult<DistributionType> = try {
1418 FragmentModel::find_by_id(fragment_id)
1419 .select_only()
1420 .column(fragment::Column::DistributionType)
1421 .into_tuple()
1422 .one(txn)
1423 .await?
1424 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1425 };
1426 result
1427 };
1428
1429 let source_backfill_distribution_type =
1430 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1431 let source_distribution_type =
1432 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1433
1434 let load_fragment_actor_distribution =
1435 |actor_info: &SharedActorInfos,
1436 fragment_id: FragmentId|
1437 -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1438 let guard = actor_info.read_guard();
1439
1440 guard
1441 .get_fragment(fragment_id as _)
1442 .map(|fragment| {
1443 fragment
1444 .actors
1445 .iter()
1446 .map(|(actor_id, actor)| {
1447 (
1448 *actor_id as _,
1449 actor
1450 .vnode_bitmap
1451 .as_ref()
1452 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1453 )
1454 })
1455 .collect()
1456 })
1457 .unwrap_or_default()
1458 };
1459
1460 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1461 load_fragment_actor_distribution(
1462 self.env.shared_actor_infos(),
1463 source_backfill_fragment_id,
1464 );
1465
1466 let source_actors =
1467 load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1468
1469 Ok(resolve_no_shuffle_actor_dispatcher(
1470 source_distribution_type,
1471 &source_actors,
1472 source_backfill_distribution_type,
1473 &source_backfill_actors,
1474 )
1475 .into_iter()
1476 .map(|(source_actor, source_backfill_actor)| {
1477 (source_backfill_actor as _, source_actor as _)
1478 })
1479 .collect())
1480 }
1481
1482 pub async fn get_root_fragments(
1495 &self,
1496 job_ids: Vec<JobId>,
1497 ) -> MetaResult<(
1498 HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1499 HashMap<ActorId, WorkerId>,
1500 )> {
1501 let inner = self.inner.read().await;
1502
1503 let all_fragments = FragmentModel::find()
1504 .filter(fragment::Column::JobId.is_in(job_ids))
1505 .all(&inner.db)
1506 .await?;
1507 let mut root_fragments = HashMap::<JobId, fragment::Model>::new();
1509 for fragment in all_fragments {
1510 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1511 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1512 _ = root_fragments.insert(fragment.job_id, fragment);
1513 } else if mask.contains(FragmentTypeFlag::Source) {
1514 _ = root_fragments.try_insert(fragment.job_id, fragment);
1517 }
1518 }
1519
1520 let mut root_fragments_pb = HashMap::new();
1521
1522 let info = self.env.shared_actor_infos().read_guard();
1523
1524 let root_fragment_to_jobs: HashMap<_, _> = root_fragments
1525 .iter()
1526 .map(|(job_id, fragment)| (fragment.fragment_id, *job_id))
1527 .collect();
1528
1529 for fragment in root_fragment_to_jobs.keys() {
1530 let fragment_info = info.get_fragment(*fragment).context(format!(
1531 "fragment {} not found in shared actor info",
1532 fragment
1533 ))?;
1534
1535 let job_id = root_fragment_to_jobs[&fragment_info.fragment_id];
1536 let fragment = root_fragments
1537 .get(&job_id)
1538 .context(format!("root fragment for job {} not found", job_id))?;
1539
1540 root_fragments_pb.insert(
1541 job_id,
1542 (fragment_info.clone(), fragment.stream_node.to_protobuf()),
1543 );
1544 }
1545
1546 let mut all_actor_locations = HashMap::new();
1547
1548 for (_, SharedFragmentInfo { actors, .. }) in info.iter_over_fragments() {
1549 for (actor_id, actor_info) in actors {
1550 all_actor_locations.insert(*actor_id as ActorId, actor_info.worker_id);
1551 }
1552 }
1553
1554 Ok((root_fragments_pb, all_actor_locations))
1555 }
1556
1557 pub async fn get_root_fragment(
1558 &self,
1559 job_id: JobId,
1560 ) -> MetaResult<(SharedFragmentInfo, HashMap<ActorId, WorkerId>)> {
1561 let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1562 let (root_fragment, _) = root_fragments
1563 .remove(&job_id)
1564 .context(format!("root fragment for job {} not found", job_id))?;
1565
1566 Ok((root_fragment, actors))
1567 }
1568
1569 pub async fn get_downstream_fragments(
1571 &self,
1572 job_id: JobId,
1573 ) -> MetaResult<(
1574 Vec<(
1575 stream_plan::DispatcherType,
1576 SharedFragmentInfo,
1577 PbStreamNode,
1578 )>,
1579 HashMap<ActorId, WorkerId>,
1580 )> {
1581 let (root_fragment, actor_locations) = self.get_root_fragment(job_id).await?;
1582
1583 let inner = self.inner.read().await;
1584 let txn = inner.db.begin().await?;
1585 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1586 .filter(
1587 fragment_relation::Column::SourceFragmentId
1588 .eq(root_fragment.fragment_id as FragmentId),
1589 )
1590 .all(&txn)
1591 .await?;
1592
1593 let downstream_fragment_ids = downstream_fragment_relations
1594 .iter()
1595 .map(|model| model.target_fragment_id as FragmentId)
1596 .collect::<HashSet<_>>();
1597
1598 let downstream_fragment_nodes: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1599 .select_only()
1600 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1601 .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1602 .into_tuple()
1603 .all(&txn)
1604 .await?;
1605
1606 let downstream_fragment_nodes: HashMap<_, _> =
1607 downstream_fragment_nodes.into_iter().collect();
1608
1609 let mut downstream_fragments = vec![];
1610
1611 let info = self.env.shared_actor_infos().read_guard();
1612
1613 let fragment_map: HashMap<_, _> = downstream_fragment_relations
1614 .iter()
1615 .map(|model| (model.target_fragment_id, model.dispatcher_type))
1616 .collect();
1617
1618 for fragment_id in fragment_map.keys() {
1619 let fragment_info @ SharedFragmentInfo { actors, .. } =
1620 info.get_fragment(*fragment_id).unwrap();
1621
1622 let dispatcher_type = fragment_map[fragment_id];
1623
1624 if actors.is_empty() {
1625 bail!("No fragment found for fragment id {}", fragment_id);
1626 }
1627
1628 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1629
1630 let nodes = downstream_fragment_nodes
1631 .get(fragment_id)
1632 .context(format!(
1633 "downstream fragment node for id {} not found",
1634 fragment_id
1635 ))?
1636 .to_protobuf();
1637
1638 downstream_fragments.push((dispatch_type, fragment_info.clone(), nodes));
1639 }
1640 Ok((downstream_fragments, actor_locations))
1641 }
1642
1643 pub async fn load_source_fragment_ids(
1644 &self,
1645 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1646 let inner = self.inner.read().await;
1647 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1648 .select_only()
1649 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1650 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1651 .into_tuple()
1652 .all(&inner.db)
1653 .await?;
1654
1655 let mut source_fragment_ids = HashMap::new();
1656 for (fragment_id, stream_node) in fragments {
1657 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1658 source_fragment_ids
1659 .entry(source_id)
1660 .or_insert_with(BTreeSet::new)
1661 .insert(fragment_id);
1662 }
1663 }
1664 Ok(source_fragment_ids)
1665 }
1666
1667 pub async fn load_backfill_fragment_ids(
1668 &self,
1669 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1670 let inner = self.inner.read().await;
1671 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1672 .select_only()
1673 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1674 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1675 .into_tuple()
1676 .all(&inner.db)
1677 .await?;
1678
1679 let mut source_fragment_ids = HashMap::new();
1680 for (fragment_id, stream_node) in fragments {
1681 if let Some((source_id, upstream_source_fragment_id)) =
1682 stream_node.to_protobuf().find_source_backfill()
1683 {
1684 source_fragment_ids
1685 .entry(source_id)
1686 .or_insert_with(BTreeSet::new)
1687 .insert((fragment_id, upstream_source_fragment_id));
1688 }
1689 }
1690 Ok(source_fragment_ids)
1691 }
1692
1693 pub async fn get_all_upstream_sink_infos(
1694 &self,
1695 target_table: &PbTable,
1696 target_fragment_id: FragmentId,
1697 ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1698 let incoming_sinks = self.get_table_incoming_sinks(target_table.id).await?;
1699
1700 let inner = self.inner.read().await;
1701 let txn = inner.db.begin().await?;
1702
1703 let sink_ids = incoming_sinks.iter().map(|s| s.id).collect_vec();
1704 let sink_fragment_ids = get_sink_fragment_by_ids(&txn, sink_ids).await?;
1705
1706 let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1707 for pb_sink in &incoming_sinks {
1708 let sink_fragment_id =
1709 sink_fragment_ids
1710 .get(&pb_sink.id)
1711 .cloned()
1712 .ok_or(anyhow::anyhow!(
1713 "sink fragment not found for sink id {}",
1714 pb_sink.id
1715 ))?;
1716 let upstream_info = build_upstream_sink_info(
1717 pb_sink,
1718 sink_fragment_id,
1719 target_table,
1720 target_fragment_id,
1721 )?;
1722 upstream_sink_infos.push(upstream_info);
1723 }
1724
1725 Ok(upstream_sink_infos)
1726 }
1727
1728 pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1729 let inner = self.inner.read().await;
1730 let txn = inner.db.begin().await?;
1731
1732 let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1733 .select_only()
1734 .column(fragment::Column::FragmentId)
1735 .filter(
1736 fragment::Column::JobId
1737 .eq(job_id)
1738 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1739 )
1740 .into_tuple()
1741 .all(&txn)
1742 .await?;
1743
1744 if mview_fragment.len() != 1 {
1745 return Err(anyhow::anyhow!(
1746 "expected exactly one mview fragment for job {}, found {}",
1747 job_id,
1748 mview_fragment.len()
1749 )
1750 .into());
1751 }
1752
1753 Ok(mview_fragment.into_iter().next().unwrap())
1754 }
1755
1756 pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1757 let inner = self.inner.read().await;
1758 let txn = inner.db.begin().await?;
1759 has_table_been_migrated(&txn, table_id).await
1760 }
1761
1762 pub async fn update_fragment_splits<C>(
1763 &self,
1764 txn: &C,
1765 fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
1766 ) -> MetaResult<()>
1767 where
1768 C: ConnectionTrait,
1769 {
1770 if fragment_splits.is_empty() {
1771 return Ok(());
1772 }
1773
1774 let models: Vec<fragment_splits::ActiveModel> = fragment_splits
1775 .iter()
1776 .map(|(fragment_id, splits)| fragment_splits::ActiveModel {
1777 fragment_id: Set(*fragment_id as _),
1778 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1779 splits: splits.iter().map(Into::into).collect_vec(),
1780 }))),
1781 })
1782 .collect();
1783
1784 FragmentSplits::insert_many(models)
1785 .on_conflict(
1786 OnConflict::column(fragment_splits::Column::FragmentId)
1787 .update_column(fragment_splits::Column::Splits)
1788 .to_owned(),
1789 )
1790 .exec(txn)
1791 .await?;
1792
1793 Ok(())
1794 }
1795}
1796
1797#[cfg(test)]
1798mod tests {
1799 use std::collections::{BTreeMap, HashMap, HashSet};
1800
1801 use itertools::Itertools;
1802 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1803 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1804 use risingwave_common::id::JobId;
1805 use risingwave_common::util::iter_util::ZipEqDebug;
1806 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1807 use risingwave_meta_model::fragment::DistributionType;
1808 use risingwave_meta_model::*;
1809 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1810 use risingwave_pb::plan_common::PbExprContext;
1811 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1812 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1813 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1814
1815 use super::ActorInfo;
1816 use crate::MetaResult;
1817 use crate::controller::catalog::CatalogController;
1818 use crate::model::{Fragment, StreamActor};
1819
1820 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1821
1822 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1823
1824 const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
1825
1826 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
1827
1828 const TEST_JOB_ID: JobId = JobId::new(1);
1829
1830 const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
1831
1832 fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
1833 let mut upstream_actor_ids = BTreeMap::new();
1834 upstream_actor_ids.insert(
1835 TEST_UPSTREAM_FRAGMENT_ID,
1836 HashSet::from_iter([(actor_id + 100)]),
1837 );
1838 upstream_actor_ids.insert(
1839 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1840 HashSet::from_iter([(actor_id + 200)]),
1841 );
1842 upstream_actor_ids
1843 }
1844
1845 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1846 let mut input = vec![];
1847 for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
1848 input.push(PbStreamNode {
1849 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1850 upstream_fragment_id,
1851 ..Default::default()
1852 }))),
1853 ..Default::default()
1854 });
1855 }
1856
1857 PbStreamNode {
1858 input,
1859 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1860 ..Default::default()
1861 }
1862 }
1863
1864 #[tokio::test]
1865 async fn test_extract_fragment() -> MetaResult<()> {
1866 let actor_count = 3u32;
1867 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1868 .map(|actor_id| {
1869 (
1870 actor_id.into(),
1871 generate_upstream_actor_ids_for_actor(actor_id.into()),
1872 )
1873 })
1874 .collect();
1875
1876 let actor_bitmaps = ActorMapping::new_uniform(
1877 (0..actor_count).map(|i| i.into()),
1878 VirtualNode::COUNT_FOR_TEST,
1879 )
1880 .to_bitmaps();
1881
1882 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1883
1884 let pb_actors = (0..actor_count)
1885 .map(|actor_id| StreamActor {
1886 actor_id: actor_id.into(),
1887 fragment_id: TEST_FRAGMENT_ID as _,
1888 vnode_bitmap: actor_bitmaps.get(&actor_id.into()).cloned(),
1889 mview_definition: "".to_owned(),
1890 expr_context: Some(PbExprContext {
1891 time_zone: String::from("America/New_York"),
1892 strict_mode: false,
1893 }),
1894 })
1895 .collect_vec();
1896
1897 let pb_fragment = Fragment {
1898 fragment_id: TEST_FRAGMENT_ID as _,
1899 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1900 distribution_type: PbFragmentDistributionType::Hash as _,
1901 actors: pb_actors,
1902 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1903 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1904 nodes: stream_node,
1905 };
1906
1907 let fragment =
1908 CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
1909
1910 check_fragment(fragment, pb_fragment);
1911
1912 Ok(())
1913 }
1914
1915 #[tokio::test]
1916 async fn test_compose_fragment() -> MetaResult<()> {
1917 let actor_count = 3u32;
1918
1919 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1920 .map(|actor_id| {
1921 (
1922 actor_id.into(),
1923 generate_upstream_actor_ids_for_actor(actor_id.into()),
1924 )
1925 })
1926 .collect();
1927
1928 let mut actor_bitmaps = ActorMapping::new_uniform(
1929 (0..actor_count).map(|i| i.into()),
1930 VirtualNode::COUNT_FOR_TEST,
1931 )
1932 .to_bitmaps();
1933
1934 let actors = (0..actor_count)
1935 .map(|actor_id| {
1936 let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
1937 splits: vec![PbConnectorSplit {
1938 split_type: "dummy".to_owned(),
1939 ..Default::default()
1940 }],
1941 });
1942
1943 ActorInfo {
1944 actor_id: actor_id.into(),
1945 fragment_id: TEST_FRAGMENT_ID,
1946 splits: actor_splits,
1947 worker_id: 0.into(),
1948 vnode_bitmap: actor_bitmaps
1949 .remove(&actor_id.into())
1950 .map(|bitmap| bitmap.to_protobuf())
1951 .as_ref()
1952 .map(VnodeBitmap::from),
1953 expr_context: ExprContext::from(&PbExprContext {
1954 time_zone: String::from("America/New_York"),
1955 strict_mode: false,
1956 }),
1957 }
1958 })
1959 .collect_vec();
1960
1961 let stream_node = {
1962 let template_actor = actors.first().cloned().unwrap();
1963
1964 let template_upstream_actor_ids = upstream_actor_ids
1965 .get(&(template_actor.actor_id as _))
1966 .unwrap();
1967
1968 generate_merger_stream_node(template_upstream_actor_ids)
1969 };
1970
1971 #[expect(deprecated)]
1972 let fragment = fragment::Model {
1973 fragment_id: TEST_FRAGMENT_ID,
1974 job_id: TEST_JOB_ID,
1975 fragment_type_mask: 0,
1976 distribution_type: DistributionType::Hash,
1977 stream_node: StreamNode::from(&stream_node),
1978 state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
1979 upstream_fragment_id: Default::default(),
1980 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1981 parallelism: None,
1982 };
1983
1984 let (pb_fragment, pb_actor_status, pb_actor_splits) =
1985 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
1986
1987 assert_eq!(pb_actor_status.len(), actor_count as usize);
1988 assert!(
1989 pb_actor_status
1990 .values()
1991 .all(|actor_status| actor_status.location.is_some())
1992 );
1993 assert_eq!(pb_actor_splits.len(), actor_count as usize);
1994
1995 let pb_actors = pb_fragment.actors.clone();
1996
1997 check_fragment(fragment, pb_fragment);
1998 check_actors(
1999 actors,
2000 &upstream_actor_ids,
2001 pb_actors,
2002 pb_actor_splits,
2003 &stream_node,
2004 );
2005
2006 Ok(())
2007 }
2008
2009 fn check_actors(
2010 actors: Vec<ActorInfo>,
2011 actor_upstreams: &FragmentActorUpstreams,
2012 pb_actors: Vec<StreamActor>,
2013 pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2014 stream_node: &PbStreamNode,
2015 ) {
2016 for (
2017 ActorInfo {
2018 actor_id,
2019 fragment_id,
2020 splits,
2021 worker_id: _,
2022 vnode_bitmap,
2023 expr_context,
2024 ..
2025 },
2026 StreamActor {
2027 actor_id: pb_actor_id,
2028 fragment_id: pb_fragment_id,
2029 vnode_bitmap: pb_vnode_bitmap,
2030 mview_definition,
2031 expr_context: pb_expr_context,
2032 ..
2033 },
2034 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2035 {
2036 assert_eq!(actor_id, pb_actor_id as ActorId);
2037 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2038
2039 assert_eq!(
2040 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2041 pb_vnode_bitmap,
2042 );
2043
2044 assert_eq!(mview_definition, "");
2045
2046 visit_stream_node_body(stream_node, |body| {
2047 if let PbNodeBody::Merge(m) = body {
2048 assert!(
2049 actor_upstreams
2050 .get(&(actor_id as _))
2051 .unwrap()
2052 .contains_key(&m.upstream_fragment_id)
2053 );
2054 }
2055 });
2056
2057 assert_eq!(
2058 splits,
2059 pb_actor_splits
2060 .get(&pb_actor_id)
2061 .map(ConnectorSplits::from)
2062 .unwrap_or_default()
2063 );
2064
2065 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2066 }
2067 }
2068
2069 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2070 let Fragment {
2071 fragment_id,
2072 fragment_type_mask,
2073 distribution_type: pb_distribution_type,
2074 actors: _,
2075 state_table_ids: pb_state_table_ids,
2076 maybe_vnode_count: _,
2077 nodes,
2078 } = pb_fragment;
2079
2080 assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2081 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2082 assert_eq!(
2083 pb_distribution_type,
2084 PbFragmentDistributionType::from(fragment.distribution_type)
2085 );
2086
2087 assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2088 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2089 }
2090
2091 #[test]
2092 fn test_parallelism_policy_with_root_fragments() {
2093 #[expect(deprecated)]
2094 let fragment = fragment::Model {
2095 fragment_id: 3.into(),
2096 job_id: TEST_JOB_ID,
2097 fragment_type_mask: 0,
2098 distribution_type: DistributionType::Hash,
2099 stream_node: StreamNode::from(&PbStreamNode::default()),
2100 state_table_ids: TableIdArray::default(),
2101 upstream_fragment_id: Default::default(),
2102 vnode_count: 0,
2103 parallelism: None,
2104 };
2105
2106 let job_parallelism = StreamingParallelism::Fixed(4);
2107
2108 let policy = super::CatalogController::format_fragment_parallelism_policy(
2109 &fragment,
2110 Some(&job_parallelism),
2111 &[],
2112 );
2113
2114 assert_eq!(policy, "inherit(fixed(4))");
2115 }
2116
2117 #[test]
2118 fn test_parallelism_policy_with_upstream_roots() {
2119 #[expect(deprecated)]
2120 let fragment = fragment::Model {
2121 fragment_id: 5.into(),
2122 job_id: TEST_JOB_ID,
2123 fragment_type_mask: 0,
2124 distribution_type: DistributionType::Hash,
2125 stream_node: StreamNode::from(&PbStreamNode::default()),
2126 state_table_ids: TableIdArray::default(),
2127 upstream_fragment_id: Default::default(),
2128 vnode_count: 0,
2129 parallelism: None,
2130 };
2131
2132 let policy = super::CatalogController::format_fragment_parallelism_policy(
2133 &fragment,
2134 None,
2135 &[3.into(), 1.into(), 2.into(), 1.into()],
2136 );
2137
2138 assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2139 }
2140}