1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::{Either, Itertools};
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::CdcScanOptions;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34 Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39 TableIdArray, VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits,
40 object, sink, source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46 Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::fragment::{
49 FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan;
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57 PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, SinkLogStoreType,
58 StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63 ColumnTrait, ConnectionTrait, DatabaseTransaction, EntityTrait, FromQueryResult, JoinType,
64 PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, StreamTrait, TransactionTrait,
65};
66use serde::{Deserialize, Serialize};
67
68use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
69use crate::controller::catalog::CatalogController;
70use crate::controller::scale::{
71 FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble, RenderedGraph,
72 find_fragment_no_shuffle_dags_detailed, load_fragment_context_for_jobs,
73 render_actor_assignments, resolve_streaming_job_definition,
74};
75use crate::controller::utils::{
76 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
77 get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
78 resolve_no_shuffle_actor_mapping,
79};
80use crate::error::MetaError;
81use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
82use crate::model::{
83 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
84 StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
85 TableParallelism,
86};
87use crate::rpc::ddl_controller::build_upstream_sink_info;
88use crate::stream::UpstreamSinkInfo;
89use crate::{MetaResult, model};
90
91#[derive(Debug)]
93pub struct InflightActorInfo {
94 pub worker_id: WorkerId,
95 pub vnode_bitmap: Option<Bitmap>,
96 pub splits: Vec<SplitImpl>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100struct ActorInfo {
101 pub actor_id: ActorId,
102 pub fragment_id: FragmentId,
103 pub splits: ConnectorSplits,
104 pub worker_id: WorkerId,
105 pub vnode_bitmap: Option<VnodeBitmap>,
106 pub expr_context: ExprContext,
107 pub config_override: Arc<str>,
108}
109
110#[derive(Debug)]
111struct FragmentDescRow {
112 fragment_id: FragmentId,
113 job_id: JobId,
114 fragment_type_mask: i32,
115 distribution_type: DistributionType,
116 state_table_ids: TableIdArray,
117 vnode_count: i32,
118 parallelism_override: Option<StreamingParallelism>,
119 stream_node: Option<StreamNode>,
120}
121
122#[derive(Debug)]
123pub struct InflightFragmentInfo {
124 pub fragment_id: FragmentId,
125 pub distribution_type: DistributionType,
126 pub fragment_type_mask: FragmentTypeMask,
127 pub vnode_count: usize,
128 pub nodes: PbStreamNode,
129 pub actors: HashMap<ActorId, InflightActorInfo>,
130 pub state_table_ids: HashSet<TableId>,
131}
132
133#[derive(Clone, Debug)]
134pub struct FragmentParallelismInfo {
135 pub distribution_type: FragmentDistributionType,
136 pub vnode_count: usize,
137}
138
139#[easy_ext::ext(FragmentTypeMaskExt)]
140pub impl FragmentTypeMask {
141 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
142 Expr::col(fragment::Column::FragmentTypeMask)
143 .bit_and(Expr::value(flag as i32))
144 .ne(0)
145 }
146
147 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
148 Expr::col(fragment::Column::FragmentTypeMask)
149 .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
150 .ne(0)
151 }
152
153 fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
154 Expr::col(fragment::Column::FragmentTypeMask)
155 .bit_and(Expr::value(flag as i32))
156 .eq(0)
157 }
158}
159
160#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
161#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
163 pub job_id: JobId,
164 pub obj_type: ObjectType,
165 pub name: String,
166 pub job_status: JobStatus,
167 pub parallelism: StreamingParallelism,
168 pub max_parallelism: i32,
169 pub resource_group: String,
170 pub config_override: String,
171 pub database_id: DatabaseId,
172 pub schema_id: SchemaId,
173}
174
175impl NotificationManager {
176 pub(crate) fn notify_fragment_mapping(
177 &self,
178 operation: NotificationOperation,
179 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
180 ) {
181 let fragment_ids = fragment_mappings
182 .iter()
183 .map(|mapping| mapping.fragment_id)
184 .collect_vec();
185 if fragment_ids.is_empty() {
186 return;
187 }
188 for fragment_mapping in fragment_mappings {
190 self.notify_frontend_without_version(
191 operation,
192 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
193 );
194 }
195
196 match operation {
198 NotificationOperation::Add | NotificationOperation::Update => {
199 self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
200 fragment_ids,
201 ));
202 }
203 NotificationOperation::Delete => {
204 self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
205 fragment_ids,
206 ));
207 }
208 op => {
209 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
210 }
211 }
212 }
213}
214
215impl CatalogController {
216 pub fn prepare_fragment_models_from_fragments(
217 job_id: JobId,
218 fragments: impl Iterator<Item = &Fragment>,
219 ) -> MetaResult<Vec<fragment::Model>> {
220 fragments
221 .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
222 .try_collect()
223 }
224
225 pub fn prepare_fragment_model_for_new_job(
226 job_id: JobId,
227 fragment: &Fragment,
228 ) -> MetaResult<fragment::Model> {
229 let vnode_count = fragment.vnode_count();
230 let Fragment {
231 fragment_id: pb_fragment_id,
232 fragment_type_mask: pb_fragment_type_mask,
233 distribution_type: pb_distribution_type,
234 state_table_ids: pb_state_table_ids,
235 nodes,
236 ..
237 } = fragment;
238
239 let state_table_ids = pb_state_table_ids.clone().into();
240
241 let fragment_parallelism = nodes
242 .node_body
243 .as_ref()
244 .and_then(|body| match body {
245 NodeBody::StreamCdcScan(node) => Some(node),
246 _ => None,
247 })
248 .and_then(|node| node.options.as_ref())
249 .map(CdcScanOptions::from_proto)
250 .filter(|opts| opts.is_parallelized_backfill())
251 .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
252
253 let stream_node = StreamNode::from(nodes);
254
255 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
256 .unwrap()
257 .into();
258
259 #[expect(deprecated)]
260 let fragment = fragment::Model {
261 fragment_id: *pb_fragment_id as _,
262 job_id,
263 fragment_type_mask: (*pb_fragment_type_mask).into(),
264 distribution_type,
265 stream_node,
266 state_table_ids,
267 upstream_fragment_id: Default::default(),
268 vnode_count: vnode_count as _,
269 parallelism: fragment_parallelism,
270 };
271
272 Ok(fragment)
273 }
274
275 #[expect(clippy::type_complexity)]
276 fn compose_table_fragments(
277 job_id: JobId,
278 state: PbState,
279 ctx: StreamContext,
280 fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
281 parallelism: StreamingParallelism,
282 max_parallelism: usize,
283 job_definition: Option<String>,
284 ) -> MetaResult<(
285 StreamJobFragments,
286 HashMap<FragmentId, Vec<StreamActor>>,
287 HashMap<crate::model::ActorId, PbActorStatus>,
288 )> {
289 let mut pb_fragments = BTreeMap::new();
290 let mut fragment_actors = HashMap::new();
291 let mut all_actor_status = HashMap::new();
292
293 for (fragment, actors) in fragments {
294 let (fragment, actors, actor_status, _) =
295 Self::compose_fragment(fragment, actors, job_definition.clone())?;
296 let fragment_id = fragment.fragment_id;
297 fragment_actors.insert(fragment_id, actors);
298 all_actor_status.extend(actor_status);
299
300 pb_fragments.insert(fragment_id, fragment);
301 }
302
303 let table_fragments = StreamJobFragments {
304 stream_job_id: job_id,
305 state: state as _,
306 fragments: pb_fragments,
307 ctx,
308 assigned_parallelism: match parallelism {
309 StreamingParallelism::Custom => TableParallelism::Custom,
310 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
311 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
312 },
313 max_parallelism,
314 };
315
316 Ok((table_fragments, fragment_actors, all_actor_status))
317 }
318
319 #[expect(clippy::type_complexity)]
320 fn compose_fragment(
321 fragment: fragment::Model,
322 actors: Vec<ActorInfo>,
323 job_definition: Option<String>,
324 ) -> MetaResult<(
325 Fragment,
326 Vec<StreamActor>,
327 HashMap<crate::model::ActorId, PbActorStatus>,
328 HashMap<crate::model::ActorId, PbConnectorSplits>,
329 )> {
330 let fragment::Model {
331 fragment_id,
332 fragment_type_mask,
333 distribution_type,
334 stream_node,
335 state_table_ids,
336 vnode_count,
337 ..
338 } = fragment;
339
340 let stream_node = stream_node.to_protobuf();
341 let mut upstream_fragments = HashSet::new();
342 visit_stream_node_body(&stream_node, |body| {
343 if let NodeBody::Merge(m) = body {
344 assert!(
345 upstream_fragments.insert(m.upstream_fragment_id),
346 "non-duplicate upstream fragment"
347 );
348 }
349 });
350
351 let mut pb_actors = vec![];
352
353 let mut pb_actor_status = HashMap::new();
354 let mut pb_actor_splits = HashMap::new();
355
356 for actor in actors {
357 if actor.fragment_id != fragment_id {
358 bail!(
359 "fragment id {} from actor {} is different from fragment {}",
360 actor.fragment_id,
361 actor.actor_id,
362 fragment_id
363 )
364 }
365
366 let ActorInfo {
367 actor_id,
368 fragment_id,
369 worker_id,
370 splits,
371 vnode_bitmap,
372 expr_context,
373 config_override,
374 ..
375 } = actor;
376
377 let vnode_bitmap =
378 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
379 let pb_expr_context = Some(expr_context.to_protobuf());
380
381 pb_actor_status.insert(
382 actor_id as _,
383 PbActorStatus {
384 location: PbActorLocation::from_worker(worker_id),
385 },
386 );
387
388 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
389
390 pb_actors.push(StreamActor {
391 actor_id: actor_id as _,
392 fragment_id: fragment_id as _,
393 vnode_bitmap,
394 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
395 expr_context: pb_expr_context,
396 config_override,
397 })
398 }
399
400 let pb_state_table_ids = state_table_ids.0;
401 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
402 let pb_fragment = Fragment {
403 fragment_id: fragment_id as _,
404 fragment_type_mask: fragment_type_mask.into(),
405 distribution_type: pb_distribution_type,
406 state_table_ids: pb_state_table_ids,
407 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
408 nodes: stream_node,
409 };
410
411 Ok((pb_fragment, pb_actors, pb_actor_status, pb_actor_splits))
412 }
413
414 pub async fn fragment_parallelisms(
421 &self,
422 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
423 let inner = self.inner.read().await;
424 let query = FragmentModel::find().select_only().columns([
425 fragment::Column::FragmentId,
426 fragment::Column::DistributionType,
427 fragment::Column::VnodeCount,
428 ]);
429 let fragments: Vec<(FragmentId, DistributionType, i32)> =
430 query.into_tuple().all(&inner.db).await?;
431
432 Ok(fragments
433 .into_iter()
434 .map(|(fragment_id, distribution_type, vnode_count)| {
435 (
436 fragment_id,
437 FragmentParallelismInfo {
438 distribution_type: PbFragmentDistributionType::from(distribution_type),
439 vnode_count: vnode_count as usize,
440 },
441 )
442 })
443 .collect())
444 }
445
446 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
447 let inner = self.inner.read().await;
448 let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
449 .select_only()
450 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
451 .into_tuple()
452 .all(&inner.db)
453 .await?;
454 Ok(fragment_jobs.into_iter().collect())
455 }
456
457 pub async fn get_fragment_job_id(
458 &self,
459 fragment_ids: Vec<FragmentId>,
460 ) -> MetaResult<Vec<ObjectId>> {
461 let inner = self.inner.read().await;
462 self.get_fragment_job_id_in_txn(&inner.db, fragment_ids)
463 .await
464 }
465
466 pub async fn get_fragment_job_id_in_txn<C>(
467 &self,
468 txn: &C,
469 fragment_ids: Vec<FragmentId>,
470 ) -> MetaResult<Vec<ObjectId>>
471 where
472 C: ConnectionTrait + Send,
473 {
474 let object_ids: Vec<ObjectId> = FragmentModel::find()
475 .select_only()
476 .column(fragment::Column::JobId)
477 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
478 .into_tuple()
479 .all(txn)
480 .await?;
481
482 Ok(object_ids)
483 }
484
485 pub async fn get_fragment_desc_by_id(
486 &self,
487 fragment_id: FragmentId,
488 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
489 let inner = self.inner.read().await;
490
491 let fragment_model = match FragmentModel::find_by_id(fragment_id)
492 .one(&inner.db)
493 .await?
494 {
495 Some(fragment) => fragment,
496 None => return Ok(None),
497 };
498
499 let job_parallelism: Option<StreamingParallelism> =
500 StreamingJob::find_by_id(fragment_model.job_id)
501 .select_only()
502 .column(streaming_job::Column::Parallelism)
503 .into_tuple()
504 .one(&inner.db)
505 .await?;
506
507 let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
508 .select_only()
509 .columns([
510 fragment_relation::Column::SourceFragmentId,
511 fragment_relation::Column::DispatcherType,
512 ])
513 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
514 .into_tuple()
515 .all(&inner.db)
516 .await?;
517
518 let upstreams: Vec<_> = upstream_entries
519 .into_iter()
520 .map(|(source_id, _)| source_id)
521 .collect();
522
523 let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
524 .await
525 .map(Self::collect_root_fragment_mapping)?;
526 let root_fragments = root_fragment_map
527 .get(&fragment_id)
528 .cloned()
529 .unwrap_or_default();
530
531 let info = self.env.shared_actor_infos().read_guard();
532 let SharedFragmentInfo { actors, .. } = info
533 .get_fragment(fragment_model.fragment_id as _)
534 .unwrap_or_else(|| {
535 panic!(
536 "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
537 fragment_model.fragment_id,
538 fragment_model.job_id
539 )
540 });
541
542 let parallelism_policy = Self::format_fragment_parallelism_policy(
543 fragment_model.distribution_type,
544 fragment_model.parallelism.as_ref(),
545 job_parallelism.as_ref(),
546 &root_fragments,
547 );
548
549 let fragment = FragmentDesc {
550 fragment_id: fragment_model.fragment_id,
551 job_id: fragment_model.job_id,
552 fragment_type_mask: fragment_model.fragment_type_mask,
553 distribution_type: fragment_model.distribution_type,
554 state_table_ids: fragment_model.state_table_ids.clone(),
555 parallelism: actors.len() as _,
556 vnode_count: fragment_model.vnode_count,
557 stream_node: fragment_model.stream_node.clone(),
558 parallelism_policy,
559 };
560
561 Ok(Some((fragment, upstreams)))
562 }
563
564 pub async fn list_fragment_database_ids(
565 &self,
566 select_fragment_ids: Option<Vec<FragmentId>>,
567 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
568 let inner = self.inner.read().await;
569 let select = FragmentModel::find()
570 .select_only()
571 .column(fragment::Column::FragmentId)
572 .column(object::Column::DatabaseId)
573 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
574 let select = if let Some(select_fragment_ids) = select_fragment_ids {
575 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
576 } else {
577 select
578 };
579 Ok(select.into_tuple().all(&inner.db).await?)
580 }
581
582 pub async fn get_job_fragments_by_id(
583 &self,
584 job_id: JobId,
585 ) -> MetaResult<(
586 StreamJobFragments,
587 HashMap<FragmentId, Vec<StreamActor>>,
588 HashMap<ActorId, PbActorStatus>,
589 )> {
590 let inner = self.inner.read().await;
591
592 let fragments: Vec<_> = FragmentModel::find()
594 .filter(fragment::Column::JobId.eq(job_id))
595 .all(&inner.db)
596 .await?;
597
598 let job_info = StreamingJob::find_by_id(job_id)
599 .one(&inner.db)
600 .await?
601 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
602
603 let fragment_actors =
604 self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
605
606 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
607 .await?
608 .remove(&job_id);
609
610 Self::compose_table_fragments(
611 job_id,
612 job_info.job_status.into(),
613 job_info.stream_context(),
614 fragment_actors,
615 job_info.parallelism,
616 job_info.max_parallelism as _,
617 job_definition,
618 )
619 }
620
621 pub async fn get_fragment_actor_dispatchers(
622 &self,
623 fragment_ids: Vec<FragmentId>,
624 ) -> MetaResult<FragmentActorDispatchers> {
625 let inner = self.inner.read().await;
626
627 self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
628 .await
629 }
630
631 pub async fn get_fragment_actor_dispatchers_txn(
632 &self,
633 c: &impl ConnectionTrait,
634 fragment_ids: Vec<FragmentId>,
635 ) -> MetaResult<FragmentActorDispatchers> {
636 let fragment_relations = FragmentRelation::find()
637 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
638 .all(c)
639 .await?;
640
641 type FragmentActorInfo = (
642 DistributionType,
643 Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
644 );
645
646 let shared_info = self.env.shared_actor_infos();
647 let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
648 let get_fragment_actors = |fragment_id: FragmentId| async move {
649 let result: MetaResult<FragmentActorInfo> = try {
650 let read_guard = shared_info.read_guard();
651
652 let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
653
654 (
655 fragment.distribution_type,
656 Arc::new(
657 fragment
658 .actors
659 .iter()
660 .map(|(actor_id, actor_info)| {
661 (
662 *actor_id,
663 actor_info
664 .vnode_bitmap
665 .as_ref()
666 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
667 )
668 })
669 .collect(),
670 ),
671 )
672 };
673 result
674 };
675
676 let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
677 for fragment_relation::Model {
678 source_fragment_id,
679 target_fragment_id,
680 dispatcher_type,
681 dist_key_indices,
682 output_indices,
683 output_type_mapping,
684 } in fragment_relations
685 {
686 let (source_fragment_distribution, source_fragment_actors) = {
687 let (distribution, actors) = {
688 match fragment_actor_cache.entry(source_fragment_id) {
689 Entry::Occupied(entry) => entry.into_mut(),
690 Entry::Vacant(entry) => {
691 entry.insert(get_fragment_actors(source_fragment_id).await?)
692 }
693 }
694 };
695 (*distribution, actors.clone())
696 };
697 let (target_fragment_distribution, target_fragment_actors) = {
698 let (distribution, actors) = {
699 match fragment_actor_cache.entry(target_fragment_id) {
700 Entry::Occupied(entry) => entry.into_mut(),
701 Entry::Vacant(entry) => {
702 entry.insert(get_fragment_actors(target_fragment_id).await?)
703 }
704 }
705 };
706 (*distribution, actors.clone())
707 };
708 let output_mapping = PbDispatchOutputMapping {
709 indices: output_indices.into_u32_array(),
710 types: output_type_mapping.unwrap_or_default().to_protobuf(),
711 };
712 let (dispatchers, _) = compose_dispatchers(
713 source_fragment_distribution,
714 &source_fragment_actors,
715 target_fragment_id as _,
716 target_fragment_distribution,
717 &target_fragment_actors,
718 dispatcher_type,
719 dist_key_indices.into_u32_array(),
720 output_mapping,
721 );
722 let actor_dispatchers_map = actor_dispatchers_map
723 .entry(source_fragment_id as _)
724 .or_default();
725 for (actor_id, dispatchers) in dispatchers {
726 actor_dispatchers_map
727 .entry(actor_id as _)
728 .or_default()
729 .push(dispatchers);
730 }
731 }
732 Ok(actor_dispatchers_map)
733 }
734
735 pub async fn get_fragment_downstream_relations(
736 &self,
737 fragment_ids: Vec<FragmentId>,
738 ) -> MetaResult<FragmentDownstreamRelation> {
739 let inner = self.inner.read().await;
740 self.get_fragment_downstream_relations_in_txn(&inner.db, fragment_ids)
741 .await
742 }
743
744 pub async fn get_fragment_downstream_relations_in_txn<C>(
745 &self,
746 txn: &C,
747 fragment_ids: Vec<FragmentId>,
748 ) -> MetaResult<FragmentDownstreamRelation>
749 where
750 C: ConnectionTrait + StreamTrait + Send,
751 {
752 let mut stream = FragmentRelation::find()
753 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
754 .stream(txn)
755 .await?;
756 let mut relations = FragmentDownstreamRelation::new();
757 while let Some(relation) = stream.try_next().await? {
758 relations
759 .entry(relation.source_fragment_id as _)
760 .or_default()
761 .push(DownstreamFragmentRelation {
762 downstream_fragment_id: relation.target_fragment_id as _,
763 dispatcher_type: relation.dispatcher_type,
764 dist_key_indices: relation.dist_key_indices.into_u32_array(),
765 output_mapping: PbDispatchOutputMapping {
766 indices: relation.output_indices.into_u32_array(),
767 types: relation
768 .output_type_mapping
769 .unwrap_or_default()
770 .to_protobuf(),
771 },
772 });
773 }
774 Ok(relations)
775 }
776
777 pub async fn get_job_fragment_backfill_scan_type(
778 &self,
779 job_id: JobId,
780 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
781 let inner = self.inner.read().await;
782 self.get_job_fragment_backfill_scan_type_in_txn(&inner.db, job_id)
783 .await
784 }
785
786 pub async fn get_job_fragment_backfill_scan_type_in_txn<C>(
787 &self,
788 txn: &C,
789 job_id: JobId,
790 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>>
791 where
792 C: ConnectionTrait + Send,
793 {
794 let fragments: Vec<_> = FragmentModel::find()
795 .filter(fragment::Column::JobId.eq(job_id))
796 .all(txn)
797 .await?;
798
799 let mut result = HashMap::new();
800
801 for fragment::Model {
802 fragment_id,
803 stream_node,
804 ..
805 } in fragments
806 {
807 let stream_node = stream_node.to_protobuf();
808 visit_stream_node_body(&stream_node, |body| {
809 if let NodeBody::StreamScan(node) = body {
810 match node.stream_scan_type() {
811 StreamScanType::Unspecified => {}
812 scan_type => {
813 result.insert(fragment_id as crate::model::FragmentId, scan_type);
814 }
815 }
816 }
817 });
818 }
819
820 Ok(result)
821 }
822
823 pub async fn count_streaming_jobs(&self) -> MetaResult<usize> {
824 let inner = self.inner.read().await;
825 let count = StreamingJob::find().count(&inner.db).await?;
826 Ok(usize::try_from(count).context("streaming job count overflow")?)
827 }
828
829 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
830 let inner = self.inner.read().await;
831 let job_states = StreamingJob::find()
832 .select_only()
833 .column(streaming_job::Column::JobId)
834 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
835 .join(JoinType::InnerJoin, object::Relation::Database2.def())
836 .column(object::Column::ObjType)
837 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
838 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
839 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
840 .column_as(
841 Expr::if_null(
842 Expr::col((table::Entity, table::Column::Name)),
843 Expr::if_null(
844 Expr::col((source::Entity, source::Column::Name)),
845 Expr::if_null(
846 Expr::col((sink::Entity, sink::Column::Name)),
847 Expr::val("<unknown>"),
848 ),
849 ),
850 ),
851 "name",
852 )
853 .columns([
854 streaming_job::Column::JobStatus,
855 streaming_job::Column::Parallelism,
856 streaming_job::Column::MaxParallelism,
857 ])
858 .column_as(
859 Expr::if_null(
860 Expr::col((
861 streaming_job::Entity,
862 streaming_job::Column::SpecificResourceGroup,
863 )),
864 Expr::col((database::Entity, database::Column::ResourceGroup)),
865 ),
866 "resource_group",
867 )
868 .column_as(
869 Expr::if_null(
870 Expr::col((streaming_job::Entity, streaming_job::Column::ConfigOverride)),
871 Expr::val(""),
872 ),
873 "config_override",
874 )
875 .column(object::Column::DatabaseId)
876 .column(object::Column::SchemaId)
877 .into_model()
878 .all(&inner.db)
879 .await?;
880 Ok(job_states)
881 }
882
883 pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
884 let inner = self.inner.read().await;
885 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
886 .select_only()
887 .column(streaming_job::Column::MaxParallelism)
888 .into_tuple()
889 .one(&inner.db)
890 .await?
891 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
892 Ok(max_parallelism as usize)
893 }
894
895 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
897 if let Ok(inner) = self.inner.try_read()
898 && let Ok(job_state_tables) = FragmentModel::find()
899 .select_only()
900 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
901 .into_tuple::<(JobId, I32Array)>()
902 .all(&inner.db)
903 .await
904 {
905 let mut job_internal_table_ids = HashMap::new();
906 for (job_id, state_table_ids) in job_state_tables {
907 job_internal_table_ids
908 .entry(job_id)
909 .or_insert_with(Vec::new)
910 .extend(
911 state_table_ids
912 .into_inner()
913 .into_iter()
914 .map(|table_id| TableId::new(table_id as _)),
915 );
916 }
917 return Some(job_internal_table_ids.into_iter().collect());
918 }
919 None
920 }
921
922 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
923 let inner = self.inner.read().await;
924 let count = FragmentModel::find().count(&inner.db).await?;
925 Ok(count > 0)
926 }
927
928 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
929 let read_guard = self.env.shared_actor_infos().read_guard();
930 let actor_cnt: HashMap<WorkerId, _> = read_guard
931 .iter_over_fragments()
932 .flat_map(|(_, fragment)| {
933 fragment
934 .actors
935 .iter()
936 .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
937 })
938 .into_group_map()
939 .into_iter()
940 .map(|(k, v)| (k, v.len()))
941 .collect();
942
943 Ok(actor_cnt)
944 }
945
946 fn collect_fragment_actor_map(
947 &self,
948 fragment_ids: &[FragmentId],
949 stream_context: StreamContext,
950 ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
951 let guard = self.env.shared_actor_infos().read_guard();
952 let pb_expr_context = stream_context.to_expr_context();
953 let expr_context: ExprContext = (&pb_expr_context).into();
954
955 let mut actor_map = HashMap::with_capacity(fragment_ids.len());
956 for fragment_id in fragment_ids {
957 let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
958 anyhow!("fragment {} not found in shared actor info", fragment_id)
959 })?;
960
961 let actors = fragment_info
962 .actors
963 .iter()
964 .map(|(actor_id, actor_info)| ActorInfo {
965 actor_id: *actor_id as _,
966 fragment_id: *fragment_id,
967 splits: ConnectorSplits::from(&PbConnectorSplits {
968 splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
969 }),
970 worker_id: actor_info.worker_id as _,
971 vnode_bitmap: actor_info
972 .vnode_bitmap
973 .as_ref()
974 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
975 expr_context: expr_context.clone(),
976 config_override: stream_context.config_override.clone(),
977 })
978 .collect();
979
980 actor_map.insert(*fragment_id, actors);
981 }
982
983 Ok(actor_map)
984 }
985
986 fn collect_fragment_actor_pairs(
987 &self,
988 fragments: Vec<fragment::Model>,
989 stream_context: StreamContext,
990 ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
991 let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
992 let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
993 fragments
994 .into_iter()
995 .map(|fragment| {
996 let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
997 anyhow!(
998 "fragment {} missing in shared actor info map",
999 fragment.fragment_id
1000 )
1001 })?;
1002 Ok((fragment, actors))
1003 })
1004 .collect()
1005 }
1006
1007 pub async fn table_fragments(
1009 &self,
1010 ) -> MetaResult<
1011 BTreeMap<
1012 JobId,
1013 (
1014 StreamJobFragments,
1015 HashMap<FragmentId, Vec<StreamActor>>,
1016 HashMap<crate::model::ActorId, PbActorStatus>,
1017 ),
1018 >,
1019 > {
1020 let inner = self.inner.read().await;
1021 let jobs = StreamingJob::find().all(&inner.db).await?;
1022
1023 let mut job_definition = resolve_streaming_job_definition(
1024 &inner.db,
1025 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
1026 )
1027 .await?;
1028
1029 let mut table_fragments = BTreeMap::new();
1030 for job in jobs {
1031 let fragments = FragmentModel::find()
1032 .filter(fragment::Column::JobId.eq(job.job_id))
1033 .all(&inner.db)
1034 .await?;
1035
1036 let fragment_actors =
1037 self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
1038
1039 table_fragments.insert(
1040 job.job_id,
1041 Self::compose_table_fragments(
1042 job.job_id,
1043 job.job_status.into(),
1044 job.stream_context(),
1045 fragment_actors,
1046 job.parallelism,
1047 job.max_parallelism as _,
1048 job_definition.remove(&job.job_id),
1049 )?,
1050 );
1051 }
1052
1053 Ok(table_fragments)
1054 }
1055
1056 pub async fn upstream_fragments(
1057 &self,
1058 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1059 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
1060 let inner = self.inner.read().await;
1061 self.upstream_fragments_in_txn(&inner.db, fragment_ids)
1062 .await
1063 }
1064
1065 pub async fn upstream_fragments_in_txn<C>(
1066 &self,
1067 txn: &C,
1068 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1069 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>>
1070 where
1071 C: ConnectionTrait + StreamTrait + Send,
1072 {
1073 let mut stream = FragmentRelation::find()
1074 .select_only()
1075 .columns([
1076 fragment_relation::Column::SourceFragmentId,
1077 fragment_relation::Column::TargetFragmentId,
1078 ])
1079 .filter(
1080 fragment_relation::Column::TargetFragmentId
1081 .is_in(fragment_ids.map(|id| id as FragmentId)),
1082 )
1083 .into_tuple::<(FragmentId, FragmentId)>()
1084 .stream(txn)
1085 .await?;
1086 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
1087 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
1088 upstream_fragments
1089 .entry(downstream_fragment_id as crate::model::FragmentId)
1090 .or_default()
1091 .insert(upstream_fragment_id as crate::model::FragmentId);
1092 }
1093 Ok(upstream_fragments)
1094 }
1095
1096 pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
1097 let info = self.env.shared_actor_infos().read_guard();
1098
1099 let actor_locations = info
1100 .iter_over_fragments()
1101 .flat_map(|(fragment_id, fragment)| {
1102 fragment
1103 .actors
1104 .iter()
1105 .map(|(actor_id, actor)| PartialActorLocation {
1106 actor_id: *actor_id as _,
1107 fragment_id: *fragment_id as _,
1108 worker_id: actor.worker_id,
1109 })
1110 })
1111 .collect_vec();
1112
1113 Ok(actor_locations)
1114 }
1115
1116 pub async fn list_actor_info(
1117 &self,
1118 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1119 let inner = self.inner.read().await;
1120
1121 let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1122 FragmentModel::find()
1123 .select_only()
1124 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1125 .column(fragment::Column::FragmentId)
1126 .column_as(object::Column::Oid, "job_id")
1127 .column_as(object::Column::SchemaId, "schema_id")
1128 .column_as(object::Column::ObjType, "type")
1129 .into_tuple()
1130 .all(&inner.db)
1131 .await?;
1132
1133 let actor_infos = {
1134 let info = self.env.shared_actor_infos().read_guard();
1135
1136 let mut result = Vec::new();
1137
1138 for (fragment_id, object_id, schema_id, object_type) in fragment_objects {
1139 let Some(fragment) = info.get_fragment(fragment_id as _) else {
1140 return Err(MetaError::unavailable(format!(
1141 "shared actor info missing for fragment {fragment_id} while listing actors"
1142 )));
1143 };
1144
1145 for actor_id in fragment.actors.keys() {
1146 result.push((
1147 *actor_id as _,
1148 fragment.fragment_id as _,
1149 object_id,
1150 schema_id,
1151 object_type,
1152 ));
1153 }
1154 }
1155
1156 result
1157 };
1158
1159 Ok(actor_infos)
1160 }
1161
1162 pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1163 let guard = self.env.shared_actor_info.read_guard();
1164 guard
1165 .iter_over_fragments()
1166 .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1167 .collect_vec()
1168 }
1169
1170 pub async fn list_fragment_descs_with_node(
1171 &self,
1172 is_creating: bool,
1173 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1174 let inner = self.inner.read().await;
1175 let txn = inner.db.begin().await?;
1176
1177 let fragments_query = Self::build_fragment_query(is_creating);
1178 let fragments: Vec<fragment::Model> = fragments_query.all(&txn).await?;
1179
1180 let rows = fragments
1181 .into_iter()
1182 .map(|fragment| FragmentDescRow {
1183 fragment_id: fragment.fragment_id,
1184 job_id: fragment.job_id,
1185 fragment_type_mask: fragment.fragment_type_mask,
1186 distribution_type: fragment.distribution_type,
1187 state_table_ids: fragment.state_table_ids,
1188 vnode_count: fragment.vnode_count,
1189 parallelism_override: fragment.parallelism,
1190 stream_node: Some(fragment.stream_node),
1191 })
1192 .collect_vec();
1193
1194 self.build_fragment_distributions(&txn, rows).await
1195 }
1196
1197 pub async fn list_fragment_descs_without_node(
1198 &self,
1199 is_creating: bool,
1200 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1201 let inner = self.inner.read().await;
1202 let txn = inner.db.begin().await?;
1203
1204 let fragments_query = Self::build_fragment_query(is_creating);
1205 #[allow(clippy::type_complexity)]
1206 let fragments: Vec<(
1207 FragmentId,
1208 JobId,
1209 i32,
1210 DistributionType,
1211 TableIdArray,
1212 i32,
1213 Option<StreamingParallelism>,
1214 )> = fragments_query
1215 .select_only()
1216 .columns([
1217 fragment::Column::FragmentId,
1218 fragment::Column::JobId,
1219 fragment::Column::FragmentTypeMask,
1220 fragment::Column::DistributionType,
1221 fragment::Column::StateTableIds,
1222 fragment::Column::VnodeCount,
1223 fragment::Column::Parallelism,
1224 ])
1225 .into_tuple()
1226 .all(&txn)
1227 .await?;
1228
1229 let rows = fragments
1230 .into_iter()
1231 .map(
1232 |(
1233 fragment_id,
1234 job_id,
1235 fragment_type_mask,
1236 distribution_type,
1237 state_table_ids,
1238 vnode_count,
1239 parallelism_override,
1240 )| FragmentDescRow {
1241 fragment_id,
1242 job_id,
1243 fragment_type_mask,
1244 distribution_type,
1245 state_table_ids,
1246 vnode_count,
1247 parallelism_override,
1248 stream_node: None,
1249 },
1250 )
1251 .collect_vec();
1252
1253 self.build_fragment_distributions(&txn, rows).await
1254 }
1255
1256 fn build_fragment_query(is_creating: bool) -> sea_orm::Select<fragment::Entity> {
1257 if is_creating {
1258 FragmentModel::find()
1259 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1260 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1261 .filter(
1262 streaming_job::Column::JobStatus
1263 .eq(JobStatus::Initial)
1264 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1265 )
1266 } else {
1267 FragmentModel::find()
1268 }
1269 }
1270
1271 async fn build_fragment_distributions(
1272 &self,
1273 txn: &DatabaseTransaction,
1274 rows: Vec<FragmentDescRow>,
1275 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1276 let fragment_ids = rows.iter().map(|row| row.fragment_id).collect_vec();
1277 let job_ids = rows.iter().map(|row| row.job_id).unique().collect_vec();
1278
1279 let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragment_ids.is_empty() {
1280 HashMap::new()
1281 } else {
1282 StreamingJob::find()
1283 .select_only()
1284 .columns([
1285 streaming_job::Column::JobId,
1286 streaming_job::Column::Parallelism,
1287 ])
1288 .filter(streaming_job::Column::JobId.is_in(job_ids))
1289 .into_tuple()
1290 .all(txn)
1291 .await?
1292 .into_iter()
1293 .collect()
1294 };
1295
1296 let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1297 if fragment_ids.is_empty() {
1298 Vec::new()
1299 } else {
1300 FragmentRelation::find()
1301 .select_only()
1302 .columns([
1303 fragment_relation::Column::TargetFragmentId,
1304 fragment_relation::Column::SourceFragmentId,
1305 fragment_relation::Column::DispatcherType,
1306 ])
1307 .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1308 .into_tuple()
1309 .all(txn)
1310 .await?
1311 };
1312
1313 let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1314 for (target_id, source_id, _) in upstream_entries {
1315 all_upstreams.entry(target_id).or_default().push(source_id);
1316 }
1317
1318 let root_fragment_map = if fragment_ids.is_empty() {
1319 HashMap::new()
1320 } else {
1321 let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_ids).await?;
1322 Self::collect_root_fragment_mapping(ensembles)
1323 };
1324
1325 let guard = self.env.shared_actor_info.read_guard();
1326 let mut result = Vec::with_capacity(rows.len());
1327
1328 for row in rows {
1329 let parallelism = guard
1330 .get_fragment(row.fragment_id as _)
1331 .map(|fragment| fragment.actors.len())
1332 .unwrap_or_default();
1333
1334 let root_fragments = root_fragment_map
1335 .get(&row.fragment_id)
1336 .cloned()
1337 .unwrap_or_default();
1338
1339 let upstreams = all_upstreams.remove(&row.fragment_id).unwrap_or_default();
1340
1341 let parallelism_policy = Self::format_fragment_parallelism_policy(
1342 row.distribution_type,
1343 row.parallelism_override.as_ref(),
1344 job_parallelisms.get(&row.job_id),
1345 &root_fragments,
1346 );
1347
1348 let fragment = FragmentDistribution {
1349 fragment_id: row.fragment_id,
1350 table_id: row.job_id,
1351 distribution_type: PbFragmentDistributionType::from(row.distribution_type) as _,
1352 state_table_ids: row.state_table_ids.0,
1353 upstream_fragment_ids: upstreams.clone(),
1354 fragment_type_mask: row.fragment_type_mask as _,
1355 parallelism: parallelism as _,
1356 vnode_count: row.vnode_count as _,
1357 node: row.stream_node.map(|node| node.to_protobuf()),
1358 parallelism_policy,
1359 };
1360
1361 result.push((fragment, upstreams));
1362 }
1363
1364 Ok(result)
1365 }
1366
1367 pub async fn list_sink_log_store_tables(&self) -> MetaResult<Vec<(SinkId, TableId)>> {
1368 let inner = self.inner.read().await;
1369 let txn = inner.db.begin().await?;
1370
1371 let fragments: Vec<(JobId, StreamNode)> = FragmentModel::find()
1372 .select_only()
1373 .columns([fragment::Column::JobId, fragment::Column::StreamNode])
1374 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Sink))
1375 .into_tuple()
1376 .all(&txn)
1377 .await?;
1378
1379 let mut mapping: HashMap<SinkId, TableId> = HashMap::new();
1380
1381 for (job_id, stream_node) in fragments {
1382 let sink_id = SinkId::new(job_id.as_raw_id());
1383 let stream_node = stream_node.to_protobuf();
1384 let mut internal_table_id: Option<TableId> = None;
1385
1386 visit_stream_node_body(&stream_node, |body| {
1387 if let NodeBody::Sink(node) = body
1388 && node.log_store_type == SinkLogStoreType::KvLogStore as i32
1389 && let Some(table) = &node.table
1390 {
1391 internal_table_id = Some(TableId::new(table.id.as_raw_id()));
1392 }
1393 });
1394
1395 if let Some(table_id) = internal_table_id
1396 && let Some(existing) = mapping.insert(sink_id, table_id)
1397 && existing != table_id
1398 {
1399 tracing::warn!(
1400 "sink {sink_id:?} has multiple log store tables: {existing:?} vs {table_id:?}"
1401 );
1402 }
1403 }
1404
1405 Ok(mapping.into_iter().collect())
1406 }
1407
1408 fn collect_root_fragment_mapping(
1410 ensembles: Vec<NoShuffleEnsemble>,
1411 ) -> HashMap<FragmentId, Vec<FragmentId>> {
1412 let mut mapping = HashMap::new();
1413
1414 for ensemble in ensembles {
1415 let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1416 roots.sort_unstable();
1417 roots.dedup();
1418
1419 if roots.is_empty() {
1420 continue;
1421 }
1422
1423 let root_set: HashSet<_> = roots.iter().copied().collect();
1424
1425 for fragment_id in ensemble.component_fragments() {
1426 if root_set.contains(&fragment_id) {
1427 mapping.insert(fragment_id, Vec::new());
1428 } else {
1429 mapping.insert(fragment_id, roots.clone());
1430 }
1431 }
1432 }
1433
1434 mapping
1435 }
1436
1437 fn format_fragment_parallelism_policy(
1438 distribution_type: DistributionType,
1439 fragment_parallelism: Option<&StreamingParallelism>,
1440 job_parallelism: Option<&StreamingParallelism>,
1441 root_fragments: &[FragmentId],
1442 ) -> String {
1443 if distribution_type == DistributionType::Single {
1444 return "single".to_owned();
1445 }
1446
1447 if let Some(parallelism) = fragment_parallelism {
1448 return format!(
1449 "override({})",
1450 Self::format_streaming_parallelism(parallelism)
1451 );
1452 }
1453
1454 if !root_fragments.is_empty() {
1455 let mut upstreams = root_fragments.to_vec();
1456 upstreams.sort_unstable();
1457 upstreams.dedup();
1458
1459 return format!("upstream_fragment({upstreams:?})");
1460 }
1461
1462 let inherited = job_parallelism
1463 .map(Self::format_streaming_parallelism)
1464 .unwrap_or_else(|| "unknown".to_owned());
1465 format!("inherit({inherited})")
1466 }
1467
1468 fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1469 match parallelism {
1470 StreamingParallelism::Adaptive => "adaptive".to_owned(),
1471 StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1472 StreamingParallelism::Custom => "custom".to_owned(),
1473 }
1474 }
1475
1476 pub async fn list_sink_actor_mapping(
1477 &self,
1478 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1479 let inner = self.inner.read().await;
1480 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1481 .select_only()
1482 .columns([sink::Column::SinkId, sink::Column::Name])
1483 .into_tuple()
1484 .all(&inner.db)
1485 .await?;
1486 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1487
1488 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1489
1490 let actor_with_type: Vec<(ActorId, SinkId)> = {
1491 let info = self.env.shared_actor_infos().read_guard();
1492
1493 info.iter_over_fragments()
1494 .filter(|(_, fragment)| {
1495 sink_ids.contains(&fragment.job_id.as_sink_id())
1496 && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1497 })
1498 .flat_map(|(_, fragment)| {
1499 fragment
1500 .actors
1501 .keys()
1502 .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1503 })
1504 .collect()
1505 };
1506
1507 let mut sink_actor_mapping = HashMap::new();
1508 for (actor_id, sink_id) in actor_with_type {
1509 sink_actor_mapping
1510 .entry(sink_id)
1511 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1512 .1
1513 .push(actor_id);
1514 }
1515
1516 Ok(sink_actor_mapping)
1517 }
1518
1519 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1520 let inner = self.inner.read().await;
1521 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1522 .select_only()
1523 .columns([
1524 fragment::Column::FragmentId,
1525 fragment::Column::JobId,
1526 fragment::Column::StateTableIds,
1527 ])
1528 .into_partial_model()
1529 .all(&inner.db)
1530 .await?;
1531 Ok(fragment_state_tables)
1532 }
1533
1534 pub async fn load_all_actors_dynamic(
1537 &self,
1538 database_id: Option<DatabaseId>,
1539 worker_nodes: &ActiveStreamingWorkerNodes,
1540 ) -> MetaResult<FragmentRenderMap> {
1541 let loaded = self.load_fragment_context(database_id).await?;
1542
1543 if loaded.is_empty() {
1544 return Ok(HashMap::new());
1545 }
1546
1547 let adaptive_parallelism_strategy = {
1548 let system_params_reader = self.env.system_params_reader().await;
1549 system_params_reader.adaptive_parallelism_strategy()
1550 };
1551
1552 let RenderedGraph { fragments, .. } = render_actor_assignments(
1553 self.env.actor_id_generator(),
1554 worker_nodes.current(),
1555 adaptive_parallelism_strategy,
1556 &loaded,
1557 )?;
1558
1559 tracing::trace!(?fragments, "reload all actors");
1560
1561 Ok(fragments)
1562 }
1563
1564 pub async fn load_fragment_context(
1566 &self,
1567 database_id: Option<DatabaseId>,
1568 ) -> MetaResult<LoadedFragmentContext> {
1569 let inner = self.inner.read().await;
1570 let txn = inner.db.begin().await?;
1571
1572 self.load_fragment_context_in_txn(&txn, database_id).await
1573 }
1574
1575 pub async fn load_fragment_context_in_txn<C>(
1576 &self,
1577 txn: &C,
1578 database_id: Option<DatabaseId>,
1579 ) -> MetaResult<LoadedFragmentContext>
1580 where
1581 C: ConnectionTrait,
1582 {
1583 let mut query = StreamingJob::find()
1584 .select_only()
1585 .column(streaming_job::Column::JobId);
1586
1587 if let Some(database_id) = database_id {
1588 query = query
1589 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1590 .filter(object::Column::DatabaseId.eq(database_id));
1591 }
1592
1593 let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
1594
1595 let jobs: HashSet<JobId> = jobs.into_iter().collect();
1596
1597 if jobs.is_empty() {
1598 return Ok(LoadedFragmentContext::default());
1599 }
1600
1601 load_fragment_context_for_jobs(txn, jobs).await
1602 }
1603
1604 #[await_tree::instrument]
1605 pub async fn fill_snapshot_backfill_epoch(
1606 &self,
1607 fragment_ids: impl Iterator<Item = FragmentId>,
1608 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1609 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1610 ) -> MetaResult<()> {
1611 let inner = self.inner.write().await;
1612 let txn = inner.db.begin().await?;
1613 for fragment_id in fragment_ids {
1614 let fragment = FragmentModel::find_by_id(fragment_id)
1615 .one(&txn)
1616 .await?
1617 .context(format!("fragment {} not found", fragment_id))?;
1618 let mut node = fragment.stream_node.to_protobuf();
1619 if crate::stream::fill_snapshot_backfill_epoch(
1620 &mut node,
1621 snapshot_backfill_info,
1622 cross_db_snapshot_backfill_info,
1623 )? {
1624 let node = StreamNode::from(&node);
1625 FragmentModel::update(fragment::ActiveModel {
1626 fragment_id: Set(fragment_id),
1627 stream_node: Set(node),
1628 ..Default::default()
1629 })
1630 .exec(&txn)
1631 .await?;
1632 }
1633 }
1634 txn.commit().await?;
1635 Ok(())
1636 }
1637
1638 pub fn get_running_actors_of_fragment(
1640 &self,
1641 fragment_id: FragmentId,
1642 ) -> MetaResult<HashSet<model::ActorId>> {
1643 let info = self.env.shared_actor_infos().read_guard();
1644
1645 let actors = info
1646 .get_fragment(fragment_id as _)
1647 .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1648 .unwrap_or_default();
1649
1650 Ok(actors)
1651 }
1652
1653 pub async fn get_running_actors_for_source_backfill(
1656 &self,
1657 source_backfill_fragment_id: FragmentId,
1658 source_fragment_id: FragmentId,
1659 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1660 let inner = self.inner.read().await;
1661 let txn = inner.db.begin().await?;
1662
1663 let fragment_relation: DispatcherType = FragmentRelation::find()
1664 .select_only()
1665 .column(fragment_relation::Column::DispatcherType)
1666 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1667 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1668 .into_tuple()
1669 .one(&txn)
1670 .await?
1671 .ok_or_else(|| {
1672 anyhow!(
1673 "no fragment connection from source fragment {} to source backfill fragment {}",
1674 source_fragment_id,
1675 source_backfill_fragment_id
1676 )
1677 })?;
1678
1679 if fragment_relation != DispatcherType::NoShuffle {
1680 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1681 }
1682
1683 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1684 let result: MetaResult<DistributionType> = try {
1685 FragmentModel::find_by_id(fragment_id)
1686 .select_only()
1687 .column(fragment::Column::DistributionType)
1688 .into_tuple()
1689 .one(txn)
1690 .await?
1691 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1692 };
1693 result
1694 };
1695
1696 let source_backfill_distribution_type =
1697 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1698 let source_distribution_type =
1699 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1700
1701 let load_fragment_actor_distribution =
1702 |actor_info: &SharedActorInfos,
1703 fragment_id: FragmentId|
1704 -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1705 let guard = actor_info.read_guard();
1706
1707 guard
1708 .get_fragment(fragment_id as _)
1709 .map(|fragment| {
1710 fragment
1711 .actors
1712 .iter()
1713 .map(|(actor_id, actor)| {
1714 (
1715 *actor_id as _,
1716 actor
1717 .vnode_bitmap
1718 .as_ref()
1719 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1720 )
1721 })
1722 .collect()
1723 })
1724 .unwrap_or_default()
1725 };
1726
1727 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1728 load_fragment_actor_distribution(
1729 self.env.shared_actor_infos(),
1730 source_backfill_fragment_id,
1731 );
1732
1733 let source_actors =
1734 load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1735
1736 Ok(resolve_no_shuffle_actor_mapping(
1737 source_distribution_type,
1738 source_actors.iter().map(|(&id, bitmap)| (id, bitmap)),
1739 source_backfill_distribution_type,
1740 source_backfill_actors
1741 .iter()
1742 .map(|(&id, bitmap)| (id, bitmap)),
1743 )
1744 .into_iter()
1745 .map(|(source_actor, source_backfill_actor)| {
1746 (source_backfill_actor as _, source_actor as _)
1747 })
1748 .collect())
1749 }
1750
1751 pub async fn get_root_fragments(
1764 &self,
1765 job_ids: Vec<JobId>,
1766 ) -> MetaResult<HashMap<JobId, Fragment>> {
1767 let inner = self.inner.read().await;
1768
1769 let all_fragments = FragmentModel::find()
1770 .filter(fragment::Column::JobId.is_in(job_ids))
1771 .all(&inner.db)
1772 .await?;
1773 let mut root_fragments = HashMap::<JobId, Fragment>::new();
1775 for fragment in all_fragments {
1776 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1777 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1778 _ = root_fragments.insert(fragment.job_id, fragment.into());
1779 } else if mask.contains(FragmentTypeFlag::Source) {
1780 _ = root_fragments.try_insert(fragment.job_id, fragment.into());
1783 }
1784 }
1785
1786 Ok(root_fragments)
1787 }
1788
1789 pub async fn get_root_fragment(&self, job_id: JobId) -> MetaResult<Fragment> {
1790 let mut root_fragments = self.get_root_fragments(vec![job_id]).await?;
1791 let root_fragment = root_fragments
1792 .remove(&job_id)
1793 .context(format!("root fragment for job {} not found", job_id))?;
1794
1795 Ok(root_fragment)
1796 }
1797
1798 pub async fn get_downstream_fragments(
1800 &self,
1801 job_id: JobId,
1802 ) -> MetaResult<Vec<(stream_plan::DispatcherType, Fragment)>> {
1803 let root_fragment = self.get_root_fragment(job_id).await?;
1804
1805 let inner = self.inner.read().await;
1806 let txn = inner.db.begin().await?;
1807 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1808 .filter(
1809 fragment_relation::Column::SourceFragmentId
1810 .eq(root_fragment.fragment_id as FragmentId),
1811 )
1812 .all(&txn)
1813 .await?;
1814
1815 let downstream_fragment_ids = downstream_fragment_relations
1816 .iter()
1817 .map(|model| model.target_fragment_id as FragmentId)
1818 .collect::<HashSet<_>>();
1819
1820 let downstream_fragments: Vec<fragment::Model> = FragmentModel::find()
1821 .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1822 .all(&txn)
1823 .await?;
1824
1825 let mut downstream_fragments_map: HashMap<_, _> = downstream_fragments
1826 .into_iter()
1827 .map(|fragment| (fragment.fragment_id, fragment))
1828 .collect();
1829
1830 let mut downstream_fragments = vec![];
1831
1832 let fragment_map: HashMap<_, _> = downstream_fragment_relations
1833 .iter()
1834 .map(|model| (model.target_fragment_id, model.dispatcher_type))
1835 .collect();
1836
1837 for (fragment_id, dispatcher_type) in fragment_map {
1838 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1839
1840 let fragment = downstream_fragments_map
1841 .remove(&fragment_id)
1842 .context(format!(
1843 "downstream fragment node for id {} not found",
1844 fragment_id
1845 ))?
1846 .into();
1847
1848 downstream_fragments.push((dispatch_type, fragment));
1849 }
1850 Ok(downstream_fragments)
1851 }
1852
1853 pub async fn load_source_fragment_ids(
1854 &self,
1855 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1856 let inner = self.inner.read().await;
1857 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1858 .select_only()
1859 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1860 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1861 .into_tuple()
1862 .all(&inner.db)
1863 .await?;
1864
1865 let mut source_fragment_ids = HashMap::new();
1866 for (fragment_id, stream_node) in fragments {
1867 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1868 source_fragment_ids
1869 .entry(source_id)
1870 .or_insert_with(BTreeSet::new)
1871 .insert(fragment_id);
1872 }
1873 }
1874 Ok(source_fragment_ids)
1875 }
1876
1877 pub async fn load_backfill_fragment_ids(
1878 &self,
1879 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1880 let inner = self.inner.read().await;
1881 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1882 .select_only()
1883 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1884 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1885 .into_tuple()
1886 .all(&inner.db)
1887 .await?;
1888
1889 let mut source_fragment_ids = HashMap::new();
1890 for (fragment_id, stream_node) in fragments {
1891 if let Some((source_id, upstream_source_fragment_id)) =
1892 stream_node.to_protobuf().find_source_backfill()
1893 {
1894 source_fragment_ids
1895 .entry(source_id)
1896 .or_insert_with(BTreeSet::new)
1897 .insert((fragment_id, upstream_source_fragment_id));
1898 }
1899 }
1900 Ok(source_fragment_ids)
1901 }
1902
1903 pub async fn get_all_upstream_sink_infos(
1904 &self,
1905 target_table: &PbTable,
1906 target_fragment_id: FragmentId,
1907 ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1908 let inner = self.inner.read().await;
1909 let txn = inner.db.begin().await?;
1910
1911 self.get_all_upstream_sink_infos_in_txn(&txn, target_table, target_fragment_id)
1912 .await
1913 }
1914
1915 pub async fn get_all_upstream_sink_infos_in_txn<C>(
1916 &self,
1917 txn: &C,
1918 target_table: &PbTable,
1919 target_fragment_id: FragmentId,
1920 ) -> MetaResult<Vec<UpstreamSinkInfo>>
1921 where
1922 C: ConnectionTrait,
1923 {
1924 let incoming_sinks = Sink::find()
1925 .filter(sink::Column::TargetTable.eq(target_table.id))
1926 .all(txn)
1927 .await?;
1928
1929 let sink_ids = incoming_sinks.iter().map(|s| s.sink_id).collect_vec();
1930 let sink_fragment_ids = get_sink_fragment_by_ids(txn, sink_ids).await?;
1931
1932 let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1933 for sink in &incoming_sinks {
1934 let sink_fragment_id =
1935 sink_fragment_ids
1936 .get(&sink.sink_id)
1937 .cloned()
1938 .ok_or(anyhow::anyhow!(
1939 "sink fragment not found for sink id {}",
1940 sink.sink_id
1941 ))?;
1942 let upstream_info = build_upstream_sink_info(
1943 sink.sink_id,
1944 sink.original_target_columns
1945 .as_ref()
1946 .map(|cols| cols.to_protobuf())
1947 .unwrap_or_default(),
1948 sink_fragment_id,
1949 target_table,
1950 target_fragment_id,
1951 )?;
1952 upstream_sink_infos.push(upstream_info);
1953 }
1954
1955 Ok(upstream_sink_infos)
1956 }
1957
1958 pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1959 let inner = self.inner.read().await;
1960 let txn = inner.db.begin().await?;
1961
1962 let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1963 .select_only()
1964 .column(fragment::Column::FragmentId)
1965 .filter(
1966 fragment::Column::JobId
1967 .eq(job_id)
1968 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1969 )
1970 .into_tuple()
1971 .all(&txn)
1972 .await?;
1973
1974 if mview_fragment.len() != 1 {
1975 return Err(anyhow::anyhow!(
1976 "expected exactly one mview fragment for job {}, found {}",
1977 job_id,
1978 mview_fragment.len()
1979 )
1980 .into());
1981 }
1982
1983 Ok(mview_fragment.into_iter().next().unwrap())
1984 }
1985
1986 pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1987 let inner = self.inner.read().await;
1988 let txn = inner.db.begin().await?;
1989 has_table_been_migrated(&txn, table_id).await
1990 }
1991
1992 pub async fn update_fragment_splits<C>(
1993 &self,
1994 txn: &C,
1995 fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
1996 ) -> MetaResult<()>
1997 where
1998 C: ConnectionTrait,
1999 {
2000 if fragment_splits.is_empty() {
2001 return Ok(());
2002 }
2003
2004 let existing_fragment_ids: HashSet<FragmentId> = FragmentModel::find()
2005 .select_only()
2006 .column(fragment::Column::FragmentId)
2007 .filter(fragment::Column::FragmentId.is_in(fragment_splits.keys().copied()))
2008 .into_tuple()
2009 .all(txn)
2010 .await?
2011 .into_iter()
2012 .collect();
2013
2014 let (models, skipped_fragment_ids): (Vec<_>, Vec<_>) = fragment_splits
2016 .iter()
2017 .partition_map(|(fragment_id, splits)| {
2018 if existing_fragment_ids.contains(fragment_id) {
2019 Either::Left(fragment_splits::ActiveModel {
2020 fragment_id: Set(*fragment_id as _),
2021 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
2022 splits: splits.iter().map(Into::into).collect_vec(),
2023 }))),
2024 })
2025 } else {
2026 Either::Right(*fragment_id)
2027 }
2028 });
2029
2030 if !skipped_fragment_ids.is_empty() {
2031 tracing::warn!(
2032 skipped_fragment_ids = ?skipped_fragment_ids,
2033 total_fragment_ids = fragment_splits.len(),
2034 "skipping stale fragment split updates for missing fragments"
2035 );
2036 }
2037
2038 if models.is_empty() {
2039 return Ok(());
2040 }
2041
2042 FragmentSplits::insert_many(models)
2043 .on_conflict(
2044 OnConflict::column(fragment_splits::Column::FragmentId)
2045 .update_column(fragment_splits::Column::Splits)
2046 .to_owned(),
2047 )
2048 .exec(txn)
2049 .await?;
2050
2051 Ok(())
2052 }
2053}
2054
2055#[cfg(test)]
2056mod tests {
2057 use std::collections::{BTreeMap, HashMap, HashSet};
2058
2059 use itertools::Itertools;
2060 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
2061 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
2062 use risingwave_common::id::JobId;
2063 use risingwave_common::util::iter_util::ZipEqDebug;
2064 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
2065 use risingwave_meta_model::fragment::DistributionType;
2066 use risingwave_meta_model::*;
2067 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
2068 use risingwave_pb::plan_common::PbExprContext;
2069 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
2070 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
2071 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
2072
2073 use super::ActorInfo;
2074 use crate::MetaResult;
2075 use crate::controller::catalog::CatalogController;
2076 use crate::model::{Fragment, StreamActor};
2077
2078 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
2079
2080 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
2081
2082 const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
2083
2084 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
2085
2086 const TEST_JOB_ID: JobId = JobId::new(1);
2087
2088 const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
2089
2090 fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
2091 let mut upstream_actor_ids = BTreeMap::new();
2092 upstream_actor_ids.insert(
2093 TEST_UPSTREAM_FRAGMENT_ID,
2094 HashSet::from_iter([(actor_id + 100)]),
2095 );
2096 upstream_actor_ids.insert(
2097 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
2098 HashSet::from_iter([(actor_id + 200)]),
2099 );
2100 upstream_actor_ids
2101 }
2102
2103 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
2104 let mut input = vec![];
2105 for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
2106 input.push(PbStreamNode {
2107 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
2108 upstream_fragment_id,
2109 ..Default::default()
2110 }))),
2111 ..Default::default()
2112 });
2113 }
2114
2115 PbStreamNode {
2116 input,
2117 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
2118 ..Default::default()
2119 }
2120 }
2121
2122 #[tokio::test]
2123 async fn test_extract_fragment() -> MetaResult<()> {
2124 let actor_count = 3u32;
2125 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2126 .map(|actor_id| {
2127 (
2128 actor_id.into(),
2129 generate_upstream_actor_ids_for_actor(actor_id.into()),
2130 )
2131 })
2132 .collect();
2133
2134 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
2135
2136 let pb_fragment = Fragment {
2137 fragment_id: TEST_FRAGMENT_ID as _,
2138 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
2139 distribution_type: PbFragmentDistributionType::Hash as _,
2140 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
2141 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
2142 nodes: stream_node,
2143 };
2144
2145 let fragment =
2146 CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
2147
2148 check_fragment(fragment, pb_fragment);
2149
2150 Ok(())
2151 }
2152
2153 #[tokio::test]
2154 async fn test_compose_fragment() -> MetaResult<()> {
2155 let actor_count = 3u32;
2156
2157 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2158 .map(|actor_id| {
2159 (
2160 actor_id.into(),
2161 generate_upstream_actor_ids_for_actor(actor_id.into()),
2162 )
2163 })
2164 .collect();
2165
2166 let mut actor_bitmaps = ActorMapping::new_uniform(
2167 (0..actor_count).map(|i| i.into()),
2168 VirtualNode::COUNT_FOR_TEST,
2169 )
2170 .to_bitmaps();
2171
2172 let actors = (0..actor_count)
2173 .map(|actor_id| {
2174 let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
2175 splits: vec![PbConnectorSplit {
2176 split_type: "dummy".to_owned(),
2177 ..Default::default()
2178 }],
2179 });
2180
2181 ActorInfo {
2182 actor_id: actor_id.into(),
2183 fragment_id: TEST_FRAGMENT_ID,
2184 splits: actor_splits,
2185 worker_id: 0.into(),
2186 vnode_bitmap: actor_bitmaps
2187 .remove(&actor_id)
2188 .map(|bitmap| bitmap.to_protobuf())
2189 .as_ref()
2190 .map(VnodeBitmap::from),
2191 expr_context: ExprContext::from(&PbExprContext {
2192 time_zone: String::from("America/New_York"),
2193 strict_mode: false,
2194 }),
2195 config_override: "a.b.c = true".into(),
2196 }
2197 })
2198 .collect_vec();
2199
2200 let stream_node = {
2201 let template_actor = actors.first().cloned().unwrap();
2202
2203 let template_upstream_actor_ids =
2204 upstream_actor_ids.get(&template_actor.actor_id).unwrap();
2205
2206 generate_merger_stream_node(template_upstream_actor_ids)
2207 };
2208
2209 #[expect(deprecated)]
2210 let fragment = fragment::Model {
2211 fragment_id: TEST_FRAGMENT_ID,
2212 job_id: TEST_JOB_ID,
2213 fragment_type_mask: 0,
2214 distribution_type: DistributionType::Hash,
2215 stream_node: StreamNode::from(&stream_node),
2216 state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
2217 upstream_fragment_id: Default::default(),
2218 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2219 parallelism: None,
2220 };
2221
2222 let (pb_fragment, pb_actors, pb_actor_status, pb_actor_splits) =
2223 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2224
2225 assert_eq!(pb_actor_status.len(), actor_count as usize);
2226 assert!(
2227 pb_actor_status
2228 .values()
2229 .all(|actor_status| actor_status.location.is_some())
2230 );
2231 assert_eq!(pb_actor_splits.len(), actor_count as usize);
2232
2233 check_fragment(fragment, pb_fragment);
2234 check_actors(
2235 actors,
2236 &upstream_actor_ids,
2237 pb_actors,
2238 pb_actor_splits,
2239 &stream_node,
2240 );
2241
2242 Ok(())
2243 }
2244
2245 fn check_actors(
2246 actors: Vec<ActorInfo>,
2247 actor_upstreams: &FragmentActorUpstreams,
2248 pb_actors: Vec<StreamActor>,
2249 pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2250 stream_node: &PbStreamNode,
2251 ) {
2252 for (
2253 ActorInfo {
2254 actor_id,
2255 fragment_id,
2256 splits,
2257 worker_id: _,
2258 vnode_bitmap,
2259 expr_context,
2260 ..
2261 },
2262 StreamActor {
2263 actor_id: pb_actor_id,
2264 fragment_id: pb_fragment_id,
2265 vnode_bitmap: pb_vnode_bitmap,
2266 mview_definition,
2267 expr_context: pb_expr_context,
2268 ..
2269 },
2270 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2271 {
2272 assert_eq!(actor_id, pb_actor_id as ActorId);
2273 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2274
2275 assert_eq!(
2276 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2277 pb_vnode_bitmap,
2278 );
2279
2280 assert_eq!(mview_definition, "");
2281
2282 visit_stream_node_body(stream_node, |body| {
2283 if let PbNodeBody::Merge(m) = body {
2284 assert!(
2285 actor_upstreams
2286 .get(&actor_id)
2287 .unwrap()
2288 .contains_key(&m.upstream_fragment_id)
2289 );
2290 }
2291 });
2292
2293 assert_eq!(
2294 splits,
2295 pb_actor_splits
2296 .get(&pb_actor_id)
2297 .map(ConnectorSplits::from)
2298 .unwrap_or_default()
2299 );
2300
2301 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2302 }
2303 }
2304
2305 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2306 let Fragment {
2307 fragment_id,
2308 fragment_type_mask,
2309 distribution_type: pb_distribution_type,
2310 state_table_ids: pb_state_table_ids,
2311 maybe_vnode_count: _,
2312 nodes,
2313 } = pb_fragment;
2314
2315 assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2316 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2317 assert_eq!(
2318 pb_distribution_type,
2319 PbFragmentDistributionType::from(fragment.distribution_type)
2320 );
2321
2322 assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2323 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2324 }
2325
2326 #[test]
2327 fn test_parallelism_policy_with_root_fragments() {
2328 #[expect(deprecated)]
2329 let fragment = fragment::Model {
2330 fragment_id: 3.into(),
2331 job_id: TEST_JOB_ID,
2332 fragment_type_mask: 0,
2333 distribution_type: DistributionType::Hash,
2334 stream_node: StreamNode::from(&PbStreamNode::default()),
2335 state_table_ids: TableIdArray::default(),
2336 upstream_fragment_id: Default::default(),
2337 vnode_count: 0,
2338 parallelism: None,
2339 };
2340
2341 let job_parallelism = StreamingParallelism::Fixed(4);
2342
2343 let policy = super::CatalogController::format_fragment_parallelism_policy(
2344 fragment.distribution_type,
2345 fragment.parallelism.as_ref(),
2346 Some(&job_parallelism),
2347 &[],
2348 );
2349
2350 assert_eq!(policy, "inherit(fixed(4))");
2351 }
2352
2353 #[test]
2354 fn test_parallelism_policy_with_upstream_roots() {
2355 #[expect(deprecated)]
2356 let fragment = fragment::Model {
2357 fragment_id: 5.into(),
2358 job_id: TEST_JOB_ID,
2359 fragment_type_mask: 0,
2360 distribution_type: DistributionType::Hash,
2361 stream_node: StreamNode::from(&PbStreamNode::default()),
2362 state_table_ids: TableIdArray::default(),
2363 upstream_fragment_id: Default::default(),
2364 vnode_count: 0,
2365 parallelism: None,
2366 };
2367
2368 let policy = super::CatalogController::format_fragment_parallelism_policy(
2369 fragment.distribution_type,
2370 fragment.parallelism.as_ref(),
2371 None,
2372 &[3.into(), 1.into(), 2.into(), 1.into()],
2373 );
2374
2375 assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2376 }
2377}