1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::{Either, Itertools};
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::CdcScanOptions;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34 Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39 TableIdArray, VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits,
40 object, sink, source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46 Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::fragment::{
49 FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan;
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57 PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, SinkLogStoreType,
58 StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63 ColumnTrait, ConnectionTrait, DatabaseTransaction, EntityTrait, FromQueryResult, JoinType,
64 PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, StreamTrait, TransactionTrait,
65};
66use serde::{Deserialize, Serialize};
67
68use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
69use crate::controller::catalog::CatalogController;
70use crate::controller::scale::{
71 FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble, RenderedGraph,
72 find_fragment_no_shuffle_dags_detailed, load_fragment_context_for_jobs,
73 render_actor_assignments, resolve_streaming_job_definition,
74};
75use crate::controller::utils::{
76 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
77 get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
78 resolve_no_shuffle_actor_mapping,
79};
80use crate::error::MetaError;
81use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
82use crate::model::{
83 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
84 StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
85 TableParallelism,
86};
87use crate::rpc::ddl_controller::build_upstream_sink_info;
88use crate::stream::UpstreamSinkInfo;
89use crate::{MetaResult, model};
90
91#[derive(Debug)]
93pub struct InflightActorInfo {
94 pub worker_id: WorkerId,
95 pub vnode_bitmap: Option<Bitmap>,
96 pub splits: Vec<SplitImpl>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100struct ActorInfo {
101 pub actor_id: ActorId,
102 pub fragment_id: FragmentId,
103 pub splits: ConnectorSplits,
104 pub worker_id: WorkerId,
105 pub vnode_bitmap: Option<VnodeBitmap>,
106 pub expr_context: ExprContext,
107 pub config_override: Arc<str>,
108}
109
110#[derive(Debug)]
111struct FragmentDescRow {
112 fragment_id: FragmentId,
113 job_id: JobId,
114 fragment_type_mask: i32,
115 distribution_type: DistributionType,
116 state_table_ids: TableIdArray,
117 vnode_count: i32,
118 parallelism_override: Option<StreamingParallelism>,
119 stream_node: Option<StreamNode>,
120}
121
122#[derive(Debug)]
123pub struct InflightFragmentInfo {
124 pub fragment_id: FragmentId,
125 pub distribution_type: DistributionType,
126 pub fragment_type_mask: FragmentTypeMask,
127 pub vnode_count: usize,
128 pub nodes: PbStreamNode,
129 pub actors: HashMap<ActorId, InflightActorInfo>,
130 pub state_table_ids: HashSet<TableId>,
131}
132
133#[derive(Clone, Debug)]
134pub struct FragmentParallelismInfo {
135 pub distribution_type: FragmentDistributionType,
136 pub vnode_count: usize,
137}
138
139#[easy_ext::ext(FragmentTypeMaskExt)]
140pub impl FragmentTypeMask {
141 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
142 Expr::col(fragment::Column::FragmentTypeMask)
143 .bit_and(Expr::value(flag as i32))
144 .ne(0)
145 }
146
147 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
148 Expr::col(fragment::Column::FragmentTypeMask)
149 .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
150 .ne(0)
151 }
152
153 fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
154 Expr::col(fragment::Column::FragmentTypeMask)
155 .bit_and(Expr::value(flag as i32))
156 .eq(0)
157 }
158}
159
160#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
161#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
163 pub job_id: JobId,
164 pub obj_type: ObjectType,
165 pub name: String,
166 pub job_status: JobStatus,
167 pub parallelism: StreamingParallelism,
168 pub max_parallelism: i32,
169 pub resource_group: String,
170 pub config_override: String,
171 pub database_id: DatabaseId,
172 pub schema_id: SchemaId,
173}
174
175impl NotificationManager {
176 pub(crate) fn notify_streaming_fragment_mapping(
182 &self,
183 operation: NotificationOperation,
184 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
185 ) {
186 if fragment_mappings.is_empty() {
187 return;
188 }
189 for fragment_mapping in fragment_mappings {
191 self.notify_frontend_without_version(
192 operation,
193 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
194 );
195 }
196 }
197
198 pub(crate) fn notify_serving_fragment_mapping_update(
203 &self,
204 fragment_ids: Vec<crate::model::FragmentId>,
205 ) {
206 if fragment_ids.is_empty() {
207 return;
208 }
209 self.notify_local_subscribers(LocalNotification::ServingFragmentMappingsUpsert(
210 fragment_ids,
211 ));
212 }
213
214 pub(crate) fn notify_serving_fragment_mapping_delete(
219 &self,
220 fragment_ids: Vec<crate::model::FragmentId>,
221 ) {
222 if fragment_ids.is_empty() {
223 return;
224 }
225 self.notify_local_subscribers(LocalNotification::ServingFragmentMappingsDelete(
226 fragment_ids,
227 ));
228 }
229}
230
231impl CatalogController {
232 pub fn prepare_fragment_models_from_fragments(
233 job_id: JobId,
234 fragments: impl Iterator<Item = &Fragment>,
235 ) -> MetaResult<Vec<fragment::Model>> {
236 fragments
237 .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
238 .try_collect()
239 }
240
241 pub fn prepare_fragment_model_for_new_job(
242 job_id: JobId,
243 fragment: &Fragment,
244 ) -> MetaResult<fragment::Model> {
245 let vnode_count = fragment.vnode_count();
246 let Fragment {
247 fragment_id: pb_fragment_id,
248 fragment_type_mask: pb_fragment_type_mask,
249 distribution_type: pb_distribution_type,
250 state_table_ids: pb_state_table_ids,
251 nodes,
252 ..
253 } = fragment;
254
255 let state_table_ids = pb_state_table_ids.clone().into();
256
257 let fragment_parallelism = nodes
258 .node_body
259 .as_ref()
260 .and_then(|body| match body {
261 NodeBody::StreamCdcScan(node) => Some(node),
262 _ => None,
263 })
264 .and_then(|node| node.options.as_ref())
265 .map(CdcScanOptions::from_proto)
266 .filter(|opts| opts.is_parallelized_backfill())
267 .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
268
269 let stream_node = StreamNode::from(nodes);
270
271 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
272 .unwrap()
273 .into();
274
275 #[expect(deprecated)]
276 let fragment = fragment::Model {
277 fragment_id: *pb_fragment_id as _,
278 job_id,
279 fragment_type_mask: (*pb_fragment_type_mask).into(),
280 distribution_type,
281 stream_node,
282 state_table_ids,
283 upstream_fragment_id: Default::default(),
284 vnode_count: vnode_count as _,
285 parallelism: fragment_parallelism,
286 };
287
288 Ok(fragment)
289 }
290
291 #[expect(clippy::type_complexity)]
292 fn compose_table_fragments(
293 job_id: JobId,
294 state: PbState,
295 ctx: StreamContext,
296 fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
297 parallelism: StreamingParallelism,
298 max_parallelism: usize,
299 job_definition: Option<String>,
300 ) -> MetaResult<(
301 StreamJobFragments,
302 HashMap<FragmentId, Vec<StreamActor>>,
303 HashMap<crate::model::ActorId, PbActorStatus>,
304 )> {
305 let mut pb_fragments = BTreeMap::new();
306 let mut fragment_actors = HashMap::new();
307 let mut all_actor_status = HashMap::new();
308
309 for (fragment, actors) in fragments {
310 let (fragment, actors, actor_status, _) =
311 Self::compose_fragment(fragment, actors, job_definition.clone())?;
312 let fragment_id = fragment.fragment_id;
313 fragment_actors.insert(fragment_id, actors);
314 all_actor_status.extend(actor_status);
315
316 pb_fragments.insert(fragment_id, fragment);
317 }
318
319 let table_fragments = StreamJobFragments {
320 stream_job_id: job_id,
321 state: state as _,
322 fragments: pb_fragments,
323 ctx,
324 assigned_parallelism: match parallelism {
325 StreamingParallelism::Custom => TableParallelism::Custom,
326 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
327 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
328 },
329 max_parallelism,
330 };
331
332 Ok((table_fragments, fragment_actors, all_actor_status))
333 }
334
335 #[expect(clippy::type_complexity)]
336 fn compose_fragment(
337 fragment: fragment::Model,
338 actors: Vec<ActorInfo>,
339 job_definition: Option<String>,
340 ) -> MetaResult<(
341 Fragment,
342 Vec<StreamActor>,
343 HashMap<crate::model::ActorId, PbActorStatus>,
344 HashMap<crate::model::ActorId, PbConnectorSplits>,
345 )> {
346 let fragment::Model {
347 fragment_id,
348 fragment_type_mask,
349 distribution_type,
350 stream_node,
351 state_table_ids,
352 vnode_count,
353 ..
354 } = fragment;
355
356 let stream_node = stream_node.to_protobuf();
357 let mut upstream_fragments = HashSet::new();
358 visit_stream_node_body(&stream_node, |body| {
359 if let NodeBody::Merge(m) = body {
360 assert!(
361 upstream_fragments.insert(m.upstream_fragment_id),
362 "non-duplicate upstream fragment"
363 );
364 }
365 });
366
367 let mut pb_actors = vec![];
368
369 let mut pb_actor_status = HashMap::new();
370 let mut pb_actor_splits = HashMap::new();
371
372 for actor in actors {
373 if actor.fragment_id != fragment_id {
374 bail!(
375 "fragment id {} from actor {} is different from fragment {}",
376 actor.fragment_id,
377 actor.actor_id,
378 fragment_id
379 )
380 }
381
382 let ActorInfo {
383 actor_id,
384 fragment_id,
385 worker_id,
386 splits,
387 vnode_bitmap,
388 expr_context,
389 config_override,
390 ..
391 } = actor;
392
393 let vnode_bitmap =
394 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
395 let pb_expr_context = Some(expr_context.to_protobuf());
396
397 pb_actor_status.insert(
398 actor_id as _,
399 PbActorStatus {
400 location: PbActorLocation::from_worker(worker_id),
401 },
402 );
403
404 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
405
406 pb_actors.push(StreamActor {
407 actor_id: actor_id as _,
408 fragment_id: fragment_id as _,
409 vnode_bitmap,
410 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
411 expr_context: pb_expr_context,
412 config_override,
413 })
414 }
415
416 let pb_state_table_ids = state_table_ids.0;
417 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
418 let pb_fragment = Fragment {
419 fragment_id: fragment_id as _,
420 fragment_type_mask: fragment_type_mask.into(),
421 distribution_type: pb_distribution_type,
422 state_table_ids: pb_state_table_ids,
423 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
424 nodes: stream_node,
425 };
426
427 Ok((pb_fragment, pb_actors, pb_actor_status, pb_actor_splits))
428 }
429
430 pub async fn fragment_parallelisms(
437 &self,
438 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
439 let inner = self.inner.read().await;
440 let query = FragmentModel::find().select_only().columns([
441 fragment::Column::FragmentId,
442 fragment::Column::DistributionType,
443 fragment::Column::VnodeCount,
444 ]);
445 let fragments: Vec<(FragmentId, DistributionType, i32)> =
446 query.into_tuple().all(&inner.db).await?;
447
448 Ok(fragments
449 .into_iter()
450 .map(|(fragment_id, distribution_type, vnode_count)| {
451 (
452 fragment_id,
453 FragmentParallelismInfo {
454 distribution_type: PbFragmentDistributionType::from(distribution_type),
455 vnode_count: vnode_count as usize,
456 },
457 )
458 })
459 .collect())
460 }
461
462 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
463 let inner = self.inner.read().await;
464 let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
465 .select_only()
466 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
467 .into_tuple()
468 .all(&inner.db)
469 .await?;
470 Ok(fragment_jobs.into_iter().collect())
471 }
472
473 pub async fn get_fragment_job_id(
474 &self,
475 fragment_ids: Vec<FragmentId>,
476 ) -> MetaResult<Vec<ObjectId>> {
477 let inner = self.inner.read().await;
478 self.get_fragment_job_id_in_txn(&inner.db, fragment_ids)
479 .await
480 }
481
482 pub async fn get_fragment_job_id_in_txn<C>(
483 &self,
484 txn: &C,
485 fragment_ids: Vec<FragmentId>,
486 ) -> MetaResult<Vec<ObjectId>>
487 where
488 C: ConnectionTrait + Send,
489 {
490 let object_ids: Vec<ObjectId> = FragmentModel::find()
491 .select_only()
492 .column(fragment::Column::JobId)
493 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
494 .into_tuple()
495 .all(txn)
496 .await?;
497
498 Ok(object_ids)
499 }
500
501 pub async fn get_fragment_desc_by_id(
502 &self,
503 fragment_id: FragmentId,
504 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
505 let inner = self.inner.read().await;
506
507 let fragment_model = match FragmentModel::find_by_id(fragment_id)
508 .one(&inner.db)
509 .await?
510 {
511 Some(fragment) => fragment,
512 None => return Ok(None),
513 };
514
515 let job_parallelism: Option<StreamingParallelism> =
516 StreamingJob::find_by_id(fragment_model.job_id)
517 .select_only()
518 .column(streaming_job::Column::Parallelism)
519 .into_tuple()
520 .one(&inner.db)
521 .await?;
522
523 let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
524 .select_only()
525 .columns([
526 fragment_relation::Column::SourceFragmentId,
527 fragment_relation::Column::DispatcherType,
528 ])
529 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
530 .into_tuple()
531 .all(&inner.db)
532 .await?;
533
534 let upstreams: Vec<_> = upstream_entries
535 .into_iter()
536 .map(|(source_id, _)| source_id)
537 .collect();
538
539 let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
540 .await
541 .map(Self::collect_root_fragment_mapping)?;
542 let root_fragments = root_fragment_map
543 .get(&fragment_id)
544 .cloned()
545 .unwrap_or_default();
546
547 let info = self.env.shared_actor_infos().read_guard();
548 let SharedFragmentInfo { actors, .. } = info
549 .get_fragment(fragment_model.fragment_id as _)
550 .unwrap_or_else(|| {
551 panic!(
552 "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
553 fragment_model.fragment_id,
554 fragment_model.job_id
555 )
556 });
557
558 let parallelism_policy = Self::format_fragment_parallelism_policy(
559 fragment_model.distribution_type,
560 fragment_model.parallelism.as_ref(),
561 job_parallelism.as_ref(),
562 &root_fragments,
563 );
564
565 let fragment = FragmentDesc {
566 fragment_id: fragment_model.fragment_id,
567 job_id: fragment_model.job_id,
568 fragment_type_mask: fragment_model.fragment_type_mask,
569 distribution_type: fragment_model.distribution_type,
570 state_table_ids: fragment_model.state_table_ids.clone(),
571 parallelism: actors.len() as _,
572 vnode_count: fragment_model.vnode_count,
573 stream_node: fragment_model.stream_node.clone(),
574 parallelism_policy,
575 };
576
577 Ok(Some((fragment, upstreams)))
578 }
579
580 pub async fn list_fragment_database_ids(
581 &self,
582 select_fragment_ids: Option<Vec<FragmentId>>,
583 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
584 let inner = self.inner.read().await;
585 let select = FragmentModel::find()
586 .select_only()
587 .column(fragment::Column::FragmentId)
588 .column(object::Column::DatabaseId)
589 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
590 let select = if let Some(select_fragment_ids) = select_fragment_ids {
591 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
592 } else {
593 select
594 };
595 Ok(select.into_tuple().all(&inner.db).await?)
596 }
597
598 pub async fn get_job_fragments_by_id(
599 &self,
600 job_id: JobId,
601 ) -> MetaResult<(
602 StreamJobFragments,
603 HashMap<FragmentId, Vec<StreamActor>>,
604 HashMap<ActorId, PbActorStatus>,
605 )> {
606 let inner = self.inner.read().await;
607
608 let fragments: Vec<_> = FragmentModel::find()
610 .filter(fragment::Column::JobId.eq(job_id))
611 .all(&inner.db)
612 .await?;
613
614 let job_info = StreamingJob::find_by_id(job_id)
615 .one(&inner.db)
616 .await?
617 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
618
619 let fragment_actors =
620 self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
621
622 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
623 .await?
624 .remove(&job_id);
625
626 Self::compose_table_fragments(
627 job_id,
628 job_info.job_status.into(),
629 job_info.stream_context(),
630 fragment_actors,
631 job_info.parallelism,
632 job_info.max_parallelism as _,
633 job_definition,
634 )
635 }
636
637 pub async fn get_fragment_actor_dispatchers(
638 &self,
639 fragment_ids: Vec<FragmentId>,
640 ) -> MetaResult<FragmentActorDispatchers> {
641 let inner = self.inner.read().await;
642
643 self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
644 .await
645 }
646
647 pub async fn get_fragment_actor_dispatchers_txn(
648 &self,
649 c: &impl ConnectionTrait,
650 fragment_ids: Vec<FragmentId>,
651 ) -> MetaResult<FragmentActorDispatchers> {
652 let fragment_relations = FragmentRelation::find()
653 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
654 .all(c)
655 .await?;
656
657 type FragmentActorInfo = (
658 DistributionType,
659 Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
660 );
661
662 let shared_info = self.env.shared_actor_infos();
663 let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
664 let get_fragment_actors = |fragment_id: FragmentId| async move {
665 let result: MetaResult<FragmentActorInfo> = try {
666 let read_guard = shared_info.read_guard();
667
668 let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
669
670 (
671 fragment.distribution_type,
672 Arc::new(
673 fragment
674 .actors
675 .iter()
676 .map(|(actor_id, actor_info)| {
677 (
678 *actor_id,
679 actor_info
680 .vnode_bitmap
681 .as_ref()
682 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
683 )
684 })
685 .collect(),
686 ),
687 )
688 };
689 result
690 };
691
692 let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
693 for fragment_relation::Model {
694 source_fragment_id,
695 target_fragment_id,
696 dispatcher_type,
697 dist_key_indices,
698 output_indices,
699 output_type_mapping,
700 } in fragment_relations
701 {
702 let (source_fragment_distribution, source_fragment_actors) = {
703 let (distribution, actors) = {
704 match fragment_actor_cache.entry(source_fragment_id) {
705 Entry::Occupied(entry) => entry.into_mut(),
706 Entry::Vacant(entry) => {
707 entry.insert(get_fragment_actors(source_fragment_id).await?)
708 }
709 }
710 };
711 (*distribution, actors.clone())
712 };
713 let (target_fragment_distribution, target_fragment_actors) = {
714 let (distribution, actors) = {
715 match fragment_actor_cache.entry(target_fragment_id) {
716 Entry::Occupied(entry) => entry.into_mut(),
717 Entry::Vacant(entry) => {
718 entry.insert(get_fragment_actors(target_fragment_id).await?)
719 }
720 }
721 };
722 (*distribution, actors.clone())
723 };
724 let output_mapping = PbDispatchOutputMapping {
725 indices: output_indices.into_u32_array(),
726 types: output_type_mapping.unwrap_or_default().to_protobuf(),
727 };
728 let (dispatchers, _) = compose_dispatchers(
729 source_fragment_distribution,
730 &source_fragment_actors,
731 target_fragment_id as _,
732 target_fragment_distribution,
733 &target_fragment_actors,
734 dispatcher_type,
735 dist_key_indices.into_u32_array(),
736 output_mapping,
737 );
738 let actor_dispatchers_map = actor_dispatchers_map
739 .entry(source_fragment_id as _)
740 .or_default();
741 for (actor_id, dispatchers) in dispatchers {
742 actor_dispatchers_map
743 .entry(actor_id as _)
744 .or_default()
745 .push(dispatchers);
746 }
747 }
748 Ok(actor_dispatchers_map)
749 }
750
751 pub async fn get_fragment_downstream_relations(
752 &self,
753 fragment_ids: Vec<FragmentId>,
754 ) -> MetaResult<FragmentDownstreamRelation> {
755 let inner = self.inner.read().await;
756 self.get_fragment_downstream_relations_in_txn(&inner.db, fragment_ids)
757 .await
758 }
759
760 pub async fn get_fragment_downstream_relations_in_txn<C>(
761 &self,
762 txn: &C,
763 fragment_ids: Vec<FragmentId>,
764 ) -> MetaResult<FragmentDownstreamRelation>
765 where
766 C: ConnectionTrait + StreamTrait + Send,
767 {
768 let mut stream = FragmentRelation::find()
769 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
770 .stream(txn)
771 .await?;
772 let mut relations = FragmentDownstreamRelation::new();
773 while let Some(relation) = stream.try_next().await? {
774 relations
775 .entry(relation.source_fragment_id as _)
776 .or_default()
777 .push(DownstreamFragmentRelation {
778 downstream_fragment_id: relation.target_fragment_id as _,
779 dispatcher_type: relation.dispatcher_type,
780 dist_key_indices: relation.dist_key_indices.into_u32_array(),
781 output_mapping: PbDispatchOutputMapping {
782 indices: relation.output_indices.into_u32_array(),
783 types: relation
784 .output_type_mapping
785 .unwrap_or_default()
786 .to_protobuf(),
787 },
788 });
789 }
790 Ok(relations)
791 }
792
793 pub async fn get_job_fragment_backfill_scan_type(
794 &self,
795 job_id: JobId,
796 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
797 let inner = self.inner.read().await;
798 self.get_job_fragment_backfill_scan_type_in_txn(&inner.db, job_id)
799 .await
800 }
801
802 pub async fn get_job_fragment_backfill_scan_type_in_txn<C>(
803 &self,
804 txn: &C,
805 job_id: JobId,
806 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>>
807 where
808 C: ConnectionTrait + Send,
809 {
810 let fragments: Vec<_> = FragmentModel::find()
811 .filter(fragment::Column::JobId.eq(job_id))
812 .all(txn)
813 .await?;
814
815 let mut result = HashMap::new();
816
817 for fragment::Model {
818 fragment_id,
819 stream_node,
820 ..
821 } in fragments
822 {
823 let stream_node = stream_node.to_protobuf();
824 visit_stream_node_body(&stream_node, |body| {
825 if let NodeBody::StreamScan(node) = body {
826 match node.stream_scan_type() {
827 StreamScanType::Unspecified => {}
828 scan_type => {
829 result.insert(fragment_id as crate::model::FragmentId, scan_type);
830 }
831 }
832 }
833 });
834 }
835
836 Ok(result)
837 }
838
839 pub async fn count_streaming_jobs(&self) -> MetaResult<usize> {
840 let inner = self.inner.read().await;
841 let count = StreamingJob::find().count(&inner.db).await?;
842 Ok(usize::try_from(count).context("streaming job count overflow")?)
843 }
844
845 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
846 let inner = self.inner.read().await;
847 let job_states = StreamingJob::find()
848 .select_only()
849 .column(streaming_job::Column::JobId)
850 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
851 .join(JoinType::InnerJoin, object::Relation::Database2.def())
852 .column(object::Column::ObjType)
853 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
854 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
855 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
856 .column_as(
857 Expr::if_null(
858 Expr::col((table::Entity, table::Column::Name)),
859 Expr::if_null(
860 Expr::col((source::Entity, source::Column::Name)),
861 Expr::if_null(
862 Expr::col((sink::Entity, sink::Column::Name)),
863 Expr::val("<unknown>"),
864 ),
865 ),
866 ),
867 "name",
868 )
869 .columns([
870 streaming_job::Column::JobStatus,
871 streaming_job::Column::Parallelism,
872 streaming_job::Column::MaxParallelism,
873 ])
874 .column_as(
875 Expr::if_null(
876 Expr::col((
877 streaming_job::Entity,
878 streaming_job::Column::SpecificResourceGroup,
879 )),
880 Expr::col((database::Entity, database::Column::ResourceGroup)),
881 ),
882 "resource_group",
883 )
884 .column_as(
885 Expr::if_null(
886 Expr::col((streaming_job::Entity, streaming_job::Column::ConfigOverride)),
887 Expr::val(""),
888 ),
889 "config_override",
890 )
891 .column(object::Column::DatabaseId)
892 .column(object::Column::SchemaId)
893 .into_model()
894 .all(&inner.db)
895 .await?;
896 Ok(job_states)
897 }
898
899 pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
900 let inner = self.inner.read().await;
901 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
902 .select_only()
903 .column(streaming_job::Column::MaxParallelism)
904 .into_tuple()
905 .one(&inner.db)
906 .await?
907 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
908 Ok(max_parallelism as usize)
909 }
910
911 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
913 if let Ok(inner) = self.inner.try_read()
914 && let Ok(job_state_tables) = FragmentModel::find()
915 .select_only()
916 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
917 .into_tuple::<(JobId, I32Array)>()
918 .all(&inner.db)
919 .await
920 {
921 let mut job_internal_table_ids = HashMap::new();
922 for (job_id, state_table_ids) in job_state_tables {
923 job_internal_table_ids
924 .entry(job_id)
925 .or_insert_with(Vec::new)
926 .extend(
927 state_table_ids
928 .into_inner()
929 .into_iter()
930 .map(|table_id| TableId::new(table_id as _)),
931 );
932 }
933 return Some(job_internal_table_ids.into_iter().collect());
934 }
935 None
936 }
937
938 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
939 let inner = self.inner.read().await;
940 let count = FragmentModel::find().count(&inner.db).await?;
941 Ok(count > 0)
942 }
943
944 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
945 let read_guard = self.env.shared_actor_infos().read_guard();
946 let actor_cnt: HashMap<WorkerId, _> = read_guard
947 .iter_over_fragments()
948 .flat_map(|(_, fragment)| {
949 fragment
950 .actors
951 .iter()
952 .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
953 })
954 .into_group_map()
955 .into_iter()
956 .map(|(k, v)| (k, v.len()))
957 .collect();
958
959 Ok(actor_cnt)
960 }
961
962 fn collect_fragment_actor_map(
963 &self,
964 fragment_ids: &[FragmentId],
965 stream_context: StreamContext,
966 ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
967 let guard = self.env.shared_actor_infos().read_guard();
968 let pb_expr_context = stream_context.to_expr_context();
969 let expr_context: ExprContext = (&pb_expr_context).into();
970
971 let mut actor_map = HashMap::with_capacity(fragment_ids.len());
972 for fragment_id in fragment_ids {
973 let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
974 anyhow!("fragment {} not found in shared actor info", fragment_id)
975 })?;
976
977 let actors = fragment_info
978 .actors
979 .iter()
980 .map(|(actor_id, actor_info)| ActorInfo {
981 actor_id: *actor_id as _,
982 fragment_id: *fragment_id,
983 splits: ConnectorSplits::from(&PbConnectorSplits {
984 splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
985 }),
986 worker_id: actor_info.worker_id as _,
987 vnode_bitmap: actor_info
988 .vnode_bitmap
989 .as_ref()
990 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
991 expr_context: expr_context.clone(),
992 config_override: stream_context.config_override.clone(),
993 })
994 .collect();
995
996 actor_map.insert(*fragment_id, actors);
997 }
998
999 Ok(actor_map)
1000 }
1001
1002 fn collect_fragment_actor_pairs(
1003 &self,
1004 fragments: Vec<fragment::Model>,
1005 stream_context: StreamContext,
1006 ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
1007 let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
1008 let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
1009 fragments
1010 .into_iter()
1011 .map(|fragment| {
1012 let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
1013 anyhow!(
1014 "fragment {} missing in shared actor info map",
1015 fragment.fragment_id
1016 )
1017 })?;
1018 Ok((fragment, actors))
1019 })
1020 .collect()
1021 }
1022
1023 pub async fn table_fragments(
1025 &self,
1026 ) -> MetaResult<
1027 BTreeMap<
1028 JobId,
1029 (
1030 StreamJobFragments,
1031 HashMap<FragmentId, Vec<StreamActor>>,
1032 HashMap<crate::model::ActorId, PbActorStatus>,
1033 ),
1034 >,
1035 > {
1036 let inner = self.inner.read().await;
1037 let jobs = StreamingJob::find().all(&inner.db).await?;
1038
1039 let mut job_definition = resolve_streaming_job_definition(
1040 &inner.db,
1041 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
1042 )
1043 .await?;
1044
1045 let mut table_fragments = BTreeMap::new();
1046 for job in jobs {
1047 let fragments = FragmentModel::find()
1048 .filter(fragment::Column::JobId.eq(job.job_id))
1049 .all(&inner.db)
1050 .await?;
1051
1052 let fragment_actors =
1053 self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
1054
1055 table_fragments.insert(
1056 job.job_id,
1057 Self::compose_table_fragments(
1058 job.job_id,
1059 job.job_status.into(),
1060 job.stream_context(),
1061 fragment_actors,
1062 job.parallelism,
1063 job.max_parallelism as _,
1064 job_definition.remove(&job.job_id),
1065 )?,
1066 );
1067 }
1068
1069 Ok(table_fragments)
1070 }
1071
1072 pub async fn upstream_fragments(
1073 &self,
1074 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1075 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
1076 let inner = self.inner.read().await;
1077 self.upstream_fragments_in_txn(&inner.db, fragment_ids)
1078 .await
1079 }
1080
1081 pub async fn upstream_fragments_in_txn<C>(
1082 &self,
1083 txn: &C,
1084 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1085 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>>
1086 where
1087 C: ConnectionTrait + StreamTrait + Send,
1088 {
1089 let mut stream = FragmentRelation::find()
1090 .select_only()
1091 .columns([
1092 fragment_relation::Column::SourceFragmentId,
1093 fragment_relation::Column::TargetFragmentId,
1094 ])
1095 .filter(
1096 fragment_relation::Column::TargetFragmentId
1097 .is_in(fragment_ids.map(|id| id as FragmentId)),
1098 )
1099 .into_tuple::<(FragmentId, FragmentId)>()
1100 .stream(txn)
1101 .await?;
1102 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
1103 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
1104 upstream_fragments
1105 .entry(downstream_fragment_id as crate::model::FragmentId)
1106 .or_default()
1107 .insert(upstream_fragment_id as crate::model::FragmentId);
1108 }
1109 Ok(upstream_fragments)
1110 }
1111
1112 pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
1113 let info = self.env.shared_actor_infos().read_guard();
1114
1115 let actor_locations = info
1116 .iter_over_fragments()
1117 .flat_map(|(fragment_id, fragment)| {
1118 fragment
1119 .actors
1120 .iter()
1121 .map(|(actor_id, actor)| PartialActorLocation {
1122 actor_id: *actor_id as _,
1123 fragment_id: *fragment_id as _,
1124 worker_id: actor.worker_id,
1125 })
1126 })
1127 .collect_vec();
1128
1129 Ok(actor_locations)
1130 }
1131
1132 pub async fn list_actor_info(
1133 &self,
1134 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1135 let inner = self.inner.read().await;
1136
1137 let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1138 FragmentModel::find()
1139 .select_only()
1140 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1141 .column(fragment::Column::FragmentId)
1142 .column_as(object::Column::Oid, "job_id")
1143 .column_as(object::Column::SchemaId, "schema_id")
1144 .column_as(object::Column::ObjType, "type")
1145 .into_tuple()
1146 .all(&inner.db)
1147 .await?;
1148
1149 let actor_infos = {
1150 let info = self.env.shared_actor_infos().read_guard();
1151
1152 let mut result = Vec::new();
1153
1154 for (fragment_id, object_id, schema_id, object_type) in fragment_objects {
1155 let Some(fragment) = info.get_fragment(fragment_id as _) else {
1156 return Err(MetaError::unavailable(format!(
1157 "shared actor info missing for fragment {fragment_id} while listing actors"
1158 )));
1159 };
1160
1161 for actor_id in fragment.actors.keys() {
1162 result.push((
1163 *actor_id as _,
1164 fragment.fragment_id as _,
1165 object_id,
1166 schema_id,
1167 object_type,
1168 ));
1169 }
1170 }
1171
1172 result
1173 };
1174
1175 Ok(actor_infos)
1176 }
1177
1178 pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1179 let guard = self.env.shared_actor_info.read_guard();
1180 guard
1181 .iter_over_fragments()
1182 .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1183 .collect_vec()
1184 }
1185
1186 pub async fn list_fragment_descs_with_node(
1187 &self,
1188 is_creating: bool,
1189 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1190 let inner = self.inner.read().await;
1191 let txn = inner.db.begin().await?;
1192
1193 let fragments_query = Self::build_fragment_query(is_creating);
1194 let fragments: Vec<fragment::Model> = fragments_query.all(&txn).await?;
1195
1196 let rows = fragments
1197 .into_iter()
1198 .map(|fragment| FragmentDescRow {
1199 fragment_id: fragment.fragment_id,
1200 job_id: fragment.job_id,
1201 fragment_type_mask: fragment.fragment_type_mask,
1202 distribution_type: fragment.distribution_type,
1203 state_table_ids: fragment.state_table_ids,
1204 vnode_count: fragment.vnode_count,
1205 parallelism_override: fragment.parallelism,
1206 stream_node: Some(fragment.stream_node),
1207 })
1208 .collect_vec();
1209
1210 self.build_fragment_distributions(&txn, rows).await
1211 }
1212
1213 pub async fn list_fragment_descs_without_node(
1214 &self,
1215 is_creating: bool,
1216 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1217 let inner = self.inner.read().await;
1218 let txn = inner.db.begin().await?;
1219
1220 let fragments_query = Self::build_fragment_query(is_creating);
1221 #[allow(clippy::type_complexity)]
1222 let fragments: Vec<(
1223 FragmentId,
1224 JobId,
1225 i32,
1226 DistributionType,
1227 TableIdArray,
1228 i32,
1229 Option<StreamingParallelism>,
1230 )> = fragments_query
1231 .select_only()
1232 .columns([
1233 fragment::Column::FragmentId,
1234 fragment::Column::JobId,
1235 fragment::Column::FragmentTypeMask,
1236 fragment::Column::DistributionType,
1237 fragment::Column::StateTableIds,
1238 fragment::Column::VnodeCount,
1239 fragment::Column::Parallelism,
1240 ])
1241 .into_tuple()
1242 .all(&txn)
1243 .await?;
1244
1245 let rows = fragments
1246 .into_iter()
1247 .map(
1248 |(
1249 fragment_id,
1250 job_id,
1251 fragment_type_mask,
1252 distribution_type,
1253 state_table_ids,
1254 vnode_count,
1255 parallelism_override,
1256 )| FragmentDescRow {
1257 fragment_id,
1258 job_id,
1259 fragment_type_mask,
1260 distribution_type,
1261 state_table_ids,
1262 vnode_count,
1263 parallelism_override,
1264 stream_node: None,
1265 },
1266 )
1267 .collect_vec();
1268
1269 self.build_fragment_distributions(&txn, rows).await
1270 }
1271
1272 fn build_fragment_query(is_creating: bool) -> sea_orm::Select<fragment::Entity> {
1273 if is_creating {
1274 FragmentModel::find()
1275 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1276 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1277 .filter(
1278 streaming_job::Column::JobStatus
1279 .eq(JobStatus::Initial)
1280 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1281 )
1282 } else {
1283 FragmentModel::find()
1284 }
1285 }
1286
1287 async fn build_fragment_distributions(
1288 &self,
1289 txn: &DatabaseTransaction,
1290 rows: Vec<FragmentDescRow>,
1291 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1292 let fragment_ids = rows.iter().map(|row| row.fragment_id).collect_vec();
1293 let job_ids = rows.iter().map(|row| row.job_id).unique().collect_vec();
1294
1295 let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragment_ids.is_empty() {
1296 HashMap::new()
1297 } else {
1298 StreamingJob::find()
1299 .select_only()
1300 .columns([
1301 streaming_job::Column::JobId,
1302 streaming_job::Column::Parallelism,
1303 ])
1304 .filter(streaming_job::Column::JobId.is_in(job_ids))
1305 .into_tuple()
1306 .all(txn)
1307 .await?
1308 .into_iter()
1309 .collect()
1310 };
1311
1312 let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1313 if fragment_ids.is_empty() {
1314 Vec::new()
1315 } else {
1316 FragmentRelation::find()
1317 .select_only()
1318 .columns([
1319 fragment_relation::Column::TargetFragmentId,
1320 fragment_relation::Column::SourceFragmentId,
1321 fragment_relation::Column::DispatcherType,
1322 ])
1323 .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1324 .into_tuple()
1325 .all(txn)
1326 .await?
1327 };
1328
1329 let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1330 for (target_id, source_id, _) in upstream_entries {
1331 all_upstreams.entry(target_id).or_default().push(source_id);
1332 }
1333
1334 let root_fragment_map = if fragment_ids.is_empty() {
1335 HashMap::new()
1336 } else {
1337 let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_ids).await?;
1338 Self::collect_root_fragment_mapping(ensembles)
1339 };
1340
1341 let guard = self.env.shared_actor_info.read_guard();
1342 let mut result = Vec::with_capacity(rows.len());
1343
1344 for row in rows {
1345 let parallelism = guard
1346 .get_fragment(row.fragment_id as _)
1347 .map(|fragment| fragment.actors.len())
1348 .unwrap_or_default();
1349
1350 let root_fragments = root_fragment_map
1351 .get(&row.fragment_id)
1352 .cloned()
1353 .unwrap_or_default();
1354
1355 let upstreams = all_upstreams.remove(&row.fragment_id).unwrap_or_default();
1356
1357 let parallelism_policy = Self::format_fragment_parallelism_policy(
1358 row.distribution_type,
1359 row.parallelism_override.as_ref(),
1360 job_parallelisms.get(&row.job_id),
1361 &root_fragments,
1362 );
1363
1364 let fragment = FragmentDistribution {
1365 fragment_id: row.fragment_id,
1366 table_id: row.job_id,
1367 distribution_type: PbFragmentDistributionType::from(row.distribution_type) as _,
1368 state_table_ids: row.state_table_ids.0,
1369 upstream_fragment_ids: upstreams.clone(),
1370 fragment_type_mask: row.fragment_type_mask as _,
1371 parallelism: parallelism as _,
1372 vnode_count: row.vnode_count as _,
1373 node: row.stream_node.map(|node| node.to_protobuf()),
1374 parallelism_policy,
1375 };
1376
1377 result.push((fragment, upstreams));
1378 }
1379
1380 Ok(result)
1381 }
1382
1383 pub async fn list_sink_log_store_tables(&self) -> MetaResult<Vec<(SinkId, TableId)>> {
1384 let inner = self.inner.read().await;
1385 let txn = inner.db.begin().await?;
1386
1387 let fragments: Vec<(JobId, StreamNode)> = FragmentModel::find()
1388 .select_only()
1389 .columns([fragment::Column::JobId, fragment::Column::StreamNode])
1390 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Sink))
1391 .into_tuple()
1392 .all(&txn)
1393 .await?;
1394
1395 let mut mapping: HashMap<SinkId, TableId> = HashMap::new();
1396
1397 for (job_id, stream_node) in fragments {
1398 let sink_id = SinkId::new(job_id.as_raw_id());
1399 let stream_node = stream_node.to_protobuf();
1400 let mut internal_table_id: Option<TableId> = None;
1401
1402 visit_stream_node_body(&stream_node, |body| {
1403 if let NodeBody::Sink(node) = body
1404 && node.log_store_type == SinkLogStoreType::KvLogStore as i32
1405 && let Some(table) = &node.table
1406 {
1407 internal_table_id = Some(TableId::new(table.id.as_raw_id()));
1408 }
1409 });
1410
1411 if let Some(table_id) = internal_table_id
1412 && let Some(existing) = mapping.insert(sink_id, table_id)
1413 && existing != table_id
1414 {
1415 tracing::warn!(
1416 "sink {sink_id:?} has multiple log store tables: {existing:?} vs {table_id:?}"
1417 );
1418 }
1419 }
1420
1421 Ok(mapping.into_iter().collect())
1422 }
1423
1424 fn collect_root_fragment_mapping(
1426 ensembles: Vec<NoShuffleEnsemble>,
1427 ) -> HashMap<FragmentId, Vec<FragmentId>> {
1428 let mut mapping = HashMap::new();
1429
1430 for ensemble in ensembles {
1431 let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1432 roots.sort_unstable();
1433 roots.dedup();
1434
1435 if roots.is_empty() {
1436 continue;
1437 }
1438
1439 let root_set: HashSet<_> = roots.iter().copied().collect();
1440
1441 for fragment_id in ensemble.component_fragments() {
1442 if root_set.contains(&fragment_id) {
1443 mapping.insert(fragment_id, Vec::new());
1444 } else {
1445 mapping.insert(fragment_id, roots.clone());
1446 }
1447 }
1448 }
1449
1450 mapping
1451 }
1452
1453 fn format_fragment_parallelism_policy(
1454 distribution_type: DistributionType,
1455 fragment_parallelism: Option<&StreamingParallelism>,
1456 job_parallelism: Option<&StreamingParallelism>,
1457 root_fragments: &[FragmentId],
1458 ) -> String {
1459 if distribution_type == DistributionType::Single {
1460 return "single".to_owned();
1461 }
1462
1463 if let Some(parallelism) = fragment_parallelism {
1464 return format!(
1465 "override({})",
1466 Self::format_streaming_parallelism(parallelism)
1467 );
1468 }
1469
1470 if !root_fragments.is_empty() {
1471 let mut upstreams = root_fragments.to_vec();
1472 upstreams.sort_unstable();
1473 upstreams.dedup();
1474
1475 return format!("upstream_fragment({upstreams:?})");
1476 }
1477
1478 let inherited = job_parallelism
1479 .map(Self::format_streaming_parallelism)
1480 .unwrap_or_else(|| "unknown".to_owned());
1481 format!("inherit({inherited})")
1482 }
1483
1484 fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1485 match parallelism {
1486 StreamingParallelism::Adaptive => "adaptive".to_owned(),
1487 StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1488 StreamingParallelism::Custom => "custom".to_owned(),
1489 }
1490 }
1491
1492 pub async fn list_sink_actor_mapping(
1493 &self,
1494 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1495 let inner = self.inner.read().await;
1496 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1497 .select_only()
1498 .columns([sink::Column::SinkId, sink::Column::Name])
1499 .into_tuple()
1500 .all(&inner.db)
1501 .await?;
1502 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1503
1504 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1505
1506 let actor_with_type: Vec<(ActorId, SinkId)> = {
1507 let info = self.env.shared_actor_infos().read_guard();
1508
1509 info.iter_over_fragments()
1510 .filter(|(_, fragment)| {
1511 sink_ids.contains(&fragment.job_id.as_sink_id())
1512 && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1513 })
1514 .flat_map(|(_, fragment)| {
1515 fragment
1516 .actors
1517 .keys()
1518 .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1519 })
1520 .collect()
1521 };
1522
1523 let mut sink_actor_mapping = HashMap::new();
1524 for (actor_id, sink_id) in actor_with_type {
1525 sink_actor_mapping
1526 .entry(sink_id)
1527 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1528 .1
1529 .push(actor_id);
1530 }
1531
1532 Ok(sink_actor_mapping)
1533 }
1534
1535 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1536 let inner = self.inner.read().await;
1537 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1538 .select_only()
1539 .columns([
1540 fragment::Column::FragmentId,
1541 fragment::Column::JobId,
1542 fragment::Column::StateTableIds,
1543 ])
1544 .into_partial_model()
1545 .all(&inner.db)
1546 .await?;
1547 Ok(fragment_state_tables)
1548 }
1549
1550 pub async fn load_all_actors_dynamic(
1553 &self,
1554 database_id: Option<DatabaseId>,
1555 worker_nodes: &ActiveStreamingWorkerNodes,
1556 ) -> MetaResult<FragmentRenderMap> {
1557 let loaded = self.load_fragment_context(database_id).await?;
1558
1559 if loaded.is_empty() {
1560 return Ok(HashMap::new());
1561 }
1562
1563 let adaptive_parallelism_strategy = {
1564 let system_params_reader = self.env.system_params_reader().await;
1565 system_params_reader.adaptive_parallelism_strategy()
1566 };
1567
1568 let RenderedGraph { fragments, .. } = render_actor_assignments(
1569 self.env.actor_id_generator(),
1570 worker_nodes.current(),
1571 adaptive_parallelism_strategy,
1572 &loaded,
1573 )?;
1574
1575 tracing::trace!(?fragments, "reload all actors");
1576
1577 Ok(fragments)
1578 }
1579
1580 pub async fn load_fragment_context(
1582 &self,
1583 database_id: Option<DatabaseId>,
1584 ) -> MetaResult<LoadedFragmentContext> {
1585 let inner = self.inner.read().await;
1586 let txn = inner.db.begin().await?;
1587
1588 self.load_fragment_context_in_txn(&txn, database_id).await
1589 }
1590
1591 pub async fn load_fragment_context_in_txn<C>(
1592 &self,
1593 txn: &C,
1594 database_id: Option<DatabaseId>,
1595 ) -> MetaResult<LoadedFragmentContext>
1596 where
1597 C: ConnectionTrait,
1598 {
1599 let mut query = StreamingJob::find()
1600 .select_only()
1601 .column(streaming_job::Column::JobId);
1602
1603 if let Some(database_id) = database_id {
1604 query = query
1605 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1606 .filter(object::Column::DatabaseId.eq(database_id));
1607 }
1608
1609 let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
1610
1611 let jobs: HashSet<JobId> = jobs.into_iter().collect();
1612
1613 if jobs.is_empty() {
1614 return Ok(LoadedFragmentContext::default());
1615 }
1616
1617 load_fragment_context_for_jobs(txn, jobs).await
1618 }
1619
1620 #[await_tree::instrument]
1621 pub async fn fill_snapshot_backfill_epoch(
1622 &self,
1623 fragment_ids: impl Iterator<Item = FragmentId>,
1624 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1625 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1626 ) -> MetaResult<()> {
1627 let inner = self.inner.write().await;
1628 let txn = inner.db.begin().await?;
1629 for fragment_id in fragment_ids {
1630 let fragment = FragmentModel::find_by_id(fragment_id)
1631 .one(&txn)
1632 .await?
1633 .context(format!("fragment {} not found", fragment_id))?;
1634 let mut node = fragment.stream_node.to_protobuf();
1635 if crate::stream::fill_snapshot_backfill_epoch(
1636 &mut node,
1637 snapshot_backfill_info,
1638 cross_db_snapshot_backfill_info,
1639 )? {
1640 let node = StreamNode::from(&node);
1641 FragmentModel::update(fragment::ActiveModel {
1642 fragment_id: Set(fragment_id),
1643 stream_node: Set(node),
1644 ..Default::default()
1645 })
1646 .exec(&txn)
1647 .await?;
1648 }
1649 }
1650 txn.commit().await?;
1651 Ok(())
1652 }
1653
1654 pub fn get_running_actors_of_fragment(
1656 &self,
1657 fragment_id: FragmentId,
1658 ) -> MetaResult<HashSet<model::ActorId>> {
1659 let info = self.env.shared_actor_infos().read_guard();
1660
1661 let actors = info
1662 .get_fragment(fragment_id as _)
1663 .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1664 .unwrap_or_default();
1665
1666 Ok(actors)
1667 }
1668
1669 pub async fn get_running_actors_for_source_backfill(
1672 &self,
1673 source_backfill_fragment_id: FragmentId,
1674 source_fragment_id: FragmentId,
1675 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1676 let inner = self.inner.read().await;
1677 let txn = inner.db.begin().await?;
1678
1679 let fragment_relation: DispatcherType = FragmentRelation::find()
1680 .select_only()
1681 .column(fragment_relation::Column::DispatcherType)
1682 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1683 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1684 .into_tuple()
1685 .one(&txn)
1686 .await?
1687 .ok_or_else(|| {
1688 anyhow!(
1689 "no fragment connection from source fragment {} to source backfill fragment {}",
1690 source_fragment_id,
1691 source_backfill_fragment_id
1692 )
1693 })?;
1694
1695 if fragment_relation != DispatcherType::NoShuffle {
1696 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1697 }
1698
1699 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1700 let result: MetaResult<DistributionType> = try {
1701 FragmentModel::find_by_id(fragment_id)
1702 .select_only()
1703 .column(fragment::Column::DistributionType)
1704 .into_tuple()
1705 .one(txn)
1706 .await?
1707 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1708 };
1709 result
1710 };
1711
1712 let source_backfill_distribution_type =
1713 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1714 let source_distribution_type =
1715 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1716
1717 let load_fragment_actor_distribution =
1718 |actor_info: &SharedActorInfos,
1719 fragment_id: FragmentId|
1720 -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1721 let guard = actor_info.read_guard();
1722
1723 guard
1724 .get_fragment(fragment_id as _)
1725 .map(|fragment| {
1726 fragment
1727 .actors
1728 .iter()
1729 .map(|(actor_id, actor)| {
1730 (
1731 *actor_id as _,
1732 actor
1733 .vnode_bitmap
1734 .as_ref()
1735 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1736 )
1737 })
1738 .collect()
1739 })
1740 .unwrap_or_default()
1741 };
1742
1743 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1744 load_fragment_actor_distribution(
1745 self.env.shared_actor_infos(),
1746 source_backfill_fragment_id,
1747 );
1748
1749 let source_actors =
1750 load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1751
1752 Ok(resolve_no_shuffle_actor_mapping(
1753 source_distribution_type,
1754 source_actors.iter().map(|(&id, bitmap)| (id, bitmap)),
1755 source_backfill_distribution_type,
1756 source_backfill_actors
1757 .iter()
1758 .map(|(&id, bitmap)| (id, bitmap)),
1759 )
1760 .into_iter()
1761 .map(|(source_actor, source_backfill_actor)| {
1762 (source_backfill_actor as _, source_actor as _)
1763 })
1764 .collect())
1765 }
1766
1767 pub async fn get_root_fragments(
1780 &self,
1781 job_ids: Vec<JobId>,
1782 ) -> MetaResult<HashMap<JobId, Fragment>> {
1783 let inner = self.inner.read().await;
1784
1785 let all_fragments = FragmentModel::find()
1786 .filter(fragment::Column::JobId.is_in(job_ids))
1787 .all(&inner.db)
1788 .await?;
1789 let mut root_fragments = HashMap::<JobId, Fragment>::new();
1791 for fragment in all_fragments {
1792 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1793 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1794 _ = root_fragments.insert(fragment.job_id, fragment.into());
1795 } else if mask.contains(FragmentTypeFlag::Source) {
1796 _ = root_fragments.try_insert(fragment.job_id, fragment.into());
1799 }
1800 }
1801
1802 Ok(root_fragments)
1803 }
1804
1805 pub async fn get_root_fragment(&self, job_id: JobId) -> MetaResult<Fragment> {
1806 let mut root_fragments = self.get_root_fragments(vec![job_id]).await?;
1807 let root_fragment = root_fragments
1808 .remove(&job_id)
1809 .context(format!("root fragment for job {} not found", job_id))?;
1810
1811 Ok(root_fragment)
1812 }
1813
1814 pub async fn get_downstream_fragments(
1816 &self,
1817 job_id: JobId,
1818 ) -> MetaResult<Vec<(stream_plan::DispatcherType, Fragment)>> {
1819 let root_fragment = self.get_root_fragment(job_id).await?;
1820
1821 let inner = self.inner.read().await;
1822 let txn = inner.db.begin().await?;
1823 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1824 .filter(
1825 fragment_relation::Column::SourceFragmentId
1826 .eq(root_fragment.fragment_id as FragmentId),
1827 )
1828 .all(&txn)
1829 .await?;
1830
1831 let downstream_fragment_ids = downstream_fragment_relations
1832 .iter()
1833 .map(|model| model.target_fragment_id as FragmentId)
1834 .collect::<HashSet<_>>();
1835
1836 let downstream_fragments: Vec<fragment::Model> = FragmentModel::find()
1837 .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1838 .all(&txn)
1839 .await?;
1840
1841 let mut downstream_fragments_map: HashMap<_, _> = downstream_fragments
1842 .into_iter()
1843 .map(|fragment| (fragment.fragment_id, fragment))
1844 .collect();
1845
1846 let mut downstream_fragments = vec![];
1847
1848 let fragment_map: HashMap<_, _> = downstream_fragment_relations
1849 .iter()
1850 .map(|model| (model.target_fragment_id, model.dispatcher_type))
1851 .collect();
1852
1853 for (fragment_id, dispatcher_type) in fragment_map {
1854 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1855
1856 let fragment = downstream_fragments_map
1857 .remove(&fragment_id)
1858 .context(format!(
1859 "downstream fragment node for id {} not found",
1860 fragment_id
1861 ))?
1862 .into();
1863
1864 downstream_fragments.push((dispatch_type, fragment));
1865 }
1866 Ok(downstream_fragments)
1867 }
1868
1869 pub async fn load_source_fragment_ids(
1870 &self,
1871 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1872 let inner = self.inner.read().await;
1873 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1874 .select_only()
1875 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1876 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1877 .into_tuple()
1878 .all(&inner.db)
1879 .await?;
1880
1881 let mut source_fragment_ids = HashMap::new();
1882 for (fragment_id, stream_node) in fragments {
1883 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1884 source_fragment_ids
1885 .entry(source_id)
1886 .or_insert_with(BTreeSet::new)
1887 .insert(fragment_id);
1888 }
1889 }
1890 Ok(source_fragment_ids)
1891 }
1892
1893 pub async fn load_backfill_fragment_ids(
1894 &self,
1895 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1896 let inner = self.inner.read().await;
1897 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1898 .select_only()
1899 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1900 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1901 .into_tuple()
1902 .all(&inner.db)
1903 .await?;
1904
1905 let mut source_fragment_ids = HashMap::new();
1906 for (fragment_id, stream_node) in fragments {
1907 if let Some((source_id, upstream_source_fragment_id)) =
1908 stream_node.to_protobuf().find_source_backfill()
1909 {
1910 source_fragment_ids
1911 .entry(source_id)
1912 .or_insert_with(BTreeSet::new)
1913 .insert((fragment_id, upstream_source_fragment_id));
1914 }
1915 }
1916 Ok(source_fragment_ids)
1917 }
1918
1919 pub async fn get_all_upstream_sink_infos(
1920 &self,
1921 target_table: &PbTable,
1922 target_fragment_id: FragmentId,
1923 ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1924 let inner = self.inner.read().await;
1925 let txn = inner.db.begin().await?;
1926
1927 self.get_all_upstream_sink_infos_in_txn(&txn, target_table, target_fragment_id)
1928 .await
1929 }
1930
1931 pub async fn get_all_upstream_sink_infos_in_txn<C>(
1932 &self,
1933 txn: &C,
1934 target_table: &PbTable,
1935 target_fragment_id: FragmentId,
1936 ) -> MetaResult<Vec<UpstreamSinkInfo>>
1937 where
1938 C: ConnectionTrait,
1939 {
1940 let incoming_sinks = Sink::find()
1941 .filter(sink::Column::TargetTable.eq(target_table.id))
1942 .all(txn)
1943 .await?;
1944
1945 let sink_ids = incoming_sinks.iter().map(|s| s.sink_id).collect_vec();
1946 let sink_fragment_ids = get_sink_fragment_by_ids(txn, sink_ids).await?;
1947
1948 let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1949 for sink in &incoming_sinks {
1950 let sink_fragment_id =
1951 sink_fragment_ids
1952 .get(&sink.sink_id)
1953 .cloned()
1954 .ok_or(anyhow::anyhow!(
1955 "sink fragment not found for sink id {}",
1956 sink.sink_id
1957 ))?;
1958 let upstream_info = build_upstream_sink_info(
1959 sink.sink_id,
1960 sink.original_target_columns
1961 .as_ref()
1962 .map(|cols| cols.to_protobuf())
1963 .unwrap_or_default(),
1964 sink_fragment_id,
1965 target_table,
1966 target_fragment_id,
1967 )?;
1968 upstream_sink_infos.push(upstream_info);
1969 }
1970
1971 Ok(upstream_sink_infos)
1972 }
1973
1974 pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1975 let inner = self.inner.read().await;
1976 let txn = inner.db.begin().await?;
1977
1978 let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1979 .select_only()
1980 .column(fragment::Column::FragmentId)
1981 .filter(
1982 fragment::Column::JobId
1983 .eq(job_id)
1984 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1985 )
1986 .into_tuple()
1987 .all(&txn)
1988 .await?;
1989
1990 if mview_fragment.len() != 1 {
1991 return Err(anyhow::anyhow!(
1992 "expected exactly one mview fragment for job {}, found {}",
1993 job_id,
1994 mview_fragment.len()
1995 )
1996 .into());
1997 }
1998
1999 Ok(mview_fragment.into_iter().next().unwrap())
2000 }
2001
2002 pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
2003 let inner = self.inner.read().await;
2004 let txn = inner.db.begin().await?;
2005 has_table_been_migrated(&txn, table_id).await
2006 }
2007
2008 pub async fn update_fragment_splits<C>(
2009 &self,
2010 txn: &C,
2011 fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
2012 ) -> MetaResult<()>
2013 where
2014 C: ConnectionTrait,
2015 {
2016 if fragment_splits.is_empty() {
2017 return Ok(());
2018 }
2019
2020 let existing_fragment_ids: HashSet<FragmentId> = FragmentModel::find()
2021 .select_only()
2022 .column(fragment::Column::FragmentId)
2023 .filter(fragment::Column::FragmentId.is_in(fragment_splits.keys().copied()))
2024 .into_tuple()
2025 .all(txn)
2026 .await?
2027 .into_iter()
2028 .collect();
2029
2030 let (models, skipped_fragment_ids): (Vec<_>, Vec<_>) = fragment_splits
2032 .iter()
2033 .partition_map(|(fragment_id, splits)| {
2034 if existing_fragment_ids.contains(fragment_id) {
2035 Either::Left(fragment_splits::ActiveModel {
2036 fragment_id: Set(*fragment_id as _),
2037 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
2038 splits: splits.iter().map(Into::into).collect_vec(),
2039 }))),
2040 })
2041 } else {
2042 Either::Right(*fragment_id)
2043 }
2044 });
2045
2046 if !skipped_fragment_ids.is_empty() {
2047 tracing::warn!(
2048 skipped_fragment_ids = ?skipped_fragment_ids,
2049 total_fragment_ids = fragment_splits.len(),
2050 "skipping stale fragment split updates for missing fragments"
2051 );
2052 }
2053
2054 if models.is_empty() {
2055 return Ok(());
2056 }
2057
2058 FragmentSplits::insert_many(models)
2059 .on_conflict(
2060 OnConflict::column(fragment_splits::Column::FragmentId)
2061 .update_column(fragment_splits::Column::Splits)
2062 .to_owned(),
2063 )
2064 .exec(txn)
2065 .await?;
2066
2067 Ok(())
2068 }
2069}
2070
2071#[cfg(test)]
2072mod tests {
2073 use std::collections::{BTreeMap, HashMap, HashSet};
2074
2075 use itertools::Itertools;
2076 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
2077 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
2078 use risingwave_common::id::JobId;
2079 use risingwave_common::util::iter_util::ZipEqDebug;
2080 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
2081 use risingwave_meta_model::fragment::DistributionType;
2082 use risingwave_meta_model::*;
2083 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
2084 use risingwave_pb::plan_common::PbExprContext;
2085 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
2086 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
2087 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
2088
2089 use super::ActorInfo;
2090 use crate::MetaResult;
2091 use crate::controller::catalog::CatalogController;
2092 use crate::model::{Fragment, StreamActor};
2093
2094 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
2095
2096 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
2097
2098 const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
2099
2100 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
2101
2102 const TEST_JOB_ID: JobId = JobId::new(1);
2103
2104 const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
2105
2106 fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
2107 let mut upstream_actor_ids = BTreeMap::new();
2108 upstream_actor_ids.insert(
2109 TEST_UPSTREAM_FRAGMENT_ID,
2110 HashSet::from_iter([(actor_id + 100)]),
2111 );
2112 upstream_actor_ids.insert(
2113 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
2114 HashSet::from_iter([(actor_id + 200)]),
2115 );
2116 upstream_actor_ids
2117 }
2118
2119 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
2120 let mut input = vec![];
2121 for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
2122 input.push(PbStreamNode {
2123 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
2124 upstream_fragment_id,
2125 ..Default::default()
2126 }))),
2127 ..Default::default()
2128 });
2129 }
2130
2131 PbStreamNode {
2132 input,
2133 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
2134 ..Default::default()
2135 }
2136 }
2137
2138 #[tokio::test]
2139 async fn test_extract_fragment() -> MetaResult<()> {
2140 let actor_count = 3u32;
2141 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2142 .map(|actor_id| {
2143 (
2144 actor_id.into(),
2145 generate_upstream_actor_ids_for_actor(actor_id.into()),
2146 )
2147 })
2148 .collect();
2149
2150 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
2151
2152 let pb_fragment = Fragment {
2153 fragment_id: TEST_FRAGMENT_ID as _,
2154 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
2155 distribution_type: PbFragmentDistributionType::Hash as _,
2156 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
2157 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
2158 nodes: stream_node,
2159 };
2160
2161 let fragment =
2162 CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
2163
2164 check_fragment(fragment, pb_fragment);
2165
2166 Ok(())
2167 }
2168
2169 #[tokio::test]
2170 async fn test_compose_fragment() -> MetaResult<()> {
2171 let actor_count = 3u32;
2172
2173 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2174 .map(|actor_id| {
2175 (
2176 actor_id.into(),
2177 generate_upstream_actor_ids_for_actor(actor_id.into()),
2178 )
2179 })
2180 .collect();
2181
2182 let mut actor_bitmaps = ActorMapping::new_uniform(
2183 (0..actor_count).map(|i| i.into()),
2184 VirtualNode::COUNT_FOR_TEST,
2185 )
2186 .to_bitmaps();
2187
2188 let actors = (0..actor_count)
2189 .map(|actor_id| {
2190 let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
2191 splits: vec![PbConnectorSplit {
2192 split_type: "dummy".to_owned(),
2193 ..Default::default()
2194 }],
2195 });
2196
2197 ActorInfo {
2198 actor_id: actor_id.into(),
2199 fragment_id: TEST_FRAGMENT_ID,
2200 splits: actor_splits,
2201 worker_id: 0.into(),
2202 vnode_bitmap: actor_bitmaps
2203 .remove(&actor_id)
2204 .map(|bitmap| bitmap.to_protobuf())
2205 .as_ref()
2206 .map(VnodeBitmap::from),
2207 expr_context: ExprContext::from(&PbExprContext {
2208 time_zone: String::from("America/New_York"),
2209 strict_mode: false,
2210 }),
2211 config_override: "a.b.c = true".into(),
2212 }
2213 })
2214 .collect_vec();
2215
2216 let stream_node = {
2217 let template_actor = actors.first().cloned().unwrap();
2218
2219 let template_upstream_actor_ids =
2220 upstream_actor_ids.get(&template_actor.actor_id).unwrap();
2221
2222 generate_merger_stream_node(template_upstream_actor_ids)
2223 };
2224
2225 #[expect(deprecated)]
2226 let fragment = fragment::Model {
2227 fragment_id: TEST_FRAGMENT_ID,
2228 job_id: TEST_JOB_ID,
2229 fragment_type_mask: 0,
2230 distribution_type: DistributionType::Hash,
2231 stream_node: StreamNode::from(&stream_node),
2232 state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
2233 upstream_fragment_id: Default::default(),
2234 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2235 parallelism: None,
2236 };
2237
2238 let (pb_fragment, pb_actors, pb_actor_status, pb_actor_splits) =
2239 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2240
2241 assert_eq!(pb_actor_status.len(), actor_count as usize);
2242 assert!(
2243 pb_actor_status
2244 .values()
2245 .all(|actor_status| actor_status.location.is_some())
2246 );
2247 assert_eq!(pb_actor_splits.len(), actor_count as usize);
2248
2249 check_fragment(fragment, pb_fragment);
2250 check_actors(
2251 actors,
2252 &upstream_actor_ids,
2253 pb_actors,
2254 pb_actor_splits,
2255 &stream_node,
2256 );
2257
2258 Ok(())
2259 }
2260
2261 fn check_actors(
2262 actors: Vec<ActorInfo>,
2263 actor_upstreams: &FragmentActorUpstreams,
2264 pb_actors: Vec<StreamActor>,
2265 pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2266 stream_node: &PbStreamNode,
2267 ) {
2268 for (
2269 ActorInfo {
2270 actor_id,
2271 fragment_id,
2272 splits,
2273 worker_id: _,
2274 vnode_bitmap,
2275 expr_context,
2276 ..
2277 },
2278 StreamActor {
2279 actor_id: pb_actor_id,
2280 fragment_id: pb_fragment_id,
2281 vnode_bitmap: pb_vnode_bitmap,
2282 mview_definition,
2283 expr_context: pb_expr_context,
2284 ..
2285 },
2286 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2287 {
2288 assert_eq!(actor_id, pb_actor_id as ActorId);
2289 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2290
2291 assert_eq!(
2292 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2293 pb_vnode_bitmap,
2294 );
2295
2296 assert_eq!(mview_definition, "");
2297
2298 visit_stream_node_body(stream_node, |body| {
2299 if let PbNodeBody::Merge(m) = body {
2300 assert!(
2301 actor_upstreams
2302 .get(&actor_id)
2303 .unwrap()
2304 .contains_key(&m.upstream_fragment_id)
2305 );
2306 }
2307 });
2308
2309 assert_eq!(
2310 splits,
2311 pb_actor_splits
2312 .get(&pb_actor_id)
2313 .map(ConnectorSplits::from)
2314 .unwrap_or_default()
2315 );
2316
2317 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2318 }
2319 }
2320
2321 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2322 let Fragment {
2323 fragment_id,
2324 fragment_type_mask,
2325 distribution_type: pb_distribution_type,
2326 state_table_ids: pb_state_table_ids,
2327 maybe_vnode_count: _,
2328 nodes,
2329 } = pb_fragment;
2330
2331 assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2332 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2333 assert_eq!(
2334 pb_distribution_type,
2335 PbFragmentDistributionType::from(fragment.distribution_type)
2336 );
2337
2338 assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2339 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2340 }
2341
2342 #[test]
2343 fn test_parallelism_policy_with_root_fragments() {
2344 #[expect(deprecated)]
2345 let fragment = fragment::Model {
2346 fragment_id: 3.into(),
2347 job_id: TEST_JOB_ID,
2348 fragment_type_mask: 0,
2349 distribution_type: DistributionType::Hash,
2350 stream_node: StreamNode::from(&PbStreamNode::default()),
2351 state_table_ids: TableIdArray::default(),
2352 upstream_fragment_id: Default::default(),
2353 vnode_count: 0,
2354 parallelism: None,
2355 };
2356
2357 let job_parallelism = StreamingParallelism::Fixed(4);
2358
2359 let policy = super::CatalogController::format_fragment_parallelism_policy(
2360 fragment.distribution_type,
2361 fragment.parallelism.as_ref(),
2362 Some(&job_parallelism),
2363 &[],
2364 );
2365
2366 assert_eq!(policy, "inherit(fixed(4))");
2367 }
2368
2369 #[test]
2370 fn test_parallelism_policy_with_upstream_roots() {
2371 #[expect(deprecated)]
2372 let fragment = fragment::Model {
2373 fragment_id: 5.into(),
2374 job_id: TEST_JOB_ID,
2375 fragment_type_mask: 0,
2376 distribution_type: DistributionType::Hash,
2377 stream_node: StreamNode::from(&PbStreamNode::default()),
2378 state_table_ids: TableIdArray::default(),
2379 upstream_fragment_id: Default::default(),
2380 vnode_count: 0,
2381 parallelism: None,
2382 };
2383
2384 let policy = super::CatalogController::format_fragment_parallelism_policy(
2385 fragment.distribution_type,
2386 fragment.parallelism.as_ref(),
2387 None,
2388 &[3.into(), 1.into(), 2.into(), 1.into()],
2389 );
2390
2391 assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2392 }
2393}