1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::CdcScanOptions;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34 Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39 VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink,
40 source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46 Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::fragment::{
49 FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan;
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57 PbDispatchOutputMapping, PbDispatcherType, PbStreamContext, PbStreamNode, PbStreamScanType,
58 StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63 ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, JoinType, PaginatorTrait,
64 QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
65};
66use serde::{Deserialize, Serialize};
67use tracing::debug;
68
69use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
70use crate::controller::catalog::CatalogController;
71use crate::controller::scale::{
72 FragmentRenderMap, NoShuffleEnsemble, find_fragment_no_shuffle_dags_detailed,
73 load_fragment_info, resolve_streaming_job_definition,
74};
75use crate::controller::utils::{
76 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
77 get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
78 resolve_no_shuffle_actor_dispatcher,
79};
80use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
81use crate::model::{
82 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
83 StreamActor, StreamContext, StreamJobFragments, TableParallelism,
84};
85use crate::rpc::ddl_controller::build_upstream_sink_info;
86use crate::stream::UpstreamSinkInfo;
87use crate::{MetaResult, model};
88
89#[derive(Clone, Debug)]
91pub struct InflightActorInfo {
92 pub worker_id: WorkerId,
93 pub vnode_bitmap: Option<Bitmap>,
94 pub splits: Vec<SplitImpl>,
95}
96
97#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
98struct ActorInfo {
99 pub actor_id: ActorId,
100 pub fragment_id: FragmentId,
101 pub splits: ConnectorSplits,
102 pub worker_id: WorkerId,
103 pub vnode_bitmap: Option<VnodeBitmap>,
104 pub expr_context: ExprContext,
105}
106
107#[derive(Clone, Debug)]
108pub struct InflightFragmentInfo {
109 pub fragment_id: crate::model::FragmentId,
110 pub distribution_type: DistributionType,
111 pub fragment_type_mask: FragmentTypeMask,
112 pub vnode_count: usize,
113 pub nodes: PbStreamNode,
114 pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
115 pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
116}
117
118#[derive(Clone, Debug)]
119pub struct FragmentParallelismInfo {
120 pub distribution_type: FragmentDistributionType,
121 pub actor_count: usize,
122 pub vnode_count: usize,
123}
124
125#[easy_ext::ext(FragmentTypeMaskExt)]
126pub impl FragmentTypeMask {
127 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
128 Expr::col(fragment::Column::FragmentTypeMask)
129 .bit_and(Expr::value(flag as i32))
130 .ne(0)
131 }
132
133 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
134 Expr::col(fragment::Column::FragmentTypeMask)
135 .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
136 .ne(0)
137 }
138
139 fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
140 Expr::col(fragment::Column::FragmentTypeMask)
141 .bit_and(Expr::value(flag as i32))
142 .eq(0)
143 }
144}
145
146#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
147#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
149 pub job_id: ObjectId,
150 pub obj_type: ObjectType,
151 pub name: String,
152 pub job_status: JobStatus,
153 pub parallelism: StreamingParallelism,
154 pub max_parallelism: i32,
155 pub resource_group: String,
156 pub database_id: DatabaseId,
157 pub schema_id: SchemaId,
158}
159
160impl NotificationManager {
161 pub(crate) fn notify_fragment_mapping(
162 &self,
163 operation: NotificationOperation,
164 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
165 ) {
166 let fragment_ids = fragment_mappings
167 .iter()
168 .map(|mapping| mapping.fragment_id)
169 .collect_vec();
170 if fragment_ids.is_empty() {
171 return;
172 }
173 for fragment_mapping in fragment_mappings {
175 self.notify_frontend_without_version(
176 operation,
177 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
178 );
179 }
180
181 match operation {
183 NotificationOperation::Add | NotificationOperation::Update => {
184 self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
185 fragment_ids,
186 ));
187 }
188 NotificationOperation::Delete => {
189 self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
190 fragment_ids,
191 ));
192 }
193 op => {
194 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
195 }
196 }
197 }
198}
199
200impl CatalogController {
201 pub fn prepare_fragment_models_from_fragments(
202 job_id: JobId,
203 fragments: impl Iterator<Item = &Fragment>,
204 ) -> MetaResult<Vec<fragment::Model>> {
205 fragments
206 .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
207 .try_collect()
208 }
209
210 pub fn prepare_fragment_model_for_new_job(
211 job_id: JobId,
212 fragment: &Fragment,
213 ) -> MetaResult<fragment::Model> {
214 let vnode_count = fragment.vnode_count();
215 let Fragment {
216 fragment_id: pb_fragment_id,
217 fragment_type_mask: pb_fragment_type_mask,
218 distribution_type: pb_distribution_type,
219 actors: pb_actors,
220 state_table_ids: pb_state_table_ids,
221 nodes,
222 ..
223 } = fragment;
224
225 let state_table_ids = pb_state_table_ids
226 .iter()
227 .map(|table_id| table_id.as_raw_id() as i32)
228 .collect_vec()
229 .into();
230
231 assert!(!pb_actors.is_empty());
232
233 let fragment_parallelism = nodes
234 .node_body
235 .as_ref()
236 .and_then(|body| match body {
237 NodeBody::StreamCdcScan(node) => Some(node),
238 _ => None,
239 })
240 .and_then(|node| node.options.as_ref())
241 .map(CdcScanOptions::from_proto)
242 .filter(|opts| opts.is_parallelized_backfill())
243 .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
244
245 let stream_node = StreamNode::from(nodes);
246
247 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
248 .unwrap()
249 .into();
250
251 #[expect(deprecated)]
252 let fragment = fragment::Model {
253 fragment_id: *pb_fragment_id as _,
254 job_id,
255 fragment_type_mask: (*pb_fragment_type_mask).into(),
256 distribution_type,
257 stream_node,
258 state_table_ids,
259 upstream_fragment_id: Default::default(),
260 vnode_count: vnode_count as _,
261 parallelism: fragment_parallelism,
262 };
263
264 Ok(fragment)
265 }
266
267 fn compose_table_fragments(
268 job_id: JobId,
269 state: PbState,
270 ctx: Option<PbStreamContext>,
271 fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
272 parallelism: StreamingParallelism,
273 max_parallelism: usize,
274 job_definition: Option<String>,
275 ) -> MetaResult<StreamJobFragments> {
276 let mut pb_fragments = BTreeMap::new();
277 let mut pb_actor_status = BTreeMap::new();
278
279 for (fragment, actors) in fragments {
280 let (fragment, fragment_actor_status, _) =
281 Self::compose_fragment(fragment, actors, job_definition.clone())?;
282
283 pb_fragments.insert(fragment.fragment_id, fragment);
284 pb_actor_status.extend(fragment_actor_status.into_iter());
285 }
286
287 let table_fragments = StreamJobFragments {
288 stream_job_id: job_id,
289 state: state as _,
290 fragments: pb_fragments,
291 actor_status: pb_actor_status,
292 ctx: ctx
293 .as_ref()
294 .map(StreamContext::from_protobuf)
295 .unwrap_or_default(),
296 assigned_parallelism: match parallelism {
297 StreamingParallelism::Custom => TableParallelism::Custom,
298 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
299 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
300 },
301 max_parallelism,
302 };
303
304 Ok(table_fragments)
305 }
306
307 #[allow(clippy::type_complexity)]
308 fn compose_fragment(
309 fragment: fragment::Model,
310 actors: Vec<ActorInfo>,
311 job_definition: Option<String>,
312 ) -> MetaResult<(
313 Fragment,
314 HashMap<crate::model::ActorId, PbActorStatus>,
315 HashMap<crate::model::ActorId, PbConnectorSplits>,
316 )> {
317 let fragment::Model {
318 fragment_id,
319 fragment_type_mask,
320 distribution_type,
321 stream_node,
322 state_table_ids,
323 vnode_count,
324 ..
325 } = fragment;
326
327 let stream_node = stream_node.to_protobuf();
328 let mut upstream_fragments = HashSet::new();
329 visit_stream_node_body(&stream_node, |body| {
330 if let NodeBody::Merge(m) = body {
331 assert!(
332 upstream_fragments.insert(m.upstream_fragment_id),
333 "non-duplicate upstream fragment"
334 );
335 }
336 });
337
338 let mut pb_actors = vec![];
339
340 let mut pb_actor_status = HashMap::new();
341 let mut pb_actor_splits = HashMap::new();
342
343 for actor in actors {
344 if actor.fragment_id != fragment_id {
345 bail!(
346 "fragment id {} from actor {} is different from fragment {}",
347 actor.fragment_id,
348 actor.actor_id,
349 fragment_id
350 )
351 }
352
353 let ActorInfo {
354 actor_id,
355 fragment_id,
356 worker_id,
357 splits,
358 vnode_bitmap,
359 expr_context,
360 ..
361 } = actor;
362
363 let vnode_bitmap =
364 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
365 let pb_expr_context = Some(expr_context.to_protobuf());
366
367 pb_actor_status.insert(
368 actor_id as _,
369 PbActorStatus {
370 location: PbActorLocation::from_worker(worker_id as u32),
371 },
372 );
373
374 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
375
376 pb_actors.push(StreamActor {
377 actor_id: actor_id as _,
378 fragment_id: fragment_id as _,
379 vnode_bitmap,
380 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
381 expr_context: pb_expr_context,
382 })
383 }
384
385 let pb_state_table_ids = state_table_ids.into_u32_array();
386 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
387 let pb_fragment = Fragment {
388 fragment_id: fragment_id as _,
389 fragment_type_mask: fragment_type_mask.into(),
390 distribution_type: pb_distribution_type,
391 actors: pb_actors,
392 state_table_ids: pb_state_table_ids.into_iter().map_into().collect(),
393 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
394 nodes: stream_node,
395 };
396
397 Ok((pb_fragment, pb_actor_status, pb_actor_splits))
398 }
399
400 pub fn running_fragment_parallelisms(
401 &self,
402 id_filter: Option<HashSet<FragmentId>>,
403 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
404 let info = self.env.shared_actor_infos().read_guard();
405
406 let mut result = HashMap::new();
407 for (fragment_id, fragment) in info.iter_over_fragments() {
408 if let Some(id_filter) = &id_filter
409 && !id_filter.contains(&(*fragment_id as _))
410 {
411 continue; }
413
414 result.insert(
415 *fragment_id as _,
416 FragmentParallelismInfo {
417 distribution_type: fragment.distribution_type.into(),
418 actor_count: fragment.actors.len() as _,
419 vnode_count: fragment.vnode_count,
420 },
421 );
422 }
423
424 Ok(result)
425 }
426
427 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
428 let inner = self.inner.read().await;
429 let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
430 .select_only()
431 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
432 .into_tuple()
433 .all(&inner.db)
434 .await?;
435 Ok(fragment_jobs.into_iter().collect())
436 }
437
438 pub async fn get_fragment_job_id(
439 &self,
440 fragment_ids: Vec<FragmentId>,
441 ) -> MetaResult<Vec<ObjectId>> {
442 let inner = self.inner.read().await;
443
444 let object_ids: Vec<ObjectId> = FragmentModel::find()
445 .select_only()
446 .column(fragment::Column::JobId)
447 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
448 .into_tuple()
449 .all(&inner.db)
450 .await?;
451
452 Ok(object_ids)
453 }
454
455 pub async fn get_fragment_desc_by_id(
456 &self,
457 fragment_id: FragmentId,
458 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
459 let inner = self.inner.read().await;
460
461 let fragment_model = match FragmentModel::find_by_id(fragment_id)
462 .one(&inner.db)
463 .await?
464 {
465 Some(fragment) => fragment,
466 None => return Ok(None),
467 };
468
469 let job_parallelism: Option<StreamingParallelism> =
470 StreamingJob::find_by_id(fragment_model.job_id)
471 .select_only()
472 .column(streaming_job::Column::Parallelism)
473 .into_tuple()
474 .one(&inner.db)
475 .await?;
476
477 let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
478 .select_only()
479 .columns([
480 fragment_relation::Column::SourceFragmentId,
481 fragment_relation::Column::DispatcherType,
482 ])
483 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
484 .into_tuple()
485 .all(&inner.db)
486 .await?;
487
488 let upstreams: Vec<_> = upstream_entries
489 .into_iter()
490 .map(|(source_id, _)| source_id)
491 .collect();
492
493 let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
494 .await
495 .map(Self::collect_root_fragment_mapping)?;
496 let root_fragments = root_fragment_map
497 .get(&fragment_id)
498 .cloned()
499 .unwrap_or_default();
500
501 let info = self.env.shared_actor_infos().read_guard();
502 let SharedFragmentInfo { actors, .. } = info
503 .get_fragment(fragment_model.fragment_id as _)
504 .unwrap_or_else(|| {
505 panic!(
506 "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
507 fragment_model.fragment_id,
508 fragment_model.job_id
509 )
510 });
511
512 let parallelism_policy = Self::format_fragment_parallelism_policy(
513 &fragment_model,
514 job_parallelism.as_ref(),
515 &root_fragments,
516 );
517
518 let fragment = FragmentDesc {
519 fragment_id: fragment_model.fragment_id,
520 job_id: fragment_model.job_id,
521 fragment_type_mask: fragment_model.fragment_type_mask,
522 distribution_type: fragment_model.distribution_type,
523 state_table_ids: fragment_model.state_table_ids.clone(),
524 parallelism: actors.len() as _,
525 vnode_count: fragment_model.vnode_count,
526 stream_node: fragment_model.stream_node.clone(),
527 parallelism_policy,
528 };
529
530 Ok(Some((fragment, upstreams)))
531 }
532
533 pub async fn list_fragment_database_ids(
534 &self,
535 select_fragment_ids: Option<Vec<FragmentId>>,
536 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
537 let inner = self.inner.read().await;
538 let select = FragmentModel::find()
539 .select_only()
540 .column(fragment::Column::FragmentId)
541 .column(object::Column::DatabaseId)
542 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
543 let select = if let Some(select_fragment_ids) = select_fragment_ids {
544 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
545 } else {
546 select
547 };
548 Ok(select.into_tuple().all(&inner.db).await?)
549 }
550
551 pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
552 let inner = self.inner.read().await;
553
554 let fragments: Vec<_> = FragmentModel::find()
556 .filter(fragment::Column::JobId.eq(job_id))
557 .all(&inner.db)
558 .await?;
559
560 let job_info = StreamingJob::find_by_id(job_id)
561 .one(&inner.db)
562 .await?
563 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
564
565 let fragment_actors =
566 self.collect_fragment_actor_pairs(fragments, job_info.timezone.clone())?;
567
568 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
569 .await?
570 .remove(&job_id);
571
572 Self::compose_table_fragments(
573 job_id,
574 job_info.job_status.into(),
575 job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
576 fragment_actors,
577 job_info.parallelism.clone(),
578 job_info.max_parallelism as _,
579 job_definition,
580 )
581 }
582
583 pub async fn get_fragment_actor_dispatchers(
584 &self,
585 fragment_ids: Vec<FragmentId>,
586 ) -> MetaResult<FragmentActorDispatchers> {
587 let inner = self.inner.read().await;
588
589 self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
590 .await
591 }
592
593 pub async fn get_fragment_actor_dispatchers_txn(
594 &self,
595 c: &impl ConnectionTrait,
596 fragment_ids: Vec<FragmentId>,
597 ) -> MetaResult<FragmentActorDispatchers> {
598 let fragment_relations = FragmentRelation::find()
599 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
600 .all(c)
601 .await?;
602
603 type FragmentActorInfo = (
604 DistributionType,
605 Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
606 );
607
608 let shared_info = self.env.shared_actor_infos();
609 let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
610 let get_fragment_actors = |fragment_id: FragmentId| async move {
611 let result: MetaResult<FragmentActorInfo> = try {
612 let read_guard = shared_info.read_guard();
613
614 let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
615
616 (
617 fragment.distribution_type,
618 Arc::new(
619 fragment
620 .actors
621 .iter()
622 .map(|(actor_id, actor_info)| {
623 (
624 *actor_id,
625 actor_info
626 .vnode_bitmap
627 .as_ref()
628 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
629 )
630 })
631 .collect(),
632 ),
633 )
634 };
635 result
636 };
637
638 let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
639 for fragment_relation::Model {
640 source_fragment_id,
641 target_fragment_id,
642 dispatcher_type,
643 dist_key_indices,
644 output_indices,
645 output_type_mapping,
646 } in fragment_relations
647 {
648 let (source_fragment_distribution, source_fragment_actors) = {
649 let (distribution, actors) = {
650 match fragment_actor_cache.entry(source_fragment_id) {
651 Entry::Occupied(entry) => entry.into_mut(),
652 Entry::Vacant(entry) => {
653 entry.insert(get_fragment_actors(source_fragment_id).await?)
654 }
655 }
656 };
657 (*distribution, actors.clone())
658 };
659 let (target_fragment_distribution, target_fragment_actors) = {
660 let (distribution, actors) = {
661 match fragment_actor_cache.entry(target_fragment_id) {
662 Entry::Occupied(entry) => entry.into_mut(),
663 Entry::Vacant(entry) => {
664 entry.insert(get_fragment_actors(target_fragment_id).await?)
665 }
666 }
667 };
668 (*distribution, actors.clone())
669 };
670 let output_mapping = PbDispatchOutputMapping {
671 indices: output_indices.into_u32_array(),
672 types: output_type_mapping.unwrap_or_default().to_protobuf(),
673 };
674 let dispatchers = compose_dispatchers(
675 source_fragment_distribution,
676 &source_fragment_actors,
677 target_fragment_id as _,
678 target_fragment_distribution,
679 &target_fragment_actors,
680 dispatcher_type,
681 dist_key_indices.into_u32_array(),
682 output_mapping,
683 );
684 let actor_dispatchers_map = actor_dispatchers_map
685 .entry(source_fragment_id as _)
686 .or_default();
687 for (actor_id, dispatchers) in dispatchers {
688 actor_dispatchers_map
689 .entry(actor_id as _)
690 .or_default()
691 .push(dispatchers);
692 }
693 }
694 Ok(actor_dispatchers_map)
695 }
696
697 pub async fn get_fragment_downstream_relations(
698 &self,
699 fragment_ids: Vec<FragmentId>,
700 ) -> MetaResult<FragmentDownstreamRelation> {
701 let inner = self.inner.read().await;
702 let mut stream = FragmentRelation::find()
703 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
704 .stream(&inner.db)
705 .await?;
706 let mut relations = FragmentDownstreamRelation::new();
707 while let Some(relation) = stream.try_next().await? {
708 relations
709 .entry(relation.source_fragment_id as _)
710 .or_default()
711 .push(DownstreamFragmentRelation {
712 downstream_fragment_id: relation.target_fragment_id as _,
713 dispatcher_type: relation.dispatcher_type,
714 dist_key_indices: relation.dist_key_indices.into_u32_array(),
715 output_mapping: PbDispatchOutputMapping {
716 indices: relation.output_indices.into_u32_array(),
717 types: relation
718 .output_type_mapping
719 .unwrap_or_default()
720 .to_protobuf(),
721 },
722 });
723 }
724 Ok(relations)
725 }
726
727 pub async fn get_job_fragment_backfill_scan_type(
728 &self,
729 job_id: ObjectId,
730 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
731 let inner = self.inner.read().await;
732 let fragments: Vec<_> = FragmentModel::find()
733 .filter(fragment::Column::JobId.eq(job_id))
734 .all(&inner.db)
735 .await?;
736
737 let mut result = HashMap::new();
738
739 for fragment::Model {
740 fragment_id,
741 stream_node,
742 ..
743 } in fragments
744 {
745 let stream_node = stream_node.to_protobuf();
746 visit_stream_node_body(&stream_node, |body| {
747 if let NodeBody::StreamScan(node) = body {
748 match node.stream_scan_type() {
749 StreamScanType::Unspecified => {}
750 scan_type => {
751 result.insert(fragment_id as crate::model::FragmentId, scan_type);
752 }
753 }
754 }
755 });
756 }
757
758 Ok(result)
759 }
760
761 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
762 let inner = self.inner.read().await;
763 let job_states = StreamingJob::find()
764 .select_only()
765 .column(streaming_job::Column::JobId)
766 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
767 .join(JoinType::InnerJoin, object::Relation::Database2.def())
768 .column(object::Column::ObjType)
769 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
770 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
771 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
772 .column_as(
773 Expr::if_null(
774 Expr::col((table::Entity, table::Column::Name)),
775 Expr::if_null(
776 Expr::col((source::Entity, source::Column::Name)),
777 Expr::if_null(
778 Expr::col((sink::Entity, sink::Column::Name)),
779 Expr::val("<unknown>"),
780 ),
781 ),
782 ),
783 "name",
784 )
785 .columns([
786 streaming_job::Column::JobStatus,
787 streaming_job::Column::Parallelism,
788 streaming_job::Column::MaxParallelism,
789 ])
790 .column_as(
791 Expr::if_null(
792 Expr::col((
793 streaming_job::Entity,
794 streaming_job::Column::SpecificResourceGroup,
795 )),
796 Expr::col((database::Entity, database::Column::ResourceGroup)),
797 ),
798 "resource_group",
799 )
800 .column(object::Column::DatabaseId)
801 .column(object::Column::SchemaId)
802 .into_model()
803 .all(&inner.db)
804 .await?;
805 Ok(job_states)
806 }
807
808 pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
809 let inner = self.inner.read().await;
810 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
811 .select_only()
812 .column(streaming_job::Column::MaxParallelism)
813 .into_tuple()
814 .one(&inner.db)
815 .await?
816 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
817 Ok(max_parallelism as usize)
818 }
819
820 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
822 if let Ok(inner) = self.inner.try_read()
823 && let Ok(job_state_tables) = FragmentModel::find()
824 .select_only()
825 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
826 .into_tuple::<(JobId, I32Array)>()
827 .all(&inner.db)
828 .await
829 {
830 let mut job_internal_table_ids = HashMap::new();
831 for (job_id, state_table_ids) in job_state_tables {
832 job_internal_table_ids
833 .entry(job_id)
834 .or_insert_with(Vec::new)
835 .extend(
836 state_table_ids
837 .into_inner()
838 .into_iter()
839 .map(|table_id| TableId::new(table_id as _)),
840 );
841 }
842 return Some(job_internal_table_ids.into_iter().collect());
843 }
844 None
845 }
846
847 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
848 let inner = self.inner.read().await;
849 let count = FragmentModel::find().count(&inner.db).await?;
850 Ok(count > 0)
851 }
852
853 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
854 let read_guard = self.env.shared_actor_infos().read_guard();
855 let actor_cnt: HashMap<WorkerId, _> = read_guard
856 .iter_over_fragments()
857 .flat_map(|(_, fragment)| {
858 fragment
859 .actors
860 .iter()
861 .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
862 })
863 .into_group_map()
864 .into_iter()
865 .map(|(k, v)| (k, v.len()))
866 .collect();
867
868 Ok(actor_cnt)
869 }
870
871 fn collect_fragment_actor_map(
872 &self,
873 fragment_ids: &[FragmentId],
874 timezone: Option<String>,
875 ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
876 let guard = self.env.shared_actor_infos().read_guard();
877 let stream_context = StreamContext { timezone };
878 let pb_expr_context = stream_context.to_expr_context();
879 let expr_context: ExprContext = (&pb_expr_context).into();
880
881 let mut actor_map = HashMap::with_capacity(fragment_ids.len());
882 for fragment_id in fragment_ids {
883 let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
884 anyhow!("fragment {} not found in shared actor info", fragment_id)
885 })?;
886
887 let actors = fragment_info
888 .actors
889 .iter()
890 .map(|(actor_id, actor_info)| ActorInfo {
891 actor_id: *actor_id as _,
892 fragment_id: *fragment_id,
893 splits: ConnectorSplits::from(&PbConnectorSplits {
894 splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
895 }),
896 worker_id: actor_info.worker_id as _,
897 vnode_bitmap: actor_info
898 .vnode_bitmap
899 .as_ref()
900 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
901 expr_context: expr_context.clone(),
902 })
903 .collect();
904
905 actor_map.insert(*fragment_id, actors);
906 }
907
908 Ok(actor_map)
909 }
910
911 fn collect_fragment_actor_pairs(
912 &self,
913 fragments: Vec<fragment::Model>,
914 timezone: Option<String>,
915 ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
916 let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
917 let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, timezone)?;
918 fragments
919 .into_iter()
920 .map(|fragment| {
921 let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
922 anyhow!(
923 "fragment {} missing in shared actor info map",
924 fragment.fragment_id
925 )
926 })?;
927 Ok((fragment, actors))
928 })
929 .collect()
930 }
931
932 pub async fn table_fragments(&self) -> MetaResult<BTreeMap<JobId, StreamJobFragments>> {
934 let inner = self.inner.read().await;
935 let jobs = StreamingJob::find().all(&inner.db).await?;
936
937 let mut job_definition = resolve_streaming_job_definition(
938 &inner.db,
939 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
940 )
941 .await?;
942
943 let mut table_fragments = BTreeMap::new();
944 for job in jobs {
945 let fragments = FragmentModel::find()
946 .filter(fragment::Column::JobId.eq(job.job_id))
947 .all(&inner.db)
948 .await?;
949
950 let fragment_actors =
951 self.collect_fragment_actor_pairs(fragments, job.timezone.clone())?;
952
953 table_fragments.insert(
954 job.job_id,
955 Self::compose_table_fragments(
956 job.job_id,
957 job.job_status.into(),
958 job.timezone.map(|tz| PbStreamContext { timezone: tz }),
959 fragment_actors,
960 job.parallelism.clone(),
961 job.max_parallelism as _,
962 job_definition.remove(&job.job_id),
963 )?,
964 );
965 }
966
967 Ok(table_fragments)
968 }
969
970 pub async fn upstream_fragments(
971 &self,
972 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
973 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
974 let inner = self.inner.read().await;
975 let mut stream = FragmentRelation::find()
976 .select_only()
977 .columns([
978 fragment_relation::Column::SourceFragmentId,
979 fragment_relation::Column::TargetFragmentId,
980 ])
981 .filter(
982 fragment_relation::Column::TargetFragmentId
983 .is_in(fragment_ids.map(|id| id as FragmentId)),
984 )
985 .into_tuple::<(FragmentId, FragmentId)>()
986 .stream(&inner.db)
987 .await?;
988 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
989 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
990 upstream_fragments
991 .entry(downstream_fragment_id as crate::model::FragmentId)
992 .or_default()
993 .insert(upstream_fragment_id as crate::model::FragmentId);
994 }
995 Ok(upstream_fragments)
996 }
997
998 pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
999 let info = self.env.shared_actor_infos().read_guard();
1000
1001 let actor_locations = info
1002 .iter_over_fragments()
1003 .flat_map(|(fragment_id, fragment)| {
1004 fragment
1005 .actors
1006 .iter()
1007 .map(|(actor_id, actor)| PartialActorLocation {
1008 actor_id: *actor_id as _,
1009 fragment_id: *fragment_id as _,
1010 worker_id: actor.worker_id,
1011 })
1012 })
1013 .collect_vec();
1014
1015 Ok(actor_locations)
1016 }
1017
1018 pub async fn list_actor_info(
1019 &self,
1020 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1021 let inner = self.inner.read().await;
1022
1023 let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1024 FragmentModel::find()
1025 .select_only()
1026 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1027 .column(fragment::Column::FragmentId)
1028 .column_as(object::Column::Oid, "job_id")
1029 .column_as(object::Column::SchemaId, "schema_id")
1030 .column_as(object::Column::ObjType, "type")
1031 .into_tuple()
1032 .all(&inner.db)
1033 .await?;
1034
1035 let actor_infos = {
1036 let info = self.env.shared_actor_infos().read_guard();
1037
1038 fragment_objects
1039 .into_iter()
1040 .flat_map(|(fragment_id, object_id, schema_id, object_type)| {
1041 let SharedFragmentInfo {
1042 fragment_id,
1043 actors,
1044 ..
1045 } = info.get_fragment(fragment_id as _).unwrap();
1046 actors.keys().map(move |actor_id| {
1047 (
1048 *actor_id as _,
1049 *fragment_id as _,
1050 object_id,
1051 schema_id,
1052 object_type,
1053 )
1054 })
1055 })
1056 .collect_vec()
1057 };
1058
1059 Ok(actor_infos)
1060 }
1061
1062 pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1063 let guard = self.env.shared_actor_info.read_guard();
1064 guard
1065 .iter_over_fragments()
1066 .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1067 .collect_vec()
1068 }
1069
1070 pub async fn list_fragment_descs(
1071 &self,
1072 is_creating: bool,
1073 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1074 let inner = self.inner.read().await;
1075
1076 let txn = inner.db.begin().await?;
1077
1078 let fragments: Vec<_> = if is_creating {
1079 FragmentModel::find()
1080 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1081 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1082 .filter(
1083 streaming_job::Column::JobStatus
1084 .eq(JobStatus::Initial)
1085 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1086 )
1087 .all(&txn)
1088 .await?
1089 } else {
1090 FragmentModel::find().all(&txn).await?
1091 };
1092
1093 let fragment_ids = fragments.iter().map(|f| f.fragment_id).collect_vec();
1094
1095 let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragments.is_empty() {
1096 HashMap::new()
1097 } else {
1098 let job_ids = fragments.iter().map(|f| f.job_id).unique().collect_vec();
1099 StreamingJob::find()
1100 .select_only()
1101 .columns([
1102 streaming_job::Column::JobId,
1103 streaming_job::Column::Parallelism,
1104 ])
1105 .filter(streaming_job::Column::JobId.is_in(job_ids))
1106 .into_tuple()
1107 .all(&txn)
1108 .await?
1109 .into_iter()
1110 .collect()
1111 };
1112
1113 let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1114 if fragment_ids.is_empty() {
1115 Vec::new()
1116 } else {
1117 FragmentRelation::find()
1118 .select_only()
1119 .columns([
1120 fragment_relation::Column::TargetFragmentId,
1121 fragment_relation::Column::SourceFragmentId,
1122 fragment_relation::Column::DispatcherType,
1123 ])
1124 .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1125 .into_tuple()
1126 .all(&txn)
1127 .await?
1128 };
1129
1130 let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1131 for (target_id, source_id, _) in upstream_entries {
1132 all_upstreams.entry(target_id).or_default().push(source_id);
1133 }
1134
1135 let root_fragment_map = if fragment_ids.is_empty() {
1136 HashMap::new()
1137 } else {
1138 let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?;
1139 Self::collect_root_fragment_mapping(ensembles)
1140 };
1141
1142 let guard = self.env.shared_actor_info.read_guard();
1143
1144 let mut result = Vec::new();
1145
1146 for fragment_desc in fragments {
1147 let parallelism = guard
1149 .get_fragment(fragment_desc.fragment_id as _)
1150 .map(|fragment| fragment.actors.len())
1151 .unwrap_or_default();
1152
1153 let root_fragments = root_fragment_map
1154 .get(&fragment_desc.fragment_id)
1155 .cloned()
1156 .unwrap_or_default();
1157
1158 let upstreams = all_upstreams
1159 .remove(&fragment_desc.fragment_id)
1160 .unwrap_or_default();
1161
1162 let parallelism_policy = Self::format_fragment_parallelism_policy(
1163 &fragment_desc,
1164 job_parallelisms.get(&fragment_desc.job_id),
1165 &root_fragments,
1166 );
1167
1168 let fragment = FragmentDistribution {
1169 fragment_id: fragment_desc.fragment_id as _,
1170 table_id: fragment_desc.job_id.as_raw_id(),
1171 distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type)
1172 as _,
1173 state_table_ids: fragment_desc.state_table_ids.into_u32_array(),
1174 upstream_fragment_ids: upstreams.iter().map(|id| *id as _).collect(),
1175 fragment_type_mask: fragment_desc.fragment_type_mask as _,
1176 parallelism: parallelism as _,
1177 vnode_count: fragment_desc.vnode_count as _,
1178 node: Some(fragment_desc.stream_node.to_protobuf()),
1179 parallelism_policy,
1180 };
1181
1182 result.push((fragment, upstreams));
1183 }
1184 Ok(result)
1185 }
1186
1187 fn collect_root_fragment_mapping(
1189 ensembles: Vec<NoShuffleEnsemble>,
1190 ) -> HashMap<FragmentId, Vec<FragmentId>> {
1191 let mut mapping = HashMap::new();
1192
1193 for ensemble in ensembles {
1194 let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1195 roots.sort_unstable();
1196 roots.dedup();
1197
1198 if roots.is_empty() {
1199 continue;
1200 }
1201
1202 let root_set: HashSet<_> = roots.iter().copied().collect();
1203
1204 for fragment_id in ensemble.component_fragments() {
1205 if root_set.contains(&fragment_id) {
1206 mapping.insert(fragment_id, Vec::new());
1207 } else {
1208 mapping.insert(fragment_id, roots.clone());
1209 }
1210 }
1211 }
1212
1213 mapping
1214 }
1215
1216 fn format_fragment_parallelism_policy(
1217 fragment: &fragment::Model,
1218 job_parallelism: Option<&StreamingParallelism>,
1219 root_fragments: &[FragmentId],
1220 ) -> String {
1221 if fragment.distribution_type == DistributionType::Single {
1222 return "single".to_owned();
1223 }
1224
1225 if let Some(parallelism) = fragment.parallelism.as_ref() {
1226 return format!(
1227 "override({})",
1228 Self::format_streaming_parallelism(parallelism)
1229 );
1230 }
1231
1232 if !root_fragments.is_empty() {
1233 let mut upstreams = root_fragments.to_vec();
1234 upstreams.sort_unstable();
1235 upstreams.dedup();
1236
1237 return format!("upstream_fragment({upstreams:?})");
1238 }
1239
1240 let inherited = job_parallelism
1241 .map(Self::format_streaming_parallelism)
1242 .unwrap_or_else(|| "unknown".to_owned());
1243 format!("inherit({inherited})")
1244 }
1245
1246 fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1247 match parallelism {
1248 StreamingParallelism::Adaptive => "adaptive".to_owned(),
1249 StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1250 StreamingParallelism::Custom => "custom".to_owned(),
1251 }
1252 }
1253
1254 pub async fn list_sink_actor_mapping(
1255 &self,
1256 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1257 let inner = self.inner.read().await;
1258 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1259 .select_only()
1260 .columns([sink::Column::SinkId, sink::Column::Name])
1261 .into_tuple()
1262 .all(&inner.db)
1263 .await?;
1264 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1265
1266 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1267
1268 let actor_with_type: Vec<(ActorId, ObjectId)> = {
1269 let info = self.env.shared_actor_infos().read_guard();
1270
1271 info.iter_over_fragments()
1272 .filter(|(_, fragment)| {
1273 sink_ids.contains(&(fragment.job_id.as_raw_id() as _))
1274 && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1275 })
1276 .flat_map(|(_, fragment)| {
1277 fragment
1278 .actors
1279 .keys()
1280 .map(move |actor_id| (*actor_id as _, fragment.job_id.as_raw_id() as _))
1281 })
1282 .collect()
1283 };
1284
1285 let mut sink_actor_mapping = HashMap::new();
1286 for (actor_id, sink_id) in actor_with_type {
1287 sink_actor_mapping
1288 .entry(sink_id)
1289 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1290 .1
1291 .push(actor_id);
1292 }
1293
1294 Ok(sink_actor_mapping)
1295 }
1296
1297 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1298 let inner = self.inner.read().await;
1299 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1300 .select_only()
1301 .columns([
1302 fragment::Column::FragmentId,
1303 fragment::Column::JobId,
1304 fragment::Column::StateTableIds,
1305 ])
1306 .into_partial_model()
1307 .all(&inner.db)
1308 .await?;
1309 Ok(fragment_state_tables)
1310 }
1311
1312 pub async fn load_all_actors_dynamic(
1315 &self,
1316 database_id: Option<DatabaseId>,
1317 worker_nodes: &ActiveStreamingWorkerNodes,
1318 ) -> MetaResult<FragmentRenderMap> {
1319 let adaptive_parallelism_strategy = {
1320 let system_params_reader = self.env.system_params_reader().await;
1321 system_params_reader.adaptive_parallelism_strategy()
1322 };
1323
1324 let inner = self.inner.read().await;
1325 let txn = inner.db.begin().await?;
1326
1327 let database_fragment_infos = load_fragment_info(
1328 &txn,
1329 self.env.actor_id_generator(),
1330 database_id,
1331 worker_nodes,
1332 adaptive_parallelism_strategy,
1333 )
1334 .await?;
1335
1336 debug!(?database_fragment_infos, "reload all actors");
1337
1338 Ok(database_fragment_infos)
1339 }
1340
1341 #[await_tree::instrument]
1342 pub async fn fill_snapshot_backfill_epoch(
1343 &self,
1344 fragment_ids: impl Iterator<Item = FragmentId>,
1345 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1346 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1347 ) -> MetaResult<()> {
1348 let inner = self.inner.write().await;
1349 let txn = inner.db.begin().await?;
1350 for fragment_id in fragment_ids {
1351 let fragment = FragmentModel::find_by_id(fragment_id)
1352 .one(&txn)
1353 .await?
1354 .context(format!("fragment {} not found", fragment_id))?;
1355 let mut node = fragment.stream_node.to_protobuf();
1356 if crate::stream::fill_snapshot_backfill_epoch(
1357 &mut node,
1358 snapshot_backfill_info,
1359 cross_db_snapshot_backfill_info,
1360 )? {
1361 let node = StreamNode::from(&node);
1362 FragmentModel::update(fragment::ActiveModel {
1363 fragment_id: Set(fragment_id),
1364 stream_node: Set(node),
1365 ..Default::default()
1366 })
1367 .exec(&txn)
1368 .await?;
1369 }
1370 }
1371 txn.commit().await?;
1372 Ok(())
1373 }
1374
1375 pub fn get_running_actors_of_fragment(
1377 &self,
1378 fragment_id: FragmentId,
1379 ) -> MetaResult<HashSet<model::ActorId>> {
1380 let info = self.env.shared_actor_infos().read_guard();
1381
1382 let actors = info
1383 .get_fragment(fragment_id as _)
1384 .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1385 .unwrap_or_default();
1386
1387 Ok(actors)
1388 }
1389
1390 pub async fn get_running_actors_for_source_backfill(
1393 &self,
1394 source_backfill_fragment_id: FragmentId,
1395 source_fragment_id: FragmentId,
1396 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1397 let inner = self.inner.read().await;
1398 let txn = inner.db.begin().await?;
1399
1400 let fragment_relation: DispatcherType = FragmentRelation::find()
1401 .select_only()
1402 .column(fragment_relation::Column::DispatcherType)
1403 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1404 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1405 .into_tuple()
1406 .one(&txn)
1407 .await?
1408 .ok_or_else(|| {
1409 anyhow!(
1410 "no fragment connection from source fragment {} to source backfill fragment {}",
1411 source_fragment_id,
1412 source_backfill_fragment_id
1413 )
1414 })?;
1415
1416 if fragment_relation != DispatcherType::NoShuffle {
1417 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1418 }
1419
1420 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1421 let result: MetaResult<DistributionType> = try {
1422 FragmentModel::find_by_id(fragment_id)
1423 .select_only()
1424 .column(fragment::Column::DistributionType)
1425 .into_tuple()
1426 .one(txn)
1427 .await?
1428 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1429 };
1430 result
1431 };
1432
1433 let source_backfill_distribution_type =
1434 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1435 let source_distribution_type =
1436 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1437
1438 let load_fragment_actor_distribution =
1439 |actor_info: &SharedActorInfos,
1440 fragment_id: FragmentId|
1441 -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1442 let guard = actor_info.read_guard();
1443
1444 guard
1445 .get_fragment(fragment_id as _)
1446 .map(|fragment| {
1447 fragment
1448 .actors
1449 .iter()
1450 .map(|(actor_id, actor)| {
1451 (
1452 *actor_id as _,
1453 actor
1454 .vnode_bitmap
1455 .as_ref()
1456 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1457 )
1458 })
1459 .collect()
1460 })
1461 .unwrap_or_default()
1462 };
1463
1464 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1465 load_fragment_actor_distribution(
1466 self.env.shared_actor_infos(),
1467 source_backfill_fragment_id,
1468 );
1469
1470 let source_actors =
1471 load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1472
1473 Ok(resolve_no_shuffle_actor_dispatcher(
1474 source_distribution_type,
1475 &source_actors,
1476 source_backfill_distribution_type,
1477 &source_backfill_actors,
1478 )
1479 .into_iter()
1480 .map(|(source_actor, source_backfill_actor)| {
1481 (source_backfill_actor as _, source_actor as _)
1482 })
1483 .collect())
1484 }
1485
1486 pub async fn get_root_fragments(
1499 &self,
1500 job_ids: Vec<JobId>,
1501 ) -> MetaResult<(
1502 HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1503 HashMap<ActorId, WorkerId>,
1504 )> {
1505 let inner = self.inner.read().await;
1506
1507 let all_fragments = FragmentModel::find()
1508 .filter(fragment::Column::JobId.is_in(job_ids))
1509 .all(&inner.db)
1510 .await?;
1511 let mut root_fragments = HashMap::<JobId, fragment::Model>::new();
1513 for fragment in all_fragments {
1514 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1515 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1516 _ = root_fragments.insert(fragment.job_id, fragment);
1517 } else if mask.contains(FragmentTypeFlag::Source) {
1518 _ = root_fragments.try_insert(fragment.job_id, fragment);
1521 }
1522 }
1523
1524 let mut root_fragments_pb = HashMap::new();
1525
1526 let info = self.env.shared_actor_infos().read_guard();
1527
1528 let root_fragment_to_jobs: HashMap<_, _> = root_fragments
1529 .iter()
1530 .map(|(job_id, fragment)| (fragment.fragment_id as u32, *job_id))
1531 .collect();
1532
1533 for fragment in root_fragment_to_jobs.keys() {
1534 let fragment_info = info.get_fragment(*fragment).context(format!(
1535 "fragment {} not found in shared actor info",
1536 fragment
1537 ))?;
1538
1539 let job_id = root_fragment_to_jobs[&fragment_info.fragment_id];
1540 let fragment = root_fragments
1541 .get(&job_id)
1542 .context(format!("root fragment for job {} not found", job_id))?;
1543
1544 root_fragments_pb.insert(
1545 job_id,
1546 (fragment_info.clone(), fragment.stream_node.to_protobuf()),
1547 );
1548 }
1549
1550 let mut all_actor_locations = HashMap::new();
1551
1552 for (_, SharedFragmentInfo { actors, .. }) in info.iter_over_fragments() {
1553 for (actor_id, actor_info) in actors {
1554 all_actor_locations.insert(*actor_id as ActorId, actor_info.worker_id);
1555 }
1556 }
1557
1558 Ok((root_fragments_pb, all_actor_locations))
1559 }
1560
1561 pub async fn get_root_fragment(
1562 &self,
1563 job_id: JobId,
1564 ) -> MetaResult<(SharedFragmentInfo, HashMap<ActorId, WorkerId>)> {
1565 let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1566 let (root_fragment, _) = root_fragments
1567 .remove(&job_id)
1568 .context(format!("root fragment for job {} not found", job_id))?;
1569
1570 Ok((root_fragment, actors))
1571 }
1572
1573 pub async fn get_downstream_fragments(
1575 &self,
1576 job_id: JobId,
1577 ) -> MetaResult<(
1578 Vec<(
1579 stream_plan::DispatcherType,
1580 SharedFragmentInfo,
1581 PbStreamNode,
1582 )>,
1583 HashMap<ActorId, WorkerId>,
1584 )> {
1585 let (root_fragment, actor_locations) = self.get_root_fragment(job_id).await?;
1586
1587 let inner = self.inner.read().await;
1588 let txn = inner.db.begin().await?;
1589 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1590 .filter(
1591 fragment_relation::Column::SourceFragmentId
1592 .eq(root_fragment.fragment_id as FragmentId),
1593 )
1594 .all(&txn)
1595 .await?;
1596
1597 let downstream_fragment_ids = downstream_fragment_relations
1598 .iter()
1599 .map(|model| model.target_fragment_id as FragmentId)
1600 .collect::<HashSet<_>>();
1601
1602 let downstream_fragment_nodes: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1603 .select_only()
1604 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1605 .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1606 .into_tuple()
1607 .all(&txn)
1608 .await?;
1609
1610 let downstream_fragment_nodes: HashMap<_, _> = downstream_fragment_nodes
1611 .into_iter()
1612 .map(|(id, node)| (id as u32, node))
1613 .collect();
1614
1615 let mut downstream_fragments = vec![];
1616
1617 let info = self.env.shared_actor_infos().read_guard();
1618
1619 let fragment_map: HashMap<_, _> = downstream_fragment_relations
1620 .iter()
1621 .map(|model| (model.target_fragment_id as u32, model.dispatcher_type))
1622 .collect();
1623
1624 for fragment_id in fragment_map.keys() {
1625 let fragment_info @ SharedFragmentInfo { actors, .. } =
1626 info.get_fragment(*fragment_id).unwrap();
1627
1628 let dispatcher_type = fragment_map[fragment_id];
1629
1630 if actors.is_empty() {
1631 bail!("No fragment found for fragment id {}", fragment_id);
1632 }
1633
1634 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1635
1636 let nodes = downstream_fragment_nodes
1637 .get(fragment_id)
1638 .context(format!(
1639 "downstream fragment node for id {} not found",
1640 fragment_id
1641 ))?
1642 .to_protobuf();
1643
1644 downstream_fragments.push((dispatch_type, fragment_info.clone(), nodes));
1645 }
1646 Ok((downstream_fragments, actor_locations))
1647 }
1648
1649 pub async fn load_source_fragment_ids(
1650 &self,
1651 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1652 let inner = self.inner.read().await;
1653 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1654 .select_only()
1655 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1656 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1657 .into_tuple()
1658 .all(&inner.db)
1659 .await?;
1660
1661 let mut source_fragment_ids = HashMap::new();
1662 for (fragment_id, stream_node) in fragments {
1663 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1664 source_fragment_ids
1665 .entry(source_id as SourceId)
1666 .or_insert_with(BTreeSet::new)
1667 .insert(fragment_id);
1668 }
1669 }
1670 Ok(source_fragment_ids)
1671 }
1672
1673 pub async fn load_backfill_fragment_ids(
1674 &self,
1675 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1676 let inner = self.inner.read().await;
1677 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1678 .select_only()
1679 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1680 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1681 .into_tuple()
1682 .all(&inner.db)
1683 .await?;
1684
1685 let mut source_fragment_ids = HashMap::new();
1686 for (fragment_id, stream_node) in fragments {
1687 if let Some((source_id, upstream_source_fragment_id)) =
1688 stream_node.to_protobuf().find_source_backfill()
1689 {
1690 source_fragment_ids
1691 .entry(source_id as SourceId)
1692 .or_insert_with(BTreeSet::new)
1693 .insert((fragment_id, upstream_source_fragment_id));
1694 }
1695 }
1696 Ok(source_fragment_ids)
1697 }
1698
1699 pub async fn get_all_upstream_sink_infos(
1700 &self,
1701 target_table: &PbTable,
1702 target_fragment_id: FragmentId,
1703 ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1704 let incoming_sinks = self
1705 .get_table_incoming_sinks(TableId::new(target_table.id))
1706 .await?;
1707
1708 let inner = self.inner.read().await;
1709 let txn = inner.db.begin().await?;
1710
1711 let sink_ids = incoming_sinks.iter().map(|s| s.id as SinkId).collect_vec();
1712 let sink_fragment_ids = get_sink_fragment_by_ids(&txn, sink_ids).await?;
1713
1714 let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1715 for pb_sink in &incoming_sinks {
1716 let sink_fragment_id =
1717 sink_fragment_ids
1718 .get(&(pb_sink.id as _))
1719 .cloned()
1720 .ok_or(anyhow::anyhow!(
1721 "sink fragment not found for sink id {}",
1722 pb_sink.id
1723 ))?;
1724 let upstream_info = build_upstream_sink_info(
1725 pb_sink,
1726 sink_fragment_id,
1727 target_table,
1728 target_fragment_id,
1729 )?;
1730 upstream_sink_infos.push(upstream_info);
1731 }
1732
1733 Ok(upstream_sink_infos)
1734 }
1735
1736 pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1737 let inner = self.inner.read().await;
1738 let txn = inner.db.begin().await?;
1739
1740 let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1741 .select_only()
1742 .column(fragment::Column::FragmentId)
1743 .filter(
1744 fragment::Column::JobId
1745 .eq(job_id)
1746 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1747 )
1748 .into_tuple()
1749 .all(&txn)
1750 .await?;
1751
1752 if mview_fragment.len() != 1 {
1753 return Err(anyhow::anyhow!(
1754 "expected exactly one mview fragment for job {}, found {}",
1755 job_id,
1756 mview_fragment.len()
1757 )
1758 .into());
1759 }
1760
1761 Ok(mview_fragment.into_iter().next().unwrap())
1762 }
1763
1764 pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1765 let inner = self.inner.read().await;
1766 let txn = inner.db.begin().await?;
1767 has_table_been_migrated(&txn, table_id).await
1768 }
1769
1770 pub async fn update_fragment_splits<C>(
1771 &self,
1772 txn: &C,
1773 fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
1774 ) -> MetaResult<()>
1775 where
1776 C: ConnectionTrait,
1777 {
1778 if fragment_splits.is_empty() {
1779 return Ok(());
1780 }
1781
1782 let models: Vec<fragment_splits::ActiveModel> = fragment_splits
1783 .iter()
1784 .map(|(fragment_id, splits)| fragment_splits::ActiveModel {
1785 fragment_id: Set(*fragment_id as _),
1786 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1787 splits: splits.iter().map(Into::into).collect_vec(),
1788 }))),
1789 })
1790 .collect();
1791
1792 FragmentSplits::insert_many(models)
1793 .on_conflict(
1794 OnConflict::column(fragment_splits::Column::FragmentId)
1795 .update_column(fragment_splits::Column::Splits)
1796 .to_owned(),
1797 )
1798 .exec(txn)
1799 .await?;
1800
1801 Ok(())
1802 }
1803}
1804
1805#[cfg(test)]
1806mod tests {
1807 use std::collections::{BTreeMap, HashMap, HashSet};
1808
1809 use itertools::Itertools;
1810 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1811 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1812 use risingwave_common::id::JobId;
1813 use risingwave_common::util::iter_util::ZipEqDebug;
1814 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1815 use risingwave_meta_model::fragment::DistributionType;
1816 use risingwave_meta_model::{
1817 ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, StreamNode,
1818 StreamingParallelism, TableId, VnodeBitmap, fragment,
1819 };
1820 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1821 use risingwave_pb::plan_common::PbExprContext;
1822 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1823 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1824 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1825
1826 use super::ActorInfo;
1827 use crate::MetaResult;
1828 use crate::controller::catalog::CatalogController;
1829 use crate::model::{Fragment, StreamActor};
1830
1831 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1832
1833 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1834
1835 const TEST_FRAGMENT_ID: FragmentId = 1;
1836
1837 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1838
1839 const TEST_JOB_ID: JobId = JobId::new(1);
1840
1841 const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
1842
1843 fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1844 let mut upstream_actor_ids = BTreeMap::new();
1845 upstream_actor_ids.insert(
1846 TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1847 HashSet::from_iter([(actor_id + 100)]),
1848 );
1849 upstream_actor_ids.insert(
1850 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1851 HashSet::from_iter([(actor_id + 200)]),
1852 );
1853 upstream_actor_ids
1854 }
1855
1856 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1857 let mut input = vec![];
1858 for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1859 input.push(PbStreamNode {
1860 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1861 upstream_fragment_id: *upstream_fragment_id as _,
1862 ..Default::default()
1863 }))),
1864 ..Default::default()
1865 });
1866 }
1867
1868 PbStreamNode {
1869 input,
1870 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1871 ..Default::default()
1872 }
1873 }
1874
1875 #[tokio::test]
1876 async fn test_extract_fragment() -> MetaResult<()> {
1877 let actor_count = 3u32;
1878 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1879 .map(|actor_id| {
1880 (
1881 actor_id as _,
1882 generate_upstream_actor_ids_for_actor(actor_id),
1883 )
1884 })
1885 .collect();
1886
1887 let actor_bitmaps = ActorMapping::new_uniform(
1888 (0..actor_count).map(|i| i as _),
1889 VirtualNode::COUNT_FOR_TEST,
1890 )
1891 .to_bitmaps();
1892
1893 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1894
1895 let pb_actors = (0..actor_count)
1896 .map(|actor_id| StreamActor {
1897 actor_id: actor_id as _,
1898 fragment_id: TEST_FRAGMENT_ID as _,
1899 vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1900 mview_definition: "".to_owned(),
1901 expr_context: Some(PbExprContext {
1902 time_zone: String::from("America/New_York"),
1903 strict_mode: false,
1904 }),
1905 })
1906 .collect_vec();
1907
1908 let pb_fragment = Fragment {
1909 fragment_id: TEST_FRAGMENT_ID as _,
1910 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1911 distribution_type: PbFragmentDistributionType::Hash as _,
1912 actors: pb_actors,
1913 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1914 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1915 nodes: stream_node,
1916 };
1917
1918 let fragment =
1919 CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
1920
1921 check_fragment(fragment, pb_fragment);
1922
1923 Ok(())
1924 }
1925
1926 #[tokio::test]
1927 async fn test_compose_fragment() -> MetaResult<()> {
1928 let actor_count = 3u32;
1929
1930 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1931 .map(|actor_id| {
1932 (
1933 actor_id as _,
1934 generate_upstream_actor_ids_for_actor(actor_id),
1935 )
1936 })
1937 .collect();
1938
1939 let mut actor_bitmaps = ActorMapping::new_uniform(
1940 (0..actor_count).map(|i| i as _),
1941 VirtualNode::COUNT_FOR_TEST,
1942 )
1943 .to_bitmaps();
1944
1945 let actors = (0..actor_count)
1946 .map(|actor_id| {
1947 let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
1948 splits: vec![PbConnectorSplit {
1949 split_type: "dummy".to_owned(),
1950 ..Default::default()
1951 }],
1952 });
1953
1954 ActorInfo {
1955 actor_id: actor_id as ActorId,
1956 fragment_id: TEST_FRAGMENT_ID,
1957 splits: actor_splits,
1958 worker_id: 0,
1959 vnode_bitmap: actor_bitmaps
1960 .remove(&actor_id)
1961 .map(|bitmap| bitmap.to_protobuf())
1962 .as_ref()
1963 .map(VnodeBitmap::from),
1964 expr_context: ExprContext::from(&PbExprContext {
1965 time_zone: String::from("America/New_York"),
1966 strict_mode: false,
1967 }),
1968 }
1969 })
1970 .collect_vec();
1971
1972 let stream_node = {
1973 let template_actor = actors.first().cloned().unwrap();
1974
1975 let template_upstream_actor_ids = upstream_actor_ids
1976 .get(&(template_actor.actor_id as _))
1977 .unwrap();
1978
1979 generate_merger_stream_node(template_upstream_actor_ids)
1980 };
1981
1982 #[expect(deprecated)]
1983 let fragment = fragment::Model {
1984 fragment_id: TEST_FRAGMENT_ID,
1985 job_id: TEST_JOB_ID,
1986 fragment_type_mask: 0,
1987 distribution_type: DistributionType::Hash,
1988 stream_node: StreamNode::from(&stream_node),
1989 state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID.as_raw_id() as _]),
1990 upstream_fragment_id: Default::default(),
1991 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1992 parallelism: None,
1993 };
1994
1995 let (pb_fragment, pb_actor_status, pb_actor_splits) =
1996 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
1997
1998 assert_eq!(pb_actor_status.len(), actor_count as usize);
1999 assert!(
2000 pb_actor_status
2001 .values()
2002 .all(|actor_status| actor_status.location.is_some())
2003 );
2004 assert_eq!(pb_actor_splits.len(), actor_count as usize);
2005
2006 let pb_actors = pb_fragment.actors.clone();
2007
2008 check_fragment(fragment, pb_fragment);
2009 check_actors(
2010 actors,
2011 &upstream_actor_ids,
2012 pb_actors,
2013 pb_actor_splits,
2014 &stream_node,
2015 );
2016
2017 Ok(())
2018 }
2019
2020 fn check_actors(
2021 actors: Vec<ActorInfo>,
2022 actor_upstreams: &FragmentActorUpstreams,
2023 pb_actors: Vec<StreamActor>,
2024 pb_actor_splits: HashMap<u32, PbConnectorSplits>,
2025 stream_node: &PbStreamNode,
2026 ) {
2027 for (
2028 ActorInfo {
2029 actor_id,
2030 fragment_id,
2031 splits,
2032 worker_id: _,
2033 vnode_bitmap,
2034 expr_context,
2035 ..
2036 },
2037 StreamActor {
2038 actor_id: pb_actor_id,
2039 fragment_id: pb_fragment_id,
2040 vnode_bitmap: pb_vnode_bitmap,
2041 mview_definition,
2042 expr_context: pb_expr_context,
2043 ..
2044 },
2045 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2046 {
2047 assert_eq!(actor_id, pb_actor_id as ActorId);
2048 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2049
2050 assert_eq!(
2051 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2052 pb_vnode_bitmap,
2053 );
2054
2055 assert_eq!(mview_definition, "");
2056
2057 visit_stream_node_body(stream_node, |body| {
2058 if let PbNodeBody::Merge(m) = body {
2059 assert!(
2060 actor_upstreams
2061 .get(&(actor_id as _))
2062 .unwrap()
2063 .contains_key(&m.upstream_fragment_id)
2064 );
2065 }
2066 });
2067
2068 assert_eq!(
2069 splits,
2070 pb_actor_splits
2071 .get(&pb_actor_id)
2072 .map(ConnectorSplits::from)
2073 .unwrap_or_default()
2074 );
2075
2076 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2077 }
2078 }
2079
2080 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2081 let Fragment {
2082 fragment_id,
2083 fragment_type_mask,
2084 distribution_type: pb_distribution_type,
2085 actors: _,
2086 state_table_ids: pb_state_table_ids,
2087 maybe_vnode_count: _,
2088 nodes,
2089 } = pb_fragment;
2090
2091 assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2092 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2093 assert_eq!(
2094 pb_distribution_type,
2095 PbFragmentDistributionType::from(fragment.distribution_type)
2096 );
2097
2098 assert_eq!(
2099 pb_state_table_ids,
2100 fragment
2101 .state_table_ids
2102 .into_u32_array()
2103 .into_iter()
2104 .map_into()
2105 .collect_vec()
2106 );
2107 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2108 }
2109
2110 #[test]
2111 fn test_parallelism_policy_with_root_fragments() {
2112 #[expect(deprecated)]
2113 let fragment = fragment::Model {
2114 fragment_id: 3,
2115 job_id: TEST_JOB_ID,
2116 fragment_type_mask: 0,
2117 distribution_type: DistributionType::Hash,
2118 stream_node: StreamNode::from(&PbStreamNode::default()),
2119 state_table_ids: I32Array::default(),
2120 upstream_fragment_id: Default::default(),
2121 vnode_count: 0,
2122 parallelism: None,
2123 };
2124
2125 let job_parallelism = StreamingParallelism::Fixed(4);
2126
2127 let policy = super::CatalogController::format_fragment_parallelism_policy(
2128 &fragment,
2129 Some(&job_parallelism),
2130 &[],
2131 );
2132
2133 assert_eq!(policy, "inherit(fixed(4))");
2134 }
2135
2136 #[test]
2137 fn test_parallelism_policy_with_upstream_roots() {
2138 #[expect(deprecated)]
2139 let fragment = fragment::Model {
2140 fragment_id: 5,
2141 job_id: TEST_JOB_ID,
2142 fragment_type_mask: 0,
2143 distribution_type: DistributionType::Hash,
2144 stream_node: StreamNode::from(&PbStreamNode::default()),
2145 state_table_ids: I32Array::default(),
2146 upstream_fragment_id: Default::default(),
2147 vnode_count: 0,
2148 parallelism: None,
2149 };
2150
2151 let policy = super::CatalogController::format_fragment_parallelism_policy(
2152 &fragment,
2153 None,
2154 &[3, 1, 2, 1],
2155 );
2156
2157 assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2158 }
2159}