1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::CdcScanOptions;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34 Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39 TableIdArray, VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits,
40 object, sink, source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46 Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::fragment::{
49 FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan;
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57 PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, SinkLogStoreType,
58 StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63 ColumnTrait, ConnectionTrait, DatabaseTransaction, EntityTrait, FromQueryResult, JoinType,
64 PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, StreamTrait, TransactionTrait,
65};
66use serde::{Deserialize, Serialize};
67
68use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
69use crate::controller::catalog::CatalogController;
70use crate::controller::scale::{
71 FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble, RenderedGraph,
72 find_fragment_no_shuffle_dags_detailed, load_fragment_context_for_jobs,
73 render_actor_assignments, resolve_streaming_job_definition,
74};
75use crate::controller::utils::{
76 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
77 get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
78 resolve_no_shuffle_actor_dispatcher,
79};
80use crate::error::MetaError;
81use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
82use crate::model::{
83 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
84 StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
85 TableParallelism,
86};
87use crate::rpc::ddl_controller::build_upstream_sink_info;
88use crate::stream::UpstreamSinkInfo;
89use crate::{MetaResult, model};
90
91#[derive(Debug)]
93pub struct InflightActorInfo {
94 pub worker_id: WorkerId,
95 pub vnode_bitmap: Option<Bitmap>,
96 pub splits: Vec<SplitImpl>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100struct ActorInfo {
101 pub actor_id: ActorId,
102 pub fragment_id: FragmentId,
103 pub splits: ConnectorSplits,
104 pub worker_id: WorkerId,
105 pub vnode_bitmap: Option<VnodeBitmap>,
106 pub expr_context: ExprContext,
107 pub config_override: Arc<str>,
108}
109
110#[derive(Debug)]
111struct FragmentDescRow {
112 fragment_id: FragmentId,
113 job_id: JobId,
114 fragment_type_mask: i32,
115 distribution_type: DistributionType,
116 state_table_ids: TableIdArray,
117 vnode_count: i32,
118 parallelism_override: Option<StreamingParallelism>,
119 stream_node: Option<StreamNode>,
120}
121
122#[derive(Debug)]
123pub struct InflightFragmentInfo {
124 pub fragment_id: FragmentId,
125 pub distribution_type: DistributionType,
126 pub fragment_type_mask: FragmentTypeMask,
127 pub vnode_count: usize,
128 pub nodes: PbStreamNode,
129 pub actors: HashMap<ActorId, InflightActorInfo>,
130 pub state_table_ids: HashSet<TableId>,
131}
132
133#[derive(Clone, Debug)]
134pub struct FragmentParallelismInfo {
135 pub distribution_type: FragmentDistributionType,
136 pub actor_count: usize,
137 pub vnode_count: usize,
138}
139
140#[easy_ext::ext(FragmentTypeMaskExt)]
141pub impl FragmentTypeMask {
142 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
143 Expr::col(fragment::Column::FragmentTypeMask)
144 .bit_and(Expr::value(flag as i32))
145 .ne(0)
146 }
147
148 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
149 Expr::col(fragment::Column::FragmentTypeMask)
150 .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
151 .ne(0)
152 }
153
154 fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
155 Expr::col(fragment::Column::FragmentTypeMask)
156 .bit_and(Expr::value(flag as i32))
157 .eq(0)
158 }
159}
160
161#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
162#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
164 pub job_id: JobId,
165 pub obj_type: ObjectType,
166 pub name: String,
167 pub job_status: JobStatus,
168 pub parallelism: StreamingParallelism,
169 pub max_parallelism: i32,
170 pub resource_group: String,
171 pub config_override: String,
172 pub database_id: DatabaseId,
173 pub schema_id: SchemaId,
174}
175
176impl NotificationManager {
177 pub(crate) fn notify_fragment_mapping(
178 &self,
179 operation: NotificationOperation,
180 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
181 ) {
182 let fragment_ids = fragment_mappings
183 .iter()
184 .map(|mapping| mapping.fragment_id)
185 .collect_vec();
186 if fragment_ids.is_empty() {
187 return;
188 }
189 for fragment_mapping in fragment_mappings {
191 self.notify_frontend_without_version(
192 operation,
193 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
194 );
195 }
196
197 match operation {
199 NotificationOperation::Add | NotificationOperation::Update => {
200 self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
201 fragment_ids,
202 ));
203 }
204 NotificationOperation::Delete => {
205 self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
206 fragment_ids,
207 ));
208 }
209 op => {
210 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
211 }
212 }
213 }
214}
215
216impl CatalogController {
217 pub fn prepare_fragment_models_from_fragments(
218 job_id: JobId,
219 fragments: impl Iterator<Item = &Fragment>,
220 ) -> MetaResult<Vec<fragment::Model>> {
221 fragments
222 .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
223 .try_collect()
224 }
225
226 pub fn prepare_fragment_model_for_new_job(
227 job_id: JobId,
228 fragment: &Fragment,
229 ) -> MetaResult<fragment::Model> {
230 let vnode_count = fragment.vnode_count();
231 let Fragment {
232 fragment_id: pb_fragment_id,
233 fragment_type_mask: pb_fragment_type_mask,
234 distribution_type: pb_distribution_type,
235 actors: pb_actors,
236 state_table_ids: pb_state_table_ids,
237 nodes,
238 ..
239 } = fragment;
240
241 let state_table_ids = pb_state_table_ids.clone().into();
242
243 assert!(!pb_actors.is_empty());
244
245 let fragment_parallelism = nodes
246 .node_body
247 .as_ref()
248 .and_then(|body| match body {
249 NodeBody::StreamCdcScan(node) => Some(node),
250 _ => None,
251 })
252 .and_then(|node| node.options.as_ref())
253 .map(CdcScanOptions::from_proto)
254 .filter(|opts| opts.is_parallelized_backfill())
255 .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
256
257 let stream_node = StreamNode::from(nodes);
258
259 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
260 .unwrap()
261 .into();
262
263 #[expect(deprecated)]
264 let fragment = fragment::Model {
265 fragment_id: *pb_fragment_id as _,
266 job_id,
267 fragment_type_mask: (*pb_fragment_type_mask).into(),
268 distribution_type,
269 stream_node,
270 state_table_ids,
271 upstream_fragment_id: Default::default(),
272 vnode_count: vnode_count as _,
273 parallelism: fragment_parallelism,
274 };
275
276 Ok(fragment)
277 }
278
279 fn compose_table_fragments(
280 job_id: JobId,
281 state: PbState,
282 ctx: StreamContext,
283 fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
284 parallelism: StreamingParallelism,
285 max_parallelism: usize,
286 job_definition: Option<String>,
287 ) -> MetaResult<StreamJobFragments> {
288 let mut pb_fragments = BTreeMap::new();
289 let mut pb_actor_status = BTreeMap::new();
290
291 for (fragment, actors) in fragments {
292 let (fragment, fragment_actor_status, _) =
293 Self::compose_fragment(fragment, actors, job_definition.clone())?;
294
295 pb_fragments.insert(fragment.fragment_id, fragment);
296 pb_actor_status.extend(fragment_actor_status.into_iter());
297 }
298
299 let table_fragments = StreamJobFragments {
300 stream_job_id: job_id,
301 state: state as _,
302 fragments: pb_fragments,
303 actor_status: pb_actor_status,
304 ctx,
305 assigned_parallelism: match parallelism {
306 StreamingParallelism::Custom => TableParallelism::Custom,
307 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
308 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
309 },
310 max_parallelism,
311 };
312
313 Ok(table_fragments)
314 }
315
316 #[allow(clippy::type_complexity)]
317 fn compose_fragment(
318 fragment: fragment::Model,
319 actors: Vec<ActorInfo>,
320 job_definition: Option<String>,
321 ) -> MetaResult<(
322 Fragment,
323 HashMap<crate::model::ActorId, PbActorStatus>,
324 HashMap<crate::model::ActorId, PbConnectorSplits>,
325 )> {
326 let fragment::Model {
327 fragment_id,
328 fragment_type_mask,
329 distribution_type,
330 stream_node,
331 state_table_ids,
332 vnode_count,
333 ..
334 } = fragment;
335
336 let stream_node = stream_node.to_protobuf();
337 let mut upstream_fragments = HashSet::new();
338 visit_stream_node_body(&stream_node, |body| {
339 if let NodeBody::Merge(m) = body {
340 assert!(
341 upstream_fragments.insert(m.upstream_fragment_id),
342 "non-duplicate upstream fragment"
343 );
344 }
345 });
346
347 let mut pb_actors = vec![];
348
349 let mut pb_actor_status = HashMap::new();
350 let mut pb_actor_splits = HashMap::new();
351
352 for actor in actors {
353 if actor.fragment_id != fragment_id {
354 bail!(
355 "fragment id {} from actor {} is different from fragment {}",
356 actor.fragment_id,
357 actor.actor_id,
358 fragment_id
359 )
360 }
361
362 let ActorInfo {
363 actor_id,
364 fragment_id,
365 worker_id,
366 splits,
367 vnode_bitmap,
368 expr_context,
369 config_override,
370 ..
371 } = actor;
372
373 let vnode_bitmap =
374 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
375 let pb_expr_context = Some(expr_context.to_protobuf());
376
377 pb_actor_status.insert(
378 actor_id as _,
379 PbActorStatus {
380 location: PbActorLocation::from_worker(worker_id),
381 },
382 );
383
384 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
385
386 pb_actors.push(StreamActor {
387 actor_id: actor_id as _,
388 fragment_id: fragment_id as _,
389 vnode_bitmap,
390 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
391 expr_context: pb_expr_context,
392 config_override,
393 })
394 }
395
396 let pb_state_table_ids = state_table_ids.0;
397 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
398 let pb_fragment = Fragment {
399 fragment_id: fragment_id as _,
400 fragment_type_mask: fragment_type_mask.into(),
401 distribution_type: pb_distribution_type,
402 actors: pb_actors,
403 state_table_ids: pb_state_table_ids,
404 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
405 nodes: stream_node,
406 };
407
408 Ok((pb_fragment, pb_actor_status, pb_actor_splits))
409 }
410
411 pub fn running_fragment_parallelisms(
412 &self,
413 id_filter: Option<HashSet<FragmentId>>,
414 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
415 let info = self.env.shared_actor_infos().read_guard();
416
417 let mut result = HashMap::new();
418 for (fragment_id, fragment) in info.iter_over_fragments() {
419 if let Some(id_filter) = &id_filter
420 && !id_filter.contains(&(*fragment_id as _))
421 {
422 continue; }
424
425 result.insert(
426 *fragment_id as _,
427 FragmentParallelismInfo {
428 distribution_type: fragment.distribution_type.into(),
429 actor_count: fragment.actors.len() as _,
430 vnode_count: fragment.vnode_count,
431 },
432 );
433 }
434
435 Ok(result)
436 }
437
438 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
439 let inner = self.inner.read().await;
440 let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
441 .select_only()
442 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
443 .into_tuple()
444 .all(&inner.db)
445 .await?;
446 Ok(fragment_jobs.into_iter().collect())
447 }
448
449 pub async fn get_fragment_job_id(
450 &self,
451 fragment_ids: Vec<FragmentId>,
452 ) -> MetaResult<Vec<ObjectId>> {
453 let inner = self.inner.read().await;
454
455 let object_ids: Vec<ObjectId> = FragmentModel::find()
456 .select_only()
457 .column(fragment::Column::JobId)
458 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
459 .into_tuple()
460 .all(&inner.db)
461 .await?;
462
463 Ok(object_ids)
464 }
465
466 pub async fn get_fragment_desc_by_id(
467 &self,
468 fragment_id: FragmentId,
469 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
470 let inner = self.inner.read().await;
471
472 let fragment_model = match FragmentModel::find_by_id(fragment_id)
473 .one(&inner.db)
474 .await?
475 {
476 Some(fragment) => fragment,
477 None => return Ok(None),
478 };
479
480 let job_parallelism: Option<StreamingParallelism> =
481 StreamingJob::find_by_id(fragment_model.job_id)
482 .select_only()
483 .column(streaming_job::Column::Parallelism)
484 .into_tuple()
485 .one(&inner.db)
486 .await?;
487
488 let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
489 .select_only()
490 .columns([
491 fragment_relation::Column::SourceFragmentId,
492 fragment_relation::Column::DispatcherType,
493 ])
494 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
495 .into_tuple()
496 .all(&inner.db)
497 .await?;
498
499 let upstreams: Vec<_> = upstream_entries
500 .into_iter()
501 .map(|(source_id, _)| source_id)
502 .collect();
503
504 let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
505 .await
506 .map(Self::collect_root_fragment_mapping)?;
507 let root_fragments = root_fragment_map
508 .get(&fragment_id)
509 .cloned()
510 .unwrap_or_default();
511
512 let info = self.env.shared_actor_infos().read_guard();
513 let SharedFragmentInfo { actors, .. } = info
514 .get_fragment(fragment_model.fragment_id as _)
515 .unwrap_or_else(|| {
516 panic!(
517 "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
518 fragment_model.fragment_id,
519 fragment_model.job_id
520 )
521 });
522
523 let parallelism_policy = Self::format_fragment_parallelism_policy(
524 fragment_model.distribution_type,
525 fragment_model.parallelism.as_ref(),
526 job_parallelism.as_ref(),
527 &root_fragments,
528 );
529
530 let fragment = FragmentDesc {
531 fragment_id: fragment_model.fragment_id,
532 job_id: fragment_model.job_id,
533 fragment_type_mask: fragment_model.fragment_type_mask,
534 distribution_type: fragment_model.distribution_type,
535 state_table_ids: fragment_model.state_table_ids.clone(),
536 parallelism: actors.len() as _,
537 vnode_count: fragment_model.vnode_count,
538 stream_node: fragment_model.stream_node.clone(),
539 parallelism_policy,
540 };
541
542 Ok(Some((fragment, upstreams)))
543 }
544
545 pub async fn list_fragment_database_ids(
546 &self,
547 select_fragment_ids: Option<Vec<FragmentId>>,
548 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
549 let inner = self.inner.read().await;
550 let select = FragmentModel::find()
551 .select_only()
552 .column(fragment::Column::FragmentId)
553 .column(object::Column::DatabaseId)
554 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
555 let select = if let Some(select_fragment_ids) = select_fragment_ids {
556 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
557 } else {
558 select
559 };
560 Ok(select.into_tuple().all(&inner.db).await?)
561 }
562
563 pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
564 let inner = self.inner.read().await;
565
566 let fragments: Vec<_> = FragmentModel::find()
568 .filter(fragment::Column::JobId.eq(job_id))
569 .all(&inner.db)
570 .await?;
571
572 let job_info = StreamingJob::find_by_id(job_id)
573 .one(&inner.db)
574 .await?
575 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
576
577 let fragment_actors =
578 self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
579
580 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
581 .await?
582 .remove(&job_id);
583
584 Self::compose_table_fragments(
585 job_id,
586 job_info.job_status.into(),
587 job_info.stream_context(),
588 fragment_actors,
589 job_info.parallelism.clone(),
590 job_info.max_parallelism as _,
591 job_definition,
592 )
593 }
594
595 pub async fn get_fragment_actor_dispatchers(
596 &self,
597 fragment_ids: Vec<FragmentId>,
598 ) -> MetaResult<FragmentActorDispatchers> {
599 let inner = self.inner.read().await;
600
601 self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
602 .await
603 }
604
605 pub async fn get_fragment_actor_dispatchers_txn(
606 &self,
607 c: &impl ConnectionTrait,
608 fragment_ids: Vec<FragmentId>,
609 ) -> MetaResult<FragmentActorDispatchers> {
610 let fragment_relations = FragmentRelation::find()
611 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
612 .all(c)
613 .await?;
614
615 type FragmentActorInfo = (
616 DistributionType,
617 Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
618 );
619
620 let shared_info = self.env.shared_actor_infos();
621 let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
622 let get_fragment_actors = |fragment_id: FragmentId| async move {
623 let result: MetaResult<FragmentActorInfo> = try {
624 let read_guard = shared_info.read_guard();
625
626 let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
627
628 (
629 fragment.distribution_type,
630 Arc::new(
631 fragment
632 .actors
633 .iter()
634 .map(|(actor_id, actor_info)| {
635 (
636 *actor_id,
637 actor_info
638 .vnode_bitmap
639 .as_ref()
640 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
641 )
642 })
643 .collect(),
644 ),
645 )
646 };
647 result
648 };
649
650 let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
651 for fragment_relation::Model {
652 source_fragment_id,
653 target_fragment_id,
654 dispatcher_type,
655 dist_key_indices,
656 output_indices,
657 output_type_mapping,
658 } in fragment_relations
659 {
660 let (source_fragment_distribution, source_fragment_actors) = {
661 let (distribution, actors) = {
662 match fragment_actor_cache.entry(source_fragment_id) {
663 Entry::Occupied(entry) => entry.into_mut(),
664 Entry::Vacant(entry) => {
665 entry.insert(get_fragment_actors(source_fragment_id).await?)
666 }
667 }
668 };
669 (*distribution, actors.clone())
670 };
671 let (target_fragment_distribution, target_fragment_actors) = {
672 let (distribution, actors) = {
673 match fragment_actor_cache.entry(target_fragment_id) {
674 Entry::Occupied(entry) => entry.into_mut(),
675 Entry::Vacant(entry) => {
676 entry.insert(get_fragment_actors(target_fragment_id).await?)
677 }
678 }
679 };
680 (*distribution, actors.clone())
681 };
682 let output_mapping = PbDispatchOutputMapping {
683 indices: output_indices.into_u32_array(),
684 types: output_type_mapping.unwrap_or_default().to_protobuf(),
685 };
686 let dispatchers = compose_dispatchers(
687 source_fragment_distribution,
688 &source_fragment_actors,
689 target_fragment_id as _,
690 target_fragment_distribution,
691 &target_fragment_actors,
692 dispatcher_type,
693 dist_key_indices.into_u32_array(),
694 output_mapping,
695 );
696 let actor_dispatchers_map = actor_dispatchers_map
697 .entry(source_fragment_id as _)
698 .or_default();
699 for (actor_id, dispatchers) in dispatchers {
700 actor_dispatchers_map
701 .entry(actor_id as _)
702 .or_default()
703 .push(dispatchers);
704 }
705 }
706 Ok(actor_dispatchers_map)
707 }
708
709 pub async fn get_fragment_downstream_relations(
710 &self,
711 fragment_ids: Vec<FragmentId>,
712 ) -> MetaResult<FragmentDownstreamRelation> {
713 let inner = self.inner.read().await;
714 self.get_fragment_downstream_relations_in_txn(&inner.db, fragment_ids)
715 .await
716 }
717
718 pub async fn get_fragment_downstream_relations_in_txn<C>(
719 &self,
720 txn: &C,
721 fragment_ids: Vec<FragmentId>,
722 ) -> MetaResult<FragmentDownstreamRelation>
723 where
724 C: ConnectionTrait + StreamTrait + Send,
725 {
726 let mut stream = FragmentRelation::find()
727 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
728 .stream(txn)
729 .await?;
730 let mut relations = FragmentDownstreamRelation::new();
731 while let Some(relation) = stream.try_next().await? {
732 relations
733 .entry(relation.source_fragment_id as _)
734 .or_default()
735 .push(DownstreamFragmentRelation {
736 downstream_fragment_id: relation.target_fragment_id as _,
737 dispatcher_type: relation.dispatcher_type,
738 dist_key_indices: relation.dist_key_indices.into_u32_array(),
739 output_mapping: PbDispatchOutputMapping {
740 indices: relation.output_indices.into_u32_array(),
741 types: relation
742 .output_type_mapping
743 .unwrap_or_default()
744 .to_protobuf(),
745 },
746 });
747 }
748 Ok(relations)
749 }
750
751 pub async fn get_job_fragment_backfill_scan_type(
752 &self,
753 job_id: JobId,
754 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
755 let inner = self.inner.read().await;
756 let fragments: Vec<_> = FragmentModel::find()
757 .filter(fragment::Column::JobId.eq(job_id))
758 .all(&inner.db)
759 .await?;
760
761 let mut result = HashMap::new();
762
763 for fragment::Model {
764 fragment_id,
765 stream_node,
766 ..
767 } in fragments
768 {
769 let stream_node = stream_node.to_protobuf();
770 visit_stream_node_body(&stream_node, |body| {
771 if let NodeBody::StreamScan(node) = body {
772 match node.stream_scan_type() {
773 StreamScanType::Unspecified => {}
774 scan_type => {
775 result.insert(fragment_id as crate::model::FragmentId, scan_type);
776 }
777 }
778 }
779 });
780 }
781
782 Ok(result)
783 }
784
785 pub async fn count_streaming_jobs(&self) -> MetaResult<usize> {
786 let inner = self.inner.read().await;
787 let count = StreamingJob::find().count(&inner.db).await?;
788 Ok(usize::try_from(count).context("streaming job count overflow")?)
789 }
790
791 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
792 let inner = self.inner.read().await;
793 let job_states = StreamingJob::find()
794 .select_only()
795 .column(streaming_job::Column::JobId)
796 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
797 .join(JoinType::InnerJoin, object::Relation::Database2.def())
798 .column(object::Column::ObjType)
799 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
800 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
801 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
802 .column_as(
803 Expr::if_null(
804 Expr::col((table::Entity, table::Column::Name)),
805 Expr::if_null(
806 Expr::col((source::Entity, source::Column::Name)),
807 Expr::if_null(
808 Expr::col((sink::Entity, sink::Column::Name)),
809 Expr::val("<unknown>"),
810 ),
811 ),
812 ),
813 "name",
814 )
815 .columns([
816 streaming_job::Column::JobStatus,
817 streaming_job::Column::Parallelism,
818 streaming_job::Column::MaxParallelism,
819 ])
820 .column_as(
821 Expr::if_null(
822 Expr::col((
823 streaming_job::Entity,
824 streaming_job::Column::SpecificResourceGroup,
825 )),
826 Expr::col((database::Entity, database::Column::ResourceGroup)),
827 ),
828 "resource_group",
829 )
830 .column_as(
831 Expr::if_null(
832 Expr::col((streaming_job::Entity, streaming_job::Column::ConfigOverride)),
833 Expr::val(""),
834 ),
835 "config_override",
836 )
837 .column(object::Column::DatabaseId)
838 .column(object::Column::SchemaId)
839 .into_model()
840 .all(&inner.db)
841 .await?;
842 Ok(job_states)
843 }
844
845 pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
846 let inner = self.inner.read().await;
847 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
848 .select_only()
849 .column(streaming_job::Column::MaxParallelism)
850 .into_tuple()
851 .one(&inner.db)
852 .await?
853 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
854 Ok(max_parallelism as usize)
855 }
856
857 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
859 if let Ok(inner) = self.inner.try_read()
860 && let Ok(job_state_tables) = FragmentModel::find()
861 .select_only()
862 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
863 .into_tuple::<(JobId, I32Array)>()
864 .all(&inner.db)
865 .await
866 {
867 let mut job_internal_table_ids = HashMap::new();
868 for (job_id, state_table_ids) in job_state_tables {
869 job_internal_table_ids
870 .entry(job_id)
871 .or_insert_with(Vec::new)
872 .extend(
873 state_table_ids
874 .into_inner()
875 .into_iter()
876 .map(|table_id| TableId::new(table_id as _)),
877 );
878 }
879 return Some(job_internal_table_ids.into_iter().collect());
880 }
881 None
882 }
883
884 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
885 let inner = self.inner.read().await;
886 let count = FragmentModel::find().count(&inner.db).await?;
887 Ok(count > 0)
888 }
889
890 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
891 let read_guard = self.env.shared_actor_infos().read_guard();
892 let actor_cnt: HashMap<WorkerId, _> = read_guard
893 .iter_over_fragments()
894 .flat_map(|(_, fragment)| {
895 fragment
896 .actors
897 .iter()
898 .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
899 })
900 .into_group_map()
901 .into_iter()
902 .map(|(k, v)| (k, v.len()))
903 .collect();
904
905 Ok(actor_cnt)
906 }
907
908 fn collect_fragment_actor_map(
909 &self,
910 fragment_ids: &[FragmentId],
911 stream_context: StreamContext,
912 ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
913 let guard = self.env.shared_actor_infos().read_guard();
914 let pb_expr_context = stream_context.to_expr_context();
915 let expr_context: ExprContext = (&pb_expr_context).into();
916
917 let mut actor_map = HashMap::with_capacity(fragment_ids.len());
918 for fragment_id in fragment_ids {
919 let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
920 anyhow!("fragment {} not found in shared actor info", fragment_id)
921 })?;
922
923 let actors = fragment_info
924 .actors
925 .iter()
926 .map(|(actor_id, actor_info)| ActorInfo {
927 actor_id: *actor_id as _,
928 fragment_id: *fragment_id,
929 splits: ConnectorSplits::from(&PbConnectorSplits {
930 splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
931 }),
932 worker_id: actor_info.worker_id as _,
933 vnode_bitmap: actor_info
934 .vnode_bitmap
935 .as_ref()
936 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
937 expr_context: expr_context.clone(),
938 config_override: stream_context.config_override.clone(),
939 })
940 .collect();
941
942 actor_map.insert(*fragment_id, actors);
943 }
944
945 Ok(actor_map)
946 }
947
948 fn collect_fragment_actor_pairs(
949 &self,
950 fragments: Vec<fragment::Model>,
951 stream_context: StreamContext,
952 ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
953 let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
954 let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
955 fragments
956 .into_iter()
957 .map(|fragment| {
958 let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
959 anyhow!(
960 "fragment {} missing in shared actor info map",
961 fragment.fragment_id
962 )
963 })?;
964 Ok((fragment, actors))
965 })
966 .collect()
967 }
968
969 pub async fn table_fragments(&self) -> MetaResult<BTreeMap<JobId, StreamJobFragments>> {
971 let inner = self.inner.read().await;
972 let jobs = StreamingJob::find().all(&inner.db).await?;
973
974 let mut job_definition = resolve_streaming_job_definition(
975 &inner.db,
976 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
977 )
978 .await?;
979
980 let mut table_fragments = BTreeMap::new();
981 for job in jobs {
982 let fragments = FragmentModel::find()
983 .filter(fragment::Column::JobId.eq(job.job_id))
984 .all(&inner.db)
985 .await?;
986
987 let fragment_actors =
988 self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
989
990 table_fragments.insert(
991 job.job_id,
992 Self::compose_table_fragments(
993 job.job_id,
994 job.job_status.into(),
995 job.stream_context(),
996 fragment_actors,
997 job.parallelism.clone(),
998 job.max_parallelism as _,
999 job_definition.remove(&job.job_id),
1000 )?,
1001 );
1002 }
1003
1004 Ok(table_fragments)
1005 }
1006
1007 pub async fn upstream_fragments(
1008 &self,
1009 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1010 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
1011 let inner = self.inner.read().await;
1012 let mut stream = FragmentRelation::find()
1013 .select_only()
1014 .columns([
1015 fragment_relation::Column::SourceFragmentId,
1016 fragment_relation::Column::TargetFragmentId,
1017 ])
1018 .filter(
1019 fragment_relation::Column::TargetFragmentId
1020 .is_in(fragment_ids.map(|id| id as FragmentId)),
1021 )
1022 .into_tuple::<(FragmentId, FragmentId)>()
1023 .stream(&inner.db)
1024 .await?;
1025 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
1026 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
1027 upstream_fragments
1028 .entry(downstream_fragment_id as crate::model::FragmentId)
1029 .or_default()
1030 .insert(upstream_fragment_id as crate::model::FragmentId);
1031 }
1032 Ok(upstream_fragments)
1033 }
1034
1035 pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
1036 let info = self.env.shared_actor_infos().read_guard();
1037
1038 let actor_locations = info
1039 .iter_over_fragments()
1040 .flat_map(|(fragment_id, fragment)| {
1041 fragment
1042 .actors
1043 .iter()
1044 .map(|(actor_id, actor)| PartialActorLocation {
1045 actor_id: *actor_id as _,
1046 fragment_id: *fragment_id as _,
1047 worker_id: actor.worker_id,
1048 })
1049 })
1050 .collect_vec();
1051
1052 Ok(actor_locations)
1053 }
1054
1055 pub async fn list_actor_info(
1056 &self,
1057 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1058 let inner = self.inner.read().await;
1059
1060 let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1061 FragmentModel::find()
1062 .select_only()
1063 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1064 .column(fragment::Column::FragmentId)
1065 .column_as(object::Column::Oid, "job_id")
1066 .column_as(object::Column::SchemaId, "schema_id")
1067 .column_as(object::Column::ObjType, "type")
1068 .into_tuple()
1069 .all(&inner.db)
1070 .await?;
1071
1072 let actor_infos = {
1073 let info = self.env.shared_actor_infos().read_guard();
1074
1075 let mut result = Vec::new();
1076
1077 for (fragment_id, object_id, schema_id, object_type) in fragment_objects {
1078 let Some(fragment) = info.get_fragment(fragment_id as _) else {
1079 return Err(MetaError::unavailable(format!(
1080 "shared actor info missing for fragment {fragment_id} while listing actors"
1081 )));
1082 };
1083
1084 for actor_id in fragment.actors.keys() {
1085 result.push((
1086 *actor_id as _,
1087 fragment.fragment_id as _,
1088 object_id,
1089 schema_id,
1090 object_type,
1091 ));
1092 }
1093 }
1094
1095 result
1096 };
1097
1098 Ok(actor_infos)
1099 }
1100
1101 pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1102 let guard = self.env.shared_actor_info.read_guard();
1103 guard
1104 .iter_over_fragments()
1105 .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1106 .collect_vec()
1107 }
1108
1109 pub async fn list_fragment_descs_with_node(
1110 &self,
1111 is_creating: bool,
1112 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1113 let inner = self.inner.read().await;
1114 let txn = inner.db.begin().await?;
1115
1116 let fragments_query = Self::build_fragment_query(is_creating);
1117 let fragments: Vec<fragment::Model> = fragments_query.all(&txn).await?;
1118
1119 let rows = fragments
1120 .into_iter()
1121 .map(|fragment| FragmentDescRow {
1122 fragment_id: fragment.fragment_id,
1123 job_id: fragment.job_id,
1124 fragment_type_mask: fragment.fragment_type_mask,
1125 distribution_type: fragment.distribution_type,
1126 state_table_ids: fragment.state_table_ids,
1127 vnode_count: fragment.vnode_count,
1128 parallelism_override: fragment.parallelism,
1129 stream_node: Some(fragment.stream_node),
1130 })
1131 .collect_vec();
1132
1133 self.build_fragment_distributions(&txn, rows).await
1134 }
1135
1136 pub async fn list_fragment_descs_without_node(
1137 &self,
1138 is_creating: bool,
1139 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1140 let inner = self.inner.read().await;
1141 let txn = inner.db.begin().await?;
1142
1143 let fragments_query = Self::build_fragment_query(is_creating);
1144 #[allow(clippy::type_complexity)]
1145 let fragments: Vec<(
1146 FragmentId,
1147 JobId,
1148 i32,
1149 DistributionType,
1150 TableIdArray,
1151 i32,
1152 Option<StreamingParallelism>,
1153 )> = fragments_query
1154 .select_only()
1155 .columns([
1156 fragment::Column::FragmentId,
1157 fragment::Column::JobId,
1158 fragment::Column::FragmentTypeMask,
1159 fragment::Column::DistributionType,
1160 fragment::Column::StateTableIds,
1161 fragment::Column::VnodeCount,
1162 fragment::Column::Parallelism,
1163 ])
1164 .into_tuple()
1165 .all(&txn)
1166 .await?;
1167
1168 let rows = fragments
1169 .into_iter()
1170 .map(
1171 |(
1172 fragment_id,
1173 job_id,
1174 fragment_type_mask,
1175 distribution_type,
1176 state_table_ids,
1177 vnode_count,
1178 parallelism_override,
1179 )| FragmentDescRow {
1180 fragment_id,
1181 job_id,
1182 fragment_type_mask,
1183 distribution_type,
1184 state_table_ids,
1185 vnode_count,
1186 parallelism_override,
1187 stream_node: None,
1188 },
1189 )
1190 .collect_vec();
1191
1192 self.build_fragment_distributions(&txn, rows).await
1193 }
1194
1195 fn build_fragment_query(is_creating: bool) -> sea_orm::Select<fragment::Entity> {
1196 if is_creating {
1197 FragmentModel::find()
1198 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1199 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1200 .filter(
1201 streaming_job::Column::JobStatus
1202 .eq(JobStatus::Initial)
1203 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1204 )
1205 } else {
1206 FragmentModel::find()
1207 }
1208 }
1209
1210 async fn build_fragment_distributions(
1211 &self,
1212 txn: &DatabaseTransaction,
1213 rows: Vec<FragmentDescRow>,
1214 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1215 let fragment_ids = rows.iter().map(|row| row.fragment_id).collect_vec();
1216 let job_ids = rows.iter().map(|row| row.job_id).unique().collect_vec();
1217
1218 let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragment_ids.is_empty() {
1219 HashMap::new()
1220 } else {
1221 StreamingJob::find()
1222 .select_only()
1223 .columns([
1224 streaming_job::Column::JobId,
1225 streaming_job::Column::Parallelism,
1226 ])
1227 .filter(streaming_job::Column::JobId.is_in(job_ids))
1228 .into_tuple()
1229 .all(txn)
1230 .await?
1231 .into_iter()
1232 .collect()
1233 };
1234
1235 let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1236 if fragment_ids.is_empty() {
1237 Vec::new()
1238 } else {
1239 FragmentRelation::find()
1240 .select_only()
1241 .columns([
1242 fragment_relation::Column::TargetFragmentId,
1243 fragment_relation::Column::SourceFragmentId,
1244 fragment_relation::Column::DispatcherType,
1245 ])
1246 .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1247 .into_tuple()
1248 .all(txn)
1249 .await?
1250 };
1251
1252 let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1253 for (target_id, source_id, _) in upstream_entries {
1254 all_upstreams.entry(target_id).or_default().push(source_id);
1255 }
1256
1257 let root_fragment_map = if fragment_ids.is_empty() {
1258 HashMap::new()
1259 } else {
1260 let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_ids).await?;
1261 Self::collect_root_fragment_mapping(ensembles)
1262 };
1263
1264 let guard = self.env.shared_actor_info.read_guard();
1265 let mut result = Vec::with_capacity(rows.len());
1266
1267 for row in rows {
1268 let parallelism = guard
1269 .get_fragment(row.fragment_id as _)
1270 .map(|fragment| fragment.actors.len())
1271 .unwrap_or_default();
1272
1273 let root_fragments = root_fragment_map
1274 .get(&row.fragment_id)
1275 .cloned()
1276 .unwrap_or_default();
1277
1278 let upstreams = all_upstreams.remove(&row.fragment_id).unwrap_or_default();
1279
1280 let parallelism_policy = Self::format_fragment_parallelism_policy(
1281 row.distribution_type,
1282 row.parallelism_override.as_ref(),
1283 job_parallelisms.get(&row.job_id),
1284 &root_fragments,
1285 );
1286
1287 let fragment = FragmentDistribution {
1288 fragment_id: row.fragment_id,
1289 table_id: row.job_id,
1290 distribution_type: PbFragmentDistributionType::from(row.distribution_type) as _,
1291 state_table_ids: row.state_table_ids.0,
1292 upstream_fragment_ids: upstreams.clone(),
1293 fragment_type_mask: row.fragment_type_mask as _,
1294 parallelism: parallelism as _,
1295 vnode_count: row.vnode_count as _,
1296 node: row.stream_node.map(|node| node.to_protobuf()),
1297 parallelism_policy,
1298 };
1299
1300 result.push((fragment, upstreams));
1301 }
1302
1303 Ok(result)
1304 }
1305
1306 pub async fn list_sink_log_store_tables(&self) -> MetaResult<Vec<(SinkId, TableId)>> {
1307 let inner = self.inner.read().await;
1308 let txn = inner.db.begin().await?;
1309
1310 let fragments: Vec<(JobId, StreamNode)> = FragmentModel::find()
1311 .select_only()
1312 .columns([fragment::Column::JobId, fragment::Column::StreamNode])
1313 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Sink))
1314 .into_tuple()
1315 .all(&txn)
1316 .await?;
1317
1318 let mut mapping: HashMap<SinkId, TableId> = HashMap::new();
1319
1320 for (job_id, stream_node) in fragments {
1321 let sink_id = SinkId::new(job_id.as_raw_id());
1322 let stream_node = stream_node.to_protobuf();
1323 let mut internal_table_id: Option<TableId> = None;
1324
1325 visit_stream_node_body(&stream_node, |body| {
1326 if let NodeBody::Sink(node) = body
1327 && node.log_store_type == SinkLogStoreType::KvLogStore as i32
1328 && let Some(table) = &node.table
1329 {
1330 internal_table_id = Some(TableId::new(table.id.as_raw_id()));
1331 }
1332 });
1333
1334 if let Some(table_id) = internal_table_id
1335 && let Some(existing) = mapping.insert(sink_id, table_id)
1336 && existing != table_id
1337 {
1338 tracing::warn!(
1339 "sink {sink_id:?} has multiple log store tables: {existing:?} vs {table_id:?}"
1340 );
1341 }
1342 }
1343
1344 Ok(mapping.into_iter().collect())
1345 }
1346
1347 fn collect_root_fragment_mapping(
1349 ensembles: Vec<NoShuffleEnsemble>,
1350 ) -> HashMap<FragmentId, Vec<FragmentId>> {
1351 let mut mapping = HashMap::new();
1352
1353 for ensemble in ensembles {
1354 let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1355 roots.sort_unstable();
1356 roots.dedup();
1357
1358 if roots.is_empty() {
1359 continue;
1360 }
1361
1362 let root_set: HashSet<_> = roots.iter().copied().collect();
1363
1364 for fragment_id in ensemble.component_fragments() {
1365 if root_set.contains(&fragment_id) {
1366 mapping.insert(fragment_id, Vec::new());
1367 } else {
1368 mapping.insert(fragment_id, roots.clone());
1369 }
1370 }
1371 }
1372
1373 mapping
1374 }
1375
1376 fn format_fragment_parallelism_policy(
1377 distribution_type: DistributionType,
1378 fragment_parallelism: Option<&StreamingParallelism>,
1379 job_parallelism: Option<&StreamingParallelism>,
1380 root_fragments: &[FragmentId],
1381 ) -> String {
1382 if distribution_type == DistributionType::Single {
1383 return "single".to_owned();
1384 }
1385
1386 if let Some(parallelism) = fragment_parallelism {
1387 return format!(
1388 "override({})",
1389 Self::format_streaming_parallelism(parallelism)
1390 );
1391 }
1392
1393 if !root_fragments.is_empty() {
1394 let mut upstreams = root_fragments.to_vec();
1395 upstreams.sort_unstable();
1396 upstreams.dedup();
1397
1398 return format!("upstream_fragment({upstreams:?})");
1399 }
1400
1401 let inherited = job_parallelism
1402 .map(Self::format_streaming_parallelism)
1403 .unwrap_or_else(|| "unknown".to_owned());
1404 format!("inherit({inherited})")
1405 }
1406
1407 fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1408 match parallelism {
1409 StreamingParallelism::Adaptive => "adaptive".to_owned(),
1410 StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1411 StreamingParallelism::Custom => "custom".to_owned(),
1412 }
1413 }
1414
1415 pub async fn list_sink_actor_mapping(
1416 &self,
1417 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1418 let inner = self.inner.read().await;
1419 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1420 .select_only()
1421 .columns([sink::Column::SinkId, sink::Column::Name])
1422 .into_tuple()
1423 .all(&inner.db)
1424 .await?;
1425 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1426
1427 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1428
1429 let actor_with_type: Vec<(ActorId, SinkId)> = {
1430 let info = self.env.shared_actor_infos().read_guard();
1431
1432 info.iter_over_fragments()
1433 .filter(|(_, fragment)| {
1434 sink_ids.contains(&fragment.job_id.as_sink_id())
1435 && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1436 })
1437 .flat_map(|(_, fragment)| {
1438 fragment
1439 .actors
1440 .keys()
1441 .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1442 })
1443 .collect()
1444 };
1445
1446 let mut sink_actor_mapping = HashMap::new();
1447 for (actor_id, sink_id) in actor_with_type {
1448 sink_actor_mapping
1449 .entry(sink_id)
1450 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1451 .1
1452 .push(actor_id);
1453 }
1454
1455 Ok(sink_actor_mapping)
1456 }
1457
1458 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1459 let inner = self.inner.read().await;
1460 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1461 .select_only()
1462 .columns([
1463 fragment::Column::FragmentId,
1464 fragment::Column::JobId,
1465 fragment::Column::StateTableIds,
1466 ])
1467 .into_partial_model()
1468 .all(&inner.db)
1469 .await?;
1470 Ok(fragment_state_tables)
1471 }
1472
1473 pub async fn load_all_actors_dynamic(
1476 &self,
1477 database_id: Option<DatabaseId>,
1478 worker_nodes: &ActiveStreamingWorkerNodes,
1479 ) -> MetaResult<FragmentRenderMap> {
1480 let loaded = self.load_fragment_context(database_id).await?;
1481
1482 if loaded.is_empty() {
1483 return Ok(HashMap::new());
1484 }
1485
1486 let adaptive_parallelism_strategy = {
1487 let system_params_reader = self.env.system_params_reader().await;
1488 system_params_reader.adaptive_parallelism_strategy()
1489 };
1490
1491 let RenderedGraph { fragments, .. } = render_actor_assignments(
1492 self.env.actor_id_generator(),
1493 worker_nodes.current(),
1494 adaptive_parallelism_strategy,
1495 &loaded,
1496 )?;
1497
1498 tracing::trace!(?fragments, "reload all actors");
1499
1500 Ok(fragments)
1501 }
1502
1503 pub async fn load_fragment_context(
1505 &self,
1506 database_id: Option<DatabaseId>,
1507 ) -> MetaResult<LoadedFragmentContext> {
1508 let inner = self.inner.read().await;
1509 let txn = inner.db.begin().await?;
1510
1511 self.load_fragment_context_in_txn(&txn, database_id).await
1512 }
1513
1514 pub async fn load_fragment_context_in_txn<C>(
1515 &self,
1516 txn: &C,
1517 database_id: Option<DatabaseId>,
1518 ) -> MetaResult<LoadedFragmentContext>
1519 where
1520 C: ConnectionTrait,
1521 {
1522 let mut query = StreamingJob::find()
1523 .select_only()
1524 .column(streaming_job::Column::JobId);
1525
1526 if let Some(database_id) = database_id {
1527 query = query
1528 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1529 .filter(object::Column::DatabaseId.eq(database_id));
1530 }
1531
1532 let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
1533
1534 let jobs: HashSet<JobId> = jobs.into_iter().collect();
1535
1536 if jobs.is_empty() {
1537 return Ok(LoadedFragmentContext::default());
1538 }
1539
1540 load_fragment_context_for_jobs(txn, jobs).await
1541 }
1542
1543 #[await_tree::instrument]
1544 pub async fn fill_snapshot_backfill_epoch(
1545 &self,
1546 fragment_ids: impl Iterator<Item = FragmentId>,
1547 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1548 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1549 ) -> MetaResult<()> {
1550 let inner = self.inner.write().await;
1551 let txn = inner.db.begin().await?;
1552 for fragment_id in fragment_ids {
1553 let fragment = FragmentModel::find_by_id(fragment_id)
1554 .one(&txn)
1555 .await?
1556 .context(format!("fragment {} not found", fragment_id))?;
1557 let mut node = fragment.stream_node.to_protobuf();
1558 if crate::stream::fill_snapshot_backfill_epoch(
1559 &mut node,
1560 snapshot_backfill_info,
1561 cross_db_snapshot_backfill_info,
1562 )? {
1563 let node = StreamNode::from(&node);
1564 FragmentModel::update(fragment::ActiveModel {
1565 fragment_id: Set(fragment_id),
1566 stream_node: Set(node),
1567 ..Default::default()
1568 })
1569 .exec(&txn)
1570 .await?;
1571 }
1572 }
1573 txn.commit().await?;
1574 Ok(())
1575 }
1576
1577 pub fn get_running_actors_of_fragment(
1579 &self,
1580 fragment_id: FragmentId,
1581 ) -> MetaResult<HashSet<model::ActorId>> {
1582 let info = self.env.shared_actor_infos().read_guard();
1583
1584 let actors = info
1585 .get_fragment(fragment_id as _)
1586 .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1587 .unwrap_or_default();
1588
1589 Ok(actors)
1590 }
1591
1592 pub async fn get_running_actors_for_source_backfill(
1595 &self,
1596 source_backfill_fragment_id: FragmentId,
1597 source_fragment_id: FragmentId,
1598 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1599 let inner = self.inner.read().await;
1600 let txn = inner.db.begin().await?;
1601
1602 let fragment_relation: DispatcherType = FragmentRelation::find()
1603 .select_only()
1604 .column(fragment_relation::Column::DispatcherType)
1605 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1606 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1607 .into_tuple()
1608 .one(&txn)
1609 .await?
1610 .ok_or_else(|| {
1611 anyhow!(
1612 "no fragment connection from source fragment {} to source backfill fragment {}",
1613 source_fragment_id,
1614 source_backfill_fragment_id
1615 )
1616 })?;
1617
1618 if fragment_relation != DispatcherType::NoShuffle {
1619 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1620 }
1621
1622 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1623 let result: MetaResult<DistributionType> = try {
1624 FragmentModel::find_by_id(fragment_id)
1625 .select_only()
1626 .column(fragment::Column::DistributionType)
1627 .into_tuple()
1628 .one(txn)
1629 .await?
1630 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1631 };
1632 result
1633 };
1634
1635 let source_backfill_distribution_type =
1636 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1637 let source_distribution_type =
1638 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1639
1640 let load_fragment_actor_distribution =
1641 |actor_info: &SharedActorInfos,
1642 fragment_id: FragmentId|
1643 -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1644 let guard = actor_info.read_guard();
1645
1646 guard
1647 .get_fragment(fragment_id as _)
1648 .map(|fragment| {
1649 fragment
1650 .actors
1651 .iter()
1652 .map(|(actor_id, actor)| {
1653 (
1654 *actor_id as _,
1655 actor
1656 .vnode_bitmap
1657 .as_ref()
1658 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1659 )
1660 })
1661 .collect()
1662 })
1663 .unwrap_or_default()
1664 };
1665
1666 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1667 load_fragment_actor_distribution(
1668 self.env.shared_actor_infos(),
1669 source_backfill_fragment_id,
1670 );
1671
1672 let source_actors =
1673 load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1674
1675 Ok(resolve_no_shuffle_actor_dispatcher(
1676 source_distribution_type,
1677 &source_actors,
1678 source_backfill_distribution_type,
1679 &source_backfill_actors,
1680 )
1681 .into_iter()
1682 .map(|(source_actor, source_backfill_actor)| {
1683 (source_backfill_actor as _, source_actor as _)
1684 })
1685 .collect())
1686 }
1687
1688 pub async fn get_root_fragments(
1701 &self,
1702 job_ids: Vec<JobId>,
1703 ) -> MetaResult<(
1704 HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1705 HashMap<ActorId, WorkerId>,
1706 )> {
1707 let inner = self.inner.read().await;
1708
1709 let all_fragments = FragmentModel::find()
1710 .filter(fragment::Column::JobId.is_in(job_ids))
1711 .all(&inner.db)
1712 .await?;
1713 let mut root_fragments = HashMap::<JobId, fragment::Model>::new();
1715 for fragment in all_fragments {
1716 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1717 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1718 _ = root_fragments.insert(fragment.job_id, fragment);
1719 } else if mask.contains(FragmentTypeFlag::Source) {
1720 _ = root_fragments.try_insert(fragment.job_id, fragment);
1723 }
1724 }
1725
1726 let mut root_fragments_pb = HashMap::new();
1727
1728 let info = self.env.shared_actor_infos().read_guard();
1729
1730 let root_fragment_to_jobs: HashMap<_, _> = root_fragments
1731 .iter()
1732 .map(|(job_id, fragment)| (fragment.fragment_id, *job_id))
1733 .collect();
1734
1735 for fragment in root_fragment_to_jobs.keys() {
1736 let fragment_info = info.get_fragment(*fragment).context(format!(
1737 "root fragment {} not found in shared actor info",
1738 fragment
1739 ))?;
1740
1741 let job_id = root_fragment_to_jobs[&fragment_info.fragment_id];
1742 let fragment = root_fragments
1743 .get(&job_id)
1744 .context(format!("root fragment for job {} not found", job_id))?;
1745
1746 root_fragments_pb.insert(
1747 job_id,
1748 (fragment_info.clone(), fragment.stream_node.to_protobuf()),
1749 );
1750 }
1751
1752 let mut all_actor_locations = HashMap::new();
1753
1754 for (_, SharedFragmentInfo { actors, .. }) in info.iter_over_fragments() {
1755 for (actor_id, actor_info) in actors {
1756 all_actor_locations.insert(*actor_id as ActorId, actor_info.worker_id);
1757 }
1758 }
1759
1760 Ok((root_fragments_pb, all_actor_locations))
1761 }
1762
1763 pub async fn get_root_fragment(
1764 &self,
1765 job_id: JobId,
1766 ) -> MetaResult<(SharedFragmentInfo, HashMap<ActorId, WorkerId>)> {
1767 let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1768 let (root_fragment, _) = root_fragments
1769 .remove(&job_id)
1770 .context(format!("root fragment for job {} not found", job_id))?;
1771
1772 Ok((root_fragment, actors))
1773 }
1774
1775 pub async fn get_downstream_fragments(
1777 &self,
1778 job_id: JobId,
1779 ) -> MetaResult<(
1780 Vec<(
1781 stream_plan::DispatcherType,
1782 SharedFragmentInfo,
1783 PbStreamNode,
1784 )>,
1785 HashMap<ActorId, WorkerId>,
1786 )> {
1787 let (root_fragment, actor_locations) = self.get_root_fragment(job_id).await?;
1788
1789 let inner = self.inner.read().await;
1790 let txn = inner.db.begin().await?;
1791 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1792 .filter(
1793 fragment_relation::Column::SourceFragmentId
1794 .eq(root_fragment.fragment_id as FragmentId),
1795 )
1796 .all(&txn)
1797 .await?;
1798
1799 let downstream_fragment_ids = downstream_fragment_relations
1800 .iter()
1801 .map(|model| model.target_fragment_id as FragmentId)
1802 .collect::<HashSet<_>>();
1803
1804 let downstream_fragment_nodes: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1805 .select_only()
1806 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1807 .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1808 .into_tuple()
1809 .all(&txn)
1810 .await?;
1811
1812 let downstream_fragment_nodes: HashMap<_, _> =
1813 downstream_fragment_nodes.into_iter().collect();
1814
1815 let mut downstream_fragments = vec![];
1816
1817 let info = self.env.shared_actor_infos().read_guard();
1818
1819 let fragment_map: HashMap<_, _> = downstream_fragment_relations
1820 .iter()
1821 .map(|model| (model.target_fragment_id, model.dispatcher_type))
1822 .collect();
1823
1824 for fragment_id in fragment_map.keys() {
1825 let fragment_info @ SharedFragmentInfo { actors, .. } =
1826 info.get_fragment(*fragment_id).unwrap();
1827
1828 let dispatcher_type = fragment_map[fragment_id];
1829
1830 if actors.is_empty() {
1831 bail!("No fragment found for fragment id {}", fragment_id);
1832 }
1833
1834 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1835
1836 let nodes = downstream_fragment_nodes
1837 .get(fragment_id)
1838 .context(format!(
1839 "downstream fragment node for id {} not found",
1840 fragment_id
1841 ))?
1842 .to_protobuf();
1843
1844 downstream_fragments.push((dispatch_type, fragment_info.clone(), nodes));
1845 }
1846 Ok((downstream_fragments, actor_locations))
1847 }
1848
1849 pub async fn load_source_fragment_ids(
1850 &self,
1851 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1852 let inner = self.inner.read().await;
1853 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1854 .select_only()
1855 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1856 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1857 .into_tuple()
1858 .all(&inner.db)
1859 .await?;
1860
1861 let mut source_fragment_ids = HashMap::new();
1862 for (fragment_id, stream_node) in fragments {
1863 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1864 source_fragment_ids
1865 .entry(source_id)
1866 .or_insert_with(BTreeSet::new)
1867 .insert(fragment_id);
1868 }
1869 }
1870 Ok(source_fragment_ids)
1871 }
1872
1873 pub async fn load_backfill_fragment_ids(
1874 &self,
1875 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1876 let inner = self.inner.read().await;
1877 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1878 .select_only()
1879 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1880 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1881 .into_tuple()
1882 .all(&inner.db)
1883 .await?;
1884
1885 let mut source_fragment_ids = HashMap::new();
1886 for (fragment_id, stream_node) in fragments {
1887 if let Some((source_id, upstream_source_fragment_id)) =
1888 stream_node.to_protobuf().find_source_backfill()
1889 {
1890 source_fragment_ids
1891 .entry(source_id)
1892 .or_insert_with(BTreeSet::new)
1893 .insert((fragment_id, upstream_source_fragment_id));
1894 }
1895 }
1896 Ok(source_fragment_ids)
1897 }
1898
1899 pub async fn get_all_upstream_sink_infos(
1900 &self,
1901 target_table: &PbTable,
1902 target_fragment_id: FragmentId,
1903 ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1904 let inner = self.inner.read().await;
1905 let txn = inner.db.begin().await?;
1906
1907 self.get_all_upstream_sink_infos_in_txn(&txn, target_table, target_fragment_id)
1908 .await
1909 }
1910
1911 pub async fn get_all_upstream_sink_infos_in_txn<C>(
1912 &self,
1913 txn: &C,
1914 target_table: &PbTable,
1915 target_fragment_id: FragmentId,
1916 ) -> MetaResult<Vec<UpstreamSinkInfo>>
1917 where
1918 C: ConnectionTrait,
1919 {
1920 let incoming_sinks = Sink::find()
1921 .filter(sink::Column::TargetTable.eq(target_table.id))
1922 .all(txn)
1923 .await?;
1924
1925 let sink_ids = incoming_sinks.iter().map(|s| s.sink_id).collect_vec();
1926 let sink_fragment_ids = get_sink_fragment_by_ids(txn, sink_ids).await?;
1927
1928 let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1929 for sink in &incoming_sinks {
1930 let sink_fragment_id =
1931 sink_fragment_ids
1932 .get(&sink.sink_id)
1933 .cloned()
1934 .ok_or(anyhow::anyhow!(
1935 "sink fragment not found for sink id {}",
1936 sink.sink_id
1937 ))?;
1938 let upstream_info = build_upstream_sink_info(
1939 sink.sink_id,
1940 sink.original_target_columns
1941 .as_ref()
1942 .map(|cols| cols.to_protobuf())
1943 .unwrap_or_default(),
1944 sink_fragment_id,
1945 target_table,
1946 target_fragment_id,
1947 )?;
1948 upstream_sink_infos.push(upstream_info);
1949 }
1950
1951 Ok(upstream_sink_infos)
1952 }
1953
1954 pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1955 let inner = self.inner.read().await;
1956 let txn = inner.db.begin().await?;
1957
1958 let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1959 .select_only()
1960 .column(fragment::Column::FragmentId)
1961 .filter(
1962 fragment::Column::JobId
1963 .eq(job_id)
1964 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1965 )
1966 .into_tuple()
1967 .all(&txn)
1968 .await?;
1969
1970 if mview_fragment.len() != 1 {
1971 return Err(anyhow::anyhow!(
1972 "expected exactly one mview fragment for job {}, found {}",
1973 job_id,
1974 mview_fragment.len()
1975 )
1976 .into());
1977 }
1978
1979 Ok(mview_fragment.into_iter().next().unwrap())
1980 }
1981
1982 pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1983 let inner = self.inner.read().await;
1984 let txn = inner.db.begin().await?;
1985 has_table_been_migrated(&txn, table_id).await
1986 }
1987
1988 pub async fn update_fragment_splits<C>(
1989 &self,
1990 txn: &C,
1991 fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
1992 ) -> MetaResult<()>
1993 where
1994 C: ConnectionTrait,
1995 {
1996 if fragment_splits.is_empty() {
1997 return Ok(());
1998 }
1999
2000 let models: Vec<fragment_splits::ActiveModel> = fragment_splits
2001 .iter()
2002 .map(|(fragment_id, splits)| fragment_splits::ActiveModel {
2003 fragment_id: Set(*fragment_id as _),
2004 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
2005 splits: splits.iter().map(Into::into).collect_vec(),
2006 }))),
2007 })
2008 .collect();
2009
2010 FragmentSplits::insert_many(models)
2011 .on_conflict(
2012 OnConflict::column(fragment_splits::Column::FragmentId)
2013 .update_column(fragment_splits::Column::Splits)
2014 .to_owned(),
2015 )
2016 .exec(txn)
2017 .await?;
2018
2019 Ok(())
2020 }
2021}
2022
2023#[cfg(test)]
2024mod tests {
2025 use std::collections::{BTreeMap, HashMap, HashSet};
2026
2027 use itertools::Itertools;
2028 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
2029 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
2030 use risingwave_common::id::JobId;
2031 use risingwave_common::util::iter_util::ZipEqDebug;
2032 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
2033 use risingwave_meta_model::fragment::DistributionType;
2034 use risingwave_meta_model::*;
2035 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
2036 use risingwave_pb::plan_common::PbExprContext;
2037 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
2038 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
2039 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
2040
2041 use super::ActorInfo;
2042 use crate::MetaResult;
2043 use crate::controller::catalog::CatalogController;
2044 use crate::model::{Fragment, StreamActor};
2045
2046 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
2047
2048 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
2049
2050 const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
2051
2052 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
2053
2054 const TEST_JOB_ID: JobId = JobId::new(1);
2055
2056 const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
2057
2058 fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
2059 let mut upstream_actor_ids = BTreeMap::new();
2060 upstream_actor_ids.insert(
2061 TEST_UPSTREAM_FRAGMENT_ID,
2062 HashSet::from_iter([(actor_id + 100)]),
2063 );
2064 upstream_actor_ids.insert(
2065 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
2066 HashSet::from_iter([(actor_id + 200)]),
2067 );
2068 upstream_actor_ids
2069 }
2070
2071 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
2072 let mut input = vec![];
2073 for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
2074 input.push(PbStreamNode {
2075 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
2076 upstream_fragment_id,
2077 ..Default::default()
2078 }))),
2079 ..Default::default()
2080 });
2081 }
2082
2083 PbStreamNode {
2084 input,
2085 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
2086 ..Default::default()
2087 }
2088 }
2089
2090 #[tokio::test]
2091 async fn test_extract_fragment() -> MetaResult<()> {
2092 let actor_count = 3u32;
2093 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2094 .map(|actor_id| {
2095 (
2096 actor_id.into(),
2097 generate_upstream_actor_ids_for_actor(actor_id.into()),
2098 )
2099 })
2100 .collect();
2101
2102 let actor_bitmaps = ActorMapping::new_uniform(
2103 (0..actor_count).map(|i| i.into()),
2104 VirtualNode::COUNT_FOR_TEST,
2105 )
2106 .to_bitmaps();
2107
2108 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
2109
2110 let pb_actors = (0..actor_count)
2111 .map(|actor_id| StreamActor {
2112 actor_id: actor_id.into(),
2113 fragment_id: TEST_FRAGMENT_ID as _,
2114 vnode_bitmap: actor_bitmaps.get(&actor_id.into()).cloned(),
2115 mview_definition: "".to_owned(),
2116 expr_context: Some(PbExprContext {
2117 time_zone: String::from("America/New_York"),
2118 strict_mode: false,
2119 }),
2120 config_override: "".into(),
2121 })
2122 .collect_vec();
2123
2124 let pb_fragment = Fragment {
2125 fragment_id: TEST_FRAGMENT_ID as _,
2126 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
2127 distribution_type: PbFragmentDistributionType::Hash as _,
2128 actors: pb_actors,
2129 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
2130 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
2131 nodes: stream_node,
2132 };
2133
2134 let fragment =
2135 CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
2136
2137 check_fragment(fragment, pb_fragment);
2138
2139 Ok(())
2140 }
2141
2142 #[tokio::test]
2143 async fn test_compose_fragment() -> MetaResult<()> {
2144 let actor_count = 3u32;
2145
2146 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2147 .map(|actor_id| {
2148 (
2149 actor_id.into(),
2150 generate_upstream_actor_ids_for_actor(actor_id.into()),
2151 )
2152 })
2153 .collect();
2154
2155 let mut actor_bitmaps = ActorMapping::new_uniform(
2156 (0..actor_count).map(|i| i.into()),
2157 VirtualNode::COUNT_FOR_TEST,
2158 )
2159 .to_bitmaps();
2160
2161 let actors = (0..actor_count)
2162 .map(|actor_id| {
2163 let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
2164 splits: vec![PbConnectorSplit {
2165 split_type: "dummy".to_owned(),
2166 ..Default::default()
2167 }],
2168 });
2169
2170 ActorInfo {
2171 actor_id: actor_id.into(),
2172 fragment_id: TEST_FRAGMENT_ID,
2173 splits: actor_splits,
2174 worker_id: 0.into(),
2175 vnode_bitmap: actor_bitmaps
2176 .remove(&actor_id.into())
2177 .map(|bitmap| bitmap.to_protobuf())
2178 .as_ref()
2179 .map(VnodeBitmap::from),
2180 expr_context: ExprContext::from(&PbExprContext {
2181 time_zone: String::from("America/New_York"),
2182 strict_mode: false,
2183 }),
2184 config_override: "a.b.c = true".into(),
2185 }
2186 })
2187 .collect_vec();
2188
2189 let stream_node = {
2190 let template_actor = actors.first().cloned().unwrap();
2191
2192 let template_upstream_actor_ids = upstream_actor_ids
2193 .get(&(template_actor.actor_id as _))
2194 .unwrap();
2195
2196 generate_merger_stream_node(template_upstream_actor_ids)
2197 };
2198
2199 #[expect(deprecated)]
2200 let fragment = fragment::Model {
2201 fragment_id: TEST_FRAGMENT_ID,
2202 job_id: TEST_JOB_ID,
2203 fragment_type_mask: 0,
2204 distribution_type: DistributionType::Hash,
2205 stream_node: StreamNode::from(&stream_node),
2206 state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
2207 upstream_fragment_id: Default::default(),
2208 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2209 parallelism: None,
2210 };
2211
2212 let (pb_fragment, pb_actor_status, pb_actor_splits) =
2213 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2214
2215 assert_eq!(pb_actor_status.len(), actor_count as usize);
2216 assert!(
2217 pb_actor_status
2218 .values()
2219 .all(|actor_status| actor_status.location.is_some())
2220 );
2221 assert_eq!(pb_actor_splits.len(), actor_count as usize);
2222
2223 let pb_actors = pb_fragment.actors.clone();
2224
2225 check_fragment(fragment, pb_fragment);
2226 check_actors(
2227 actors,
2228 &upstream_actor_ids,
2229 pb_actors,
2230 pb_actor_splits,
2231 &stream_node,
2232 );
2233
2234 Ok(())
2235 }
2236
2237 fn check_actors(
2238 actors: Vec<ActorInfo>,
2239 actor_upstreams: &FragmentActorUpstreams,
2240 pb_actors: Vec<StreamActor>,
2241 pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2242 stream_node: &PbStreamNode,
2243 ) {
2244 for (
2245 ActorInfo {
2246 actor_id,
2247 fragment_id,
2248 splits,
2249 worker_id: _,
2250 vnode_bitmap,
2251 expr_context,
2252 ..
2253 },
2254 StreamActor {
2255 actor_id: pb_actor_id,
2256 fragment_id: pb_fragment_id,
2257 vnode_bitmap: pb_vnode_bitmap,
2258 mview_definition,
2259 expr_context: pb_expr_context,
2260 ..
2261 },
2262 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2263 {
2264 assert_eq!(actor_id, pb_actor_id as ActorId);
2265 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2266
2267 assert_eq!(
2268 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2269 pb_vnode_bitmap,
2270 );
2271
2272 assert_eq!(mview_definition, "");
2273
2274 visit_stream_node_body(stream_node, |body| {
2275 if let PbNodeBody::Merge(m) = body {
2276 assert!(
2277 actor_upstreams
2278 .get(&(actor_id as _))
2279 .unwrap()
2280 .contains_key(&m.upstream_fragment_id)
2281 );
2282 }
2283 });
2284
2285 assert_eq!(
2286 splits,
2287 pb_actor_splits
2288 .get(&pb_actor_id)
2289 .map(ConnectorSplits::from)
2290 .unwrap_or_default()
2291 );
2292
2293 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2294 }
2295 }
2296
2297 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2298 let Fragment {
2299 fragment_id,
2300 fragment_type_mask,
2301 distribution_type: pb_distribution_type,
2302 actors: _,
2303 state_table_ids: pb_state_table_ids,
2304 maybe_vnode_count: _,
2305 nodes,
2306 } = pb_fragment;
2307
2308 assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2309 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2310 assert_eq!(
2311 pb_distribution_type,
2312 PbFragmentDistributionType::from(fragment.distribution_type)
2313 );
2314
2315 assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2316 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2317 }
2318
2319 #[test]
2320 fn test_parallelism_policy_with_root_fragments() {
2321 #[expect(deprecated)]
2322 let fragment = fragment::Model {
2323 fragment_id: 3.into(),
2324 job_id: TEST_JOB_ID,
2325 fragment_type_mask: 0,
2326 distribution_type: DistributionType::Hash,
2327 stream_node: StreamNode::from(&PbStreamNode::default()),
2328 state_table_ids: TableIdArray::default(),
2329 upstream_fragment_id: Default::default(),
2330 vnode_count: 0,
2331 parallelism: None,
2332 };
2333
2334 let job_parallelism = StreamingParallelism::Fixed(4);
2335
2336 let policy = super::CatalogController::format_fragment_parallelism_policy(
2337 fragment.distribution_type,
2338 fragment.parallelism.as_ref(),
2339 Some(&job_parallelism),
2340 &[],
2341 );
2342
2343 assert_eq!(policy, "inherit(fixed(4))");
2344 }
2345
2346 #[test]
2347 fn test_parallelism_policy_with_upstream_roots() {
2348 #[expect(deprecated)]
2349 let fragment = fragment::Model {
2350 fragment_id: 5.into(),
2351 job_id: TEST_JOB_ID,
2352 fragment_type_mask: 0,
2353 distribution_type: DistributionType::Hash,
2354 stream_node: StreamNode::from(&PbStreamNode::default()),
2355 state_table_ids: TableIdArray::default(),
2356 upstream_fragment_id: Default::default(),
2357 vnode_count: 0,
2358 parallelism: None,
2359 };
2360
2361 let policy = super::CatalogController::format_fragment_parallelism_policy(
2362 fragment.distribution_type,
2363 fragment.parallelism.as_ref(),
2364 None,
2365 &[3.into(), 1.into(), 2.into(), 1.into()],
2366 );
2367
2368 assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2369 }
2370}