1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::{Either, Itertools};
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::AdaptiveParallelismStrategy;
28use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
29use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
30use risingwave_connector::source::SplitImpl;
31use risingwave_connector::source::cdc::CdcScanOptions;
32use risingwave_meta_model::fragment::DistributionType;
33use risingwave_meta_model::object::ObjectType;
34use risingwave_meta_model::prelude::{
35 Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
36};
37use risingwave_meta_model::{
38 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
39 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
40 TableIdArray, VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits,
41 object, sink, source, streaming_job, table,
42};
43use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
44use risingwave_pb::catalog::PbTable;
45use risingwave_pb::common::PbActorLocation;
46use risingwave_pb::meta::subscribe_response::{
47 Info as NotificationInfo, Operation as NotificationOperation,
48};
49use risingwave_pb::meta::table_fragments::fragment::{
50 FragmentDistributionType, PbFragmentDistributionType,
51};
52use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
53use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
54use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
55use risingwave_pb::stream_plan;
56use risingwave_pb::stream_plan::stream_node::NodeBody;
57use risingwave_pb::stream_plan::{
58 PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, SinkLogStoreType,
59 StreamScanType,
60};
61use sea_orm::ActiveValue::Set;
62use sea_orm::sea_query::Expr;
63use sea_orm::{
64 ColumnTrait, ConnectionTrait, DatabaseTransaction, EntityTrait, FromQueryResult, JoinType,
65 PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, StreamTrait, TransactionTrait,
66};
67use serde::{Deserialize, Serialize};
68
69use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
70use crate::controller::catalog::CatalogController;
71use crate::controller::scale::{
72 FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble, RenderedGraph,
73 find_fragment_no_shuffle_dags_detailed, load_fragment_context_for_jobs,
74 render_actor_assignments, resolve_streaming_job_definition,
75};
76use crate::controller::utils::{
77 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
78 get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
79 resolve_no_shuffle_actor_mapping,
80};
81use crate::error::MetaError;
82use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
83use crate::model::{
84 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
85 StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
86};
87use crate::rpc::ddl_controller::build_upstream_sink_info;
88use crate::stream::UpstreamSinkInfo;
89use crate::{MetaResult, model};
90
91#[derive(Debug)]
93pub struct InflightActorInfo {
94 pub worker_id: WorkerId,
95 pub vnode_bitmap: Option<Bitmap>,
96 pub splits: Vec<SplitImpl>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100struct ActorInfo {
101 pub actor_id: ActorId,
102 pub fragment_id: FragmentId,
103 pub splits: ConnectorSplits,
104 pub worker_id: WorkerId,
105 pub vnode_bitmap: Option<VnodeBitmap>,
106 pub expr_context: ExprContext,
107 pub config_override: Arc<str>,
108}
109
110#[derive(Debug)]
111struct FragmentDescRow {
112 fragment_id: FragmentId,
113 job_id: JobId,
114 fragment_type_mask: i32,
115 distribution_type: DistributionType,
116 state_table_ids: TableIdArray,
117 vnode_count: i32,
118 parallelism_override: Option<StreamingParallelism>,
119 stream_node: Option<StreamNode>,
120}
121
122#[derive(Debug)]
123pub struct InflightFragmentInfo {
124 pub fragment_id: FragmentId,
125 pub distribution_type: DistributionType,
126 pub fragment_type_mask: FragmentTypeMask,
127 pub vnode_count: usize,
128 pub nodes: PbStreamNode,
129 pub actors: HashMap<ActorId, InflightActorInfo>,
130 pub state_table_ids: HashSet<TableId>,
131}
132
133#[derive(Clone, Debug)]
134pub struct FragmentParallelismInfo {
135 pub distribution_type: FragmentDistributionType,
136 pub vnode_count: usize,
137}
138
139#[easy_ext::ext(FragmentTypeMaskExt)]
140pub impl FragmentTypeMask {
141 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
142 Expr::col(fragment::Column::FragmentTypeMask)
143 .bit_and(Expr::value(flag as i32))
144 .ne(0)
145 }
146
147 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
148 Expr::col(fragment::Column::FragmentTypeMask)
149 .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
150 .ne(0)
151 }
152
153 fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
154 Expr::col(fragment::Column::FragmentTypeMask)
155 .bit_and(Expr::value(flag as i32))
156 .eq(0)
157 }
158}
159
160#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
161#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
163 pub job_id: JobId,
164 pub obj_type: ObjectType,
165 pub name: String,
166 pub job_status: JobStatus,
167 pub parallelism: StreamingParallelism,
168 pub adaptive_parallelism_strategy: Option<String>,
169 #[serde(skip)]
173 pub backfill_parallelism: Option<StreamingParallelism>,
174 #[serde(skip)]
175 pub backfill_adaptive_parallelism_strategy: Option<String>,
176 pub max_parallelism: i32,
177 pub resource_group: String,
178 pub config_override: String,
179 pub database_id: DatabaseId,
180 pub schema_id: SchemaId,
181}
182
183impl NotificationManager {
184 pub(crate) fn notify_streaming_fragment_mapping(
190 &self,
191 operation: NotificationOperation,
192 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
193 ) {
194 if fragment_mappings.is_empty() {
195 return;
196 }
197 for fragment_mapping in fragment_mappings {
199 self.notify_frontend_without_version(
200 operation,
201 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
202 );
203 }
204 }
205
206 pub(crate) fn notify_serving_fragment_mapping_update(
211 &self,
212 fragment_ids: Vec<crate::model::FragmentId>,
213 ) {
214 if fragment_ids.is_empty() {
215 return;
216 }
217 self.notify_local_subscribers(LocalNotification::ServingFragmentMappingsUpsert(
218 fragment_ids,
219 ));
220 }
221
222 pub(crate) fn notify_serving_fragment_mapping_delete(
227 &self,
228 fragment_ids: Vec<crate::model::FragmentId>,
229 ) {
230 if fragment_ids.is_empty() {
231 return;
232 }
233 self.notify_local_subscribers(LocalNotification::ServingFragmentMappingsDelete(
234 fragment_ids,
235 ));
236 }
237}
238
239impl CatalogController {
240 pub fn prepare_fragment_models_from_fragments(
241 job_id: JobId,
242 fragments: impl Iterator<Item = &Fragment>,
243 ) -> MetaResult<Vec<fragment::Model>> {
244 fragments
245 .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
246 .try_collect()
247 }
248
249 pub fn prepare_fragment_model_for_new_job(
250 job_id: JobId,
251 fragment: &Fragment,
252 ) -> MetaResult<fragment::Model> {
253 let vnode_count = fragment.vnode_count();
254 let Fragment {
255 fragment_id: pb_fragment_id,
256 fragment_type_mask: pb_fragment_type_mask,
257 distribution_type: pb_distribution_type,
258 state_table_ids: pb_state_table_ids,
259 nodes,
260 ..
261 } = fragment;
262
263 let state_table_ids = pb_state_table_ids.clone().into();
264
265 let fragment_parallelism = nodes
266 .node_body
267 .as_ref()
268 .and_then(|body| match body {
269 NodeBody::StreamCdcScan(node) => Some(node),
270 _ => None,
271 })
272 .and_then(|node| node.options.as_ref())
273 .map(CdcScanOptions::from_proto)
274 .filter(|opts| opts.is_parallelized_backfill())
275 .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
276
277 let stream_node = StreamNode::from(nodes);
278
279 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
280 .unwrap()
281 .into();
282
283 #[expect(deprecated)]
284 let fragment = fragment::Model {
285 fragment_id: *pb_fragment_id as _,
286 job_id,
287 fragment_type_mask: (*pb_fragment_type_mask).into(),
288 distribution_type,
289 stream_node,
290 state_table_ids,
291 upstream_fragment_id: Default::default(),
292 vnode_count: vnode_count as _,
293 parallelism: fragment_parallelism,
294 };
295
296 Ok(fragment)
297 }
298
299 #[expect(clippy::type_complexity)]
300 fn compose_table_fragments(
301 job_id: JobId,
302 state: PbState,
303 ctx: StreamContext,
304 fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
305 max_parallelism: usize,
306 job_definition: Option<String>,
307 ) -> MetaResult<(
308 StreamJobFragments,
309 HashMap<FragmentId, Vec<StreamActor>>,
310 HashMap<crate::model::ActorId, PbActorStatus>,
311 )> {
312 let mut pb_fragments = BTreeMap::new();
313 let mut fragment_actors = HashMap::new();
314 let mut all_actor_status = HashMap::new();
315
316 for (fragment, actors) in fragments {
317 let (fragment, actors, actor_status, _) =
318 Self::compose_fragment(fragment, actors, job_definition.clone())?;
319 let fragment_id = fragment.fragment_id;
320 fragment_actors.insert(fragment_id, actors);
321 all_actor_status.extend(actor_status);
322
323 pb_fragments.insert(fragment_id, fragment);
324 }
325
326 let table_fragments = StreamJobFragments {
327 stream_job_id: job_id,
328 state: state as _,
329 fragments: pb_fragments,
330 ctx,
331 max_parallelism,
332 };
333
334 Ok((table_fragments, fragment_actors, all_actor_status))
335 }
336
337 #[expect(clippy::type_complexity)]
338 fn compose_fragment(
339 fragment: fragment::Model,
340 actors: Vec<ActorInfo>,
341 job_definition: Option<String>,
342 ) -> MetaResult<(
343 Fragment,
344 Vec<StreamActor>,
345 HashMap<crate::model::ActorId, PbActorStatus>,
346 HashMap<crate::model::ActorId, PbConnectorSplits>,
347 )> {
348 let fragment::Model {
349 fragment_id,
350 fragment_type_mask,
351 distribution_type,
352 stream_node,
353 state_table_ids,
354 vnode_count,
355 ..
356 } = fragment;
357
358 let stream_node = stream_node.to_protobuf();
359 let mut upstream_fragments = HashSet::new();
360 visit_stream_node_body(&stream_node, |body| {
361 if let NodeBody::Merge(m) = body {
362 assert!(
363 upstream_fragments.insert(m.upstream_fragment_id),
364 "non-duplicate upstream fragment"
365 );
366 }
367 });
368
369 let mut pb_actors = vec![];
370
371 let mut pb_actor_status = HashMap::new();
372 let mut pb_actor_splits = HashMap::new();
373
374 for actor in actors {
375 if actor.fragment_id != fragment_id {
376 bail!(
377 "fragment id {} from actor {} is different from fragment {}",
378 actor.fragment_id,
379 actor.actor_id,
380 fragment_id
381 )
382 }
383
384 let ActorInfo {
385 actor_id,
386 fragment_id,
387 worker_id,
388 splits,
389 vnode_bitmap,
390 expr_context,
391 config_override,
392 ..
393 } = actor;
394
395 let vnode_bitmap =
396 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
397 let pb_expr_context = Some(expr_context.to_protobuf());
398
399 pb_actor_status.insert(
400 actor_id as _,
401 PbActorStatus {
402 location: PbActorLocation::from_worker(worker_id),
403 },
404 );
405
406 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
407
408 pb_actors.push(StreamActor {
409 actor_id: actor_id as _,
410 fragment_id: fragment_id as _,
411 vnode_bitmap,
412 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
413 expr_context: pb_expr_context,
414 config_override,
415 })
416 }
417
418 let pb_state_table_ids = state_table_ids.0;
419 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
420 let pb_fragment = Fragment {
421 fragment_id: fragment_id as _,
422 fragment_type_mask: fragment_type_mask.into(),
423 distribution_type: pb_distribution_type,
424 state_table_ids: pb_state_table_ids,
425 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
426 nodes: stream_node,
427 };
428
429 Ok((pb_fragment, pb_actors, pb_actor_status, pb_actor_splits))
430 }
431
432 pub async fn fragment_parallelisms(
439 &self,
440 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
441 let inner = self.inner.read().await;
442 let query = FragmentModel::find().select_only().columns([
443 fragment::Column::FragmentId,
444 fragment::Column::DistributionType,
445 fragment::Column::VnodeCount,
446 ]);
447 let fragments: Vec<(FragmentId, DistributionType, i32)> =
448 query.into_tuple().all(&inner.db).await?;
449
450 Ok(fragments
451 .into_iter()
452 .map(|(fragment_id, distribution_type, vnode_count)| {
453 (
454 fragment_id,
455 FragmentParallelismInfo {
456 distribution_type: PbFragmentDistributionType::from(distribution_type),
457 vnode_count: vnode_count as usize,
458 },
459 )
460 })
461 .collect())
462 }
463
464 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
465 let inner = self.inner.read().await;
466 let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
467 .select_only()
468 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
469 .into_tuple()
470 .all(&inner.db)
471 .await?;
472 Ok(fragment_jobs.into_iter().collect())
473 }
474
475 pub async fn get_fragment_job_id(
476 &self,
477 fragment_ids: Vec<FragmentId>,
478 ) -> MetaResult<Vec<ObjectId>> {
479 let inner = self.inner.read().await;
480 self.get_fragment_job_id_in_txn(&inner.db, fragment_ids)
481 .await
482 }
483
484 pub async fn get_fragment_job_id_in_txn<C>(
485 &self,
486 txn: &C,
487 fragment_ids: Vec<FragmentId>,
488 ) -> MetaResult<Vec<ObjectId>>
489 where
490 C: ConnectionTrait + Send,
491 {
492 let object_ids: Vec<ObjectId> = FragmentModel::find()
493 .select_only()
494 .column(fragment::Column::JobId)
495 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
496 .into_tuple()
497 .all(txn)
498 .await?;
499
500 Ok(object_ids)
501 }
502
503 pub async fn get_fragment_desc_by_id(
504 &self,
505 fragment_id: FragmentId,
506 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
507 let inner = self.inner.read().await;
508
509 let fragment_model = match FragmentModel::find_by_id(fragment_id)
510 .one(&inner.db)
511 .await?
512 {
513 Some(fragment) => fragment,
514 None => return Ok(None),
515 };
516
517 let job_parallelism: Option<(StreamingParallelism, Option<String>)> =
518 StreamingJob::find_by_id(fragment_model.job_id)
519 .select_only()
520 .columns([
521 streaming_job::Column::Parallelism,
522 streaming_job::Column::AdaptiveParallelismStrategy,
523 ])
524 .into_tuple::<(StreamingParallelism, Option<String>)>()
525 .one(&inner.db)
526 .await?;
527
528 let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
529 .select_only()
530 .columns([
531 fragment_relation::Column::SourceFragmentId,
532 fragment_relation::Column::DispatcherType,
533 ])
534 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
535 .into_tuple()
536 .all(&inner.db)
537 .await?;
538
539 let upstreams: Vec<_> = upstream_entries
540 .into_iter()
541 .map(|(source_id, _)| source_id)
542 .collect();
543
544 let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
545 .await
546 .map(Self::collect_root_fragment_mapping)?;
547 let root_fragments = root_fragment_map
548 .get(&fragment_id)
549 .cloned()
550 .unwrap_or_default();
551
552 let info = self.env.shared_actor_infos().read_guard();
553 let SharedFragmentInfo { actors, .. } = info
554 .get_fragment(fragment_model.fragment_id as _)
555 .unwrap_or_else(|| {
556 panic!(
557 "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
558 fragment_model.fragment_id,
559 fragment_model.job_id
560 )
561 });
562
563 let parallelism_policy = Self::format_fragment_parallelism_policy(
564 fragment_model.distribution_type,
565 fragment_model.parallelism.as_ref(),
566 job_parallelism.as_ref().map(|(parallelism, _)| parallelism),
567 job_parallelism
568 .as_ref()
569 .and_then(|(_, strategy)| strategy.as_deref()),
570 &root_fragments,
571 );
572
573 let fragment = FragmentDesc {
574 fragment_id: fragment_model.fragment_id,
575 job_id: fragment_model.job_id,
576 fragment_type_mask: fragment_model.fragment_type_mask,
577 distribution_type: fragment_model.distribution_type,
578 state_table_ids: fragment_model.state_table_ids.clone(),
579 parallelism: actors.len() as _,
580 vnode_count: fragment_model.vnode_count,
581 stream_node: fragment_model.stream_node.clone(),
582 parallelism_policy,
583 };
584
585 Ok(Some((fragment, upstreams)))
586 }
587
588 pub async fn list_fragment_database_ids(
589 &self,
590 select_fragment_ids: Option<Vec<FragmentId>>,
591 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
592 let inner = self.inner.read().await;
593 let select = FragmentModel::find()
594 .select_only()
595 .column(fragment::Column::FragmentId)
596 .column(object::Column::DatabaseId)
597 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
598 let select = if let Some(select_fragment_ids) = select_fragment_ids {
599 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
600 } else {
601 select
602 };
603 Ok(select.into_tuple().all(&inner.db).await?)
604 }
605
606 pub async fn get_job_fragments_by_id(
607 &self,
608 job_id: JobId,
609 ) -> MetaResult<(
610 StreamJobFragments,
611 HashMap<FragmentId, Vec<StreamActor>>,
612 HashMap<ActorId, PbActorStatus>,
613 )> {
614 let inner = self.inner.read().await;
615
616 let fragments: Vec<_> = FragmentModel::find()
618 .filter(fragment::Column::JobId.eq(job_id))
619 .all(&inner.db)
620 .await?;
621
622 let job_info = StreamingJob::find_by_id(job_id)
623 .one(&inner.db)
624 .await?
625 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
626
627 let fragment_actors =
628 self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
629
630 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
631 .await?
632 .remove(&job_id);
633
634 Self::compose_table_fragments(
635 job_id,
636 job_info.job_status.into(),
637 job_info.stream_context(),
638 fragment_actors,
639 job_info.max_parallelism as _,
640 job_definition,
641 )
642 }
643
644 pub async fn get_fragment_actor_dispatchers(
645 &self,
646 fragment_ids: Vec<FragmentId>,
647 ) -> MetaResult<FragmentActorDispatchers> {
648 let inner = self.inner.read().await;
649
650 self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
651 .await
652 }
653
654 pub async fn get_fragment_actor_dispatchers_txn(
655 &self,
656 c: &impl ConnectionTrait,
657 fragment_ids: Vec<FragmentId>,
658 ) -> MetaResult<FragmentActorDispatchers> {
659 let fragment_relations = FragmentRelation::find()
660 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
661 .all(c)
662 .await?;
663
664 type FragmentActorInfo = (
665 DistributionType,
666 Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
667 );
668
669 let shared_info = self.env.shared_actor_infos();
670 let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
671 let get_fragment_actors = |fragment_id: FragmentId| async move {
672 let result: MetaResult<FragmentActorInfo> = try {
673 let read_guard = shared_info.read_guard();
674
675 let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
676
677 (
678 fragment.distribution_type,
679 Arc::new(
680 fragment
681 .actors
682 .iter()
683 .map(|(actor_id, actor_info)| {
684 (
685 *actor_id,
686 actor_info
687 .vnode_bitmap
688 .as_ref()
689 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
690 )
691 })
692 .collect(),
693 ),
694 )
695 };
696 result
697 };
698
699 let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
700 for fragment_relation::Model {
701 source_fragment_id,
702 target_fragment_id,
703 dispatcher_type,
704 dist_key_indices,
705 output_indices,
706 output_type_mapping,
707 } in fragment_relations
708 {
709 let (source_fragment_distribution, source_fragment_actors) = {
710 let (distribution, actors) = {
711 match fragment_actor_cache.entry(source_fragment_id) {
712 Entry::Occupied(entry) => entry.into_mut(),
713 Entry::Vacant(entry) => {
714 entry.insert(get_fragment_actors(source_fragment_id).await?)
715 }
716 }
717 };
718 (*distribution, actors.clone())
719 };
720 let (target_fragment_distribution, target_fragment_actors) = {
721 let (distribution, actors) = {
722 match fragment_actor_cache.entry(target_fragment_id) {
723 Entry::Occupied(entry) => entry.into_mut(),
724 Entry::Vacant(entry) => {
725 entry.insert(get_fragment_actors(target_fragment_id).await?)
726 }
727 }
728 };
729 (*distribution, actors.clone())
730 };
731 let output_mapping = PbDispatchOutputMapping {
732 indices: output_indices.into_u32_array(),
733 types: output_type_mapping.unwrap_or_default().to_protobuf(),
734 };
735 let (dispatchers, _) = compose_dispatchers(
736 source_fragment_distribution,
737 &source_fragment_actors,
738 target_fragment_id as _,
739 target_fragment_distribution,
740 &target_fragment_actors,
741 dispatcher_type,
742 dist_key_indices.into_u32_array(),
743 output_mapping,
744 );
745 let actor_dispatchers_map = actor_dispatchers_map
746 .entry(source_fragment_id as _)
747 .or_default();
748 for (actor_id, dispatchers) in dispatchers {
749 actor_dispatchers_map
750 .entry(actor_id as _)
751 .or_default()
752 .push(dispatchers);
753 }
754 }
755 Ok(actor_dispatchers_map)
756 }
757
758 pub async fn get_fragment_downstream_relations(
759 &self,
760 fragment_ids: Vec<FragmentId>,
761 ) -> MetaResult<FragmentDownstreamRelation> {
762 let inner = self.inner.read().await;
763 self.get_fragment_downstream_relations_in_txn(&inner.db, fragment_ids)
764 .await
765 }
766
767 pub async fn get_fragment_downstream_relations_in_txn<C>(
768 &self,
769 txn: &C,
770 fragment_ids: Vec<FragmentId>,
771 ) -> MetaResult<FragmentDownstreamRelation>
772 where
773 C: ConnectionTrait + StreamTrait + Send,
774 {
775 let mut stream = FragmentRelation::find()
776 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
777 .stream(txn)
778 .await?;
779 let mut relations = FragmentDownstreamRelation::new();
780 while let Some(relation) = stream.try_next().await? {
781 relations
782 .entry(relation.source_fragment_id as _)
783 .or_default()
784 .push(DownstreamFragmentRelation {
785 downstream_fragment_id: relation.target_fragment_id as _,
786 dispatcher_type: relation.dispatcher_type,
787 dist_key_indices: relation.dist_key_indices.into_u32_array(),
788 output_mapping: PbDispatchOutputMapping {
789 indices: relation.output_indices.into_u32_array(),
790 types: relation
791 .output_type_mapping
792 .unwrap_or_default()
793 .to_protobuf(),
794 },
795 });
796 }
797 Ok(relations)
798 }
799
800 pub async fn get_job_fragment_backfill_scan_type(
801 &self,
802 job_id: JobId,
803 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
804 let inner = self.inner.read().await;
805 self.get_job_fragment_backfill_scan_type_in_txn(&inner.db, job_id)
806 .await
807 }
808
809 pub async fn get_job_fragment_backfill_scan_type_in_txn<C>(
810 &self,
811 txn: &C,
812 job_id: JobId,
813 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>>
814 where
815 C: ConnectionTrait + Send,
816 {
817 let fragments: Vec<_> = FragmentModel::find()
818 .filter(fragment::Column::JobId.eq(job_id))
819 .all(txn)
820 .await?;
821
822 let mut result = HashMap::new();
823
824 for fragment::Model {
825 fragment_id,
826 stream_node,
827 ..
828 } in fragments
829 {
830 let stream_node = stream_node.to_protobuf();
831 visit_stream_node_body(&stream_node, |body| {
832 if let NodeBody::StreamScan(node) = body {
833 match node.stream_scan_type() {
834 StreamScanType::Unspecified => {}
835 scan_type => {
836 result.insert(fragment_id as crate::model::FragmentId, scan_type);
837 }
838 }
839 }
840 });
841 }
842
843 Ok(result)
844 }
845
846 pub async fn count_streaming_jobs(&self) -> MetaResult<usize> {
847 let inner = self.inner.read().await;
848 let count = StreamingJob::find().count(&inner.db).await?;
849 Ok(usize::try_from(count).context("streaming job count overflow")?)
850 }
851
852 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
853 let inner = self.inner.read().await;
854 let job_states = StreamingJob::find()
855 .select_only()
856 .column(streaming_job::Column::JobId)
857 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
858 .join(JoinType::InnerJoin, object::Relation::Database2.def())
859 .column(object::Column::ObjType)
860 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
861 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
862 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
863 .column_as(
864 Expr::if_null(
865 Expr::col((table::Entity, table::Column::Name)),
866 Expr::if_null(
867 Expr::col((source::Entity, source::Column::Name)),
868 Expr::if_null(
869 Expr::col((sink::Entity, sink::Column::Name)),
870 Expr::val("<unknown>"),
871 ),
872 ),
873 ),
874 "name",
875 )
876 .columns([
877 streaming_job::Column::JobStatus,
878 streaming_job::Column::Parallelism,
879 streaming_job::Column::AdaptiveParallelismStrategy,
880 streaming_job::Column::BackfillParallelism,
881 streaming_job::Column::BackfillAdaptiveParallelismStrategy,
882 streaming_job::Column::MaxParallelism,
883 ])
884 .column_as(
885 Expr::if_null(
886 Expr::col((
887 streaming_job::Entity,
888 streaming_job::Column::SpecificResourceGroup,
889 )),
890 Expr::col((database::Entity, database::Column::ResourceGroup)),
891 ),
892 "resource_group",
893 )
894 .column_as(
895 Expr::if_null(
896 Expr::col((streaming_job::Entity, streaming_job::Column::ConfigOverride)),
897 Expr::val(""),
898 ),
899 "config_override",
900 )
901 .column(object::Column::DatabaseId)
902 .column(object::Column::SchemaId)
903 .into_model()
904 .all(&inner.db)
905 .await?;
906 Ok(job_states)
907 }
908
909 pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
910 let inner = self.inner.read().await;
911 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
912 .select_only()
913 .column(streaming_job::Column::MaxParallelism)
914 .into_tuple()
915 .one(&inner.db)
916 .await?
917 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
918 Ok(max_parallelism as usize)
919 }
920
921 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
923 if let Ok(inner) = self.inner.try_read()
924 && let Ok(job_state_tables) = FragmentModel::find()
925 .select_only()
926 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
927 .into_tuple::<(JobId, I32Array)>()
928 .all(&inner.db)
929 .await
930 {
931 let mut job_internal_table_ids = HashMap::new();
932 for (job_id, state_table_ids) in job_state_tables {
933 job_internal_table_ids
934 .entry(job_id)
935 .or_insert_with(Vec::new)
936 .extend(
937 state_table_ids
938 .into_inner()
939 .into_iter()
940 .map(|table_id| TableId::new(table_id as _)),
941 );
942 }
943 return Some(job_internal_table_ids.into_iter().collect());
944 }
945 None
946 }
947
948 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
949 let inner = self.inner.read().await;
950 let count = FragmentModel::find().count(&inner.db).await?;
951 Ok(count > 0)
952 }
953
954 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
955 let read_guard = self.env.shared_actor_infos().read_guard();
956 let actor_cnt: HashMap<WorkerId, _> = read_guard
957 .iter_over_fragments()
958 .flat_map(|(_, fragment)| {
959 fragment
960 .actors
961 .iter()
962 .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
963 })
964 .into_group_map()
965 .into_iter()
966 .map(|(k, v)| (k, v.len()))
967 .collect();
968
969 Ok(actor_cnt)
970 }
971
972 fn collect_fragment_actor_map(
973 &self,
974 fragment_ids: &[FragmentId],
975 stream_context: StreamContext,
976 ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
977 let guard = self.env.shared_actor_infos().read_guard();
978 let pb_expr_context = stream_context.to_expr_context();
979 let expr_context: ExprContext = (&pb_expr_context).into();
980
981 let mut actor_map = HashMap::with_capacity(fragment_ids.len());
982 for fragment_id in fragment_ids {
983 let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
984 anyhow!("fragment {} not found in shared actor info", fragment_id)
985 })?;
986
987 let actors = fragment_info
988 .actors
989 .iter()
990 .map(|(actor_id, actor_info)| ActorInfo {
991 actor_id: *actor_id as _,
992 fragment_id: *fragment_id,
993 splits: ConnectorSplits::from(&PbConnectorSplits {
994 splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
995 }),
996 worker_id: actor_info.worker_id as _,
997 vnode_bitmap: actor_info
998 .vnode_bitmap
999 .as_ref()
1000 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
1001 expr_context: expr_context.clone(),
1002 config_override: stream_context.config_override.clone(),
1003 })
1004 .collect();
1005
1006 actor_map.insert(*fragment_id, actors);
1007 }
1008
1009 Ok(actor_map)
1010 }
1011
1012 fn collect_fragment_actor_pairs(
1013 &self,
1014 fragments: Vec<fragment::Model>,
1015 stream_context: StreamContext,
1016 ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
1017 let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
1018 let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
1019 fragments
1020 .into_iter()
1021 .map(|fragment| {
1022 let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
1023 anyhow!(
1024 "fragment {} missing in shared actor info map",
1025 fragment.fragment_id
1026 )
1027 })?;
1028 Ok((fragment, actors))
1029 })
1030 .collect()
1031 }
1032
1033 pub async fn table_fragments(
1035 &self,
1036 ) -> MetaResult<
1037 BTreeMap<
1038 JobId,
1039 (
1040 StreamJobFragments,
1041 HashMap<FragmentId, Vec<StreamActor>>,
1042 HashMap<crate::model::ActorId, PbActorStatus>,
1043 ),
1044 >,
1045 > {
1046 let inner = self.inner.read().await;
1047 let jobs = StreamingJob::find().all(&inner.db).await?;
1048
1049 let mut job_definition = resolve_streaming_job_definition(
1050 &inner.db,
1051 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
1052 )
1053 .await?;
1054
1055 let mut table_fragments = BTreeMap::new();
1056 for job in jobs {
1057 let fragments = FragmentModel::find()
1058 .filter(fragment::Column::JobId.eq(job.job_id))
1059 .all(&inner.db)
1060 .await?;
1061
1062 let fragment_actors =
1063 self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
1064
1065 table_fragments.insert(
1066 job.job_id,
1067 Self::compose_table_fragments(
1068 job.job_id,
1069 job.job_status.into(),
1070 job.stream_context(),
1071 fragment_actors,
1072 job.max_parallelism as _,
1073 job_definition.remove(&job.job_id),
1074 )?,
1075 );
1076 }
1077
1078 Ok(table_fragments)
1079 }
1080
1081 pub async fn upstream_fragments(
1082 &self,
1083 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1084 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
1085 let inner = self.inner.read().await;
1086 self.upstream_fragments_in_txn(&inner.db, fragment_ids)
1087 .await
1088 }
1089
1090 pub async fn upstream_fragments_in_txn<C>(
1091 &self,
1092 txn: &C,
1093 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1094 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>>
1095 where
1096 C: ConnectionTrait + StreamTrait + Send,
1097 {
1098 let mut stream = FragmentRelation::find()
1099 .select_only()
1100 .columns([
1101 fragment_relation::Column::SourceFragmentId,
1102 fragment_relation::Column::TargetFragmentId,
1103 ])
1104 .filter(
1105 fragment_relation::Column::TargetFragmentId
1106 .is_in(fragment_ids.map(|id| id as FragmentId)),
1107 )
1108 .into_tuple::<(FragmentId, FragmentId)>()
1109 .stream(txn)
1110 .await?;
1111 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
1112 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
1113 upstream_fragments
1114 .entry(downstream_fragment_id as crate::model::FragmentId)
1115 .or_default()
1116 .insert(upstream_fragment_id as crate::model::FragmentId);
1117 }
1118 Ok(upstream_fragments)
1119 }
1120
1121 pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
1122 let info = self.env.shared_actor_infos().read_guard();
1123
1124 let actor_locations = info
1125 .iter_over_fragments()
1126 .flat_map(|(fragment_id, fragment)| {
1127 fragment
1128 .actors
1129 .iter()
1130 .map(|(actor_id, actor)| PartialActorLocation {
1131 actor_id: *actor_id as _,
1132 fragment_id: *fragment_id as _,
1133 worker_id: actor.worker_id,
1134 })
1135 })
1136 .collect_vec();
1137
1138 Ok(actor_locations)
1139 }
1140
1141 pub async fn list_actor_info(
1142 &self,
1143 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1144 let inner = self.inner.read().await;
1145
1146 let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1147 FragmentModel::find()
1148 .select_only()
1149 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1150 .column(fragment::Column::FragmentId)
1151 .column_as(object::Column::Oid, "job_id")
1152 .column_as(object::Column::SchemaId, "schema_id")
1153 .column_as(object::Column::ObjType, "type")
1154 .into_tuple()
1155 .all(&inner.db)
1156 .await?;
1157
1158 let actor_infos = {
1159 let info = self.env.shared_actor_infos().read_guard();
1160
1161 let mut result = Vec::new();
1162
1163 for (fragment_id, object_id, schema_id, object_type) in fragment_objects {
1164 let Some(fragment) = info.get_fragment(fragment_id as _) else {
1165 return Err(MetaError::unavailable(format!(
1166 "shared actor info missing for fragment {fragment_id} while listing actors"
1167 )));
1168 };
1169
1170 for actor_id in fragment.actors.keys() {
1171 result.push((
1172 *actor_id as _,
1173 fragment.fragment_id as _,
1174 object_id,
1175 schema_id,
1176 object_type,
1177 ));
1178 }
1179 }
1180
1181 result
1182 };
1183
1184 Ok(actor_infos)
1185 }
1186
1187 pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1188 let guard = self.env.shared_actor_info.read_guard();
1189 guard
1190 .iter_over_fragments()
1191 .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1192 .collect_vec()
1193 }
1194
1195 pub async fn list_fragment_descs_with_node(
1196 &self,
1197 is_creating: bool,
1198 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1199 let inner = self.inner.read().await;
1200 let txn = inner.db.begin().await?;
1201
1202 let fragments_query = Self::build_fragment_query(is_creating);
1203 let fragments: Vec<fragment::Model> = fragments_query.all(&txn).await?;
1204
1205 let rows = fragments
1206 .into_iter()
1207 .map(|fragment| FragmentDescRow {
1208 fragment_id: fragment.fragment_id,
1209 job_id: fragment.job_id,
1210 fragment_type_mask: fragment.fragment_type_mask,
1211 distribution_type: fragment.distribution_type,
1212 state_table_ids: fragment.state_table_ids,
1213 vnode_count: fragment.vnode_count,
1214 parallelism_override: fragment.parallelism,
1215 stream_node: Some(fragment.stream_node),
1216 })
1217 .collect_vec();
1218
1219 self.build_fragment_distributions(&txn, rows).await
1220 }
1221
1222 pub async fn list_fragment_descs_without_node(
1223 &self,
1224 is_creating: bool,
1225 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1226 let inner = self.inner.read().await;
1227 let txn = inner.db.begin().await?;
1228
1229 let fragments_query = Self::build_fragment_query(is_creating);
1230 #[expect(clippy::type_complexity)]
1231 let fragments: Vec<(
1232 FragmentId,
1233 JobId,
1234 i32,
1235 DistributionType,
1236 TableIdArray,
1237 i32,
1238 Option<StreamingParallelism>,
1239 )> = fragments_query
1240 .select_only()
1241 .columns([
1242 fragment::Column::FragmentId,
1243 fragment::Column::JobId,
1244 fragment::Column::FragmentTypeMask,
1245 fragment::Column::DistributionType,
1246 fragment::Column::StateTableIds,
1247 fragment::Column::VnodeCount,
1248 fragment::Column::Parallelism,
1249 ])
1250 .into_tuple()
1251 .all(&txn)
1252 .await?;
1253
1254 let rows = fragments
1255 .into_iter()
1256 .map(
1257 |(
1258 fragment_id,
1259 job_id,
1260 fragment_type_mask,
1261 distribution_type,
1262 state_table_ids,
1263 vnode_count,
1264 parallelism_override,
1265 )| FragmentDescRow {
1266 fragment_id,
1267 job_id,
1268 fragment_type_mask,
1269 distribution_type,
1270 state_table_ids,
1271 vnode_count,
1272 parallelism_override,
1273 stream_node: None,
1274 },
1275 )
1276 .collect_vec();
1277
1278 self.build_fragment_distributions(&txn, rows).await
1279 }
1280
1281 fn build_fragment_query(is_creating: bool) -> sea_orm::Select<fragment::Entity> {
1282 if is_creating {
1283 FragmentModel::find()
1284 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1285 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1286 .filter(
1287 streaming_job::Column::JobStatus
1288 .eq(JobStatus::Initial)
1289 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1290 )
1291 } else {
1292 FragmentModel::find()
1293 }
1294 }
1295
1296 async fn build_fragment_distributions(
1297 &self,
1298 txn: &DatabaseTransaction,
1299 rows: Vec<FragmentDescRow>,
1300 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1301 let fragment_ids = rows.iter().map(|row| row.fragment_id).collect_vec();
1302 let job_ids = rows.iter().map(|row| row.job_id).unique().collect_vec();
1303
1304 let job_parallelisms: HashMap<JobId, (StreamingParallelism, Option<String>)> =
1305 if fragment_ids.is_empty() {
1306 HashMap::new()
1307 } else {
1308 StreamingJob::find()
1309 .select_only()
1310 .columns([
1311 streaming_job::Column::JobId,
1312 streaming_job::Column::Parallelism,
1313 streaming_job::Column::AdaptiveParallelismStrategy,
1314 ])
1315 .filter(streaming_job::Column::JobId.is_in(job_ids))
1316 .into_tuple::<(JobId, StreamingParallelism, Option<String>)>()
1317 .all(txn)
1318 .await?
1319 .into_iter()
1320 .map(|(job_id, parallelism, strategy)| (job_id, (parallelism, strategy)))
1321 .collect()
1322 };
1323
1324 let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1325 if fragment_ids.is_empty() {
1326 Vec::new()
1327 } else {
1328 FragmentRelation::find()
1329 .select_only()
1330 .columns([
1331 fragment_relation::Column::TargetFragmentId,
1332 fragment_relation::Column::SourceFragmentId,
1333 fragment_relation::Column::DispatcherType,
1334 ])
1335 .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1336 .into_tuple()
1337 .all(txn)
1338 .await?
1339 };
1340
1341 let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1342 for (target_id, source_id, _) in upstream_entries {
1343 all_upstreams.entry(target_id).or_default().push(source_id);
1344 }
1345
1346 let root_fragment_map = if fragment_ids.is_empty() {
1347 HashMap::new()
1348 } else {
1349 let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_ids).await?;
1350 Self::collect_root_fragment_mapping(ensembles)
1351 };
1352
1353 let guard = self.env.shared_actor_info.read_guard();
1354 let mut result = Vec::with_capacity(rows.len());
1355
1356 for row in rows {
1357 let parallelism = guard
1358 .get_fragment(row.fragment_id as _)
1359 .map(|fragment| fragment.actors.len())
1360 .unwrap_or_default();
1361
1362 let root_fragments = root_fragment_map
1363 .get(&row.fragment_id)
1364 .cloned()
1365 .unwrap_or_default();
1366
1367 let upstreams = all_upstreams.remove(&row.fragment_id).unwrap_or_default();
1368
1369 let parallelism_policy = Self::format_fragment_parallelism_policy(
1370 row.distribution_type,
1371 row.parallelism_override.as_ref(),
1372 job_parallelisms
1373 .get(&row.job_id)
1374 .map(|(parallelism, _)| parallelism),
1375 job_parallelisms
1376 .get(&row.job_id)
1377 .and_then(|(_, strategy)| strategy.as_deref()),
1378 &root_fragments,
1379 );
1380
1381 let fragment = FragmentDistribution {
1382 fragment_id: row.fragment_id,
1383 table_id: row.job_id,
1384 distribution_type: PbFragmentDistributionType::from(row.distribution_type) as _,
1385 state_table_ids: row.state_table_ids.0,
1386 upstream_fragment_ids: upstreams.clone(),
1387 fragment_type_mask: row.fragment_type_mask as _,
1388 parallelism: parallelism as _,
1389 vnode_count: row.vnode_count as _,
1390 node: row.stream_node.map(|node| node.to_protobuf()),
1391 parallelism_policy,
1392 };
1393
1394 result.push((fragment, upstreams));
1395 }
1396
1397 Ok(result)
1398 }
1399
1400 pub async fn list_sink_log_store_tables(&self) -> MetaResult<Vec<(SinkId, TableId)>> {
1401 let inner = self.inner.read().await;
1402 let txn = inner.db.begin().await?;
1403
1404 let fragments: Vec<(JobId, StreamNode)> = FragmentModel::find()
1405 .select_only()
1406 .columns([fragment::Column::JobId, fragment::Column::StreamNode])
1407 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Sink))
1408 .into_tuple()
1409 .all(&txn)
1410 .await?;
1411
1412 let mut mapping: HashMap<SinkId, TableId> = HashMap::new();
1413
1414 for (job_id, stream_node) in fragments {
1415 let sink_id = SinkId::new(job_id.as_raw_id());
1416 let stream_node = stream_node.to_protobuf();
1417 let mut internal_table_id: Option<TableId> = None;
1418
1419 visit_stream_node_body(&stream_node, |body| {
1420 if let NodeBody::Sink(node) = body
1421 && node.log_store_type == SinkLogStoreType::KvLogStore as i32
1422 && let Some(table) = &node.table
1423 {
1424 internal_table_id = Some(TableId::new(table.id.as_raw_id()));
1425 }
1426 });
1427
1428 if let Some(table_id) = internal_table_id
1429 && let Some(existing) = mapping.insert(sink_id, table_id)
1430 && existing != table_id
1431 {
1432 tracing::warn!(
1433 "sink {sink_id:?} has multiple log store tables: {existing:?} vs {table_id:?}"
1434 );
1435 }
1436 }
1437
1438 Ok(mapping.into_iter().collect())
1439 }
1440
1441 fn collect_root_fragment_mapping(
1443 ensembles: Vec<NoShuffleEnsemble>,
1444 ) -> HashMap<FragmentId, Vec<FragmentId>> {
1445 let mut mapping = HashMap::new();
1446
1447 for ensemble in ensembles {
1448 let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1449 roots.sort_unstable();
1450 roots.dedup();
1451
1452 if roots.is_empty() {
1453 continue;
1454 }
1455
1456 let root_set: HashSet<_> = roots.iter().copied().collect();
1457
1458 for fragment_id in ensemble.component_fragments() {
1459 if root_set.contains(&fragment_id) {
1460 mapping.insert(fragment_id, Vec::new());
1461 } else {
1462 mapping.insert(fragment_id, roots.clone());
1463 }
1464 }
1465 }
1466
1467 mapping
1468 }
1469
1470 fn format_fragment_parallelism_policy(
1471 distribution_type: DistributionType,
1472 fragment_parallelism: Option<&StreamingParallelism>,
1473 job_parallelism: Option<&StreamingParallelism>,
1474 job_adaptive_parallelism_strategy: Option<&str>,
1475 root_fragments: &[FragmentId],
1476 ) -> String {
1477 if distribution_type == DistributionType::Single {
1478 return "single".to_owned();
1479 }
1480
1481 if let Some(parallelism) = fragment_parallelism {
1482 return format!(
1483 "override({})",
1484 Self::format_streaming_parallelism(parallelism, job_adaptive_parallelism_strategy)
1485 );
1486 }
1487
1488 if !root_fragments.is_empty() {
1489 let mut upstreams = root_fragments.to_vec();
1490 upstreams.sort_unstable();
1491 upstreams.dedup();
1492
1493 return format!("upstream_fragment({upstreams:?})");
1494 }
1495
1496 let inherited = job_parallelism
1497 .map(|parallelism| {
1498 Self::format_streaming_parallelism(parallelism, job_adaptive_parallelism_strategy)
1499 })
1500 .unwrap_or_else(|| "unknown".to_owned());
1501 format!("inherit({inherited})")
1502 }
1503
1504 fn format_streaming_parallelism(
1505 parallelism: &StreamingParallelism,
1506 adaptive_parallelism_strategy: Option<&str>,
1507 ) -> String {
1508 match parallelism {
1509 StreamingParallelism::Adaptive => adaptive_parallelism_strategy
1510 .and_then(Self::format_adaptive_parallelism_strategy)
1511 .unwrap_or_else(|| "adaptive".to_owned()),
1512 StreamingParallelism::Fixed(n) => n.to_string(),
1513 StreamingParallelism::Custom => adaptive_parallelism_strategy
1514 .and_then(Self::format_adaptive_parallelism_strategy)
1515 .unwrap_or_else(|| "custom".to_owned()),
1516 }
1517 }
1518
1519 fn format_adaptive_parallelism_strategy(strategy: &str) -> Option<String> {
1520 parse_strategy(strategy)
1521 .ok()
1522 .map(|strategy| match strategy {
1523 AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full => {
1524 "adaptive".to_owned()
1525 }
1526 AdaptiveParallelismStrategy::Bounded(n) => format!("bounded({n})"),
1527 AdaptiveParallelismStrategy::Ratio(r) => format!("ratio({r})"),
1528 })
1529 }
1530
1531 pub async fn list_sink_actor_mapping(
1532 &self,
1533 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1534 let inner = self.inner.read().await;
1535 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1536 .select_only()
1537 .columns([sink::Column::SinkId, sink::Column::Name])
1538 .into_tuple()
1539 .all(&inner.db)
1540 .await?;
1541 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1542
1543 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1544
1545 let actor_with_type: Vec<(ActorId, SinkId)> = {
1546 let info = self.env.shared_actor_infos().read_guard();
1547
1548 info.iter_over_fragments()
1549 .filter(|(_, fragment)| {
1550 sink_ids.contains(&fragment.job_id.as_sink_id())
1551 && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1552 })
1553 .flat_map(|(_, fragment)| {
1554 fragment
1555 .actors
1556 .keys()
1557 .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1558 })
1559 .collect()
1560 };
1561
1562 let mut sink_actor_mapping = HashMap::new();
1563 for (actor_id, sink_id) in actor_with_type {
1564 sink_actor_mapping
1565 .entry(sink_id)
1566 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1567 .1
1568 .push(actor_id);
1569 }
1570
1571 Ok(sink_actor_mapping)
1572 }
1573
1574 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1575 let inner = self.inner.read().await;
1576 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1577 .select_only()
1578 .columns([
1579 fragment::Column::FragmentId,
1580 fragment::Column::JobId,
1581 fragment::Column::StateTableIds,
1582 ])
1583 .into_partial_model()
1584 .all(&inner.db)
1585 .await?;
1586 Ok(fragment_state_tables)
1587 }
1588
1589 pub async fn load_all_actors_dynamic(
1592 &self,
1593 database_id: Option<DatabaseId>,
1594 worker_nodes: &ActiveStreamingWorkerNodes,
1595 ) -> MetaResult<FragmentRenderMap> {
1596 let loaded = self.load_fragment_context(database_id).await?;
1597
1598 if loaded.is_empty() {
1599 return Ok(HashMap::new());
1600 }
1601
1602 let RenderedGraph { fragments, .. } = render_actor_assignments(
1603 self.env.actor_id_generator(),
1604 worker_nodes.current(),
1605 &loaded,
1606 )?;
1607
1608 tracing::trace!(?fragments, "reload all actors");
1609
1610 Ok(fragments)
1611 }
1612
1613 pub async fn load_fragment_context(
1615 &self,
1616 database_id: Option<DatabaseId>,
1617 ) -> MetaResult<LoadedFragmentContext> {
1618 let inner = self.inner.read().await;
1619 let txn = inner.db.begin().await?;
1620
1621 self.load_fragment_context_in_txn(&txn, database_id).await
1622 }
1623
1624 pub async fn load_fragment_context_in_txn<C>(
1625 &self,
1626 txn: &C,
1627 database_id: Option<DatabaseId>,
1628 ) -> MetaResult<LoadedFragmentContext>
1629 where
1630 C: ConnectionTrait,
1631 {
1632 let mut query = StreamingJob::find()
1633 .select_only()
1634 .column(streaming_job::Column::JobId);
1635
1636 if let Some(database_id) = database_id {
1637 query = query
1638 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1639 .filter(object::Column::DatabaseId.eq(database_id));
1640 }
1641
1642 let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
1643
1644 let jobs: HashSet<JobId> = jobs.into_iter().collect();
1645
1646 if jobs.is_empty() {
1647 return Ok(LoadedFragmentContext::default());
1648 }
1649
1650 load_fragment_context_for_jobs(txn, jobs).await
1651 }
1652
1653 #[await_tree::instrument]
1654 pub async fn fill_snapshot_backfill_epoch(
1655 &self,
1656 fragment_ids: impl Iterator<Item = FragmentId>,
1657 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1658 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1659 ) -> MetaResult<()> {
1660 let inner = self.inner.write().await;
1661 let txn = inner.db.begin().await?;
1662 for fragment_id in fragment_ids {
1663 let fragment = FragmentModel::find_by_id(fragment_id)
1664 .one(&txn)
1665 .await?
1666 .context(format!("fragment {} not found", fragment_id))?;
1667 let mut node = fragment.stream_node.to_protobuf();
1668 if crate::stream::fill_snapshot_backfill_epoch(
1669 &mut node,
1670 snapshot_backfill_info,
1671 cross_db_snapshot_backfill_info,
1672 )? {
1673 let node = StreamNode::from(&node);
1674 FragmentModel::update(fragment::ActiveModel {
1675 fragment_id: Set(fragment_id),
1676 stream_node: Set(node),
1677 ..Default::default()
1678 })
1679 .exec(&txn)
1680 .await?;
1681 }
1682 }
1683 txn.commit().await?;
1684 Ok(())
1685 }
1686
1687 pub fn get_running_actors_of_fragment(
1689 &self,
1690 fragment_id: FragmentId,
1691 ) -> MetaResult<HashSet<model::ActorId>> {
1692 let info = self.env.shared_actor_infos().read_guard();
1693
1694 let actors = info
1695 .get_fragment(fragment_id as _)
1696 .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1697 .unwrap_or_default();
1698
1699 Ok(actors)
1700 }
1701
1702 pub async fn get_running_actors_for_source_backfill(
1705 &self,
1706 source_backfill_fragment_id: FragmentId,
1707 source_fragment_id: FragmentId,
1708 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1709 let inner = self.inner.read().await;
1710 let txn = inner.db.begin().await?;
1711
1712 let fragment_relation: DispatcherType = FragmentRelation::find()
1713 .select_only()
1714 .column(fragment_relation::Column::DispatcherType)
1715 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1716 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1717 .into_tuple()
1718 .one(&txn)
1719 .await?
1720 .ok_or_else(|| {
1721 anyhow!(
1722 "no fragment connection from source fragment {} to source backfill fragment {}",
1723 source_fragment_id,
1724 source_backfill_fragment_id
1725 )
1726 })?;
1727
1728 if fragment_relation != DispatcherType::NoShuffle {
1729 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1730 }
1731
1732 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1733 let result: MetaResult<DistributionType> = try {
1734 FragmentModel::find_by_id(fragment_id)
1735 .select_only()
1736 .column(fragment::Column::DistributionType)
1737 .into_tuple()
1738 .one(txn)
1739 .await?
1740 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1741 };
1742 result
1743 };
1744
1745 let source_backfill_distribution_type =
1746 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1747 let source_distribution_type =
1748 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1749
1750 let load_fragment_actor_distribution =
1751 |actor_info: &SharedActorInfos,
1752 fragment_id: FragmentId|
1753 -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1754 let guard = actor_info.read_guard();
1755
1756 guard
1757 .get_fragment(fragment_id as _)
1758 .map(|fragment| {
1759 fragment
1760 .actors
1761 .iter()
1762 .map(|(actor_id, actor)| {
1763 (
1764 *actor_id as _,
1765 actor
1766 .vnode_bitmap
1767 .as_ref()
1768 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1769 )
1770 })
1771 .collect()
1772 })
1773 .unwrap_or_default()
1774 };
1775
1776 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1777 load_fragment_actor_distribution(
1778 self.env.shared_actor_infos(),
1779 source_backfill_fragment_id,
1780 );
1781
1782 let source_actors =
1783 load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1784
1785 Ok(resolve_no_shuffle_actor_mapping(
1786 source_distribution_type,
1787 source_actors.iter().map(|(&id, bitmap)| (id, bitmap)),
1788 source_backfill_distribution_type,
1789 source_backfill_actors
1790 .iter()
1791 .map(|(&id, bitmap)| (id, bitmap)),
1792 )
1793 .into_iter()
1794 .map(|(source_actor, source_backfill_actor)| {
1795 (source_backfill_actor as _, source_actor as _)
1796 })
1797 .collect())
1798 }
1799
1800 pub async fn get_root_fragments(
1813 &self,
1814 job_ids: Vec<JobId>,
1815 ) -> MetaResult<HashMap<JobId, Fragment>> {
1816 let inner = self.inner.read().await;
1817
1818 let all_fragments = FragmentModel::find()
1819 .filter(fragment::Column::JobId.is_in(job_ids))
1820 .all(&inner.db)
1821 .await?;
1822 let mut root_fragments = HashMap::<JobId, Fragment>::new();
1824 for fragment in all_fragments {
1825 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1826 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1827 _ = root_fragments.insert(fragment.job_id, fragment.into());
1828 } else if mask.contains(FragmentTypeFlag::Source) {
1829 _ = root_fragments.try_insert(fragment.job_id, fragment.into());
1832 }
1833 }
1834
1835 Ok(root_fragments)
1836 }
1837
1838 pub async fn get_root_fragment(&self, job_id: JobId) -> MetaResult<Fragment> {
1839 let mut root_fragments = self.get_root_fragments(vec![job_id]).await?;
1840 let root_fragment = root_fragments
1841 .remove(&job_id)
1842 .context(format!("root fragment for job {} not found", job_id))?;
1843
1844 Ok(root_fragment)
1845 }
1846
1847 pub async fn get_downstream_fragments(
1849 &self,
1850 job_id: JobId,
1851 ) -> MetaResult<Vec<(stream_plan::DispatcherType, Fragment)>> {
1852 let root_fragment = self.get_root_fragment(job_id).await?;
1853
1854 let inner = self.inner.read().await;
1855 let txn = inner.db.begin().await?;
1856 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1857 .filter(
1858 fragment_relation::Column::SourceFragmentId
1859 .eq(root_fragment.fragment_id as FragmentId),
1860 )
1861 .all(&txn)
1862 .await?;
1863
1864 let downstream_fragment_ids = downstream_fragment_relations
1865 .iter()
1866 .map(|model| model.target_fragment_id as FragmentId)
1867 .collect::<HashSet<_>>();
1868
1869 let downstream_fragments: Vec<fragment::Model> = FragmentModel::find()
1870 .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1871 .all(&txn)
1872 .await?;
1873
1874 let mut downstream_fragments_map: HashMap<_, _> = downstream_fragments
1875 .into_iter()
1876 .map(|fragment| (fragment.fragment_id, fragment))
1877 .collect();
1878
1879 let mut downstream_fragments = vec![];
1880
1881 let fragment_map: HashMap<_, _> = downstream_fragment_relations
1882 .iter()
1883 .map(|model| (model.target_fragment_id, model.dispatcher_type))
1884 .collect();
1885
1886 for (fragment_id, dispatcher_type) in fragment_map {
1887 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1888
1889 let fragment = downstream_fragments_map
1890 .remove(&fragment_id)
1891 .context(format!(
1892 "downstream fragment node for id {} not found",
1893 fragment_id
1894 ))?
1895 .into();
1896
1897 downstream_fragments.push((dispatch_type, fragment));
1898 }
1899 Ok(downstream_fragments)
1900 }
1901
1902 pub async fn load_source_fragment_ids(
1903 &self,
1904 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1905 let inner = self.inner.read().await;
1906 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1907 .select_only()
1908 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1909 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1910 .into_tuple()
1911 .all(&inner.db)
1912 .await?;
1913
1914 let mut source_fragment_ids = HashMap::new();
1915 for (fragment_id, stream_node) in fragments {
1916 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1917 source_fragment_ids
1918 .entry(source_id)
1919 .or_insert_with(BTreeSet::new)
1920 .insert(fragment_id);
1921 }
1922 }
1923 Ok(source_fragment_ids)
1924 }
1925
1926 pub async fn load_backfill_fragment_ids(
1927 &self,
1928 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1929 let inner = self.inner.read().await;
1930 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1931 .select_only()
1932 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1933 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1934 .into_tuple()
1935 .all(&inner.db)
1936 .await?;
1937
1938 let mut source_fragment_ids = HashMap::new();
1939 for (fragment_id, stream_node) in fragments {
1940 if let Some((source_id, upstream_source_fragment_id)) =
1941 stream_node.to_protobuf().find_source_backfill()
1942 {
1943 source_fragment_ids
1944 .entry(source_id)
1945 .or_insert_with(BTreeSet::new)
1946 .insert((fragment_id, upstream_source_fragment_id));
1947 }
1948 }
1949 Ok(source_fragment_ids)
1950 }
1951
1952 pub async fn get_all_upstream_sink_infos(
1953 &self,
1954 target_table: &PbTable,
1955 target_fragment_id: FragmentId,
1956 ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1957 let inner = self.inner.read().await;
1958 let txn = inner.db.begin().await?;
1959
1960 self.get_all_upstream_sink_infos_in_txn(&txn, target_table, target_fragment_id)
1961 .await
1962 }
1963
1964 pub async fn get_all_upstream_sink_infos_in_txn<C>(
1965 &self,
1966 txn: &C,
1967 target_table: &PbTable,
1968 target_fragment_id: FragmentId,
1969 ) -> MetaResult<Vec<UpstreamSinkInfo>>
1970 where
1971 C: ConnectionTrait,
1972 {
1973 let incoming_sinks = Sink::find()
1974 .filter(sink::Column::TargetTable.eq(target_table.id))
1975 .all(txn)
1976 .await?;
1977
1978 let sink_ids = incoming_sinks.iter().map(|s| s.sink_id).collect_vec();
1979 let sink_fragment_ids = get_sink_fragment_by_ids(txn, sink_ids).await?;
1980
1981 let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1982 for sink in &incoming_sinks {
1983 let sink_fragment_id =
1984 sink_fragment_ids
1985 .get(&sink.sink_id)
1986 .cloned()
1987 .ok_or(anyhow::anyhow!(
1988 "sink fragment not found for sink id {}",
1989 sink.sink_id
1990 ))?;
1991 let upstream_info = build_upstream_sink_info(
1992 sink.sink_id,
1993 sink.original_target_columns
1994 .as_ref()
1995 .map(|cols| cols.to_protobuf())
1996 .unwrap_or_default(),
1997 sink_fragment_id,
1998 target_table,
1999 target_fragment_id,
2000 )?;
2001 upstream_sink_infos.push(upstream_info);
2002 }
2003
2004 Ok(upstream_sink_infos)
2005 }
2006
2007 pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
2008 let inner = self.inner.read().await;
2009 let txn = inner.db.begin().await?;
2010
2011 let mview_fragment: Vec<FragmentId> = FragmentModel::find()
2012 .select_only()
2013 .column(fragment::Column::FragmentId)
2014 .filter(
2015 fragment::Column::JobId
2016 .eq(job_id)
2017 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2018 )
2019 .into_tuple()
2020 .all(&txn)
2021 .await?;
2022
2023 if mview_fragment.len() != 1 {
2024 return Err(anyhow::anyhow!(
2025 "expected exactly one mview fragment for job {}, found {}",
2026 job_id,
2027 mview_fragment.len()
2028 )
2029 .into());
2030 }
2031
2032 Ok(mview_fragment.into_iter().next().unwrap())
2033 }
2034
2035 pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
2036 let inner = self.inner.read().await;
2037 let txn = inner.db.begin().await?;
2038 has_table_been_migrated(&txn, table_id).await
2039 }
2040
2041 pub async fn update_fragment_splits<C>(
2042 &self,
2043 txn: &C,
2044 fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
2045 ) -> MetaResult<()>
2046 where
2047 C: ConnectionTrait,
2048 {
2049 if fragment_splits.is_empty() {
2050 return Ok(());
2051 }
2052
2053 let existing_fragment_ids: HashSet<FragmentId> = FragmentModel::find()
2054 .select_only()
2055 .column(fragment::Column::FragmentId)
2056 .filter(fragment::Column::FragmentId.is_in(fragment_splits.keys().copied()))
2057 .into_tuple()
2058 .all(txn)
2059 .await?
2060 .into_iter()
2061 .collect();
2062
2063 let (models, skipped_fragment_ids): (Vec<_>, Vec<_>) = fragment_splits
2065 .iter()
2066 .partition_map(|(fragment_id, splits)| {
2067 if existing_fragment_ids.contains(fragment_id) {
2068 Either::Left(fragment_splits::ActiveModel {
2069 fragment_id: Set(*fragment_id as _),
2070 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
2071 splits: splits.iter().map(Into::into).collect_vec(),
2072 }))),
2073 })
2074 } else {
2075 Either::Right(*fragment_id)
2076 }
2077 });
2078
2079 if !skipped_fragment_ids.is_empty() {
2080 tracing::warn!(
2081 skipped_fragment_ids = ?skipped_fragment_ids,
2082 total_fragment_ids = fragment_splits.len(),
2083 "skipping stale fragment split updates for missing fragments"
2084 );
2085 }
2086
2087 if models.is_empty() {
2088 return Ok(());
2089 }
2090
2091 FragmentSplits::insert_many(models)
2092 .on_conflict(
2093 OnConflict::column(fragment_splits::Column::FragmentId)
2094 .update_column(fragment_splits::Column::Splits)
2095 .to_owned(),
2096 )
2097 .exec(txn)
2098 .await?;
2099
2100 Ok(())
2101 }
2102}
2103
2104#[cfg(test)]
2105mod tests {
2106 use std::collections::{BTreeMap, HashMap, HashSet};
2107
2108 use itertools::Itertools;
2109 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
2110 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
2111 use risingwave_common::id::JobId;
2112 use risingwave_common::util::iter_util::ZipEqDebug;
2113 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
2114 use risingwave_meta_model::fragment::DistributionType;
2115 use risingwave_meta_model::*;
2116 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
2117 use risingwave_pb::plan_common::PbExprContext;
2118 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
2119 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
2120 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
2121
2122 use super::ActorInfo;
2123 use crate::MetaResult;
2124 use crate::controller::catalog::CatalogController;
2125 use crate::model::{Fragment, StreamActor};
2126
2127 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
2128
2129 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
2130
2131 const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
2132
2133 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
2134
2135 const TEST_JOB_ID: JobId = JobId::new(1);
2136
2137 const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
2138
2139 fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
2140 let mut upstream_actor_ids = BTreeMap::new();
2141 upstream_actor_ids.insert(
2142 TEST_UPSTREAM_FRAGMENT_ID,
2143 HashSet::from_iter([(actor_id + 100)]),
2144 );
2145 upstream_actor_ids.insert(
2146 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
2147 HashSet::from_iter([(actor_id + 200)]),
2148 );
2149 upstream_actor_ids
2150 }
2151
2152 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
2153 let mut input = vec![];
2154 for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
2155 input.push(PbStreamNode {
2156 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
2157 upstream_fragment_id,
2158 ..Default::default()
2159 }))),
2160 ..Default::default()
2161 });
2162 }
2163
2164 PbStreamNode {
2165 input,
2166 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
2167 ..Default::default()
2168 }
2169 }
2170
2171 #[tokio::test]
2172 async fn test_extract_fragment() -> MetaResult<()> {
2173 let actor_count = 3u32;
2174 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2175 .map(|actor_id| {
2176 (
2177 actor_id.into(),
2178 generate_upstream_actor_ids_for_actor(actor_id.into()),
2179 )
2180 })
2181 .collect();
2182
2183 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
2184
2185 let pb_fragment = Fragment {
2186 fragment_id: TEST_FRAGMENT_ID as _,
2187 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
2188 distribution_type: PbFragmentDistributionType::Hash as _,
2189 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
2190 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
2191 nodes: stream_node,
2192 };
2193
2194 let fragment =
2195 CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
2196
2197 check_fragment(fragment, pb_fragment);
2198
2199 Ok(())
2200 }
2201
2202 #[tokio::test]
2203 async fn test_compose_fragment() -> MetaResult<()> {
2204 let actor_count = 3u32;
2205
2206 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2207 .map(|actor_id| {
2208 (
2209 actor_id.into(),
2210 generate_upstream_actor_ids_for_actor(actor_id.into()),
2211 )
2212 })
2213 .collect();
2214
2215 let mut actor_bitmaps = ActorMapping::new_uniform(
2216 (0..actor_count).map(|i| i.into()),
2217 VirtualNode::COUNT_FOR_TEST,
2218 )
2219 .to_bitmaps();
2220
2221 let actors = (0..actor_count)
2222 .map(|actor_id| {
2223 let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
2224 splits: vec![PbConnectorSplit {
2225 split_type: "dummy".to_owned(),
2226 ..Default::default()
2227 }],
2228 });
2229
2230 ActorInfo {
2231 actor_id: actor_id.into(),
2232 fragment_id: TEST_FRAGMENT_ID,
2233 splits: actor_splits,
2234 worker_id: 0.into(),
2235 vnode_bitmap: actor_bitmaps
2236 .remove(&actor_id)
2237 .map(|bitmap| bitmap.to_protobuf())
2238 .as_ref()
2239 .map(VnodeBitmap::from),
2240 expr_context: ExprContext::from(&PbExprContext {
2241 time_zone: String::from("America/New_York"),
2242 strict_mode: false,
2243 }),
2244 config_override: "a.b.c = true".into(),
2245 }
2246 })
2247 .collect_vec();
2248
2249 let stream_node = {
2250 let template_actor = actors.first().cloned().unwrap();
2251
2252 let template_upstream_actor_ids =
2253 upstream_actor_ids.get(&template_actor.actor_id).unwrap();
2254
2255 generate_merger_stream_node(template_upstream_actor_ids)
2256 };
2257
2258 #[expect(deprecated)]
2259 let fragment = fragment::Model {
2260 fragment_id: TEST_FRAGMENT_ID,
2261 job_id: TEST_JOB_ID,
2262 fragment_type_mask: 0,
2263 distribution_type: DistributionType::Hash,
2264 stream_node: StreamNode::from(&stream_node),
2265 state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
2266 upstream_fragment_id: Default::default(),
2267 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2268 parallelism: None,
2269 };
2270
2271 let (pb_fragment, pb_actors, pb_actor_status, pb_actor_splits) =
2272 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2273
2274 assert_eq!(pb_actor_status.len(), actor_count as usize);
2275 assert!(
2276 pb_actor_status
2277 .values()
2278 .all(|actor_status| actor_status.location.is_some())
2279 );
2280 assert_eq!(pb_actor_splits.len(), actor_count as usize);
2281
2282 check_fragment(fragment, pb_fragment);
2283 check_actors(
2284 actors,
2285 &upstream_actor_ids,
2286 pb_actors,
2287 pb_actor_splits,
2288 &stream_node,
2289 );
2290
2291 Ok(())
2292 }
2293
2294 fn check_actors(
2295 actors: Vec<ActorInfo>,
2296 actor_upstreams: &FragmentActorUpstreams,
2297 pb_actors: Vec<StreamActor>,
2298 pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2299 stream_node: &PbStreamNode,
2300 ) {
2301 for (
2302 ActorInfo {
2303 actor_id,
2304 fragment_id,
2305 splits,
2306 worker_id: _,
2307 vnode_bitmap,
2308 expr_context,
2309 ..
2310 },
2311 StreamActor {
2312 actor_id: pb_actor_id,
2313 fragment_id: pb_fragment_id,
2314 vnode_bitmap: pb_vnode_bitmap,
2315 mview_definition,
2316 expr_context: pb_expr_context,
2317 ..
2318 },
2319 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2320 {
2321 assert_eq!(actor_id, pb_actor_id as ActorId);
2322 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2323
2324 assert_eq!(
2325 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2326 pb_vnode_bitmap,
2327 );
2328
2329 assert_eq!(mview_definition, "");
2330
2331 visit_stream_node_body(stream_node, |body| {
2332 if let PbNodeBody::Merge(m) = body {
2333 assert!(
2334 actor_upstreams
2335 .get(&actor_id)
2336 .unwrap()
2337 .contains_key(&m.upstream_fragment_id)
2338 );
2339 }
2340 });
2341
2342 assert_eq!(
2343 splits,
2344 pb_actor_splits
2345 .get(&pb_actor_id)
2346 .map(ConnectorSplits::from)
2347 .unwrap_or_default()
2348 );
2349
2350 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2351 }
2352 }
2353
2354 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2355 let Fragment {
2356 fragment_id,
2357 fragment_type_mask,
2358 distribution_type: pb_distribution_type,
2359 state_table_ids: pb_state_table_ids,
2360 maybe_vnode_count: _,
2361 nodes,
2362 } = pb_fragment;
2363
2364 assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2365 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2366 assert_eq!(
2367 pb_distribution_type,
2368 PbFragmentDistributionType::from(fragment.distribution_type)
2369 );
2370
2371 assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2372 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2373 }
2374
2375 #[test]
2376 fn test_parallelism_policy_with_root_fragments() {
2377 #[expect(deprecated)]
2378 let fragment = fragment::Model {
2379 fragment_id: 3.into(),
2380 job_id: TEST_JOB_ID,
2381 fragment_type_mask: 0,
2382 distribution_type: DistributionType::Hash,
2383 stream_node: StreamNode::from(&PbStreamNode::default()),
2384 state_table_ids: TableIdArray::default(),
2385 upstream_fragment_id: Default::default(),
2386 vnode_count: 0,
2387 parallelism: None,
2388 };
2389
2390 let job_parallelism = StreamingParallelism::Fixed(4);
2391
2392 let policy = super::CatalogController::format_fragment_parallelism_policy(
2393 fragment.distribution_type,
2394 fragment.parallelism.as_ref(),
2395 Some(&job_parallelism),
2396 None,
2397 &[],
2398 );
2399
2400 assert_eq!(policy, "inherit(4)");
2401 }
2402
2403 #[test]
2404 fn test_parallelism_policy_with_adaptive_strategy() {
2405 #[expect(deprecated)]
2406 let fragment = fragment::Model {
2407 fragment_id: 4.into(),
2408 job_id: TEST_JOB_ID,
2409 fragment_type_mask: 0,
2410 distribution_type: DistributionType::Hash,
2411 stream_node: StreamNode::from(&PbStreamNode::default()),
2412 state_table_ids: TableIdArray::default(),
2413 upstream_fragment_id: Default::default(),
2414 vnode_count: 0,
2415 parallelism: None,
2416 };
2417
2418 let job_parallelism = StreamingParallelism::Adaptive;
2419
2420 let policy = super::CatalogController::format_fragment_parallelism_policy(
2421 fragment.distribution_type,
2422 fragment.parallelism.as_ref(),
2423 Some(&job_parallelism),
2424 Some("RATIO(0.5)"),
2425 &[],
2426 );
2427
2428 assert_eq!(policy, "inherit(ratio(0.5))");
2429 }
2430
2431 #[test]
2432 fn test_parallelism_policy_with_custom_strategy() {
2433 #[expect(deprecated)]
2434 let fragment = fragment::Model {
2435 fragment_id: 6.into(),
2436 job_id: TEST_JOB_ID,
2437 fragment_type_mask: 0,
2438 distribution_type: DistributionType::Hash,
2439 stream_node: StreamNode::from(&PbStreamNode::default()),
2440 state_table_ids: TableIdArray::default(),
2441 upstream_fragment_id: Default::default(),
2442 vnode_count: 0,
2443 parallelism: None,
2444 };
2445
2446 let job_parallelism = StreamingParallelism::Custom;
2447
2448 let policy = super::CatalogController::format_fragment_parallelism_policy(
2449 fragment.distribution_type,
2450 fragment.parallelism.as_ref(),
2451 Some(&job_parallelism),
2452 Some("BOUNDED(8)"),
2453 &[],
2454 );
2455
2456 assert_eq!(policy, "inherit(bounded(8))");
2457 }
2458
2459 #[test]
2460 fn test_parallelism_policy_with_invalid_adaptive_strategy_falls_back() {
2461 #[expect(deprecated)]
2462 let fragment = fragment::Model {
2463 fragment_id: 7.into(),
2464 job_id: TEST_JOB_ID,
2465 fragment_type_mask: 0,
2466 distribution_type: DistributionType::Hash,
2467 stream_node: StreamNode::from(&PbStreamNode::default()),
2468 state_table_ids: TableIdArray::default(),
2469 upstream_fragment_id: Default::default(),
2470 vnode_count: 0,
2471 parallelism: None,
2472 };
2473
2474 let job_parallelism = StreamingParallelism::Adaptive;
2475
2476 let policy = super::CatalogController::format_fragment_parallelism_policy(
2477 fragment.distribution_type,
2478 fragment.parallelism.as_ref(),
2479 Some(&job_parallelism),
2480 Some("NOT_A_STRATEGY"),
2481 &[],
2482 );
2483
2484 assert_eq!(policy, "inherit(adaptive)");
2485 }
2486
2487 #[test]
2488 fn test_parallelism_policy_with_upstream_roots() {
2489 #[expect(deprecated)]
2490 let fragment = fragment::Model {
2491 fragment_id: 5.into(),
2492 job_id: TEST_JOB_ID,
2493 fragment_type_mask: 0,
2494 distribution_type: DistributionType::Hash,
2495 stream_node: StreamNode::from(&PbStreamNode::default()),
2496 state_table_ids: TableIdArray::default(),
2497 upstream_fragment_id: Default::default(),
2498 vnode_count: 0,
2499 parallelism: None,
2500 };
2501
2502 let policy = super::CatalogController::format_fragment_parallelism_policy(
2503 fragment.distribution_type,
2504 fragment.parallelism.as_ref(),
2505 None,
2506 None,
2507 &[3.into(), 1.into(), 2.into(), 1.into()],
2508 );
2509
2510 assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2511 }
2512}