1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::{Either, Itertools};
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::AdaptiveParallelismStrategy;
28use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
29use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
30use risingwave_connector::source::SplitImpl;
31use risingwave_connector::source::cdc::CdcScanOptions;
32use risingwave_meta_model::fragment::DistributionType;
33use risingwave_meta_model::object::ObjectType;
34use risingwave_meta_model::prelude::{
35 Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
36};
37use risingwave_meta_model::{
38 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
39 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
40 TableIdArray, VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits,
41 object, sink, source, streaming_job, table,
42};
43use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
44use risingwave_pb::catalog::PbTable;
45use risingwave_pb::common::PbActorLocation;
46use risingwave_pb::meta::subscribe_response::{
47 Info as NotificationInfo, Operation as NotificationOperation,
48};
49use risingwave_pb::meta::table_fragments::fragment::{
50 FragmentDistributionType, PbFragmentDistributionType,
51};
52use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
53use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
54use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
55use risingwave_pb::stream_plan;
56use risingwave_pb::stream_plan::stream_node::NodeBody;
57use risingwave_pb::stream_plan::{
58 PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, SinkLogStoreType,
59 StreamScanType,
60};
61use sea_orm::ActiveValue::Set;
62use sea_orm::sea_query::Expr;
63use sea_orm::{
64 ColumnTrait, ConnectionTrait, DatabaseTransaction, EntityTrait, FromQueryResult, JoinType,
65 PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, StreamTrait, TransactionTrait,
66};
67use serde::{Deserialize, Serialize};
68
69use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
70use crate::controller::catalog::CatalogController;
71use crate::controller::scale::{
72 FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble, RenderedGraph,
73 find_fragment_no_shuffle_dags_detailed, load_fragment_context_for_jobs,
74 render_actor_assignments, resolve_streaming_job_definition,
75};
76use crate::controller::utils::{
77 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
78 get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
79 resolve_no_shuffle_actor_mapping,
80};
81use crate::error::MetaError;
82use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
83use crate::model::{
84 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
85 StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
86};
87use crate::rpc::ddl_controller::build_upstream_sink_info;
88use crate::stream::UpstreamSinkInfo;
89use crate::{MetaResult, model};
90
91#[derive(Debug)]
93pub struct InflightActorInfo {
94 pub worker_id: WorkerId,
95 pub vnode_bitmap: Option<Bitmap>,
96 pub splits: Vec<SplitImpl>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100struct ActorInfo {
101 pub actor_id: ActorId,
102 pub fragment_id: FragmentId,
103 pub splits: ConnectorSplits,
104 pub worker_id: WorkerId,
105 pub vnode_bitmap: Option<VnodeBitmap>,
106 pub expr_context: ExprContext,
107 pub config_override: Arc<str>,
108}
109
110#[derive(Debug)]
111struct FragmentDescRow {
112 fragment_id: FragmentId,
113 job_id: JobId,
114 fragment_type_mask: i32,
115 distribution_type: DistributionType,
116 state_table_ids: TableIdArray,
117 vnode_count: i32,
118 parallelism_override: Option<StreamingParallelism>,
119 stream_node: Option<StreamNode>,
120}
121
122#[derive(Debug)]
123pub struct InflightFragmentInfo {
124 pub fragment_id: FragmentId,
125 pub distribution_type: DistributionType,
126 pub fragment_type_mask: FragmentTypeMask,
127 pub vnode_count: usize,
128 pub nodes: PbStreamNode,
129 pub actors: HashMap<ActorId, InflightActorInfo>,
130 pub state_table_ids: HashSet<TableId>,
131}
132
133#[derive(Clone, Debug)]
134pub struct FragmentParallelismInfo {
135 pub distribution_type: FragmentDistributionType,
136 pub vnode_count: usize,
137}
138
139#[easy_ext::ext(FragmentTypeMaskExt)]
140pub impl FragmentTypeMask {
141 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
142 Expr::col(fragment::Column::FragmentTypeMask)
143 .bit_and(Expr::value(flag as i32))
144 .ne(0)
145 }
146
147 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
148 Expr::col(fragment::Column::FragmentTypeMask)
149 .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
150 .ne(0)
151 }
152
153 fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
154 Expr::col(fragment::Column::FragmentTypeMask)
155 .bit_and(Expr::value(flag as i32))
156 .eq(0)
157 }
158}
159
160#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
161#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
163 pub job_id: JobId,
164 pub obj_type: ObjectType,
165 pub name: String,
166 pub job_status: JobStatus,
167 pub parallelism: StreamingParallelism,
168 pub adaptive_parallelism_strategy: Option<String>,
169 #[serde(skip)]
173 pub backfill_parallelism: Option<StreamingParallelism>,
174 #[serde(skip)]
175 pub backfill_adaptive_parallelism_strategy: Option<String>,
176 pub max_parallelism: i32,
177 pub resource_group: String,
178 pub config_override: String,
179 pub database_id: DatabaseId,
180 pub schema_id: SchemaId,
181}
182
183impl NotificationManager {
184 pub(crate) fn notify_streaming_fragment_mapping(
190 &self,
191 operation: NotificationOperation,
192 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
193 ) {
194 if fragment_mappings.is_empty() {
195 return;
196 }
197 for fragment_mapping in fragment_mappings {
199 self.notify_frontend_without_version(
200 operation,
201 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
202 );
203 }
204 }
205
206 pub(crate) fn notify_serving_fragment_mapping_update(
211 &self,
212 fragment_ids: Vec<crate::model::FragmentId>,
213 ) {
214 if fragment_ids.is_empty() {
215 return;
216 }
217 self.notify_local_subscribers(LocalNotification::ServingFragmentMappingsUpsert(
218 fragment_ids,
219 ));
220 }
221
222 pub(crate) fn notify_serving_fragment_mapping_delete(
227 &self,
228 fragment_ids: Vec<crate::model::FragmentId>,
229 ) {
230 if fragment_ids.is_empty() {
231 return;
232 }
233 self.notify_local_subscribers(LocalNotification::ServingFragmentMappingsDelete(
234 fragment_ids,
235 ));
236 }
237}
238
239impl CatalogController {
240 pub fn prepare_fragment_models_from_fragments(
241 job_id: JobId,
242 fragments: impl Iterator<Item = &Fragment>,
243 ) -> MetaResult<Vec<fragment::Model>> {
244 fragments
245 .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
246 .try_collect()
247 }
248
249 pub fn prepare_fragment_model_for_new_job(
250 job_id: JobId,
251 fragment: &Fragment,
252 ) -> MetaResult<fragment::Model> {
253 let vnode_count = fragment.vnode_count();
254 let Fragment {
255 fragment_id: pb_fragment_id,
256 fragment_type_mask: pb_fragment_type_mask,
257 distribution_type: pb_distribution_type,
258 state_table_ids: pb_state_table_ids,
259 nodes,
260 ..
261 } = fragment;
262
263 let state_table_ids = pb_state_table_ids.clone().into();
264
265 let fragment_parallelism = nodes
266 .node_body
267 .as_ref()
268 .and_then(|body| match body {
269 NodeBody::StreamCdcScan(node) => Some(node),
270 _ => None,
271 })
272 .and_then(|node| node.options.as_ref())
273 .map(CdcScanOptions::from_proto)
274 .filter(|opts| opts.is_parallelized_backfill())
275 .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
276
277 let stream_node = StreamNode::from(nodes);
278
279 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
280 .unwrap()
281 .into();
282
283 #[expect(deprecated)]
284 let fragment = fragment::Model {
285 fragment_id: *pb_fragment_id as _,
286 job_id,
287 fragment_type_mask: (*pb_fragment_type_mask).into(),
288 distribution_type,
289 stream_node,
290 state_table_ids,
291 upstream_fragment_id: Default::default(),
292 vnode_count: vnode_count as _,
293 parallelism: fragment_parallelism,
294 };
295
296 Ok(fragment)
297 }
298
299 #[expect(clippy::type_complexity)]
300 fn compose_table_fragments(
301 job_id: JobId,
302 state: PbState,
303 ctx: StreamContext,
304 fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
305 max_parallelism: usize,
306 job_definition: Option<String>,
307 ) -> MetaResult<(
308 StreamJobFragments,
309 HashMap<FragmentId, Vec<StreamActor>>,
310 HashMap<crate::model::ActorId, PbActorStatus>,
311 )> {
312 let mut pb_fragments = BTreeMap::new();
313 let mut fragment_actors = HashMap::new();
314 let mut all_actor_status = HashMap::new();
315
316 for (fragment, actors) in fragments {
317 let (fragment, actors, actor_status, _) =
318 Self::compose_fragment(fragment, actors, job_definition.clone())?;
319 let fragment_id = fragment.fragment_id;
320 fragment_actors.insert(fragment_id, actors);
321 all_actor_status.extend(actor_status);
322
323 pb_fragments.insert(fragment_id, fragment);
324 }
325
326 let table_fragments = StreamJobFragments {
327 stream_job_id: job_id,
328 state: state as _,
329 fragments: pb_fragments,
330 ctx,
331 max_parallelism,
332 };
333
334 Ok((table_fragments, fragment_actors, all_actor_status))
335 }
336
337 #[expect(clippy::type_complexity)]
338 fn compose_fragment(
339 fragment: fragment::Model,
340 actors: Vec<ActorInfo>,
341 job_definition: Option<String>,
342 ) -> MetaResult<(
343 Fragment,
344 Vec<StreamActor>,
345 HashMap<crate::model::ActorId, PbActorStatus>,
346 HashMap<crate::model::ActorId, PbConnectorSplits>,
347 )> {
348 let fragment::Model {
349 fragment_id,
350 fragment_type_mask,
351 distribution_type,
352 stream_node,
353 state_table_ids,
354 vnode_count,
355 ..
356 } = fragment;
357
358 let stream_node = stream_node.to_protobuf();
359 let mut upstream_fragments = HashSet::new();
360 visit_stream_node_body(&stream_node, |body| {
361 if let NodeBody::Merge(m) = body {
362 assert!(
363 upstream_fragments.insert(m.upstream_fragment_id),
364 "non-duplicate upstream fragment"
365 );
366 }
367 });
368
369 let mut pb_actors = vec![];
370
371 let mut pb_actor_status = HashMap::new();
372 let mut pb_actor_splits = HashMap::new();
373
374 for actor in actors {
375 if actor.fragment_id != fragment_id {
376 bail!(
377 "fragment id {} from actor {} is different from fragment {}",
378 actor.fragment_id,
379 actor.actor_id,
380 fragment_id
381 )
382 }
383
384 let ActorInfo {
385 actor_id,
386 fragment_id,
387 worker_id,
388 splits,
389 vnode_bitmap,
390 expr_context,
391 config_override,
392 ..
393 } = actor;
394
395 let vnode_bitmap =
396 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
397 let pb_expr_context = Some(expr_context.to_protobuf());
398
399 pb_actor_status.insert(
400 actor_id as _,
401 PbActorStatus {
402 location: PbActorLocation::from_worker(worker_id),
403 },
404 );
405
406 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
407
408 pb_actors.push(StreamActor {
409 actor_id: actor_id as _,
410 fragment_id: fragment_id as _,
411 vnode_bitmap,
412 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
413 expr_context: pb_expr_context,
414 config_override,
415 })
416 }
417
418 let pb_state_table_ids = state_table_ids.0;
419 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
420 let pb_fragment = Fragment {
421 fragment_id: fragment_id as _,
422 fragment_type_mask: fragment_type_mask.into(),
423 distribution_type: pb_distribution_type,
424 state_table_ids: pb_state_table_ids,
425 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
426 nodes: stream_node,
427 };
428
429 Ok((pb_fragment, pb_actors, pb_actor_status, pb_actor_splits))
430 }
431
432 pub async fn fragment_parallelisms(
439 &self,
440 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
441 let inner = self.inner.read().await;
442 let query = FragmentModel::find().select_only().columns([
443 fragment::Column::FragmentId,
444 fragment::Column::DistributionType,
445 fragment::Column::VnodeCount,
446 ]);
447 let fragments: Vec<(FragmentId, DistributionType, i32)> =
448 query.into_tuple().all(&inner.db).await?;
449
450 Ok(fragments
451 .into_iter()
452 .map(|(fragment_id, distribution_type, vnode_count)| {
453 (
454 fragment_id,
455 FragmentParallelismInfo {
456 distribution_type: PbFragmentDistributionType::from(distribution_type),
457 vnode_count: vnode_count as usize,
458 },
459 )
460 })
461 .collect())
462 }
463
464 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
465 let inner = self.inner.read().await;
466 let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
467 .select_only()
468 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
469 .into_tuple()
470 .all(&inner.db)
471 .await?;
472 Ok(fragment_jobs.into_iter().collect())
473 }
474
475 pub async fn get_fragment_job_id(
476 &self,
477 fragment_ids: Vec<FragmentId>,
478 ) -> MetaResult<Vec<ObjectId>> {
479 let inner = self.inner.read().await;
480 self.get_fragment_job_id_in_txn(&inner.db, fragment_ids)
481 .await
482 }
483
484 pub async fn get_fragment_job_id_in_txn<C>(
485 &self,
486 txn: &C,
487 fragment_ids: Vec<FragmentId>,
488 ) -> MetaResult<Vec<ObjectId>>
489 where
490 C: ConnectionTrait + Send,
491 {
492 let object_ids: Vec<ObjectId> = FragmentModel::find()
493 .select_only()
494 .column(fragment::Column::JobId)
495 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
496 .into_tuple()
497 .all(txn)
498 .await?;
499
500 Ok(object_ids)
501 }
502
503 pub async fn get_fragment_desc_by_id(
504 &self,
505 fragment_id: FragmentId,
506 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
507 let inner = self.inner.read().await;
508
509 let fragment_model = match FragmentModel::find_by_id(fragment_id)
510 .one(&inner.db)
511 .await?
512 {
513 Some(fragment) => fragment,
514 None => return Ok(None),
515 };
516
517 let job_parallelism: Option<(StreamingParallelism, Option<String>)> =
518 StreamingJob::find_by_id(fragment_model.job_id)
519 .select_only()
520 .columns([
521 streaming_job::Column::Parallelism,
522 streaming_job::Column::AdaptiveParallelismStrategy,
523 ])
524 .into_tuple::<(StreamingParallelism, Option<String>)>()
525 .one(&inner.db)
526 .await?;
527
528 let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
529 .select_only()
530 .columns([
531 fragment_relation::Column::SourceFragmentId,
532 fragment_relation::Column::DispatcherType,
533 ])
534 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
535 .into_tuple()
536 .all(&inner.db)
537 .await?;
538
539 let upstreams: Vec<_> = upstream_entries
540 .into_iter()
541 .map(|(source_id, _)| source_id)
542 .collect();
543
544 let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
545 .await
546 .map(Self::collect_root_fragment_mapping)?;
547 let root_fragments = root_fragment_map
548 .get(&fragment_id)
549 .cloned()
550 .unwrap_or_default();
551
552 let info = self.env.shared_actor_infos().read_guard();
553 let SharedFragmentInfo { actors, .. } = info
554 .get_fragment(fragment_model.fragment_id as _)
555 .unwrap_or_else(|| {
556 panic!(
557 "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
558 fragment_model.fragment_id,
559 fragment_model.job_id
560 )
561 });
562
563 let parallelism_policy = Self::format_fragment_parallelism_policy(
564 fragment_model.distribution_type,
565 fragment_model.parallelism.as_ref(),
566 job_parallelism.as_ref().map(|(parallelism, _)| parallelism),
567 job_parallelism
568 .as_ref()
569 .and_then(|(_, strategy)| strategy.as_deref()),
570 &root_fragments,
571 );
572
573 let fragment = FragmentDesc {
574 fragment_id: fragment_model.fragment_id,
575 job_id: fragment_model.job_id,
576 fragment_type_mask: fragment_model.fragment_type_mask,
577 distribution_type: fragment_model.distribution_type,
578 state_table_ids: fragment_model.state_table_ids.clone(),
579 parallelism: actors.len() as _,
580 vnode_count: fragment_model.vnode_count,
581 stream_node: fragment_model.stream_node.clone(),
582 parallelism_policy,
583 };
584
585 Ok(Some((fragment, upstreams)))
586 }
587
588 pub async fn list_fragment_database_ids(
589 &self,
590 select_fragment_ids: Option<Vec<FragmentId>>,
591 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
592 let inner = self.inner.read().await;
593 let select = FragmentModel::find()
594 .select_only()
595 .column(fragment::Column::FragmentId)
596 .column(object::Column::DatabaseId)
597 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
598 let select = if let Some(select_fragment_ids) = select_fragment_ids {
599 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
600 } else {
601 select
602 };
603 Ok(select.into_tuple().all(&inner.db).await?)
604 }
605
606 pub async fn get_job_fragments_by_id(
607 &self,
608 job_id: JobId,
609 ) -> MetaResult<(
610 StreamJobFragments,
611 HashMap<FragmentId, Vec<StreamActor>>,
612 HashMap<ActorId, PbActorStatus>,
613 )> {
614 let inner = self.inner.read().await;
615
616 let fragments: Vec<_> = FragmentModel::find()
618 .filter(fragment::Column::JobId.eq(job_id))
619 .all(&inner.db)
620 .await?;
621
622 let job_info = StreamingJob::find_by_id(job_id)
623 .one(&inner.db)
624 .await?
625 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
626
627 let fragment_actors =
628 self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
629
630 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
631 .await?
632 .remove(&job_id);
633
634 Self::compose_table_fragments(
635 job_id,
636 job_info.job_status.into(),
637 job_info.stream_context(),
638 fragment_actors,
639 job_info.max_parallelism as _,
640 job_definition,
641 )
642 }
643
644 pub async fn get_fragment_actor_dispatchers(
645 &self,
646 fragment_ids: Vec<FragmentId>,
647 ) -> MetaResult<FragmentActorDispatchers> {
648 let inner = self.inner.read().await;
649
650 self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
651 .await
652 }
653
654 pub async fn get_fragment_actor_dispatchers_txn(
655 &self,
656 c: &impl ConnectionTrait,
657 fragment_ids: Vec<FragmentId>,
658 ) -> MetaResult<FragmentActorDispatchers> {
659 let fragment_relations = FragmentRelation::find()
660 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
661 .all(c)
662 .await?;
663
664 type FragmentActorInfo = (
665 DistributionType,
666 Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
667 );
668
669 let shared_info = self.env.shared_actor_infos();
670 let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
671 let get_fragment_actors = |fragment_id: FragmentId| async move {
672 let result: MetaResult<FragmentActorInfo> = try {
673 let read_guard = shared_info.read_guard();
674
675 let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
676
677 (
678 fragment.distribution_type,
679 Arc::new(
680 fragment
681 .actors
682 .iter()
683 .map(|(actor_id, actor_info)| {
684 (
685 *actor_id,
686 actor_info
687 .vnode_bitmap
688 .as_ref()
689 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
690 )
691 })
692 .collect(),
693 ),
694 )
695 };
696 result
697 };
698
699 let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
700 for fragment_relation::Model {
701 source_fragment_id,
702 target_fragment_id,
703 dispatcher_type,
704 dist_key_indices,
705 output_indices,
706 output_type_mapping,
707 } in fragment_relations
708 {
709 let (source_fragment_distribution, source_fragment_actors) = {
710 let (distribution, actors) = {
711 match fragment_actor_cache.entry(source_fragment_id) {
712 Entry::Occupied(entry) => entry.into_mut(),
713 Entry::Vacant(entry) => {
714 entry.insert(get_fragment_actors(source_fragment_id).await?)
715 }
716 }
717 };
718 (*distribution, actors.clone())
719 };
720 let (target_fragment_distribution, target_fragment_actors) = {
721 let (distribution, actors) = {
722 match fragment_actor_cache.entry(target_fragment_id) {
723 Entry::Occupied(entry) => entry.into_mut(),
724 Entry::Vacant(entry) => {
725 entry.insert(get_fragment_actors(target_fragment_id).await?)
726 }
727 }
728 };
729 (*distribution, actors.clone())
730 };
731 let output_mapping = PbDispatchOutputMapping {
732 indices: output_indices.into_u32_array(),
733 types: output_type_mapping.unwrap_or_default().to_protobuf(),
734 };
735 let (dispatchers, _) = compose_dispatchers(
736 source_fragment_distribution,
737 &source_fragment_actors,
738 target_fragment_id as _,
739 target_fragment_distribution,
740 &target_fragment_actors,
741 dispatcher_type,
742 dist_key_indices.into_u32_array(),
743 output_mapping,
744 );
745 let actor_dispatchers_map = actor_dispatchers_map
746 .entry(source_fragment_id as _)
747 .or_default();
748 for (actor_id, dispatchers) in dispatchers {
749 actor_dispatchers_map
750 .entry(actor_id as _)
751 .or_default()
752 .push(dispatchers);
753 }
754 }
755 Ok(actor_dispatchers_map)
756 }
757
758 pub async fn get_fragment_downstream_relations(
759 &self,
760 fragment_ids: Vec<FragmentId>,
761 ) -> MetaResult<FragmentDownstreamRelation> {
762 let inner = self.inner.read().await;
763 self.get_fragment_downstream_relations_in_txn(&inner.db, fragment_ids)
764 .await
765 }
766
767 pub async fn get_fragment_downstream_relations_in_txn<C>(
768 &self,
769 txn: &C,
770 fragment_ids: Vec<FragmentId>,
771 ) -> MetaResult<FragmentDownstreamRelation>
772 where
773 C: ConnectionTrait + StreamTrait + Send,
774 {
775 let mut stream = FragmentRelation::find()
776 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
777 .stream(txn)
778 .await?;
779 let mut relations = FragmentDownstreamRelation::new();
780 while let Some(relation) = stream.try_next().await? {
781 relations
782 .entry(relation.source_fragment_id as _)
783 .or_default()
784 .push(DownstreamFragmentRelation {
785 downstream_fragment_id: relation.target_fragment_id as _,
786 dispatcher_type: relation.dispatcher_type,
787 dist_key_indices: relation.dist_key_indices.into_u32_array(),
788 output_mapping: PbDispatchOutputMapping {
789 indices: relation.output_indices.into_u32_array(),
790 types: relation
791 .output_type_mapping
792 .unwrap_or_default()
793 .to_protobuf(),
794 },
795 });
796 }
797 Ok(relations)
798 }
799
800 pub async fn get_job_fragment_backfill_scan_type(
801 &self,
802 job_id: JobId,
803 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
804 let inner = self.inner.read().await;
805 self.get_job_fragment_backfill_scan_type_in_txn(&inner.db, job_id)
806 .await
807 }
808
809 pub async fn get_job_fragment_backfill_scan_type_in_txn<C>(
810 &self,
811 txn: &C,
812 job_id: JobId,
813 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>>
814 where
815 C: ConnectionTrait + Send,
816 {
817 let fragments: Vec<_> = FragmentModel::find()
818 .filter(fragment::Column::JobId.eq(job_id))
819 .all(txn)
820 .await?;
821
822 let mut result = HashMap::new();
823
824 for fragment::Model {
825 fragment_id,
826 stream_node,
827 ..
828 } in fragments
829 {
830 let stream_node = stream_node.to_protobuf();
831 visit_stream_node_body(&stream_node, |body| {
832 if let NodeBody::StreamScan(node) = body {
833 match node.stream_scan_type() {
834 StreamScanType::Unspecified => {}
835 scan_type => {
836 result.insert(fragment_id as crate::model::FragmentId, scan_type);
837 }
838 }
839 }
840 });
841 }
842
843 Ok(result)
844 }
845
846 pub async fn count_streaming_jobs(&self) -> MetaResult<usize> {
847 let inner = self.inner.read().await;
848 let count = StreamingJob::find().count(&inner.db).await?;
849 Ok(usize::try_from(count).context("streaming job count overflow")?)
850 }
851
852 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
853 let inner = self.inner.read().await;
854 let job_states = StreamingJob::find()
855 .select_only()
856 .column(streaming_job::Column::JobId)
857 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
858 .join(JoinType::InnerJoin, object::Relation::Database2.def())
859 .column(object::Column::ObjType)
860 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
861 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
862 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
863 .column_as(
864 Expr::if_null(
865 Expr::col((table::Entity, table::Column::Name)),
866 Expr::if_null(
867 Expr::col((source::Entity, source::Column::Name)),
868 Expr::if_null(
869 Expr::col((sink::Entity, sink::Column::Name)),
870 Expr::val("<unknown>"),
871 ),
872 ),
873 ),
874 "name",
875 )
876 .columns([
877 streaming_job::Column::JobStatus,
878 streaming_job::Column::Parallelism,
879 streaming_job::Column::AdaptiveParallelismStrategy,
880 streaming_job::Column::BackfillParallelism,
881 streaming_job::Column::BackfillAdaptiveParallelismStrategy,
882 streaming_job::Column::MaxParallelism,
883 ])
884 .column_as(
885 Expr::if_null(
886 Expr::col((
887 streaming_job::Entity,
888 streaming_job::Column::SpecificResourceGroup,
889 )),
890 Expr::col((database::Entity, database::Column::ResourceGroup)),
891 ),
892 "resource_group",
893 )
894 .column_as(
895 Expr::if_null(
896 Expr::col((streaming_job::Entity, streaming_job::Column::ConfigOverride)),
897 Expr::val(""),
898 ),
899 "config_override",
900 )
901 .column(object::Column::DatabaseId)
902 .column(object::Column::SchemaId)
903 .into_model()
904 .all(&inner.db)
905 .await?;
906 Ok(job_states)
907 }
908
909 pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
910 let inner = self.inner.read().await;
911 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
912 .select_only()
913 .column(streaming_job::Column::MaxParallelism)
914 .into_tuple()
915 .one(&inner.db)
916 .await?
917 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
918 Ok(max_parallelism as usize)
919 }
920
921 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
923 if let Ok(inner) = self.inner.try_read()
924 && let Ok(job_state_tables) = FragmentModel::find()
925 .select_only()
926 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
927 .into_tuple::<(JobId, I32Array)>()
928 .all(&inner.db)
929 .await
930 {
931 let mut job_internal_table_ids = HashMap::new();
932 for (job_id, state_table_ids) in job_state_tables {
933 job_internal_table_ids
934 .entry(job_id)
935 .or_insert_with(Vec::new)
936 .extend(
937 state_table_ids
938 .into_inner()
939 .into_iter()
940 .map(|table_id| TableId::new(table_id as _)),
941 );
942 }
943 return Some(job_internal_table_ids.into_iter().collect());
944 }
945 None
946 }
947
948 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
949 let inner = self.inner.read().await;
950 let count = FragmentModel::find().count(&inner.db).await?;
951 Ok(count > 0)
952 }
953
954 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
955 let read_guard = self.env.shared_actor_infos().read_guard();
956 let actor_cnt: HashMap<WorkerId, _> = read_guard
957 .iter_over_fragments()
958 .flat_map(|(_, fragment)| {
959 fragment
960 .actors
961 .iter()
962 .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
963 })
964 .into_group_map()
965 .into_iter()
966 .map(|(k, v)| (k, v.len()))
967 .collect();
968
969 Ok(actor_cnt)
970 }
971
972 fn collect_fragment_actor_map(
973 &self,
974 fragment_ids: &[FragmentId],
975 stream_context: StreamContext,
976 ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
977 let guard = self.env.shared_actor_infos().read_guard();
978 let pb_expr_context = stream_context.to_expr_context();
979 let expr_context: ExprContext = (&pb_expr_context).into();
980
981 let mut actor_map = HashMap::with_capacity(fragment_ids.len());
982 for fragment_id in fragment_ids {
983 let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
984 anyhow!("fragment {} not found in shared actor info", fragment_id)
985 })?;
986
987 let actors = fragment_info
988 .actors
989 .iter()
990 .map(|(actor_id, actor_info)| ActorInfo {
991 actor_id: *actor_id as _,
992 fragment_id: *fragment_id,
993 splits: ConnectorSplits::from(&PbConnectorSplits {
994 splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
995 }),
996 worker_id: actor_info.worker_id as _,
997 vnode_bitmap: actor_info
998 .vnode_bitmap
999 .as_ref()
1000 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
1001 expr_context: expr_context.clone(),
1002 config_override: stream_context.config_override.clone(),
1003 })
1004 .collect();
1005
1006 actor_map.insert(*fragment_id, actors);
1007 }
1008
1009 Ok(actor_map)
1010 }
1011
1012 fn collect_fragment_actor_pairs(
1013 &self,
1014 fragments: Vec<fragment::Model>,
1015 stream_context: StreamContext,
1016 ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
1017 let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
1018 let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
1019 fragments
1020 .into_iter()
1021 .map(|fragment| {
1022 let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
1023 anyhow!(
1024 "fragment {} missing in shared actor info map",
1025 fragment.fragment_id
1026 )
1027 })?;
1028 Ok((fragment, actors))
1029 })
1030 .collect()
1031 }
1032
1033 pub async fn table_fragments(
1035 &self,
1036 ) -> MetaResult<
1037 BTreeMap<
1038 JobId,
1039 (
1040 StreamJobFragments,
1041 HashMap<FragmentId, Vec<StreamActor>>,
1042 HashMap<crate::model::ActorId, PbActorStatus>,
1043 ),
1044 >,
1045 > {
1046 let inner = self.inner.read().await;
1047 let jobs = StreamingJob::find().all(&inner.db).await?;
1048
1049 let mut job_definition = resolve_streaming_job_definition(
1050 &inner.db,
1051 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
1052 )
1053 .await?;
1054
1055 let mut table_fragments = BTreeMap::new();
1056 for job in jobs {
1057 let fragments = FragmentModel::find()
1058 .filter(fragment::Column::JobId.eq(job.job_id))
1059 .all(&inner.db)
1060 .await?;
1061
1062 let fragment_actors =
1063 self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
1064
1065 table_fragments.insert(
1066 job.job_id,
1067 Self::compose_table_fragments(
1068 job.job_id,
1069 job.job_status.into(),
1070 job.stream_context(),
1071 fragment_actors,
1072 job.max_parallelism as _,
1073 job_definition.remove(&job.job_id),
1074 )?,
1075 );
1076 }
1077
1078 Ok(table_fragments)
1079 }
1080
1081 pub async fn upstream_fragments(
1082 &self,
1083 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1084 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
1085 let inner = self.inner.read().await;
1086 self.upstream_fragments_in_txn(&inner.db, fragment_ids)
1087 .await
1088 }
1089
1090 pub async fn upstream_fragments_in_txn<C>(
1091 &self,
1092 txn: &C,
1093 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1094 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>>
1095 where
1096 C: ConnectionTrait + StreamTrait + Send,
1097 {
1098 let mut stream = FragmentRelation::find()
1099 .select_only()
1100 .columns([
1101 fragment_relation::Column::SourceFragmentId,
1102 fragment_relation::Column::TargetFragmentId,
1103 ])
1104 .filter(
1105 fragment_relation::Column::TargetFragmentId
1106 .is_in(fragment_ids.map(|id| id as FragmentId)),
1107 )
1108 .into_tuple::<(FragmentId, FragmentId)>()
1109 .stream(txn)
1110 .await?;
1111 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
1112 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
1113 upstream_fragments
1114 .entry(downstream_fragment_id as crate::model::FragmentId)
1115 .or_default()
1116 .insert(upstream_fragment_id as crate::model::FragmentId);
1117 }
1118 Ok(upstream_fragments)
1119 }
1120
1121 pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
1122 let info = self.env.shared_actor_infos().read_guard();
1123
1124 let actor_locations = info
1125 .iter_over_fragments()
1126 .flat_map(|(fragment_id, fragment)| {
1127 fragment
1128 .actors
1129 .iter()
1130 .map(|(actor_id, actor)| PartialActorLocation {
1131 actor_id: *actor_id as _,
1132 fragment_id: *fragment_id as _,
1133 worker_id: actor.worker_id,
1134 })
1135 })
1136 .collect_vec();
1137
1138 Ok(actor_locations)
1139 }
1140
1141 pub async fn list_actor_info(
1142 &self,
1143 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1144 let inner = self.inner.read().await;
1145
1146 let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1147 FragmentModel::find()
1148 .select_only()
1149 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1150 .column(fragment::Column::FragmentId)
1151 .column_as(object::Column::Oid, "job_id")
1152 .column_as(object::Column::SchemaId, "schema_id")
1153 .column_as(object::Column::ObjType, "type")
1154 .into_tuple()
1155 .all(&inner.db)
1156 .await?;
1157
1158 let actor_infos = {
1159 let info = self.env.shared_actor_infos().read_guard();
1160
1161 let mut result = Vec::new();
1162
1163 for (fragment_id, object_id, schema_id, object_type) in fragment_objects {
1164 let Some(fragment) = info.get_fragment(fragment_id as _) else {
1165 return Err(MetaError::unavailable(format!(
1166 "shared actor info missing for fragment {fragment_id} while listing actors"
1167 )));
1168 };
1169
1170 for actor_id in fragment.actors.keys() {
1171 result.push((
1172 *actor_id as _,
1173 fragment.fragment_id as _,
1174 object_id,
1175 schema_id,
1176 object_type,
1177 ));
1178 }
1179 }
1180
1181 result
1182 };
1183
1184 Ok(actor_infos)
1185 }
1186
1187 pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1188 let guard = self.env.shared_actor_info.read_guard();
1189 guard
1190 .iter_over_fragments()
1191 .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1192 .collect_vec()
1193 }
1194
1195 pub async fn list_fragment_descs_with_node(
1196 &self,
1197 is_creating: bool,
1198 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1199 let inner = self.inner.read().await;
1200 let txn = inner.db.begin().await?;
1201
1202 let fragments_query = Self::build_fragment_query(is_creating);
1203 let fragments: Vec<fragment::Model> = fragments_query.all(&txn).await?;
1204
1205 let rows = fragments
1206 .into_iter()
1207 .map(|fragment| FragmentDescRow {
1208 fragment_id: fragment.fragment_id,
1209 job_id: fragment.job_id,
1210 fragment_type_mask: fragment.fragment_type_mask,
1211 distribution_type: fragment.distribution_type,
1212 state_table_ids: fragment.state_table_ids,
1213 vnode_count: fragment.vnode_count,
1214 parallelism_override: fragment.parallelism,
1215 stream_node: Some(fragment.stream_node),
1216 })
1217 .collect_vec();
1218
1219 self.build_fragment_distributions(&txn, rows).await
1220 }
1221
1222 pub async fn list_fragment_descs_without_node(
1223 &self,
1224 is_creating: bool,
1225 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1226 let inner = self.inner.read().await;
1227 let txn = inner.db.begin().await?;
1228
1229 let fragments_query = Self::build_fragment_query(is_creating);
1230 #[expect(clippy::type_complexity)]
1231 let fragments: Vec<(
1232 FragmentId,
1233 JobId,
1234 i32,
1235 DistributionType,
1236 TableIdArray,
1237 i32,
1238 Option<StreamingParallelism>,
1239 )> = fragments_query
1240 .select_only()
1241 .columns([
1242 fragment::Column::FragmentId,
1243 fragment::Column::JobId,
1244 fragment::Column::FragmentTypeMask,
1245 fragment::Column::DistributionType,
1246 fragment::Column::StateTableIds,
1247 fragment::Column::VnodeCount,
1248 fragment::Column::Parallelism,
1249 ])
1250 .into_tuple()
1251 .all(&txn)
1252 .await?;
1253
1254 let rows = fragments
1255 .into_iter()
1256 .map(
1257 |(
1258 fragment_id,
1259 job_id,
1260 fragment_type_mask,
1261 distribution_type,
1262 state_table_ids,
1263 vnode_count,
1264 parallelism_override,
1265 )| FragmentDescRow {
1266 fragment_id,
1267 job_id,
1268 fragment_type_mask,
1269 distribution_type,
1270 state_table_ids,
1271 vnode_count,
1272 parallelism_override,
1273 stream_node: None,
1274 },
1275 )
1276 .collect_vec();
1277
1278 self.build_fragment_distributions(&txn, rows).await
1279 }
1280
1281 fn build_fragment_query(is_creating: bool) -> sea_orm::Select<fragment::Entity> {
1282 if is_creating {
1283 FragmentModel::find()
1284 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1285 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1286 .filter(
1287 streaming_job::Column::JobStatus
1288 .eq(JobStatus::Initial)
1289 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1290 )
1291 } else {
1292 FragmentModel::find()
1293 }
1294 }
1295
1296 async fn build_fragment_distributions(
1297 &self,
1298 txn: &DatabaseTransaction,
1299 rows: Vec<FragmentDescRow>,
1300 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1301 let fragment_ids = rows.iter().map(|row| row.fragment_id).collect_vec();
1302 let job_ids = rows.iter().map(|row| row.job_id).unique().collect_vec();
1303
1304 let job_parallelisms: HashMap<JobId, (StreamingParallelism, Option<String>)> =
1305 if fragment_ids.is_empty() {
1306 HashMap::new()
1307 } else {
1308 StreamingJob::find()
1309 .select_only()
1310 .columns([
1311 streaming_job::Column::JobId,
1312 streaming_job::Column::Parallelism,
1313 streaming_job::Column::AdaptiveParallelismStrategy,
1314 ])
1315 .filter(streaming_job::Column::JobId.is_in(job_ids))
1316 .into_tuple::<(JobId, StreamingParallelism, Option<String>)>()
1317 .all(txn)
1318 .await?
1319 .into_iter()
1320 .map(|(job_id, parallelism, strategy)| (job_id, (parallelism, strategy)))
1321 .collect()
1322 };
1323
1324 let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1325 if fragment_ids.is_empty() {
1326 Vec::new()
1327 } else {
1328 FragmentRelation::find()
1329 .select_only()
1330 .columns([
1331 fragment_relation::Column::TargetFragmentId,
1332 fragment_relation::Column::SourceFragmentId,
1333 fragment_relation::Column::DispatcherType,
1334 ])
1335 .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1336 .into_tuple()
1337 .all(txn)
1338 .await?
1339 };
1340
1341 let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1342 for (target_id, source_id, _) in upstream_entries {
1343 all_upstreams.entry(target_id).or_default().push(source_id);
1344 }
1345
1346 let root_fragment_map = if fragment_ids.is_empty() {
1347 HashMap::new()
1348 } else {
1349 let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_ids).await?;
1350 Self::collect_root_fragment_mapping(ensembles)
1351 };
1352
1353 let guard = self.env.shared_actor_info.read_guard();
1354 let mut result = Vec::with_capacity(rows.len());
1355
1356 for row in rows {
1357 let parallelism = guard
1358 .get_fragment(row.fragment_id as _)
1359 .map(|fragment| fragment.actors.len())
1360 .unwrap_or_default();
1361
1362 let root_fragments = root_fragment_map
1363 .get(&row.fragment_id)
1364 .cloned()
1365 .unwrap_or_default();
1366
1367 let upstreams = all_upstreams.remove(&row.fragment_id).unwrap_or_default();
1368
1369 let parallelism_policy = Self::format_fragment_parallelism_policy(
1370 row.distribution_type,
1371 row.parallelism_override.as_ref(),
1372 job_parallelisms
1373 .get(&row.job_id)
1374 .map(|(parallelism, _)| parallelism),
1375 job_parallelisms
1376 .get(&row.job_id)
1377 .and_then(|(_, strategy)| strategy.as_deref()),
1378 &root_fragments,
1379 );
1380
1381 let fragment = FragmentDistribution {
1382 fragment_id: row.fragment_id,
1383 table_id: row.job_id,
1384 distribution_type: PbFragmentDistributionType::from(row.distribution_type) as _,
1385 state_table_ids: row.state_table_ids.0,
1386 upstream_fragment_ids: upstreams.clone(),
1387 fragment_type_mask: row.fragment_type_mask as _,
1388 parallelism: parallelism as _,
1389 vnode_count: row.vnode_count as _,
1390 node: row.stream_node.map(|node| node.to_protobuf()),
1391 parallelism_policy,
1392 };
1393
1394 result.push((fragment, upstreams));
1395 }
1396
1397 Ok(result)
1398 }
1399
1400 pub async fn list_sink_log_store_tables(&self) -> MetaResult<Vec<(SinkId, TableId)>> {
1401 let inner = self.inner.read().await;
1402 let txn = inner.db.begin().await?;
1403
1404 let fragments: Vec<(JobId, StreamNode)> = FragmentModel::find()
1405 .select_only()
1406 .columns([fragment::Column::JobId, fragment::Column::StreamNode])
1407 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Sink))
1408 .into_tuple()
1409 .all(&txn)
1410 .await?;
1411
1412 let mut mapping: HashMap<SinkId, TableId> = HashMap::new();
1413
1414 for (job_id, stream_node) in fragments {
1415 let sink_id = SinkId::new(job_id.as_raw_id());
1416 let stream_node = stream_node.to_protobuf();
1417 let mut internal_table_id: Option<TableId> = None;
1418
1419 visit_stream_node_body(&stream_node, |body| {
1420 if let NodeBody::Sink(node) = body
1421 && node.log_store_type == SinkLogStoreType::KvLogStore as i32
1422 && let Some(table) = &node.table
1423 {
1424 internal_table_id = Some(TableId::new(table.id.as_raw_id()));
1425 }
1426 });
1427
1428 if let Some(table_id) = internal_table_id
1429 && let Some(existing) = mapping.insert(sink_id, table_id)
1430 && existing != table_id
1431 {
1432 tracing::warn!(
1433 "sink {sink_id:?} has multiple log store tables: {existing:?} vs {table_id:?}"
1434 );
1435 }
1436 }
1437
1438 Ok(mapping.into_iter().collect())
1439 }
1440
1441 fn collect_root_fragment_mapping(
1443 ensembles: Vec<NoShuffleEnsemble>,
1444 ) -> HashMap<FragmentId, Vec<FragmentId>> {
1445 let mut mapping = HashMap::new();
1446
1447 for ensemble in ensembles {
1448 let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1449 roots.sort_unstable();
1450 roots.dedup();
1451
1452 if roots.is_empty() {
1453 continue;
1454 }
1455
1456 let root_set: HashSet<_> = roots.iter().copied().collect();
1457
1458 for fragment_id in ensemble.component_fragments() {
1459 if root_set.contains(&fragment_id) {
1460 mapping.insert(fragment_id, Vec::new());
1461 } else {
1462 mapping.insert(fragment_id, roots.clone());
1463 }
1464 }
1465 }
1466
1467 mapping
1468 }
1469
1470 fn format_fragment_parallelism_policy(
1471 distribution_type: DistributionType,
1472 fragment_parallelism: Option<&StreamingParallelism>,
1473 job_parallelism: Option<&StreamingParallelism>,
1474 job_adaptive_parallelism_strategy: Option<&str>,
1475 root_fragments: &[FragmentId],
1476 ) -> String {
1477 if distribution_type == DistributionType::Single {
1478 return "single".to_owned();
1479 }
1480
1481 if let Some(parallelism) = fragment_parallelism {
1482 return format!(
1483 "override({})",
1484 Self::format_streaming_parallelism(parallelism, job_adaptive_parallelism_strategy)
1485 );
1486 }
1487
1488 if !root_fragments.is_empty() {
1489 let mut upstreams = root_fragments.to_vec();
1490 upstreams.sort_unstable();
1491 upstreams.dedup();
1492
1493 return format!("upstream_fragment({upstreams:?})");
1494 }
1495
1496 let inherited = job_parallelism
1497 .map(|parallelism| {
1498 Self::format_streaming_parallelism(parallelism, job_adaptive_parallelism_strategy)
1499 })
1500 .unwrap_or_else(|| "unknown".to_owned());
1501 format!("inherit({inherited})")
1502 }
1503
1504 fn format_streaming_parallelism(
1505 parallelism: &StreamingParallelism,
1506 adaptive_parallelism_strategy: Option<&str>,
1507 ) -> String {
1508 match parallelism {
1509 StreamingParallelism::Adaptive => adaptive_parallelism_strategy
1510 .and_then(Self::format_adaptive_parallelism_strategy)
1511 .unwrap_or_else(|| "adaptive".to_owned()),
1512 StreamingParallelism::Fixed(n) => n.to_string(),
1513 StreamingParallelism::Custom => adaptive_parallelism_strategy
1514 .and_then(Self::format_adaptive_parallelism_strategy)
1515 .unwrap_or_else(|| "custom".to_owned()),
1516 }
1517 }
1518
1519 fn format_adaptive_parallelism_strategy(strategy: &str) -> Option<String> {
1520 parse_strategy(strategy)
1521 .ok()
1522 .map(|strategy| match strategy {
1523 AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full => {
1524 "adaptive".to_owned()
1525 }
1526 AdaptiveParallelismStrategy::Bounded(n) => format!("bounded({n})"),
1527 AdaptiveParallelismStrategy::Ratio(r) => format!("ratio({r})"),
1528 })
1529 }
1530
1531 pub async fn list_sink_actor_mapping(
1532 &self,
1533 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1534 let inner = self.inner.read().await;
1535 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1536 .select_only()
1537 .columns([sink::Column::SinkId, sink::Column::Name])
1538 .into_tuple()
1539 .all(&inner.db)
1540 .await?;
1541 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1542
1543 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1544
1545 let actor_with_type: Vec<(ActorId, SinkId)> = {
1546 let info = self.env.shared_actor_infos().read_guard();
1547
1548 info.iter_over_fragments()
1549 .filter(|(_, fragment)| {
1550 sink_ids.contains(&fragment.job_id.as_sink_id())
1551 && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1552 })
1553 .flat_map(|(_, fragment)| {
1554 fragment
1555 .actors
1556 .keys()
1557 .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1558 })
1559 .collect()
1560 };
1561
1562 let mut sink_actor_mapping = HashMap::new();
1563 for (actor_id, sink_id) in actor_with_type {
1564 sink_actor_mapping
1565 .entry(sink_id)
1566 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1567 .1
1568 .push(actor_id);
1569 }
1570
1571 Ok(sink_actor_mapping)
1572 }
1573
1574 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1575 let inner = self.inner.read().await;
1576 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1577 .select_only()
1578 .columns([
1579 fragment::Column::FragmentId,
1580 fragment::Column::JobId,
1581 fragment::Column::StateTableIds,
1582 ])
1583 .into_partial_model()
1584 .all(&inner.db)
1585 .await?;
1586 Ok(fragment_state_tables)
1587 }
1588
1589 pub async fn load_all_actors_dynamic(
1592 &self,
1593 database_id: Option<DatabaseId>,
1594 worker_nodes: &ActiveStreamingWorkerNodes,
1595 ) -> MetaResult<FragmentRenderMap> {
1596 let loaded = self.load_fragment_context(database_id).await?;
1597
1598 if loaded.is_empty() {
1599 return Ok(HashMap::new());
1600 }
1601
1602 let RenderedGraph { fragments, .. } = render_actor_assignments(
1603 self.env.actor_id_generator(),
1604 worker_nodes.current(),
1605 &loaded,
1606 )?;
1607
1608 tracing::trace!(?fragments, "reload all actors");
1609
1610 Ok(fragments)
1611 }
1612
1613 pub async fn load_fragment_context(
1615 &self,
1616 database_id: Option<DatabaseId>,
1617 ) -> MetaResult<LoadedFragmentContext> {
1618 let inner = self.inner.read().await;
1619 let txn = inner.db.begin().await?;
1620
1621 self.load_fragment_context_in_txn(&txn, database_id).await
1622 }
1623
1624 pub async fn load_fragment_context_in_txn<C>(
1625 &self,
1626 txn: &C,
1627 database_id: Option<DatabaseId>,
1628 ) -> MetaResult<LoadedFragmentContext>
1629 where
1630 C: ConnectionTrait,
1631 {
1632 let mut query = StreamingJob::find()
1633 .select_only()
1634 .column(streaming_job::Column::JobId);
1635
1636 if let Some(database_id) = database_id {
1637 query = query
1638 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1639 .filter(object::Column::DatabaseId.eq(database_id));
1640 }
1641
1642 let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
1643
1644 let jobs: HashSet<JobId> = jobs.into_iter().collect();
1645
1646 if jobs.is_empty() {
1647 return Ok(LoadedFragmentContext::default());
1648 }
1649
1650 load_fragment_context_for_jobs(txn, jobs).await
1651 }
1652
1653 #[await_tree::instrument]
1654 pub async fn fill_snapshot_backfill_epoch(
1655 &self,
1656 fragment_ids: impl Iterator<Item = FragmentId>,
1657 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1658 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1659 ) -> MetaResult<()> {
1660 let inner = self.inner.write().await;
1661 let txn = inner.db.begin().await?;
1662 for fragment_id in fragment_ids {
1663 let fragment = FragmentModel::find_by_id(fragment_id)
1664 .one(&txn)
1665 .await?
1666 .context(format!("fragment {} not found", fragment_id))?;
1667 let mut node = fragment.stream_node.to_protobuf();
1668 if crate::stream::fill_snapshot_backfill_epoch(
1669 &mut node,
1670 snapshot_backfill_info,
1671 cross_db_snapshot_backfill_info,
1672 )? {
1673 let node = StreamNode::from(&node);
1674 FragmentModel::update(fragment::ActiveModel {
1675 fragment_id: Set(fragment_id),
1676 stream_node: Set(node),
1677 ..Default::default()
1678 })
1679 .exec(&txn)
1680 .await?;
1681 }
1682 }
1683 txn.commit().await?;
1684 Ok(())
1685 }
1686
1687 pub fn get_running_actors_of_fragment(
1689 &self,
1690 fragment_id: FragmentId,
1691 ) -> MetaResult<HashSet<model::ActorId>> {
1692 let info = self.env.shared_actor_infos().read_guard();
1693
1694 let actors = info
1695 .get_fragment(fragment_id as _)
1696 .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1697 .unwrap_or_default();
1698
1699 Ok(actors)
1700 }
1701
1702 pub async fn get_running_actors_for_source_backfill(
1705 &self,
1706 source_backfill_fragment_id: FragmentId,
1707 source_fragment_id: FragmentId,
1708 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1709 let inner = self.inner.read().await;
1710 let txn = inner.db.begin().await?;
1711
1712 let fragment_relation: DispatcherType = FragmentRelation::find()
1713 .select_only()
1714 .column(fragment_relation::Column::DispatcherType)
1715 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1716 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1717 .into_tuple()
1718 .one(&txn)
1719 .await?
1720 .ok_or_else(|| {
1721 anyhow!(
1722 "no fragment connection from source fragment {} to source backfill fragment {}",
1723 source_fragment_id,
1724 source_backfill_fragment_id
1725 )
1726 })?;
1727
1728 if fragment_relation != DispatcherType::NoShuffle {
1729 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1730 }
1731
1732 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1733 let result: MetaResult<DistributionType> = try {
1734 FragmentModel::find_by_id(fragment_id)
1735 .select_only()
1736 .column(fragment::Column::DistributionType)
1737 .into_tuple()
1738 .one(txn)
1739 .await
1740 .map_err(MetaError::from)?
1741 .ok_or_else(|| {
1742 MetaError::from(anyhow!("failed to find fragment: {}", fragment_id))
1743 })?
1744 };
1745 result
1746 };
1747
1748 let source_backfill_distribution_type =
1749 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1750 let source_distribution_type =
1751 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1752
1753 let load_fragment_actor_distribution =
1754 |actor_info: &SharedActorInfos,
1755 fragment_id: FragmentId|
1756 -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1757 let guard = actor_info.read_guard();
1758
1759 guard
1760 .get_fragment(fragment_id as _)
1761 .map(|fragment| {
1762 fragment
1763 .actors
1764 .iter()
1765 .map(|(actor_id, actor)| {
1766 (
1767 *actor_id as _,
1768 actor
1769 .vnode_bitmap
1770 .as_ref()
1771 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1772 )
1773 })
1774 .collect()
1775 })
1776 .unwrap_or_default()
1777 };
1778
1779 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1780 load_fragment_actor_distribution(
1781 self.env.shared_actor_infos(),
1782 source_backfill_fragment_id,
1783 );
1784
1785 let source_actors =
1786 load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1787
1788 Ok(resolve_no_shuffle_actor_mapping(
1789 source_distribution_type,
1790 source_actors.iter().map(|(&id, bitmap)| (id, bitmap)),
1791 source_backfill_distribution_type,
1792 source_backfill_actors
1793 .iter()
1794 .map(|(&id, bitmap)| (id, bitmap)),
1795 )
1796 .into_iter()
1797 .map(|(source_actor, source_backfill_actor)| {
1798 (source_backfill_actor as _, source_actor as _)
1799 })
1800 .collect())
1801 }
1802
1803 pub async fn get_root_fragments(
1816 &self,
1817 job_ids: Vec<JobId>,
1818 ) -> MetaResult<HashMap<JobId, Fragment>> {
1819 let inner = self.inner.read().await;
1820
1821 let all_fragments = FragmentModel::find()
1822 .filter(fragment::Column::JobId.is_in(job_ids))
1823 .all(&inner.db)
1824 .await?;
1825 let mut root_fragments = HashMap::<JobId, Fragment>::new();
1827 for fragment in all_fragments {
1828 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1829 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1830 _ = root_fragments.insert(fragment.job_id, fragment.into());
1831 } else if mask.contains(FragmentTypeFlag::Source) {
1832 _ = root_fragments.try_insert(fragment.job_id, fragment.into());
1835 }
1836 }
1837
1838 Ok(root_fragments)
1839 }
1840
1841 pub async fn get_root_fragment(&self, job_id: JobId) -> MetaResult<Fragment> {
1842 let mut root_fragments = self.get_root_fragments(vec![job_id]).await?;
1843 let root_fragment = root_fragments
1844 .remove(&job_id)
1845 .context(format!("root fragment for job {} not found", job_id))?;
1846
1847 Ok(root_fragment)
1848 }
1849
1850 pub async fn get_downstream_fragments(
1852 &self,
1853 job_id: JobId,
1854 ) -> MetaResult<Vec<(stream_plan::DispatcherType, Fragment)>> {
1855 let root_fragment = self.get_root_fragment(job_id).await?;
1856
1857 let inner = self.inner.read().await;
1858 let txn = inner.db.begin().await?;
1859 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1860 .filter(
1861 fragment_relation::Column::SourceFragmentId
1862 .eq(root_fragment.fragment_id as FragmentId),
1863 )
1864 .all(&txn)
1865 .await?;
1866
1867 let downstream_fragment_ids = downstream_fragment_relations
1868 .iter()
1869 .map(|model| model.target_fragment_id as FragmentId)
1870 .collect::<HashSet<_>>();
1871
1872 let downstream_fragments: Vec<fragment::Model> = FragmentModel::find()
1873 .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1874 .all(&txn)
1875 .await?;
1876
1877 let mut downstream_fragments_map: HashMap<_, _> = downstream_fragments
1878 .into_iter()
1879 .map(|fragment| (fragment.fragment_id, fragment))
1880 .collect();
1881
1882 let mut downstream_fragments = vec![];
1883
1884 let fragment_map: HashMap<_, _> = downstream_fragment_relations
1885 .iter()
1886 .map(|model| (model.target_fragment_id, model.dispatcher_type))
1887 .collect();
1888
1889 for (fragment_id, dispatcher_type) in fragment_map {
1890 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1891
1892 let fragment = downstream_fragments_map
1893 .remove(&fragment_id)
1894 .context(format!(
1895 "downstream fragment node for id {} not found",
1896 fragment_id
1897 ))?
1898 .into();
1899
1900 downstream_fragments.push((dispatch_type, fragment));
1901 }
1902 Ok(downstream_fragments)
1903 }
1904
1905 pub async fn load_source_fragment_ids(
1906 &self,
1907 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1908 let inner = self.inner.read().await;
1909 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1910 .select_only()
1911 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1912 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1913 .into_tuple()
1914 .all(&inner.db)
1915 .await?;
1916
1917 let mut source_fragment_ids = HashMap::new();
1918 for (fragment_id, stream_node) in fragments {
1919 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1920 source_fragment_ids
1921 .entry(source_id)
1922 .or_insert_with(BTreeSet::new)
1923 .insert(fragment_id);
1924 }
1925 }
1926 Ok(source_fragment_ids)
1927 }
1928
1929 pub async fn load_backfill_fragment_ids(
1930 &self,
1931 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1932 let inner = self.inner.read().await;
1933 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1934 .select_only()
1935 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1936 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1937 .into_tuple()
1938 .all(&inner.db)
1939 .await?;
1940
1941 let mut source_fragment_ids = HashMap::new();
1942 for (fragment_id, stream_node) in fragments {
1943 if let Some((source_id, upstream_source_fragment_id)) =
1944 stream_node.to_protobuf().find_source_backfill()
1945 {
1946 source_fragment_ids
1947 .entry(source_id)
1948 .or_insert_with(BTreeSet::new)
1949 .insert((fragment_id, upstream_source_fragment_id));
1950 }
1951 }
1952 Ok(source_fragment_ids)
1953 }
1954
1955 pub async fn get_all_upstream_sink_infos(
1956 &self,
1957 target_table: &PbTable,
1958 target_fragment_id: FragmentId,
1959 ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1960 let inner = self.inner.read().await;
1961 let txn = inner.db.begin().await?;
1962
1963 self.get_all_upstream_sink_infos_in_txn(&txn, target_table, target_fragment_id)
1964 .await
1965 }
1966
1967 pub async fn get_all_upstream_sink_infos_in_txn<C>(
1968 &self,
1969 txn: &C,
1970 target_table: &PbTable,
1971 target_fragment_id: FragmentId,
1972 ) -> MetaResult<Vec<UpstreamSinkInfo>>
1973 where
1974 C: ConnectionTrait,
1975 {
1976 let incoming_sinks = Sink::find()
1977 .filter(sink::Column::TargetTable.eq(target_table.id))
1978 .all(txn)
1979 .await?;
1980
1981 let sink_ids = incoming_sinks.iter().map(|s| s.sink_id).collect_vec();
1982 let sink_fragment_ids = get_sink_fragment_by_ids(txn, sink_ids).await?;
1983
1984 let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1985 for sink in &incoming_sinks {
1986 let sink_fragment_id =
1987 sink_fragment_ids
1988 .get(&sink.sink_id)
1989 .cloned()
1990 .ok_or(anyhow::anyhow!(
1991 "sink fragment not found for sink id {}",
1992 sink.sink_id
1993 ))?;
1994 let upstream_info = build_upstream_sink_info(
1995 sink.sink_id,
1996 sink.original_target_columns
1997 .as_ref()
1998 .map(|cols| cols.to_protobuf())
1999 .unwrap_or_default(),
2000 sink_fragment_id,
2001 target_table,
2002 target_fragment_id,
2003 )?;
2004 upstream_sink_infos.push(upstream_info);
2005 }
2006
2007 Ok(upstream_sink_infos)
2008 }
2009
2010 pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
2011 let inner = self.inner.read().await;
2012 let txn = inner.db.begin().await?;
2013
2014 let mview_fragment: Vec<FragmentId> = FragmentModel::find()
2015 .select_only()
2016 .column(fragment::Column::FragmentId)
2017 .filter(
2018 fragment::Column::JobId
2019 .eq(job_id)
2020 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2021 )
2022 .into_tuple()
2023 .all(&txn)
2024 .await?;
2025
2026 if mview_fragment.len() != 1 {
2027 return Err(anyhow::anyhow!(
2028 "expected exactly one mview fragment for job {}, found {}",
2029 job_id,
2030 mview_fragment.len()
2031 )
2032 .into());
2033 }
2034
2035 Ok(mview_fragment.into_iter().next().unwrap())
2036 }
2037
2038 pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
2039 let inner = self.inner.read().await;
2040 let txn = inner.db.begin().await?;
2041 has_table_been_migrated(&txn, table_id).await
2042 }
2043
2044 pub async fn update_fragment_splits<C>(
2045 &self,
2046 txn: &C,
2047 fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
2048 ) -> MetaResult<()>
2049 where
2050 C: ConnectionTrait,
2051 {
2052 if fragment_splits.is_empty() {
2053 return Ok(());
2054 }
2055
2056 let existing_fragment_ids: HashSet<FragmentId> = FragmentModel::find()
2057 .select_only()
2058 .column(fragment::Column::FragmentId)
2059 .filter(fragment::Column::FragmentId.is_in(fragment_splits.keys().copied()))
2060 .into_tuple()
2061 .all(txn)
2062 .await?
2063 .into_iter()
2064 .collect();
2065
2066 let (models, skipped_fragment_ids): (Vec<_>, Vec<_>) = fragment_splits
2068 .iter()
2069 .partition_map(|(fragment_id, splits)| {
2070 if existing_fragment_ids.contains(fragment_id) {
2071 Either::Left(fragment_splits::ActiveModel {
2072 fragment_id: Set(*fragment_id as _),
2073 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
2074 splits: splits.iter().map(Into::into).collect_vec(),
2075 }))),
2076 })
2077 } else {
2078 Either::Right(*fragment_id)
2079 }
2080 });
2081
2082 if !skipped_fragment_ids.is_empty() {
2083 tracing::warn!(
2084 skipped_fragment_ids = ?skipped_fragment_ids,
2085 total_fragment_ids = fragment_splits.len(),
2086 "skipping stale fragment split updates for missing fragments"
2087 );
2088 }
2089
2090 if models.is_empty() {
2091 return Ok(());
2092 }
2093
2094 FragmentSplits::insert_many(models)
2095 .on_conflict(
2096 OnConflict::column(fragment_splits::Column::FragmentId)
2097 .update_column(fragment_splits::Column::Splits)
2098 .to_owned(),
2099 )
2100 .exec(txn)
2101 .await?;
2102
2103 Ok(())
2104 }
2105}
2106
2107#[cfg(test)]
2108mod tests {
2109 use std::collections::{BTreeMap, HashMap, HashSet};
2110
2111 use itertools::Itertools;
2112 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
2113 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
2114 use risingwave_common::id::JobId;
2115 use risingwave_common::util::iter_util::ZipEqDebug;
2116 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
2117 use risingwave_meta_model::fragment::DistributionType;
2118 use risingwave_meta_model::*;
2119 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
2120 use risingwave_pb::plan_common::PbExprContext;
2121 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
2122 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
2123 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
2124
2125 use super::ActorInfo;
2126 use crate::MetaResult;
2127 use crate::controller::catalog::CatalogController;
2128 use crate::model::{Fragment, StreamActor};
2129
2130 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
2131
2132 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
2133
2134 const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
2135
2136 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
2137
2138 const TEST_JOB_ID: JobId = JobId::new(1);
2139
2140 const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
2141
2142 fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
2143 let mut upstream_actor_ids = BTreeMap::new();
2144 upstream_actor_ids.insert(
2145 TEST_UPSTREAM_FRAGMENT_ID,
2146 HashSet::from_iter([(actor_id + 100)]),
2147 );
2148 upstream_actor_ids.insert(
2149 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
2150 HashSet::from_iter([(actor_id + 200)]),
2151 );
2152 upstream_actor_ids
2153 }
2154
2155 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
2156 let mut input = vec![];
2157 for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
2158 input.push(PbStreamNode {
2159 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
2160 upstream_fragment_id,
2161 ..Default::default()
2162 }))),
2163 ..Default::default()
2164 });
2165 }
2166
2167 PbStreamNode {
2168 input,
2169 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
2170 ..Default::default()
2171 }
2172 }
2173
2174 #[tokio::test]
2175 async fn test_extract_fragment() -> MetaResult<()> {
2176 let actor_count = 3u32;
2177 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2178 .map(|actor_id| {
2179 (
2180 actor_id.into(),
2181 generate_upstream_actor_ids_for_actor(actor_id.into()),
2182 )
2183 })
2184 .collect();
2185
2186 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
2187
2188 let pb_fragment = Fragment {
2189 fragment_id: TEST_FRAGMENT_ID as _,
2190 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
2191 distribution_type: PbFragmentDistributionType::Hash as _,
2192 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
2193 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
2194 nodes: stream_node,
2195 };
2196
2197 let fragment =
2198 CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
2199
2200 check_fragment(fragment, pb_fragment);
2201
2202 Ok(())
2203 }
2204
2205 #[tokio::test]
2206 async fn test_compose_fragment() -> MetaResult<()> {
2207 let actor_count = 3u32;
2208
2209 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2210 .map(|actor_id| {
2211 (
2212 actor_id.into(),
2213 generate_upstream_actor_ids_for_actor(actor_id.into()),
2214 )
2215 })
2216 .collect();
2217
2218 let mut actor_bitmaps = ActorMapping::new_uniform(
2219 (0..actor_count).map(|i| i.into()),
2220 VirtualNode::COUNT_FOR_TEST,
2221 )
2222 .to_bitmaps();
2223
2224 let actors = (0..actor_count)
2225 .map(|actor_id| {
2226 let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
2227 splits: vec![PbConnectorSplit {
2228 split_type: "dummy".to_owned(),
2229 ..Default::default()
2230 }],
2231 });
2232
2233 ActorInfo {
2234 actor_id: actor_id.into(),
2235 fragment_id: TEST_FRAGMENT_ID,
2236 splits: actor_splits,
2237 worker_id: 0.into(),
2238 vnode_bitmap: actor_bitmaps
2239 .remove(&actor_id)
2240 .map(|bitmap| bitmap.to_protobuf())
2241 .as_ref()
2242 .map(VnodeBitmap::from),
2243 expr_context: ExprContext::from(&PbExprContext {
2244 time_zone: String::from("America/New_York"),
2245 strict_mode: false,
2246 }),
2247 config_override: "a.b.c = true".into(),
2248 }
2249 })
2250 .collect_vec();
2251
2252 let stream_node = {
2253 let template_actor = actors.first().cloned().unwrap();
2254
2255 let template_upstream_actor_ids =
2256 upstream_actor_ids.get(&template_actor.actor_id).unwrap();
2257
2258 generate_merger_stream_node(template_upstream_actor_ids)
2259 };
2260
2261 #[expect(deprecated)]
2262 let fragment = fragment::Model {
2263 fragment_id: TEST_FRAGMENT_ID,
2264 job_id: TEST_JOB_ID,
2265 fragment_type_mask: 0,
2266 distribution_type: DistributionType::Hash,
2267 stream_node: StreamNode::from(&stream_node),
2268 state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
2269 upstream_fragment_id: Default::default(),
2270 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2271 parallelism: None,
2272 };
2273
2274 let (pb_fragment, pb_actors, pb_actor_status, pb_actor_splits) =
2275 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2276
2277 assert_eq!(pb_actor_status.len(), actor_count as usize);
2278 assert!(
2279 pb_actor_status
2280 .values()
2281 .all(|actor_status| actor_status.location.is_some())
2282 );
2283 assert_eq!(pb_actor_splits.len(), actor_count as usize);
2284
2285 check_fragment(fragment, pb_fragment);
2286 check_actors(
2287 actors,
2288 &upstream_actor_ids,
2289 pb_actors,
2290 pb_actor_splits,
2291 &stream_node,
2292 );
2293
2294 Ok(())
2295 }
2296
2297 fn check_actors(
2298 actors: Vec<ActorInfo>,
2299 actor_upstreams: &FragmentActorUpstreams,
2300 pb_actors: Vec<StreamActor>,
2301 pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2302 stream_node: &PbStreamNode,
2303 ) {
2304 for (
2305 ActorInfo {
2306 actor_id,
2307 fragment_id,
2308 splits,
2309 worker_id: _,
2310 vnode_bitmap,
2311 expr_context,
2312 ..
2313 },
2314 StreamActor {
2315 actor_id: pb_actor_id,
2316 fragment_id: pb_fragment_id,
2317 vnode_bitmap: pb_vnode_bitmap,
2318 mview_definition,
2319 expr_context: pb_expr_context,
2320 ..
2321 },
2322 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2323 {
2324 assert_eq!(actor_id, pb_actor_id as ActorId);
2325 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2326
2327 assert_eq!(
2328 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2329 pb_vnode_bitmap,
2330 );
2331
2332 assert_eq!(mview_definition, "");
2333
2334 visit_stream_node_body(stream_node, |body| {
2335 if let PbNodeBody::Merge(m) = body {
2336 assert!(
2337 actor_upstreams
2338 .get(&actor_id)
2339 .unwrap()
2340 .contains_key(&m.upstream_fragment_id)
2341 );
2342 }
2343 });
2344
2345 assert_eq!(
2346 splits,
2347 pb_actor_splits
2348 .get(&pb_actor_id)
2349 .map(ConnectorSplits::from)
2350 .unwrap_or_default()
2351 );
2352
2353 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2354 }
2355 }
2356
2357 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2358 let Fragment {
2359 fragment_id,
2360 fragment_type_mask,
2361 distribution_type: pb_distribution_type,
2362 state_table_ids: pb_state_table_ids,
2363 maybe_vnode_count: _,
2364 nodes,
2365 } = pb_fragment;
2366
2367 assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2368 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2369 assert_eq!(
2370 pb_distribution_type,
2371 PbFragmentDistributionType::from(fragment.distribution_type)
2372 );
2373
2374 assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2375 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2376 }
2377
2378 #[test]
2379 fn test_parallelism_policy_with_root_fragments() {
2380 #[expect(deprecated)]
2381 let fragment = fragment::Model {
2382 fragment_id: 3.into(),
2383 job_id: TEST_JOB_ID,
2384 fragment_type_mask: 0,
2385 distribution_type: DistributionType::Hash,
2386 stream_node: StreamNode::from(&PbStreamNode::default()),
2387 state_table_ids: TableIdArray::default(),
2388 upstream_fragment_id: Default::default(),
2389 vnode_count: 0,
2390 parallelism: None,
2391 };
2392
2393 let job_parallelism = StreamingParallelism::Fixed(4);
2394
2395 let policy = super::CatalogController::format_fragment_parallelism_policy(
2396 fragment.distribution_type,
2397 fragment.parallelism.as_ref(),
2398 Some(&job_parallelism),
2399 None,
2400 &[],
2401 );
2402
2403 assert_eq!(policy, "inherit(4)");
2404 }
2405
2406 #[test]
2407 fn test_parallelism_policy_with_adaptive_strategy() {
2408 #[expect(deprecated)]
2409 let fragment = fragment::Model {
2410 fragment_id: 4.into(),
2411 job_id: TEST_JOB_ID,
2412 fragment_type_mask: 0,
2413 distribution_type: DistributionType::Hash,
2414 stream_node: StreamNode::from(&PbStreamNode::default()),
2415 state_table_ids: TableIdArray::default(),
2416 upstream_fragment_id: Default::default(),
2417 vnode_count: 0,
2418 parallelism: None,
2419 };
2420
2421 let job_parallelism = StreamingParallelism::Adaptive;
2422
2423 let policy = super::CatalogController::format_fragment_parallelism_policy(
2424 fragment.distribution_type,
2425 fragment.parallelism.as_ref(),
2426 Some(&job_parallelism),
2427 Some("RATIO(0.5)"),
2428 &[],
2429 );
2430
2431 assert_eq!(policy, "inherit(ratio(0.5))");
2432 }
2433
2434 #[test]
2435 fn test_parallelism_policy_with_custom_strategy() {
2436 #[expect(deprecated)]
2437 let fragment = fragment::Model {
2438 fragment_id: 6.into(),
2439 job_id: TEST_JOB_ID,
2440 fragment_type_mask: 0,
2441 distribution_type: DistributionType::Hash,
2442 stream_node: StreamNode::from(&PbStreamNode::default()),
2443 state_table_ids: TableIdArray::default(),
2444 upstream_fragment_id: Default::default(),
2445 vnode_count: 0,
2446 parallelism: None,
2447 };
2448
2449 let job_parallelism = StreamingParallelism::Custom;
2450
2451 let policy = super::CatalogController::format_fragment_parallelism_policy(
2452 fragment.distribution_type,
2453 fragment.parallelism.as_ref(),
2454 Some(&job_parallelism),
2455 Some("BOUNDED(8)"),
2456 &[],
2457 );
2458
2459 assert_eq!(policy, "inherit(bounded(8))");
2460 }
2461
2462 #[test]
2463 fn test_parallelism_policy_with_invalid_adaptive_strategy_falls_back() {
2464 #[expect(deprecated)]
2465 let fragment = fragment::Model {
2466 fragment_id: 7.into(),
2467 job_id: TEST_JOB_ID,
2468 fragment_type_mask: 0,
2469 distribution_type: DistributionType::Hash,
2470 stream_node: StreamNode::from(&PbStreamNode::default()),
2471 state_table_ids: TableIdArray::default(),
2472 upstream_fragment_id: Default::default(),
2473 vnode_count: 0,
2474 parallelism: None,
2475 };
2476
2477 let job_parallelism = StreamingParallelism::Adaptive;
2478
2479 let policy = super::CatalogController::format_fragment_parallelism_policy(
2480 fragment.distribution_type,
2481 fragment.parallelism.as_ref(),
2482 Some(&job_parallelism),
2483 Some("NOT_A_STRATEGY"),
2484 &[],
2485 );
2486
2487 assert_eq!(policy, "inherit(adaptive)");
2488 }
2489
2490 #[test]
2491 fn test_parallelism_policy_with_upstream_roots() {
2492 #[expect(deprecated)]
2493 let fragment = fragment::Model {
2494 fragment_id: 5.into(),
2495 job_id: TEST_JOB_ID,
2496 fragment_type_mask: 0,
2497 distribution_type: DistributionType::Hash,
2498 stream_node: StreamNode::from(&PbStreamNode::default()),
2499 state_table_ids: TableIdArray::default(),
2500 upstream_fragment_id: Default::default(),
2501 vnode_count: 0,
2502 parallelism: None,
2503 };
2504
2505 let policy = super::CatalogController::format_fragment_parallelism_policy(
2506 fragment.distribution_type,
2507 fragment.parallelism.as_ref(),
2508 None,
2509 None,
2510 &[3.into(), 1.into(), 2.into(), 1.into()],
2511 );
2512
2513 assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2514 }
2515}