1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19
20use anyhow::{Context, anyhow};
21use futures::TryStreamExt;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
26use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
27use risingwave_common::id::JobId;
28use risingwave_common::system_param::reader::SystemParamsRead;
29use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
30use risingwave_connector::source::SplitImpl;
31use risingwave_connector::source::cdc::CdcScanOptions;
32use risingwave_meta_model::fragment::DistributionType;
33use risingwave_meta_model::object::ObjectType;
34use risingwave_meta_model::prelude::{
35 Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
36};
37use risingwave_meta_model::{
38 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
39 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
40 VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink,
41 source, streaming_job, table,
42};
43use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
44use risingwave_pb::catalog::PbTable;
45use risingwave_pb::common::PbActorLocation;
46use risingwave_pb::meta::subscribe_response::{
47 Info as NotificationInfo, Operation as NotificationOperation,
48};
49use risingwave_pb::meta::table_fragments::fragment::{
50 FragmentDistributionType, PbFragmentDistributionType,
51};
52use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
53use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
54use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
55use risingwave_pb::stream_plan;
56use risingwave_pb::stream_plan::stream_node::NodeBody;
57use risingwave_pb::stream_plan::{
58 PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63 ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, JoinType, PaginatorTrait,
64 QueryFilter, QuerySelect, RelationTrait, StreamTrait, TransactionTrait,
65};
66use serde::{Deserialize, Serialize};
67
68use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
69use crate::controller::catalog::CatalogController;
70use crate::controller::scale::{
71 FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble, RenderedGraph, WorkerInfo,
72 find_fragment_no_shuffle_dags_detailed, load_fragment_context_for_jobs,
73 render_actor_assignments, resolve_streaming_job_definition,
74};
75use crate::controller::utils::{
76 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
77 get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
78 resolve_no_shuffle_actor_dispatcher,
79};
80use crate::error::MetaError;
81use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
82use crate::model::{
83 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
84 StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
85 TableParallelism,
86};
87use crate::rpc::ddl_controller::build_upstream_sink_info;
88use crate::stream::UpstreamSinkInfo;
89use crate::{MetaResult, model};
90
91#[derive(Debug)]
93pub struct InflightActorInfo {
94 pub worker_id: WorkerId,
95 pub vnode_bitmap: Option<Bitmap>,
96 pub splits: Vec<SplitImpl>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100struct ActorInfo {
101 pub actor_id: ActorId,
102 pub fragment_id: FragmentId,
103 pub splits: ConnectorSplits,
104 pub worker_id: WorkerId,
105 pub vnode_bitmap: Option<VnodeBitmap>,
106 pub expr_context: ExprContext,
107 pub config_override: Arc<str>,
108}
109
110#[derive(Debug)]
111pub struct InflightFragmentInfo {
112 pub fragment_id: FragmentId,
113 pub distribution_type: DistributionType,
114 pub fragment_type_mask: FragmentTypeMask,
115 pub vnode_count: usize,
116 pub nodes: PbStreamNode,
117 pub actors: HashMap<ActorId, InflightActorInfo>,
118 pub state_table_ids: HashSet<TableId>,
119}
120
121#[derive(Clone, Debug)]
122pub struct FragmentParallelismInfo {
123 pub distribution_type: FragmentDistributionType,
124 pub actor_count: usize,
125 pub vnode_count: usize,
126}
127
128#[easy_ext::ext(FragmentTypeMaskExt)]
129pub impl FragmentTypeMask {
130 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
131 Expr::col(fragment::Column::FragmentTypeMask)
132 .bit_and(Expr::value(flag as i32))
133 .ne(0)
134 }
135
136 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
137 Expr::col(fragment::Column::FragmentTypeMask)
138 .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
139 .ne(0)
140 }
141
142 fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
143 Expr::col(fragment::Column::FragmentTypeMask)
144 .bit_and(Expr::value(flag as i32))
145 .eq(0)
146 }
147}
148
149#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
150#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
152 pub job_id: JobId,
153 pub obj_type: ObjectType,
154 pub name: String,
155 pub job_status: JobStatus,
156 pub parallelism: StreamingParallelism,
157 pub max_parallelism: i32,
158 pub resource_group: String,
159 pub config_override: String,
160 pub database_id: DatabaseId,
161 pub schema_id: SchemaId,
162}
163
164impl NotificationManager {
165 pub(crate) fn notify_fragment_mapping(
166 &self,
167 operation: NotificationOperation,
168 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
169 ) {
170 let fragment_ids = fragment_mappings
171 .iter()
172 .map(|mapping| mapping.fragment_id)
173 .collect_vec();
174 if fragment_ids.is_empty() {
175 return;
176 }
177 for fragment_mapping in fragment_mappings {
179 self.notify_frontend_without_version(
180 operation,
181 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
182 );
183 }
184
185 match operation {
187 NotificationOperation::Add | NotificationOperation::Update => {
188 self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
189 fragment_ids,
190 ));
191 }
192 NotificationOperation::Delete => {
193 self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
194 fragment_ids,
195 ));
196 }
197 op => {
198 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
199 }
200 }
201 }
202}
203
204impl CatalogController {
205 pub fn prepare_fragment_models_from_fragments(
206 job_id: JobId,
207 fragments: impl Iterator<Item = &Fragment>,
208 ) -> MetaResult<Vec<fragment::Model>> {
209 fragments
210 .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
211 .try_collect()
212 }
213
214 pub fn prepare_fragment_model_for_new_job(
215 job_id: JobId,
216 fragment: &Fragment,
217 ) -> MetaResult<fragment::Model> {
218 let vnode_count = fragment.vnode_count();
219 let Fragment {
220 fragment_id: pb_fragment_id,
221 fragment_type_mask: pb_fragment_type_mask,
222 distribution_type: pb_distribution_type,
223 actors: pb_actors,
224 state_table_ids: pb_state_table_ids,
225 nodes,
226 ..
227 } = fragment;
228
229 let state_table_ids = pb_state_table_ids.clone().into();
230
231 assert!(!pb_actors.is_empty());
232
233 let fragment_parallelism = nodes
234 .node_body
235 .as_ref()
236 .and_then(|body| match body {
237 NodeBody::StreamCdcScan(node) => Some(node),
238 _ => None,
239 })
240 .and_then(|node| node.options.as_ref())
241 .map(CdcScanOptions::from_proto)
242 .filter(|opts| opts.is_parallelized_backfill())
243 .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
244
245 let stream_node = StreamNode::from(nodes);
246
247 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
248 .unwrap()
249 .into();
250
251 #[expect(deprecated)]
252 let fragment = fragment::Model {
253 fragment_id: *pb_fragment_id as _,
254 job_id,
255 fragment_type_mask: (*pb_fragment_type_mask).into(),
256 distribution_type,
257 stream_node,
258 state_table_ids,
259 upstream_fragment_id: Default::default(),
260 vnode_count: vnode_count as _,
261 parallelism: fragment_parallelism,
262 };
263
264 Ok(fragment)
265 }
266
267 fn compose_table_fragments(
268 job_id: JobId,
269 state: PbState,
270 ctx: StreamContext,
271 fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
272 parallelism: StreamingParallelism,
273 max_parallelism: usize,
274 job_definition: Option<String>,
275 ) -> MetaResult<StreamJobFragments> {
276 let mut pb_fragments = BTreeMap::new();
277 let mut pb_actor_status = BTreeMap::new();
278
279 for (fragment, actors) in fragments {
280 let (fragment, fragment_actor_status, _) =
281 Self::compose_fragment(fragment, actors, job_definition.clone())?;
282
283 pb_fragments.insert(fragment.fragment_id, fragment);
284 pb_actor_status.extend(fragment_actor_status.into_iter());
285 }
286
287 let table_fragments = StreamJobFragments {
288 stream_job_id: job_id,
289 state: state as _,
290 fragments: pb_fragments,
291 actor_status: pb_actor_status,
292 ctx,
293 assigned_parallelism: match parallelism {
294 StreamingParallelism::Custom => TableParallelism::Custom,
295 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
296 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
297 },
298 max_parallelism,
299 };
300
301 Ok(table_fragments)
302 }
303
304 #[allow(clippy::type_complexity)]
305 fn compose_fragment(
306 fragment: fragment::Model,
307 actors: Vec<ActorInfo>,
308 job_definition: Option<String>,
309 ) -> MetaResult<(
310 Fragment,
311 HashMap<crate::model::ActorId, PbActorStatus>,
312 HashMap<crate::model::ActorId, PbConnectorSplits>,
313 )> {
314 let fragment::Model {
315 fragment_id,
316 fragment_type_mask,
317 distribution_type,
318 stream_node,
319 state_table_ids,
320 vnode_count,
321 ..
322 } = fragment;
323
324 let stream_node = stream_node.to_protobuf();
325 let mut upstream_fragments = HashSet::new();
326 visit_stream_node_body(&stream_node, |body| {
327 if let NodeBody::Merge(m) = body {
328 assert!(
329 upstream_fragments.insert(m.upstream_fragment_id),
330 "non-duplicate upstream fragment"
331 );
332 }
333 });
334
335 let mut pb_actors = vec![];
336
337 let mut pb_actor_status = HashMap::new();
338 let mut pb_actor_splits = HashMap::new();
339
340 for actor in actors {
341 if actor.fragment_id != fragment_id {
342 bail!(
343 "fragment id {} from actor {} is different from fragment {}",
344 actor.fragment_id,
345 actor.actor_id,
346 fragment_id
347 )
348 }
349
350 let ActorInfo {
351 actor_id,
352 fragment_id,
353 worker_id,
354 splits,
355 vnode_bitmap,
356 expr_context,
357 config_override,
358 ..
359 } = actor;
360
361 let vnode_bitmap =
362 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
363 let pb_expr_context = Some(expr_context.to_protobuf());
364
365 pb_actor_status.insert(
366 actor_id as _,
367 PbActorStatus {
368 location: PbActorLocation::from_worker(worker_id),
369 },
370 );
371
372 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
373
374 pb_actors.push(StreamActor {
375 actor_id: actor_id as _,
376 fragment_id: fragment_id as _,
377 vnode_bitmap,
378 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
379 expr_context: pb_expr_context,
380 config_override,
381 })
382 }
383
384 let pb_state_table_ids = state_table_ids.0;
385 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
386 let pb_fragment = Fragment {
387 fragment_id: fragment_id as _,
388 fragment_type_mask: fragment_type_mask.into(),
389 distribution_type: pb_distribution_type,
390 actors: pb_actors,
391 state_table_ids: pb_state_table_ids,
392 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
393 nodes: stream_node,
394 };
395
396 Ok((pb_fragment, pb_actor_status, pb_actor_splits))
397 }
398
399 pub fn running_fragment_parallelisms(
400 &self,
401 id_filter: Option<HashSet<FragmentId>>,
402 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
403 let info = self.env.shared_actor_infos().read_guard();
404
405 let mut result = HashMap::new();
406 for (fragment_id, fragment) in info.iter_over_fragments() {
407 if let Some(id_filter) = &id_filter
408 && !id_filter.contains(&(*fragment_id as _))
409 {
410 continue; }
412
413 result.insert(
414 *fragment_id as _,
415 FragmentParallelismInfo {
416 distribution_type: fragment.distribution_type.into(),
417 actor_count: fragment.actors.len() as _,
418 vnode_count: fragment.vnode_count,
419 },
420 );
421 }
422
423 Ok(result)
424 }
425
426 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
427 let inner = self.inner.read().await;
428 let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
429 .select_only()
430 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
431 .into_tuple()
432 .all(&inner.db)
433 .await?;
434 Ok(fragment_jobs.into_iter().collect())
435 }
436
437 pub async fn get_fragment_job_id(
438 &self,
439 fragment_ids: Vec<FragmentId>,
440 ) -> MetaResult<Vec<ObjectId>> {
441 let inner = self.inner.read().await;
442
443 let object_ids: Vec<ObjectId> = FragmentModel::find()
444 .select_only()
445 .column(fragment::Column::JobId)
446 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
447 .into_tuple()
448 .all(&inner.db)
449 .await?;
450
451 Ok(object_ids)
452 }
453
454 pub async fn get_fragment_desc_by_id(
455 &self,
456 fragment_id: FragmentId,
457 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
458 let inner = self.inner.read().await;
459
460 let fragment_model = match FragmentModel::find_by_id(fragment_id)
461 .one(&inner.db)
462 .await?
463 {
464 Some(fragment) => fragment,
465 None => return Ok(None),
466 };
467
468 let job_parallelism: Option<StreamingParallelism> =
469 StreamingJob::find_by_id(fragment_model.job_id)
470 .select_only()
471 .column(streaming_job::Column::Parallelism)
472 .into_tuple()
473 .one(&inner.db)
474 .await?;
475
476 let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
477 .select_only()
478 .columns([
479 fragment_relation::Column::SourceFragmentId,
480 fragment_relation::Column::DispatcherType,
481 ])
482 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
483 .into_tuple()
484 .all(&inner.db)
485 .await?;
486
487 let upstreams: Vec<_> = upstream_entries
488 .into_iter()
489 .map(|(source_id, _)| source_id)
490 .collect();
491
492 let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
493 .await
494 .map(Self::collect_root_fragment_mapping)?;
495 let root_fragments = root_fragment_map
496 .get(&fragment_id)
497 .cloned()
498 .unwrap_or_default();
499
500 let info = self.env.shared_actor_infos().read_guard();
501 let SharedFragmentInfo { actors, .. } = info
502 .get_fragment(fragment_model.fragment_id as _)
503 .unwrap_or_else(|| {
504 panic!(
505 "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
506 fragment_model.fragment_id,
507 fragment_model.job_id
508 )
509 });
510
511 let parallelism_policy = Self::format_fragment_parallelism_policy(
512 &fragment_model,
513 job_parallelism.as_ref(),
514 &root_fragments,
515 );
516
517 let fragment = FragmentDesc {
518 fragment_id: fragment_model.fragment_id,
519 job_id: fragment_model.job_id,
520 fragment_type_mask: fragment_model.fragment_type_mask,
521 distribution_type: fragment_model.distribution_type,
522 state_table_ids: fragment_model.state_table_ids.clone(),
523 parallelism: actors.len() as _,
524 vnode_count: fragment_model.vnode_count,
525 stream_node: fragment_model.stream_node.clone(),
526 parallelism_policy,
527 };
528
529 Ok(Some((fragment, upstreams)))
530 }
531
532 pub async fn list_fragment_database_ids(
533 &self,
534 select_fragment_ids: Option<Vec<FragmentId>>,
535 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
536 let inner = self.inner.read().await;
537 let select = FragmentModel::find()
538 .select_only()
539 .column(fragment::Column::FragmentId)
540 .column(object::Column::DatabaseId)
541 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
542 let select = if let Some(select_fragment_ids) = select_fragment_ids {
543 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
544 } else {
545 select
546 };
547 Ok(select.into_tuple().all(&inner.db).await?)
548 }
549
550 pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
551 let inner = self.inner.read().await;
552
553 let fragments: Vec<_> = FragmentModel::find()
555 .filter(fragment::Column::JobId.eq(job_id))
556 .all(&inner.db)
557 .await?;
558
559 let job_info = StreamingJob::find_by_id(job_id)
560 .one(&inner.db)
561 .await?
562 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
563
564 let fragment_actors =
565 self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
566
567 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
568 .await?
569 .remove(&job_id);
570
571 Self::compose_table_fragments(
572 job_id,
573 job_info.job_status.into(),
574 job_info.stream_context(),
575 fragment_actors,
576 job_info.parallelism.clone(),
577 job_info.max_parallelism as _,
578 job_definition,
579 )
580 }
581
582 pub async fn get_fragment_actor_dispatchers(
583 &self,
584 fragment_ids: Vec<FragmentId>,
585 ) -> MetaResult<FragmentActorDispatchers> {
586 let inner = self.inner.read().await;
587
588 self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
589 .await
590 }
591
592 pub async fn get_fragment_actor_dispatchers_txn(
593 &self,
594 c: &impl ConnectionTrait,
595 fragment_ids: Vec<FragmentId>,
596 ) -> MetaResult<FragmentActorDispatchers> {
597 let fragment_relations = FragmentRelation::find()
598 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
599 .all(c)
600 .await?;
601
602 type FragmentActorInfo = (
603 DistributionType,
604 Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
605 );
606
607 let shared_info = self.env.shared_actor_infos();
608 let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
609 let get_fragment_actors = |fragment_id: FragmentId| async move {
610 let result: MetaResult<FragmentActorInfo> = try {
611 let read_guard = shared_info.read_guard();
612
613 let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
614
615 (
616 fragment.distribution_type,
617 Arc::new(
618 fragment
619 .actors
620 .iter()
621 .map(|(actor_id, actor_info)| {
622 (
623 *actor_id,
624 actor_info
625 .vnode_bitmap
626 .as_ref()
627 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
628 )
629 })
630 .collect(),
631 ),
632 )
633 };
634 result
635 };
636
637 let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
638 for fragment_relation::Model {
639 source_fragment_id,
640 target_fragment_id,
641 dispatcher_type,
642 dist_key_indices,
643 output_indices,
644 output_type_mapping,
645 } in fragment_relations
646 {
647 let (source_fragment_distribution, source_fragment_actors) = {
648 let (distribution, actors) = {
649 match fragment_actor_cache.entry(source_fragment_id) {
650 Entry::Occupied(entry) => entry.into_mut(),
651 Entry::Vacant(entry) => {
652 entry.insert(get_fragment_actors(source_fragment_id).await?)
653 }
654 }
655 };
656 (*distribution, actors.clone())
657 };
658 let (target_fragment_distribution, target_fragment_actors) = {
659 let (distribution, actors) = {
660 match fragment_actor_cache.entry(target_fragment_id) {
661 Entry::Occupied(entry) => entry.into_mut(),
662 Entry::Vacant(entry) => {
663 entry.insert(get_fragment_actors(target_fragment_id).await?)
664 }
665 }
666 };
667 (*distribution, actors.clone())
668 };
669 let output_mapping = PbDispatchOutputMapping {
670 indices: output_indices.into_u32_array(),
671 types: output_type_mapping.unwrap_or_default().to_protobuf(),
672 };
673 let dispatchers = compose_dispatchers(
674 source_fragment_distribution,
675 &source_fragment_actors,
676 target_fragment_id as _,
677 target_fragment_distribution,
678 &target_fragment_actors,
679 dispatcher_type,
680 dist_key_indices.into_u32_array(),
681 output_mapping,
682 );
683 let actor_dispatchers_map = actor_dispatchers_map
684 .entry(source_fragment_id as _)
685 .or_default();
686 for (actor_id, dispatchers) in dispatchers {
687 actor_dispatchers_map
688 .entry(actor_id as _)
689 .or_default()
690 .push(dispatchers);
691 }
692 }
693 Ok(actor_dispatchers_map)
694 }
695
696 pub async fn get_fragment_downstream_relations(
697 &self,
698 fragment_ids: Vec<FragmentId>,
699 ) -> MetaResult<FragmentDownstreamRelation> {
700 let inner = self.inner.read().await;
701 self.get_fragment_downstream_relations_in_txn(&inner.db, fragment_ids)
702 .await
703 }
704
705 pub async fn get_fragment_downstream_relations_in_txn<C>(
706 &self,
707 txn: &C,
708 fragment_ids: Vec<FragmentId>,
709 ) -> MetaResult<FragmentDownstreamRelation>
710 where
711 C: ConnectionTrait + StreamTrait + Send,
712 {
713 let mut stream = FragmentRelation::find()
714 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
715 .stream(txn)
716 .await?;
717 let mut relations = FragmentDownstreamRelation::new();
718 while let Some(relation) = stream.try_next().await? {
719 relations
720 .entry(relation.source_fragment_id as _)
721 .or_default()
722 .push(DownstreamFragmentRelation {
723 downstream_fragment_id: relation.target_fragment_id as _,
724 dispatcher_type: relation.dispatcher_type,
725 dist_key_indices: relation.dist_key_indices.into_u32_array(),
726 output_mapping: PbDispatchOutputMapping {
727 indices: relation.output_indices.into_u32_array(),
728 types: relation
729 .output_type_mapping
730 .unwrap_or_default()
731 .to_protobuf(),
732 },
733 });
734 }
735 Ok(relations)
736 }
737
738 pub async fn get_job_fragment_backfill_scan_type(
739 &self,
740 job_id: JobId,
741 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
742 let inner = self.inner.read().await;
743 let fragments: Vec<_> = FragmentModel::find()
744 .filter(fragment::Column::JobId.eq(job_id))
745 .all(&inner.db)
746 .await?;
747
748 let mut result = HashMap::new();
749
750 for fragment::Model {
751 fragment_id,
752 stream_node,
753 ..
754 } in fragments
755 {
756 let stream_node = stream_node.to_protobuf();
757 visit_stream_node_body(&stream_node, |body| {
758 if let NodeBody::StreamScan(node) = body {
759 match node.stream_scan_type() {
760 StreamScanType::Unspecified => {}
761 scan_type => {
762 result.insert(fragment_id as crate::model::FragmentId, scan_type);
763 }
764 }
765 }
766 });
767 }
768
769 Ok(result)
770 }
771
772 pub async fn count_streaming_jobs(&self) -> MetaResult<usize> {
773 let inner = self.inner.read().await;
774 let count = StreamingJob::find().count(&inner.db).await?;
775 Ok(usize::try_from(count).context("streaming job count overflow")?)
776 }
777
778 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
779 let inner = self.inner.read().await;
780 let job_states = StreamingJob::find()
781 .select_only()
782 .column(streaming_job::Column::JobId)
783 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
784 .join(JoinType::InnerJoin, object::Relation::Database2.def())
785 .column(object::Column::ObjType)
786 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
787 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
788 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
789 .column_as(
790 Expr::if_null(
791 Expr::col((table::Entity, table::Column::Name)),
792 Expr::if_null(
793 Expr::col((source::Entity, source::Column::Name)),
794 Expr::if_null(
795 Expr::col((sink::Entity, sink::Column::Name)),
796 Expr::val("<unknown>"),
797 ),
798 ),
799 ),
800 "name",
801 )
802 .columns([
803 streaming_job::Column::JobStatus,
804 streaming_job::Column::Parallelism,
805 streaming_job::Column::MaxParallelism,
806 ])
807 .column_as(
808 Expr::if_null(
809 Expr::col((
810 streaming_job::Entity,
811 streaming_job::Column::SpecificResourceGroup,
812 )),
813 Expr::col((database::Entity, database::Column::ResourceGroup)),
814 ),
815 "resource_group",
816 )
817 .column_as(
818 Expr::if_null(
819 Expr::col((streaming_job::Entity, streaming_job::Column::ConfigOverride)),
820 Expr::val(""),
821 ),
822 "config_override",
823 )
824 .column(object::Column::DatabaseId)
825 .column(object::Column::SchemaId)
826 .into_model()
827 .all(&inner.db)
828 .await?;
829 Ok(job_states)
830 }
831
832 pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
833 let inner = self.inner.read().await;
834 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
835 .select_only()
836 .column(streaming_job::Column::MaxParallelism)
837 .into_tuple()
838 .one(&inner.db)
839 .await?
840 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
841 Ok(max_parallelism as usize)
842 }
843
844 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
846 if let Ok(inner) = self.inner.try_read()
847 && let Ok(job_state_tables) = FragmentModel::find()
848 .select_only()
849 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
850 .into_tuple::<(JobId, I32Array)>()
851 .all(&inner.db)
852 .await
853 {
854 let mut job_internal_table_ids = HashMap::new();
855 for (job_id, state_table_ids) in job_state_tables {
856 job_internal_table_ids
857 .entry(job_id)
858 .or_insert_with(Vec::new)
859 .extend(
860 state_table_ids
861 .into_inner()
862 .into_iter()
863 .map(|table_id| TableId::new(table_id as _)),
864 );
865 }
866 return Some(job_internal_table_ids.into_iter().collect());
867 }
868 None
869 }
870
871 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
872 let inner = self.inner.read().await;
873 let count = FragmentModel::find().count(&inner.db).await?;
874 Ok(count > 0)
875 }
876
877 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
878 let read_guard = self.env.shared_actor_infos().read_guard();
879 let actor_cnt: HashMap<WorkerId, _> = read_guard
880 .iter_over_fragments()
881 .flat_map(|(_, fragment)| {
882 fragment
883 .actors
884 .iter()
885 .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
886 })
887 .into_group_map()
888 .into_iter()
889 .map(|(k, v)| (k, v.len()))
890 .collect();
891
892 Ok(actor_cnt)
893 }
894
895 fn collect_fragment_actor_map(
896 &self,
897 fragment_ids: &[FragmentId],
898 stream_context: StreamContext,
899 ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
900 let guard = self.env.shared_actor_infos().read_guard();
901 let pb_expr_context = stream_context.to_expr_context();
902 let expr_context: ExprContext = (&pb_expr_context).into();
903
904 let mut actor_map = HashMap::with_capacity(fragment_ids.len());
905 for fragment_id in fragment_ids {
906 let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
907 anyhow!("fragment {} not found in shared actor info", fragment_id)
908 })?;
909
910 let actors = fragment_info
911 .actors
912 .iter()
913 .map(|(actor_id, actor_info)| ActorInfo {
914 actor_id: *actor_id as _,
915 fragment_id: *fragment_id,
916 splits: ConnectorSplits::from(&PbConnectorSplits {
917 splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
918 }),
919 worker_id: actor_info.worker_id as _,
920 vnode_bitmap: actor_info
921 .vnode_bitmap
922 .as_ref()
923 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
924 expr_context: expr_context.clone(),
925 config_override: stream_context.config_override.clone(),
926 })
927 .collect();
928
929 actor_map.insert(*fragment_id, actors);
930 }
931
932 Ok(actor_map)
933 }
934
935 fn collect_fragment_actor_pairs(
936 &self,
937 fragments: Vec<fragment::Model>,
938 stream_context: StreamContext,
939 ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
940 let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
941 let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
942 fragments
943 .into_iter()
944 .map(|fragment| {
945 let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
946 anyhow!(
947 "fragment {} missing in shared actor info map",
948 fragment.fragment_id
949 )
950 })?;
951 Ok((fragment, actors))
952 })
953 .collect()
954 }
955
956 pub async fn table_fragments(&self) -> MetaResult<BTreeMap<JobId, StreamJobFragments>> {
958 let inner = self.inner.read().await;
959 let jobs = StreamingJob::find().all(&inner.db).await?;
960
961 let mut job_definition = resolve_streaming_job_definition(
962 &inner.db,
963 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
964 )
965 .await?;
966
967 let mut table_fragments = BTreeMap::new();
968 for job in jobs {
969 let fragments = FragmentModel::find()
970 .filter(fragment::Column::JobId.eq(job.job_id))
971 .all(&inner.db)
972 .await?;
973
974 let fragment_actors =
975 self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
976
977 table_fragments.insert(
978 job.job_id,
979 Self::compose_table_fragments(
980 job.job_id,
981 job.job_status.into(),
982 job.stream_context(),
983 fragment_actors,
984 job.parallelism.clone(),
985 job.max_parallelism as _,
986 job_definition.remove(&job.job_id),
987 )?,
988 );
989 }
990
991 Ok(table_fragments)
992 }
993
994 pub async fn upstream_fragments(
995 &self,
996 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
997 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
998 let inner = self.inner.read().await;
999 let mut stream = FragmentRelation::find()
1000 .select_only()
1001 .columns([
1002 fragment_relation::Column::SourceFragmentId,
1003 fragment_relation::Column::TargetFragmentId,
1004 ])
1005 .filter(
1006 fragment_relation::Column::TargetFragmentId
1007 .is_in(fragment_ids.map(|id| id as FragmentId)),
1008 )
1009 .into_tuple::<(FragmentId, FragmentId)>()
1010 .stream(&inner.db)
1011 .await?;
1012 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
1013 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
1014 upstream_fragments
1015 .entry(downstream_fragment_id as crate::model::FragmentId)
1016 .or_default()
1017 .insert(upstream_fragment_id as crate::model::FragmentId);
1018 }
1019 Ok(upstream_fragments)
1020 }
1021
1022 pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
1023 let info = self.env.shared_actor_infos().read_guard();
1024
1025 let actor_locations = info
1026 .iter_over_fragments()
1027 .flat_map(|(fragment_id, fragment)| {
1028 fragment
1029 .actors
1030 .iter()
1031 .map(|(actor_id, actor)| PartialActorLocation {
1032 actor_id: *actor_id as _,
1033 fragment_id: *fragment_id as _,
1034 worker_id: actor.worker_id,
1035 })
1036 })
1037 .collect_vec();
1038
1039 Ok(actor_locations)
1040 }
1041
1042 pub async fn list_actor_info(
1043 &self,
1044 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1045 let inner = self.inner.read().await;
1046
1047 let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1048 FragmentModel::find()
1049 .select_only()
1050 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1051 .column(fragment::Column::FragmentId)
1052 .column_as(object::Column::Oid, "job_id")
1053 .column_as(object::Column::SchemaId, "schema_id")
1054 .column_as(object::Column::ObjType, "type")
1055 .into_tuple()
1056 .all(&inner.db)
1057 .await?;
1058
1059 let actor_infos = {
1060 let info = self.env.shared_actor_infos().read_guard();
1061
1062 let mut result = Vec::new();
1063
1064 for (fragment_id, object_id, schema_id, object_type) in fragment_objects {
1065 let Some(fragment) = info.get_fragment(fragment_id as _) else {
1066 return Err(MetaError::unavailable(format!(
1067 "shared actor info missing for fragment {fragment_id} while listing actors"
1068 )));
1069 };
1070
1071 for actor_id in fragment.actors.keys() {
1072 result.push((
1073 *actor_id as _,
1074 fragment.fragment_id as _,
1075 object_id,
1076 schema_id,
1077 object_type,
1078 ));
1079 }
1080 }
1081
1082 result
1083 };
1084
1085 Ok(actor_infos)
1086 }
1087
1088 pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1089 let guard = self.env.shared_actor_info.read_guard();
1090 guard
1091 .iter_over_fragments()
1092 .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1093 .collect_vec()
1094 }
1095
1096 pub async fn list_fragment_descs(
1097 &self,
1098 is_creating: bool,
1099 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1100 let inner = self.inner.read().await;
1101
1102 let txn = inner.db.begin().await?;
1103
1104 let fragments: Vec<_> = if is_creating {
1105 FragmentModel::find()
1106 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1107 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1108 .filter(
1109 streaming_job::Column::JobStatus
1110 .eq(JobStatus::Initial)
1111 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1112 )
1113 .all(&txn)
1114 .await?
1115 } else {
1116 FragmentModel::find().all(&txn).await?
1117 };
1118
1119 let fragment_ids = fragments.iter().map(|f| f.fragment_id).collect_vec();
1120
1121 let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragments.is_empty() {
1122 HashMap::new()
1123 } else {
1124 let job_ids = fragments.iter().map(|f| f.job_id).unique().collect_vec();
1125 StreamingJob::find()
1126 .select_only()
1127 .columns([
1128 streaming_job::Column::JobId,
1129 streaming_job::Column::Parallelism,
1130 ])
1131 .filter(streaming_job::Column::JobId.is_in(job_ids))
1132 .into_tuple()
1133 .all(&txn)
1134 .await?
1135 .into_iter()
1136 .collect()
1137 };
1138
1139 let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1140 if fragment_ids.is_empty() {
1141 Vec::new()
1142 } else {
1143 FragmentRelation::find()
1144 .select_only()
1145 .columns([
1146 fragment_relation::Column::TargetFragmentId,
1147 fragment_relation::Column::SourceFragmentId,
1148 fragment_relation::Column::DispatcherType,
1149 ])
1150 .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1151 .into_tuple()
1152 .all(&txn)
1153 .await?
1154 };
1155
1156 let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1157 for (target_id, source_id, _) in upstream_entries {
1158 all_upstreams.entry(target_id).or_default().push(source_id);
1159 }
1160
1161 let root_fragment_map = if fragment_ids.is_empty() {
1162 HashMap::new()
1163 } else {
1164 let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?;
1165 Self::collect_root_fragment_mapping(ensembles)
1166 };
1167
1168 let guard = self.env.shared_actor_info.read_guard();
1169
1170 let mut result = Vec::new();
1171
1172 for fragment_desc in fragments {
1173 let parallelism = guard
1175 .get_fragment(fragment_desc.fragment_id as _)
1176 .map(|fragment| fragment.actors.len())
1177 .unwrap_or_default();
1178
1179 let root_fragments = root_fragment_map
1180 .get(&fragment_desc.fragment_id)
1181 .cloned()
1182 .unwrap_or_default();
1183
1184 let upstreams = all_upstreams
1185 .remove(&fragment_desc.fragment_id)
1186 .unwrap_or_default();
1187
1188 let parallelism_policy = Self::format_fragment_parallelism_policy(
1189 &fragment_desc,
1190 job_parallelisms.get(&fragment_desc.job_id),
1191 &root_fragments,
1192 );
1193
1194 let fragment = FragmentDistribution {
1195 fragment_id: fragment_desc.fragment_id,
1196 table_id: fragment_desc.job_id,
1197 distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type)
1198 as _,
1199 state_table_ids: fragment_desc.state_table_ids.0,
1200 upstream_fragment_ids: upstreams.clone(),
1201 fragment_type_mask: fragment_desc.fragment_type_mask as _,
1202 parallelism: parallelism as _,
1203 vnode_count: fragment_desc.vnode_count as _,
1204 node: Some(fragment_desc.stream_node.to_protobuf()),
1205 parallelism_policy,
1206 };
1207
1208 result.push((fragment, upstreams));
1209 }
1210 Ok(result)
1211 }
1212
1213 fn collect_root_fragment_mapping(
1215 ensembles: Vec<NoShuffleEnsemble>,
1216 ) -> HashMap<FragmentId, Vec<FragmentId>> {
1217 let mut mapping = HashMap::new();
1218
1219 for ensemble in ensembles {
1220 let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1221 roots.sort_unstable();
1222 roots.dedup();
1223
1224 if roots.is_empty() {
1225 continue;
1226 }
1227
1228 let root_set: HashSet<_> = roots.iter().copied().collect();
1229
1230 for fragment_id in ensemble.component_fragments() {
1231 if root_set.contains(&fragment_id) {
1232 mapping.insert(fragment_id, Vec::new());
1233 } else {
1234 mapping.insert(fragment_id, roots.clone());
1235 }
1236 }
1237 }
1238
1239 mapping
1240 }
1241
1242 fn format_fragment_parallelism_policy(
1243 fragment: &fragment::Model,
1244 job_parallelism: Option<&StreamingParallelism>,
1245 root_fragments: &[FragmentId],
1246 ) -> String {
1247 if fragment.distribution_type == DistributionType::Single {
1248 return "single".to_owned();
1249 }
1250
1251 if let Some(parallelism) = fragment.parallelism.as_ref() {
1252 return format!(
1253 "override({})",
1254 Self::format_streaming_parallelism(parallelism)
1255 );
1256 }
1257
1258 if !root_fragments.is_empty() {
1259 let mut upstreams = root_fragments.to_vec();
1260 upstreams.sort_unstable();
1261 upstreams.dedup();
1262
1263 return format!("upstream_fragment({upstreams:?})");
1264 }
1265
1266 let inherited = job_parallelism
1267 .map(Self::format_streaming_parallelism)
1268 .unwrap_or_else(|| "unknown".to_owned());
1269 format!("inherit({inherited})")
1270 }
1271
1272 fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1273 match parallelism {
1274 StreamingParallelism::Adaptive => "adaptive".to_owned(),
1275 StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1276 StreamingParallelism::Custom => "custom".to_owned(),
1277 }
1278 }
1279
1280 pub async fn list_sink_actor_mapping(
1281 &self,
1282 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1283 let inner = self.inner.read().await;
1284 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1285 .select_only()
1286 .columns([sink::Column::SinkId, sink::Column::Name])
1287 .into_tuple()
1288 .all(&inner.db)
1289 .await?;
1290 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1291
1292 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1293
1294 let actor_with_type: Vec<(ActorId, SinkId)> = {
1295 let info = self.env.shared_actor_infos().read_guard();
1296
1297 info.iter_over_fragments()
1298 .filter(|(_, fragment)| {
1299 sink_ids.contains(&fragment.job_id.as_sink_id())
1300 && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1301 })
1302 .flat_map(|(_, fragment)| {
1303 fragment
1304 .actors
1305 .keys()
1306 .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1307 })
1308 .collect()
1309 };
1310
1311 let mut sink_actor_mapping = HashMap::new();
1312 for (actor_id, sink_id) in actor_with_type {
1313 sink_actor_mapping
1314 .entry(sink_id)
1315 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1316 .1
1317 .push(actor_id);
1318 }
1319
1320 Ok(sink_actor_mapping)
1321 }
1322
1323 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1324 let inner = self.inner.read().await;
1325 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1326 .select_only()
1327 .columns([
1328 fragment::Column::FragmentId,
1329 fragment::Column::JobId,
1330 fragment::Column::StateTableIds,
1331 ])
1332 .into_partial_model()
1333 .all(&inner.db)
1334 .await?;
1335 Ok(fragment_state_tables)
1336 }
1337
1338 pub async fn load_all_actors_dynamic(
1341 &self,
1342 database_id: Option<DatabaseId>,
1343 worker_nodes: &ActiveStreamingWorkerNodes,
1344 ) -> MetaResult<FragmentRenderMap> {
1345 let loaded = self.load_fragment_context(database_id).await?;
1346
1347 if loaded.is_empty() {
1348 return Ok(HashMap::new());
1349 }
1350
1351 let adaptive_parallelism_strategy = {
1352 let system_params_reader = self.env.system_params_reader().await;
1353 system_params_reader.adaptive_parallelism_strategy()
1354 };
1355
1356 let available_workers: BTreeMap<_, _> = worker_nodes
1357 .current()
1358 .values()
1359 .filter(|worker| worker.is_streaming_schedulable())
1360 .map(|worker| {
1361 (
1362 worker.id,
1363 WorkerInfo {
1364 parallelism: NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
1365 resource_group: worker.resource_group(),
1366 },
1367 )
1368 })
1369 .collect();
1370
1371 let RenderedGraph { fragments, .. } = render_actor_assignments(
1372 self.env.actor_id_generator(),
1373 &available_workers,
1374 adaptive_parallelism_strategy,
1375 &loaded,
1376 )?;
1377
1378 tracing::trace!(?fragments, "reload all actors");
1379
1380 Ok(fragments)
1381 }
1382
1383 pub async fn load_fragment_context(
1385 &self,
1386 database_id: Option<DatabaseId>,
1387 ) -> MetaResult<LoadedFragmentContext> {
1388 let inner = self.inner.read().await;
1389 let txn = inner.db.begin().await?;
1390
1391 self.load_fragment_context_in_txn(&txn, database_id).await
1392 }
1393
1394 pub async fn load_fragment_context_in_txn<C>(
1395 &self,
1396 txn: &C,
1397 database_id: Option<DatabaseId>,
1398 ) -> MetaResult<LoadedFragmentContext>
1399 where
1400 C: ConnectionTrait,
1401 {
1402 let mut query = StreamingJob::find()
1403 .select_only()
1404 .column(streaming_job::Column::JobId);
1405
1406 if let Some(database_id) = database_id {
1407 query = query
1408 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1409 .filter(object::Column::DatabaseId.eq(database_id));
1410 }
1411
1412 let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
1413
1414 let jobs: HashSet<JobId> = jobs.into_iter().collect();
1415
1416 if jobs.is_empty() {
1417 return Ok(LoadedFragmentContext::default());
1418 }
1419
1420 load_fragment_context_for_jobs(txn, jobs).await
1421 }
1422
1423 #[await_tree::instrument]
1424 pub async fn fill_snapshot_backfill_epoch(
1425 &self,
1426 fragment_ids: impl Iterator<Item = FragmentId>,
1427 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1428 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1429 ) -> MetaResult<()> {
1430 let inner = self.inner.write().await;
1431 let txn = inner.db.begin().await?;
1432 for fragment_id in fragment_ids {
1433 let fragment = FragmentModel::find_by_id(fragment_id)
1434 .one(&txn)
1435 .await?
1436 .context(format!("fragment {} not found", fragment_id))?;
1437 let mut node = fragment.stream_node.to_protobuf();
1438 if crate::stream::fill_snapshot_backfill_epoch(
1439 &mut node,
1440 snapshot_backfill_info,
1441 cross_db_snapshot_backfill_info,
1442 )? {
1443 let node = StreamNode::from(&node);
1444 FragmentModel::update(fragment::ActiveModel {
1445 fragment_id: Set(fragment_id),
1446 stream_node: Set(node),
1447 ..Default::default()
1448 })
1449 .exec(&txn)
1450 .await?;
1451 }
1452 }
1453 txn.commit().await?;
1454 Ok(())
1455 }
1456
1457 pub fn get_running_actors_of_fragment(
1459 &self,
1460 fragment_id: FragmentId,
1461 ) -> MetaResult<HashSet<model::ActorId>> {
1462 let info = self.env.shared_actor_infos().read_guard();
1463
1464 let actors = info
1465 .get_fragment(fragment_id as _)
1466 .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1467 .unwrap_or_default();
1468
1469 Ok(actors)
1470 }
1471
1472 pub async fn get_running_actors_for_source_backfill(
1475 &self,
1476 source_backfill_fragment_id: FragmentId,
1477 source_fragment_id: FragmentId,
1478 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1479 let inner = self.inner.read().await;
1480 let txn = inner.db.begin().await?;
1481
1482 let fragment_relation: DispatcherType = FragmentRelation::find()
1483 .select_only()
1484 .column(fragment_relation::Column::DispatcherType)
1485 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1486 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1487 .into_tuple()
1488 .one(&txn)
1489 .await?
1490 .ok_or_else(|| {
1491 anyhow!(
1492 "no fragment connection from source fragment {} to source backfill fragment {}",
1493 source_fragment_id,
1494 source_backfill_fragment_id
1495 )
1496 })?;
1497
1498 if fragment_relation != DispatcherType::NoShuffle {
1499 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1500 }
1501
1502 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1503 let result: MetaResult<DistributionType> = try {
1504 FragmentModel::find_by_id(fragment_id)
1505 .select_only()
1506 .column(fragment::Column::DistributionType)
1507 .into_tuple()
1508 .one(txn)
1509 .await?
1510 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1511 };
1512 result
1513 };
1514
1515 let source_backfill_distribution_type =
1516 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1517 let source_distribution_type =
1518 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1519
1520 let load_fragment_actor_distribution =
1521 |actor_info: &SharedActorInfos,
1522 fragment_id: FragmentId|
1523 -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1524 let guard = actor_info.read_guard();
1525
1526 guard
1527 .get_fragment(fragment_id as _)
1528 .map(|fragment| {
1529 fragment
1530 .actors
1531 .iter()
1532 .map(|(actor_id, actor)| {
1533 (
1534 *actor_id as _,
1535 actor
1536 .vnode_bitmap
1537 .as_ref()
1538 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1539 )
1540 })
1541 .collect()
1542 })
1543 .unwrap_or_default()
1544 };
1545
1546 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1547 load_fragment_actor_distribution(
1548 self.env.shared_actor_infos(),
1549 source_backfill_fragment_id,
1550 );
1551
1552 let source_actors =
1553 load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1554
1555 Ok(resolve_no_shuffle_actor_dispatcher(
1556 source_distribution_type,
1557 &source_actors,
1558 source_backfill_distribution_type,
1559 &source_backfill_actors,
1560 )
1561 .into_iter()
1562 .map(|(source_actor, source_backfill_actor)| {
1563 (source_backfill_actor as _, source_actor as _)
1564 })
1565 .collect())
1566 }
1567
1568 pub async fn get_root_fragments(
1581 &self,
1582 job_ids: Vec<JobId>,
1583 ) -> MetaResult<(
1584 HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1585 HashMap<ActorId, WorkerId>,
1586 )> {
1587 let inner = self.inner.read().await;
1588
1589 let all_fragments = FragmentModel::find()
1590 .filter(fragment::Column::JobId.is_in(job_ids))
1591 .all(&inner.db)
1592 .await?;
1593 let mut root_fragments = HashMap::<JobId, fragment::Model>::new();
1595 for fragment in all_fragments {
1596 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1597 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1598 _ = root_fragments.insert(fragment.job_id, fragment);
1599 } else if mask.contains(FragmentTypeFlag::Source) {
1600 _ = root_fragments.try_insert(fragment.job_id, fragment);
1603 }
1604 }
1605
1606 let mut root_fragments_pb = HashMap::new();
1607
1608 let info = self.env.shared_actor_infos().read_guard();
1609
1610 let root_fragment_to_jobs: HashMap<_, _> = root_fragments
1611 .iter()
1612 .map(|(job_id, fragment)| (fragment.fragment_id, *job_id))
1613 .collect();
1614
1615 for fragment in root_fragment_to_jobs.keys() {
1616 let fragment_info = info.get_fragment(*fragment).context(format!(
1617 "root fragment {} not found in shared actor info",
1618 fragment
1619 ))?;
1620
1621 let job_id = root_fragment_to_jobs[&fragment_info.fragment_id];
1622 let fragment = root_fragments
1623 .get(&job_id)
1624 .context(format!("root fragment for job {} not found", job_id))?;
1625
1626 root_fragments_pb.insert(
1627 job_id,
1628 (fragment_info.clone(), fragment.stream_node.to_protobuf()),
1629 );
1630 }
1631
1632 let mut all_actor_locations = HashMap::new();
1633
1634 for (_, SharedFragmentInfo { actors, .. }) in info.iter_over_fragments() {
1635 for (actor_id, actor_info) in actors {
1636 all_actor_locations.insert(*actor_id as ActorId, actor_info.worker_id);
1637 }
1638 }
1639
1640 Ok((root_fragments_pb, all_actor_locations))
1641 }
1642
1643 pub async fn get_root_fragment(
1644 &self,
1645 job_id: JobId,
1646 ) -> MetaResult<(SharedFragmentInfo, HashMap<ActorId, WorkerId>)> {
1647 let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1648 let (root_fragment, _) = root_fragments
1649 .remove(&job_id)
1650 .context(format!("root fragment for job {} not found", job_id))?;
1651
1652 Ok((root_fragment, actors))
1653 }
1654
1655 pub async fn get_downstream_fragments(
1657 &self,
1658 job_id: JobId,
1659 ) -> MetaResult<(
1660 Vec<(
1661 stream_plan::DispatcherType,
1662 SharedFragmentInfo,
1663 PbStreamNode,
1664 )>,
1665 HashMap<ActorId, WorkerId>,
1666 )> {
1667 let (root_fragment, actor_locations) = self.get_root_fragment(job_id).await?;
1668
1669 let inner = self.inner.read().await;
1670 let txn = inner.db.begin().await?;
1671 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1672 .filter(
1673 fragment_relation::Column::SourceFragmentId
1674 .eq(root_fragment.fragment_id as FragmentId),
1675 )
1676 .all(&txn)
1677 .await?;
1678
1679 let downstream_fragment_ids = downstream_fragment_relations
1680 .iter()
1681 .map(|model| model.target_fragment_id as FragmentId)
1682 .collect::<HashSet<_>>();
1683
1684 let downstream_fragment_nodes: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1685 .select_only()
1686 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1687 .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1688 .into_tuple()
1689 .all(&txn)
1690 .await?;
1691
1692 let downstream_fragment_nodes: HashMap<_, _> =
1693 downstream_fragment_nodes.into_iter().collect();
1694
1695 let mut downstream_fragments = vec![];
1696
1697 let info = self.env.shared_actor_infos().read_guard();
1698
1699 let fragment_map: HashMap<_, _> = downstream_fragment_relations
1700 .iter()
1701 .map(|model| (model.target_fragment_id, model.dispatcher_type))
1702 .collect();
1703
1704 for fragment_id in fragment_map.keys() {
1705 let fragment_info @ SharedFragmentInfo { actors, .. } =
1706 info.get_fragment(*fragment_id).unwrap();
1707
1708 let dispatcher_type = fragment_map[fragment_id];
1709
1710 if actors.is_empty() {
1711 bail!("No fragment found for fragment id {}", fragment_id);
1712 }
1713
1714 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1715
1716 let nodes = downstream_fragment_nodes
1717 .get(fragment_id)
1718 .context(format!(
1719 "downstream fragment node for id {} not found",
1720 fragment_id
1721 ))?
1722 .to_protobuf();
1723
1724 downstream_fragments.push((dispatch_type, fragment_info.clone(), nodes));
1725 }
1726 Ok((downstream_fragments, actor_locations))
1727 }
1728
1729 pub async fn load_source_fragment_ids(
1730 &self,
1731 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1732 let inner = self.inner.read().await;
1733 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1734 .select_only()
1735 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1736 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1737 .into_tuple()
1738 .all(&inner.db)
1739 .await?;
1740
1741 let mut source_fragment_ids = HashMap::new();
1742 for (fragment_id, stream_node) in fragments {
1743 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1744 source_fragment_ids
1745 .entry(source_id)
1746 .or_insert_with(BTreeSet::new)
1747 .insert(fragment_id);
1748 }
1749 }
1750 Ok(source_fragment_ids)
1751 }
1752
1753 pub async fn load_backfill_fragment_ids(
1754 &self,
1755 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1756 let inner = self.inner.read().await;
1757 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1758 .select_only()
1759 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1760 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1761 .into_tuple()
1762 .all(&inner.db)
1763 .await?;
1764
1765 let mut source_fragment_ids = HashMap::new();
1766 for (fragment_id, stream_node) in fragments {
1767 if let Some((source_id, upstream_source_fragment_id)) =
1768 stream_node.to_protobuf().find_source_backfill()
1769 {
1770 source_fragment_ids
1771 .entry(source_id)
1772 .or_insert_with(BTreeSet::new)
1773 .insert((fragment_id, upstream_source_fragment_id));
1774 }
1775 }
1776 Ok(source_fragment_ids)
1777 }
1778
1779 pub async fn get_all_upstream_sink_infos(
1780 &self,
1781 target_table: &PbTable,
1782 target_fragment_id: FragmentId,
1783 ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1784 let inner = self.inner.read().await;
1785 let txn = inner.db.begin().await?;
1786
1787 self.get_all_upstream_sink_infos_in_txn(&txn, target_table, target_fragment_id)
1788 .await
1789 }
1790
1791 pub async fn get_all_upstream_sink_infos_in_txn<C>(
1792 &self,
1793 txn: &C,
1794 target_table: &PbTable,
1795 target_fragment_id: FragmentId,
1796 ) -> MetaResult<Vec<UpstreamSinkInfo>>
1797 where
1798 C: ConnectionTrait,
1799 {
1800 let incoming_sinks = self
1801 .get_table_incoming_sinks_in_txn(txn, target_table.id)
1802 .await?;
1803
1804 let sink_ids = incoming_sinks.iter().map(|s| s.id).collect_vec();
1805 let sink_fragment_ids = get_sink_fragment_by_ids(txn, sink_ids).await?;
1806
1807 let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1808 for pb_sink in &incoming_sinks {
1809 let sink_fragment_id =
1810 sink_fragment_ids
1811 .get(&pb_sink.id)
1812 .cloned()
1813 .ok_or(anyhow::anyhow!(
1814 "sink fragment not found for sink id {}",
1815 pb_sink.id
1816 ))?;
1817 let upstream_info = build_upstream_sink_info(
1818 pb_sink,
1819 sink_fragment_id,
1820 target_table,
1821 target_fragment_id,
1822 )?;
1823 upstream_sink_infos.push(upstream_info);
1824 }
1825
1826 Ok(upstream_sink_infos)
1827 }
1828
1829 pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1830 let inner = self.inner.read().await;
1831 let txn = inner.db.begin().await?;
1832
1833 let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1834 .select_only()
1835 .column(fragment::Column::FragmentId)
1836 .filter(
1837 fragment::Column::JobId
1838 .eq(job_id)
1839 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1840 )
1841 .into_tuple()
1842 .all(&txn)
1843 .await?;
1844
1845 if mview_fragment.len() != 1 {
1846 return Err(anyhow::anyhow!(
1847 "expected exactly one mview fragment for job {}, found {}",
1848 job_id,
1849 mview_fragment.len()
1850 )
1851 .into());
1852 }
1853
1854 Ok(mview_fragment.into_iter().next().unwrap())
1855 }
1856
1857 pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1858 let inner = self.inner.read().await;
1859 let txn = inner.db.begin().await?;
1860 has_table_been_migrated(&txn, table_id).await
1861 }
1862
1863 pub async fn update_fragment_splits<C>(
1864 &self,
1865 txn: &C,
1866 fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
1867 ) -> MetaResult<()>
1868 where
1869 C: ConnectionTrait,
1870 {
1871 if fragment_splits.is_empty() {
1872 return Ok(());
1873 }
1874
1875 let models: Vec<fragment_splits::ActiveModel> = fragment_splits
1876 .iter()
1877 .map(|(fragment_id, splits)| fragment_splits::ActiveModel {
1878 fragment_id: Set(*fragment_id as _),
1879 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1880 splits: splits.iter().map(Into::into).collect_vec(),
1881 }))),
1882 })
1883 .collect();
1884
1885 FragmentSplits::insert_many(models)
1886 .on_conflict(
1887 OnConflict::column(fragment_splits::Column::FragmentId)
1888 .update_column(fragment_splits::Column::Splits)
1889 .to_owned(),
1890 )
1891 .exec(txn)
1892 .await?;
1893
1894 Ok(())
1895 }
1896}
1897
1898#[cfg(test)]
1899mod tests {
1900 use std::collections::{BTreeMap, HashMap, HashSet};
1901
1902 use itertools::Itertools;
1903 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1904 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1905 use risingwave_common::id::JobId;
1906 use risingwave_common::util::iter_util::ZipEqDebug;
1907 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1908 use risingwave_meta_model::fragment::DistributionType;
1909 use risingwave_meta_model::*;
1910 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1911 use risingwave_pb::plan_common::PbExprContext;
1912 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1913 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1914 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1915
1916 use super::ActorInfo;
1917 use crate::MetaResult;
1918 use crate::controller::catalog::CatalogController;
1919 use crate::model::{Fragment, StreamActor};
1920
1921 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1922
1923 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1924
1925 const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
1926
1927 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
1928
1929 const TEST_JOB_ID: JobId = JobId::new(1);
1930
1931 const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
1932
1933 fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
1934 let mut upstream_actor_ids = BTreeMap::new();
1935 upstream_actor_ids.insert(
1936 TEST_UPSTREAM_FRAGMENT_ID,
1937 HashSet::from_iter([(actor_id + 100)]),
1938 );
1939 upstream_actor_ids.insert(
1940 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1941 HashSet::from_iter([(actor_id + 200)]),
1942 );
1943 upstream_actor_ids
1944 }
1945
1946 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1947 let mut input = vec![];
1948 for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
1949 input.push(PbStreamNode {
1950 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1951 upstream_fragment_id,
1952 ..Default::default()
1953 }))),
1954 ..Default::default()
1955 });
1956 }
1957
1958 PbStreamNode {
1959 input,
1960 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1961 ..Default::default()
1962 }
1963 }
1964
1965 #[tokio::test]
1966 async fn test_extract_fragment() -> MetaResult<()> {
1967 let actor_count = 3u32;
1968 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1969 .map(|actor_id| {
1970 (
1971 actor_id.into(),
1972 generate_upstream_actor_ids_for_actor(actor_id.into()),
1973 )
1974 })
1975 .collect();
1976
1977 let actor_bitmaps = ActorMapping::new_uniform(
1978 (0..actor_count).map(|i| i.into()),
1979 VirtualNode::COUNT_FOR_TEST,
1980 )
1981 .to_bitmaps();
1982
1983 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1984
1985 let pb_actors = (0..actor_count)
1986 .map(|actor_id| StreamActor {
1987 actor_id: actor_id.into(),
1988 fragment_id: TEST_FRAGMENT_ID as _,
1989 vnode_bitmap: actor_bitmaps.get(&actor_id.into()).cloned(),
1990 mview_definition: "".to_owned(),
1991 expr_context: Some(PbExprContext {
1992 time_zone: String::from("America/New_York"),
1993 strict_mode: false,
1994 }),
1995 config_override: "".into(),
1996 })
1997 .collect_vec();
1998
1999 let pb_fragment = Fragment {
2000 fragment_id: TEST_FRAGMENT_ID as _,
2001 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
2002 distribution_type: PbFragmentDistributionType::Hash as _,
2003 actors: pb_actors,
2004 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
2005 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
2006 nodes: stream_node,
2007 };
2008
2009 let fragment =
2010 CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
2011
2012 check_fragment(fragment, pb_fragment);
2013
2014 Ok(())
2015 }
2016
2017 #[tokio::test]
2018 async fn test_compose_fragment() -> MetaResult<()> {
2019 let actor_count = 3u32;
2020
2021 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2022 .map(|actor_id| {
2023 (
2024 actor_id.into(),
2025 generate_upstream_actor_ids_for_actor(actor_id.into()),
2026 )
2027 })
2028 .collect();
2029
2030 let mut actor_bitmaps = ActorMapping::new_uniform(
2031 (0..actor_count).map(|i| i.into()),
2032 VirtualNode::COUNT_FOR_TEST,
2033 )
2034 .to_bitmaps();
2035
2036 let actors = (0..actor_count)
2037 .map(|actor_id| {
2038 let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
2039 splits: vec![PbConnectorSplit {
2040 split_type: "dummy".to_owned(),
2041 ..Default::default()
2042 }],
2043 });
2044
2045 ActorInfo {
2046 actor_id: actor_id.into(),
2047 fragment_id: TEST_FRAGMENT_ID,
2048 splits: actor_splits,
2049 worker_id: 0.into(),
2050 vnode_bitmap: actor_bitmaps
2051 .remove(&actor_id.into())
2052 .map(|bitmap| bitmap.to_protobuf())
2053 .as_ref()
2054 .map(VnodeBitmap::from),
2055 expr_context: ExprContext::from(&PbExprContext {
2056 time_zone: String::from("America/New_York"),
2057 strict_mode: false,
2058 }),
2059 config_override: "a.b.c = true".into(),
2060 }
2061 })
2062 .collect_vec();
2063
2064 let stream_node = {
2065 let template_actor = actors.first().cloned().unwrap();
2066
2067 let template_upstream_actor_ids = upstream_actor_ids
2068 .get(&(template_actor.actor_id as _))
2069 .unwrap();
2070
2071 generate_merger_stream_node(template_upstream_actor_ids)
2072 };
2073
2074 #[expect(deprecated)]
2075 let fragment = fragment::Model {
2076 fragment_id: TEST_FRAGMENT_ID,
2077 job_id: TEST_JOB_ID,
2078 fragment_type_mask: 0,
2079 distribution_type: DistributionType::Hash,
2080 stream_node: StreamNode::from(&stream_node),
2081 state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
2082 upstream_fragment_id: Default::default(),
2083 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2084 parallelism: None,
2085 };
2086
2087 let (pb_fragment, pb_actor_status, pb_actor_splits) =
2088 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2089
2090 assert_eq!(pb_actor_status.len(), actor_count as usize);
2091 assert!(
2092 pb_actor_status
2093 .values()
2094 .all(|actor_status| actor_status.location.is_some())
2095 );
2096 assert_eq!(pb_actor_splits.len(), actor_count as usize);
2097
2098 let pb_actors = pb_fragment.actors.clone();
2099
2100 check_fragment(fragment, pb_fragment);
2101 check_actors(
2102 actors,
2103 &upstream_actor_ids,
2104 pb_actors,
2105 pb_actor_splits,
2106 &stream_node,
2107 );
2108
2109 Ok(())
2110 }
2111
2112 fn check_actors(
2113 actors: Vec<ActorInfo>,
2114 actor_upstreams: &FragmentActorUpstreams,
2115 pb_actors: Vec<StreamActor>,
2116 pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2117 stream_node: &PbStreamNode,
2118 ) {
2119 for (
2120 ActorInfo {
2121 actor_id,
2122 fragment_id,
2123 splits,
2124 worker_id: _,
2125 vnode_bitmap,
2126 expr_context,
2127 ..
2128 },
2129 StreamActor {
2130 actor_id: pb_actor_id,
2131 fragment_id: pb_fragment_id,
2132 vnode_bitmap: pb_vnode_bitmap,
2133 mview_definition,
2134 expr_context: pb_expr_context,
2135 ..
2136 },
2137 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2138 {
2139 assert_eq!(actor_id, pb_actor_id as ActorId);
2140 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2141
2142 assert_eq!(
2143 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2144 pb_vnode_bitmap,
2145 );
2146
2147 assert_eq!(mview_definition, "");
2148
2149 visit_stream_node_body(stream_node, |body| {
2150 if let PbNodeBody::Merge(m) = body {
2151 assert!(
2152 actor_upstreams
2153 .get(&(actor_id as _))
2154 .unwrap()
2155 .contains_key(&m.upstream_fragment_id)
2156 );
2157 }
2158 });
2159
2160 assert_eq!(
2161 splits,
2162 pb_actor_splits
2163 .get(&pb_actor_id)
2164 .map(ConnectorSplits::from)
2165 .unwrap_or_default()
2166 );
2167
2168 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2169 }
2170 }
2171
2172 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2173 let Fragment {
2174 fragment_id,
2175 fragment_type_mask,
2176 distribution_type: pb_distribution_type,
2177 actors: _,
2178 state_table_ids: pb_state_table_ids,
2179 maybe_vnode_count: _,
2180 nodes,
2181 } = pb_fragment;
2182
2183 assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2184 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2185 assert_eq!(
2186 pb_distribution_type,
2187 PbFragmentDistributionType::from(fragment.distribution_type)
2188 );
2189
2190 assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2191 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2192 }
2193
2194 #[test]
2195 fn test_parallelism_policy_with_root_fragments() {
2196 #[expect(deprecated)]
2197 let fragment = fragment::Model {
2198 fragment_id: 3.into(),
2199 job_id: TEST_JOB_ID,
2200 fragment_type_mask: 0,
2201 distribution_type: DistributionType::Hash,
2202 stream_node: StreamNode::from(&PbStreamNode::default()),
2203 state_table_ids: TableIdArray::default(),
2204 upstream_fragment_id: Default::default(),
2205 vnode_count: 0,
2206 parallelism: None,
2207 };
2208
2209 let job_parallelism = StreamingParallelism::Fixed(4);
2210
2211 let policy = super::CatalogController::format_fragment_parallelism_policy(
2212 &fragment,
2213 Some(&job_parallelism),
2214 &[],
2215 );
2216
2217 assert_eq!(policy, "inherit(fixed(4))");
2218 }
2219
2220 #[test]
2221 fn test_parallelism_policy_with_upstream_roots() {
2222 #[expect(deprecated)]
2223 let fragment = fragment::Model {
2224 fragment_id: 5.into(),
2225 job_id: TEST_JOB_ID,
2226 fragment_type_mask: 0,
2227 distribution_type: DistributionType::Hash,
2228 stream_node: StreamNode::from(&PbStreamNode::default()),
2229 state_table_ids: TableIdArray::default(),
2230 upstream_fragment_id: Default::default(),
2231 vnode_count: 0,
2232 parallelism: None,
2233 };
2234
2235 let policy = super::CatalogController::format_fragment_parallelism_policy(
2236 &fragment,
2237 None,
2238 &[3.into(), 1.into(), 2.into(), 1.into()],
2239 );
2240
2241 assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2242 }
2243}