1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::{Either, Itertools};
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::CdcScanOptions;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34 Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39 TableIdArray, VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits,
40 object, sink, source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46 Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::fragment::{
49 FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan;
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57 PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, SinkLogStoreType,
58 StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63 ColumnTrait, ConnectionTrait, DatabaseTransaction, EntityTrait, FromQueryResult, JoinType,
64 PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, StreamTrait, TransactionTrait,
65};
66use serde::{Deserialize, Serialize};
67
68use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
69use crate::controller::catalog::CatalogController;
70use crate::controller::scale::{
71 FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble, RenderedGraph,
72 find_fragment_no_shuffle_dags_detailed, load_fragment_context_for_jobs,
73 render_actor_assignments, resolve_streaming_job_definition,
74};
75use crate::controller::utils::{
76 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
77 get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
78 resolve_no_shuffle_actor_dispatcher,
79};
80use crate::error::MetaError;
81use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
82use crate::model::{
83 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
84 StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
85 TableParallelism,
86};
87use crate::rpc::ddl_controller::build_upstream_sink_info;
88use crate::stream::UpstreamSinkInfo;
89use crate::{MetaResult, model};
90
91#[derive(Debug)]
93pub struct InflightActorInfo {
94 pub worker_id: WorkerId,
95 pub vnode_bitmap: Option<Bitmap>,
96 pub splits: Vec<SplitImpl>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100struct ActorInfo {
101 pub actor_id: ActorId,
102 pub fragment_id: FragmentId,
103 pub splits: ConnectorSplits,
104 pub worker_id: WorkerId,
105 pub vnode_bitmap: Option<VnodeBitmap>,
106 pub expr_context: ExprContext,
107 pub config_override: Arc<str>,
108}
109
110#[derive(Debug)]
111struct FragmentDescRow {
112 fragment_id: FragmentId,
113 job_id: JobId,
114 fragment_type_mask: i32,
115 distribution_type: DistributionType,
116 state_table_ids: TableIdArray,
117 vnode_count: i32,
118 parallelism_override: Option<StreamingParallelism>,
119 stream_node: Option<StreamNode>,
120}
121
122#[derive(Debug)]
123pub struct InflightFragmentInfo {
124 pub fragment_id: FragmentId,
125 pub distribution_type: DistributionType,
126 pub fragment_type_mask: FragmentTypeMask,
127 pub vnode_count: usize,
128 pub nodes: PbStreamNode,
129 pub actors: HashMap<ActorId, InflightActorInfo>,
130 pub state_table_ids: HashSet<TableId>,
131}
132
133#[derive(Clone, Debug)]
134pub struct FragmentParallelismInfo {
135 pub distribution_type: FragmentDistributionType,
136 pub actor_count: usize,
137 pub vnode_count: usize,
138}
139
140#[easy_ext::ext(FragmentTypeMaskExt)]
141pub impl FragmentTypeMask {
142 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
143 Expr::col(fragment::Column::FragmentTypeMask)
144 .bit_and(Expr::value(flag as i32))
145 .ne(0)
146 }
147
148 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
149 Expr::col(fragment::Column::FragmentTypeMask)
150 .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
151 .ne(0)
152 }
153
154 fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
155 Expr::col(fragment::Column::FragmentTypeMask)
156 .bit_and(Expr::value(flag as i32))
157 .eq(0)
158 }
159}
160
161#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
162#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
164 pub job_id: JobId,
165 pub obj_type: ObjectType,
166 pub name: String,
167 pub job_status: JobStatus,
168 pub parallelism: StreamingParallelism,
169 pub max_parallelism: i32,
170 pub resource_group: String,
171 pub config_override: String,
172 pub database_id: DatabaseId,
173 pub schema_id: SchemaId,
174}
175
176impl NotificationManager {
177 pub(crate) fn notify_fragment_mapping(
178 &self,
179 operation: NotificationOperation,
180 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
181 ) {
182 let fragment_ids = fragment_mappings
183 .iter()
184 .map(|mapping| mapping.fragment_id)
185 .collect_vec();
186 if fragment_ids.is_empty() {
187 return;
188 }
189 for fragment_mapping in fragment_mappings {
191 self.notify_frontend_without_version(
192 operation,
193 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
194 );
195 }
196
197 match operation {
199 NotificationOperation::Add | NotificationOperation::Update => {
200 self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
201 fragment_ids,
202 ));
203 }
204 NotificationOperation::Delete => {
205 self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
206 fragment_ids,
207 ));
208 }
209 op => {
210 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
211 }
212 }
213 }
214}
215
216impl CatalogController {
217 pub fn prepare_fragment_models_from_fragments(
218 job_id: JobId,
219 fragments: impl Iterator<Item = &Fragment>,
220 ) -> MetaResult<Vec<fragment::Model>> {
221 fragments
222 .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
223 .try_collect()
224 }
225
226 pub fn prepare_fragment_model_for_new_job(
227 job_id: JobId,
228 fragment: &Fragment,
229 ) -> MetaResult<fragment::Model> {
230 let vnode_count = fragment.vnode_count();
231 let Fragment {
232 fragment_id: pb_fragment_id,
233 fragment_type_mask: pb_fragment_type_mask,
234 distribution_type: pb_distribution_type,
235 actors: pb_actors,
236 state_table_ids: pb_state_table_ids,
237 nodes,
238 ..
239 } = fragment;
240
241 let state_table_ids = pb_state_table_ids.clone().into();
242
243 assert!(!pb_actors.is_empty());
244
245 let fragment_parallelism = nodes
246 .node_body
247 .as_ref()
248 .and_then(|body| match body {
249 NodeBody::StreamCdcScan(node) => Some(node),
250 _ => None,
251 })
252 .and_then(|node| node.options.as_ref())
253 .map(CdcScanOptions::from_proto)
254 .filter(|opts| opts.is_parallelized_backfill())
255 .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
256
257 let stream_node = StreamNode::from(nodes);
258
259 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
260 .unwrap()
261 .into();
262
263 #[expect(deprecated)]
264 let fragment = fragment::Model {
265 fragment_id: *pb_fragment_id as _,
266 job_id,
267 fragment_type_mask: (*pb_fragment_type_mask).into(),
268 distribution_type,
269 stream_node,
270 state_table_ids,
271 upstream_fragment_id: Default::default(),
272 vnode_count: vnode_count as _,
273 parallelism: fragment_parallelism,
274 };
275
276 Ok(fragment)
277 }
278
279 fn compose_table_fragments(
280 job_id: JobId,
281 state: PbState,
282 ctx: StreamContext,
283 fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
284 parallelism: StreamingParallelism,
285 max_parallelism: usize,
286 job_definition: Option<String>,
287 ) -> MetaResult<StreamJobFragments> {
288 let mut pb_fragments = BTreeMap::new();
289 let mut pb_actor_status = BTreeMap::new();
290
291 for (fragment, actors) in fragments {
292 let (fragment, fragment_actor_status, _) =
293 Self::compose_fragment(fragment, actors, job_definition.clone())?;
294
295 pb_fragments.insert(fragment.fragment_id, fragment);
296 pb_actor_status.extend(fragment_actor_status.into_iter());
297 }
298
299 let table_fragments = StreamJobFragments {
300 stream_job_id: job_id,
301 state: state as _,
302 fragments: pb_fragments,
303 actor_status: pb_actor_status,
304 ctx,
305 assigned_parallelism: match parallelism {
306 StreamingParallelism::Custom => TableParallelism::Custom,
307 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
308 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
309 },
310 max_parallelism,
311 };
312
313 Ok(table_fragments)
314 }
315
316 #[allow(clippy::type_complexity)]
317 fn compose_fragment(
318 fragment: fragment::Model,
319 actors: Vec<ActorInfo>,
320 job_definition: Option<String>,
321 ) -> MetaResult<(
322 Fragment,
323 HashMap<crate::model::ActorId, PbActorStatus>,
324 HashMap<crate::model::ActorId, PbConnectorSplits>,
325 )> {
326 let fragment::Model {
327 fragment_id,
328 fragment_type_mask,
329 distribution_type,
330 stream_node,
331 state_table_ids,
332 vnode_count,
333 ..
334 } = fragment;
335
336 let stream_node = stream_node.to_protobuf();
337 let mut upstream_fragments = HashSet::new();
338 visit_stream_node_body(&stream_node, |body| {
339 if let NodeBody::Merge(m) = body {
340 assert!(
341 upstream_fragments.insert(m.upstream_fragment_id),
342 "non-duplicate upstream fragment"
343 );
344 }
345 });
346
347 let mut pb_actors = vec![];
348
349 let mut pb_actor_status = HashMap::new();
350 let mut pb_actor_splits = HashMap::new();
351
352 for actor in actors {
353 if actor.fragment_id != fragment_id {
354 bail!(
355 "fragment id {} from actor {} is different from fragment {}",
356 actor.fragment_id,
357 actor.actor_id,
358 fragment_id
359 )
360 }
361
362 let ActorInfo {
363 actor_id,
364 fragment_id,
365 worker_id,
366 splits,
367 vnode_bitmap,
368 expr_context,
369 config_override,
370 ..
371 } = actor;
372
373 let vnode_bitmap =
374 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
375 let pb_expr_context = Some(expr_context.to_protobuf());
376
377 pb_actor_status.insert(
378 actor_id as _,
379 PbActorStatus {
380 location: PbActorLocation::from_worker(worker_id),
381 },
382 );
383
384 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
385
386 pb_actors.push(StreamActor {
387 actor_id: actor_id as _,
388 fragment_id: fragment_id as _,
389 vnode_bitmap,
390 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
391 expr_context: pb_expr_context,
392 config_override,
393 })
394 }
395
396 let pb_state_table_ids = state_table_ids.0;
397 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
398 let pb_fragment = Fragment {
399 fragment_id: fragment_id as _,
400 fragment_type_mask: fragment_type_mask.into(),
401 distribution_type: pb_distribution_type,
402 actors: pb_actors,
403 state_table_ids: pb_state_table_ids,
404 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
405 nodes: stream_node,
406 };
407
408 Ok((pb_fragment, pb_actor_status, pb_actor_splits))
409 }
410
411 pub fn running_fragment_parallelisms(
412 &self,
413 id_filter: Option<HashSet<FragmentId>>,
414 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
415 let info = self.env.shared_actor_infos().read_guard();
416
417 let mut result = HashMap::new();
418 for (fragment_id, fragment) in info.iter_over_fragments() {
419 if let Some(id_filter) = &id_filter
420 && !id_filter.contains(fragment_id)
421 {
422 continue; }
424
425 result.insert(
426 *fragment_id as _,
427 FragmentParallelismInfo {
428 distribution_type: fragment.distribution_type.into(),
429 actor_count: fragment.actors.len() as _,
430 vnode_count: fragment.vnode_count,
431 },
432 );
433 }
434
435 Ok(result)
436 }
437
438 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
439 let inner = self.inner.read().await;
440 let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
441 .select_only()
442 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
443 .into_tuple()
444 .all(&inner.db)
445 .await?;
446 Ok(fragment_jobs.into_iter().collect())
447 }
448
449 pub async fn get_fragment_job_id(
450 &self,
451 fragment_ids: Vec<FragmentId>,
452 ) -> MetaResult<Vec<ObjectId>> {
453 let inner = self.inner.read().await;
454 self.get_fragment_job_id_in_txn(&inner.db, fragment_ids)
455 .await
456 }
457
458 pub async fn get_fragment_job_id_in_txn<C>(
459 &self,
460 txn: &C,
461 fragment_ids: Vec<FragmentId>,
462 ) -> MetaResult<Vec<ObjectId>>
463 where
464 C: ConnectionTrait + Send,
465 {
466 let object_ids: Vec<ObjectId> = FragmentModel::find()
467 .select_only()
468 .column(fragment::Column::JobId)
469 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
470 .into_tuple()
471 .all(txn)
472 .await?;
473
474 Ok(object_ids)
475 }
476
477 pub async fn get_fragment_desc_by_id(
478 &self,
479 fragment_id: FragmentId,
480 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
481 let inner = self.inner.read().await;
482
483 let fragment_model = match FragmentModel::find_by_id(fragment_id)
484 .one(&inner.db)
485 .await?
486 {
487 Some(fragment) => fragment,
488 None => return Ok(None),
489 };
490
491 let job_parallelism: Option<StreamingParallelism> =
492 StreamingJob::find_by_id(fragment_model.job_id)
493 .select_only()
494 .column(streaming_job::Column::Parallelism)
495 .into_tuple()
496 .one(&inner.db)
497 .await?;
498
499 let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
500 .select_only()
501 .columns([
502 fragment_relation::Column::SourceFragmentId,
503 fragment_relation::Column::DispatcherType,
504 ])
505 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
506 .into_tuple()
507 .all(&inner.db)
508 .await?;
509
510 let upstreams: Vec<_> = upstream_entries
511 .into_iter()
512 .map(|(source_id, _)| source_id)
513 .collect();
514
515 let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
516 .await
517 .map(Self::collect_root_fragment_mapping)?;
518 let root_fragments = root_fragment_map
519 .get(&fragment_id)
520 .cloned()
521 .unwrap_or_default();
522
523 let info = self.env.shared_actor_infos().read_guard();
524 let SharedFragmentInfo { actors, .. } = info
525 .get_fragment(fragment_model.fragment_id as _)
526 .unwrap_or_else(|| {
527 panic!(
528 "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
529 fragment_model.fragment_id,
530 fragment_model.job_id
531 )
532 });
533
534 let parallelism_policy = Self::format_fragment_parallelism_policy(
535 fragment_model.distribution_type,
536 fragment_model.parallelism.as_ref(),
537 job_parallelism.as_ref(),
538 &root_fragments,
539 );
540
541 let fragment = FragmentDesc {
542 fragment_id: fragment_model.fragment_id,
543 job_id: fragment_model.job_id,
544 fragment_type_mask: fragment_model.fragment_type_mask,
545 distribution_type: fragment_model.distribution_type,
546 state_table_ids: fragment_model.state_table_ids.clone(),
547 parallelism: actors.len() as _,
548 vnode_count: fragment_model.vnode_count,
549 stream_node: fragment_model.stream_node.clone(),
550 parallelism_policy,
551 };
552
553 Ok(Some((fragment, upstreams)))
554 }
555
556 pub async fn list_fragment_database_ids(
557 &self,
558 select_fragment_ids: Option<Vec<FragmentId>>,
559 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
560 let inner = self.inner.read().await;
561 let select = FragmentModel::find()
562 .select_only()
563 .column(fragment::Column::FragmentId)
564 .column(object::Column::DatabaseId)
565 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
566 let select = if let Some(select_fragment_ids) = select_fragment_ids {
567 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
568 } else {
569 select
570 };
571 Ok(select.into_tuple().all(&inner.db).await?)
572 }
573
574 pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
575 let inner = self.inner.read().await;
576
577 let fragments: Vec<_> = FragmentModel::find()
579 .filter(fragment::Column::JobId.eq(job_id))
580 .all(&inner.db)
581 .await?;
582
583 let job_info = StreamingJob::find_by_id(job_id)
584 .one(&inner.db)
585 .await?
586 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
587
588 let fragment_actors =
589 self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
590
591 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
592 .await?
593 .remove(&job_id);
594
595 Self::compose_table_fragments(
596 job_id,
597 job_info.job_status.into(),
598 job_info.stream_context(),
599 fragment_actors,
600 job_info.parallelism.clone(),
601 job_info.max_parallelism as _,
602 job_definition,
603 )
604 }
605
606 pub async fn get_fragment_actor_dispatchers(
607 &self,
608 fragment_ids: Vec<FragmentId>,
609 ) -> MetaResult<FragmentActorDispatchers> {
610 let inner = self.inner.read().await;
611
612 self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
613 .await
614 }
615
616 pub async fn get_fragment_actor_dispatchers_txn(
617 &self,
618 c: &impl ConnectionTrait,
619 fragment_ids: Vec<FragmentId>,
620 ) -> MetaResult<FragmentActorDispatchers> {
621 let fragment_relations = FragmentRelation::find()
622 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
623 .all(c)
624 .await?;
625
626 type FragmentActorInfo = (
627 DistributionType,
628 Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
629 );
630
631 let shared_info = self.env.shared_actor_infos();
632 let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
633 let get_fragment_actors = |fragment_id: FragmentId| async move {
634 let result: MetaResult<FragmentActorInfo> = try {
635 let read_guard = shared_info.read_guard();
636
637 let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
638
639 (
640 fragment.distribution_type,
641 Arc::new(
642 fragment
643 .actors
644 .iter()
645 .map(|(actor_id, actor_info)| {
646 (
647 *actor_id,
648 actor_info
649 .vnode_bitmap
650 .as_ref()
651 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
652 )
653 })
654 .collect(),
655 ),
656 )
657 };
658 result
659 };
660
661 let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
662 for fragment_relation::Model {
663 source_fragment_id,
664 target_fragment_id,
665 dispatcher_type,
666 dist_key_indices,
667 output_indices,
668 output_type_mapping,
669 } in fragment_relations
670 {
671 let (source_fragment_distribution, source_fragment_actors) = {
672 let (distribution, actors) = {
673 match fragment_actor_cache.entry(source_fragment_id) {
674 Entry::Occupied(entry) => entry.into_mut(),
675 Entry::Vacant(entry) => {
676 entry.insert(get_fragment_actors(source_fragment_id).await?)
677 }
678 }
679 };
680 (*distribution, actors.clone())
681 };
682 let (target_fragment_distribution, target_fragment_actors) = {
683 let (distribution, actors) = {
684 match fragment_actor_cache.entry(target_fragment_id) {
685 Entry::Occupied(entry) => entry.into_mut(),
686 Entry::Vacant(entry) => {
687 entry.insert(get_fragment_actors(target_fragment_id).await?)
688 }
689 }
690 };
691 (*distribution, actors.clone())
692 };
693 let output_mapping = PbDispatchOutputMapping {
694 indices: output_indices.into_u32_array(),
695 types: output_type_mapping.unwrap_or_default().to_protobuf(),
696 };
697 let dispatchers = compose_dispatchers(
698 source_fragment_distribution,
699 &source_fragment_actors,
700 target_fragment_id as _,
701 target_fragment_distribution,
702 &target_fragment_actors,
703 dispatcher_type,
704 dist_key_indices.into_u32_array(),
705 output_mapping,
706 );
707 let actor_dispatchers_map = actor_dispatchers_map
708 .entry(source_fragment_id as _)
709 .or_default();
710 for (actor_id, dispatchers) in dispatchers {
711 actor_dispatchers_map
712 .entry(actor_id as _)
713 .or_default()
714 .push(dispatchers);
715 }
716 }
717 Ok(actor_dispatchers_map)
718 }
719
720 pub async fn get_fragment_downstream_relations(
721 &self,
722 fragment_ids: Vec<FragmentId>,
723 ) -> MetaResult<FragmentDownstreamRelation> {
724 let inner = self.inner.read().await;
725 self.get_fragment_downstream_relations_in_txn(&inner.db, fragment_ids)
726 .await
727 }
728
729 pub async fn get_fragment_downstream_relations_in_txn<C>(
730 &self,
731 txn: &C,
732 fragment_ids: Vec<FragmentId>,
733 ) -> MetaResult<FragmentDownstreamRelation>
734 where
735 C: ConnectionTrait + StreamTrait + Send,
736 {
737 let mut stream = FragmentRelation::find()
738 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
739 .stream(txn)
740 .await?;
741 let mut relations = FragmentDownstreamRelation::new();
742 while let Some(relation) = stream.try_next().await? {
743 relations
744 .entry(relation.source_fragment_id as _)
745 .or_default()
746 .push(DownstreamFragmentRelation {
747 downstream_fragment_id: relation.target_fragment_id as _,
748 dispatcher_type: relation.dispatcher_type,
749 dist_key_indices: relation.dist_key_indices.into_u32_array(),
750 output_mapping: PbDispatchOutputMapping {
751 indices: relation.output_indices.into_u32_array(),
752 types: relation
753 .output_type_mapping
754 .unwrap_or_default()
755 .to_protobuf(),
756 },
757 });
758 }
759 Ok(relations)
760 }
761
762 pub async fn get_job_fragment_backfill_scan_type(
763 &self,
764 job_id: JobId,
765 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
766 let inner = self.inner.read().await;
767 self.get_job_fragment_backfill_scan_type_in_txn(&inner.db, job_id)
768 .await
769 }
770
771 pub async fn get_job_fragment_backfill_scan_type_in_txn<C>(
772 &self,
773 txn: &C,
774 job_id: JobId,
775 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>>
776 where
777 C: ConnectionTrait + Send,
778 {
779 let fragments: Vec<_> = FragmentModel::find()
780 .filter(fragment::Column::JobId.eq(job_id))
781 .all(txn)
782 .await?;
783
784 let mut result = HashMap::new();
785
786 for fragment::Model {
787 fragment_id,
788 stream_node,
789 ..
790 } in fragments
791 {
792 let stream_node = stream_node.to_protobuf();
793 visit_stream_node_body(&stream_node, |body| {
794 if let NodeBody::StreamScan(node) = body {
795 match node.stream_scan_type() {
796 StreamScanType::Unspecified => {}
797 scan_type => {
798 result.insert(fragment_id as crate::model::FragmentId, scan_type);
799 }
800 }
801 }
802 });
803 }
804
805 Ok(result)
806 }
807
808 pub async fn count_streaming_jobs(&self) -> MetaResult<usize> {
809 let inner = self.inner.read().await;
810 let count = StreamingJob::find().count(&inner.db).await?;
811 Ok(usize::try_from(count).context("streaming job count overflow")?)
812 }
813
814 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
815 let inner = self.inner.read().await;
816 let job_states = StreamingJob::find()
817 .select_only()
818 .column(streaming_job::Column::JobId)
819 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
820 .join(JoinType::InnerJoin, object::Relation::Database2.def())
821 .column(object::Column::ObjType)
822 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
823 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
824 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
825 .column_as(
826 Expr::if_null(
827 Expr::col((table::Entity, table::Column::Name)),
828 Expr::if_null(
829 Expr::col((source::Entity, source::Column::Name)),
830 Expr::if_null(
831 Expr::col((sink::Entity, sink::Column::Name)),
832 Expr::val("<unknown>"),
833 ),
834 ),
835 ),
836 "name",
837 )
838 .columns([
839 streaming_job::Column::JobStatus,
840 streaming_job::Column::Parallelism,
841 streaming_job::Column::MaxParallelism,
842 ])
843 .column_as(
844 Expr::if_null(
845 Expr::col((
846 streaming_job::Entity,
847 streaming_job::Column::SpecificResourceGroup,
848 )),
849 Expr::col((database::Entity, database::Column::ResourceGroup)),
850 ),
851 "resource_group",
852 )
853 .column_as(
854 Expr::if_null(
855 Expr::col((streaming_job::Entity, streaming_job::Column::ConfigOverride)),
856 Expr::val(""),
857 ),
858 "config_override",
859 )
860 .column(object::Column::DatabaseId)
861 .column(object::Column::SchemaId)
862 .into_model()
863 .all(&inner.db)
864 .await?;
865 Ok(job_states)
866 }
867
868 pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
869 let inner = self.inner.read().await;
870 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
871 .select_only()
872 .column(streaming_job::Column::MaxParallelism)
873 .into_tuple()
874 .one(&inner.db)
875 .await?
876 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
877 Ok(max_parallelism as usize)
878 }
879
880 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
882 if let Ok(inner) = self.inner.try_read()
883 && let Ok(job_state_tables) = FragmentModel::find()
884 .select_only()
885 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
886 .into_tuple::<(JobId, I32Array)>()
887 .all(&inner.db)
888 .await
889 {
890 let mut job_internal_table_ids = HashMap::new();
891 for (job_id, state_table_ids) in job_state_tables {
892 job_internal_table_ids
893 .entry(job_id)
894 .or_insert_with(Vec::new)
895 .extend(
896 state_table_ids
897 .into_inner()
898 .into_iter()
899 .map(|table_id| TableId::new(table_id as _)),
900 );
901 }
902 return Some(job_internal_table_ids.into_iter().collect());
903 }
904 None
905 }
906
907 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
908 let inner = self.inner.read().await;
909 let count = FragmentModel::find().count(&inner.db).await?;
910 Ok(count > 0)
911 }
912
913 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
914 let read_guard = self.env.shared_actor_infos().read_guard();
915 let actor_cnt: HashMap<WorkerId, _> = read_guard
916 .iter_over_fragments()
917 .flat_map(|(_, fragment)| {
918 fragment
919 .actors
920 .iter()
921 .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
922 })
923 .into_group_map()
924 .into_iter()
925 .map(|(k, v)| (k, v.len()))
926 .collect();
927
928 Ok(actor_cnt)
929 }
930
931 fn collect_fragment_actor_map(
932 &self,
933 fragment_ids: &[FragmentId],
934 stream_context: StreamContext,
935 ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
936 let guard = self.env.shared_actor_infos().read_guard();
937 let pb_expr_context = stream_context.to_expr_context();
938 let expr_context: ExprContext = (&pb_expr_context).into();
939
940 let mut actor_map = HashMap::with_capacity(fragment_ids.len());
941 for fragment_id in fragment_ids {
942 let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
943 anyhow!("fragment {} not found in shared actor info", fragment_id)
944 })?;
945
946 let actors = fragment_info
947 .actors
948 .iter()
949 .map(|(actor_id, actor_info)| ActorInfo {
950 actor_id: *actor_id as _,
951 fragment_id: *fragment_id,
952 splits: ConnectorSplits::from(&PbConnectorSplits {
953 splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
954 }),
955 worker_id: actor_info.worker_id as _,
956 vnode_bitmap: actor_info
957 .vnode_bitmap
958 .as_ref()
959 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
960 expr_context: expr_context.clone(),
961 config_override: stream_context.config_override.clone(),
962 })
963 .collect();
964
965 actor_map.insert(*fragment_id, actors);
966 }
967
968 Ok(actor_map)
969 }
970
971 fn collect_fragment_actor_pairs(
972 &self,
973 fragments: Vec<fragment::Model>,
974 stream_context: StreamContext,
975 ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
976 let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
977 let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
978 fragments
979 .into_iter()
980 .map(|fragment| {
981 let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
982 anyhow!(
983 "fragment {} missing in shared actor info map",
984 fragment.fragment_id
985 )
986 })?;
987 Ok((fragment, actors))
988 })
989 .collect()
990 }
991
992 pub async fn table_fragments(&self) -> MetaResult<BTreeMap<JobId, StreamJobFragments>> {
994 let inner = self.inner.read().await;
995 let jobs = StreamingJob::find().all(&inner.db).await?;
996
997 let mut job_definition = resolve_streaming_job_definition(
998 &inner.db,
999 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
1000 )
1001 .await?;
1002
1003 let mut table_fragments = BTreeMap::new();
1004 for job in jobs {
1005 let fragments = FragmentModel::find()
1006 .filter(fragment::Column::JobId.eq(job.job_id))
1007 .all(&inner.db)
1008 .await?;
1009
1010 let fragment_actors =
1011 self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
1012
1013 table_fragments.insert(
1014 job.job_id,
1015 Self::compose_table_fragments(
1016 job.job_id,
1017 job.job_status.into(),
1018 job.stream_context(),
1019 fragment_actors,
1020 job.parallelism.clone(),
1021 job.max_parallelism as _,
1022 job_definition.remove(&job.job_id),
1023 )?,
1024 );
1025 }
1026
1027 Ok(table_fragments)
1028 }
1029
1030 pub async fn upstream_fragments(
1031 &self,
1032 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1033 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
1034 let inner = self.inner.read().await;
1035 self.upstream_fragments_in_txn(&inner.db, fragment_ids)
1036 .await
1037 }
1038
1039 pub async fn upstream_fragments_in_txn<C>(
1040 &self,
1041 txn: &C,
1042 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1043 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>>
1044 where
1045 C: ConnectionTrait + StreamTrait + Send,
1046 {
1047 let mut stream = FragmentRelation::find()
1048 .select_only()
1049 .columns([
1050 fragment_relation::Column::SourceFragmentId,
1051 fragment_relation::Column::TargetFragmentId,
1052 ])
1053 .filter(
1054 fragment_relation::Column::TargetFragmentId
1055 .is_in(fragment_ids.map(|id| id as FragmentId)),
1056 )
1057 .into_tuple::<(FragmentId, FragmentId)>()
1058 .stream(txn)
1059 .await?;
1060 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
1061 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
1062 upstream_fragments
1063 .entry(downstream_fragment_id as crate::model::FragmentId)
1064 .or_default()
1065 .insert(upstream_fragment_id as crate::model::FragmentId);
1066 }
1067 Ok(upstream_fragments)
1068 }
1069
1070 pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
1071 let info = self.env.shared_actor_infos().read_guard();
1072
1073 let actor_locations = info
1074 .iter_over_fragments()
1075 .flat_map(|(fragment_id, fragment)| {
1076 fragment
1077 .actors
1078 .iter()
1079 .map(|(actor_id, actor)| PartialActorLocation {
1080 actor_id: *actor_id as _,
1081 fragment_id: *fragment_id as _,
1082 worker_id: actor.worker_id,
1083 })
1084 })
1085 .collect_vec();
1086
1087 Ok(actor_locations)
1088 }
1089
1090 pub async fn list_actor_info(
1091 &self,
1092 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1093 let inner = self.inner.read().await;
1094
1095 let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1096 FragmentModel::find()
1097 .select_only()
1098 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1099 .column(fragment::Column::FragmentId)
1100 .column_as(object::Column::Oid, "job_id")
1101 .column_as(object::Column::SchemaId, "schema_id")
1102 .column_as(object::Column::ObjType, "type")
1103 .into_tuple()
1104 .all(&inner.db)
1105 .await?;
1106
1107 let actor_infos = {
1108 let info = self.env.shared_actor_infos().read_guard();
1109
1110 let mut result = Vec::new();
1111
1112 for (fragment_id, object_id, schema_id, object_type) in fragment_objects {
1113 let Some(fragment) = info.get_fragment(fragment_id as _) else {
1114 return Err(MetaError::unavailable(format!(
1115 "shared actor info missing for fragment {fragment_id} while listing actors"
1116 )));
1117 };
1118
1119 for actor_id in fragment.actors.keys() {
1120 result.push((
1121 *actor_id as _,
1122 fragment.fragment_id as _,
1123 object_id,
1124 schema_id,
1125 object_type,
1126 ));
1127 }
1128 }
1129
1130 result
1131 };
1132
1133 Ok(actor_infos)
1134 }
1135
1136 pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1137 let guard = self.env.shared_actor_info.read_guard();
1138 guard
1139 .iter_over_fragments()
1140 .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1141 .collect_vec()
1142 }
1143
1144 pub async fn list_fragment_descs_with_node(
1145 &self,
1146 is_creating: bool,
1147 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1148 let inner = self.inner.read().await;
1149 let txn = inner.db.begin().await?;
1150
1151 let fragments_query = Self::build_fragment_query(is_creating);
1152 let fragments: Vec<fragment::Model> = fragments_query.all(&txn).await?;
1153
1154 let rows = fragments
1155 .into_iter()
1156 .map(|fragment| FragmentDescRow {
1157 fragment_id: fragment.fragment_id,
1158 job_id: fragment.job_id,
1159 fragment_type_mask: fragment.fragment_type_mask,
1160 distribution_type: fragment.distribution_type,
1161 state_table_ids: fragment.state_table_ids,
1162 vnode_count: fragment.vnode_count,
1163 parallelism_override: fragment.parallelism,
1164 stream_node: Some(fragment.stream_node),
1165 })
1166 .collect_vec();
1167
1168 self.build_fragment_distributions(&txn, rows).await
1169 }
1170
1171 pub async fn list_fragment_descs_without_node(
1172 &self,
1173 is_creating: bool,
1174 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1175 let inner = self.inner.read().await;
1176 let txn = inner.db.begin().await?;
1177
1178 let fragments_query = Self::build_fragment_query(is_creating);
1179 #[allow(clippy::type_complexity)]
1180 let fragments: Vec<(
1181 FragmentId,
1182 JobId,
1183 i32,
1184 DistributionType,
1185 TableIdArray,
1186 i32,
1187 Option<StreamingParallelism>,
1188 )> = fragments_query
1189 .select_only()
1190 .columns([
1191 fragment::Column::FragmentId,
1192 fragment::Column::JobId,
1193 fragment::Column::FragmentTypeMask,
1194 fragment::Column::DistributionType,
1195 fragment::Column::StateTableIds,
1196 fragment::Column::VnodeCount,
1197 fragment::Column::Parallelism,
1198 ])
1199 .into_tuple()
1200 .all(&txn)
1201 .await?;
1202
1203 let rows = fragments
1204 .into_iter()
1205 .map(
1206 |(
1207 fragment_id,
1208 job_id,
1209 fragment_type_mask,
1210 distribution_type,
1211 state_table_ids,
1212 vnode_count,
1213 parallelism_override,
1214 )| FragmentDescRow {
1215 fragment_id,
1216 job_id,
1217 fragment_type_mask,
1218 distribution_type,
1219 state_table_ids,
1220 vnode_count,
1221 parallelism_override,
1222 stream_node: None,
1223 },
1224 )
1225 .collect_vec();
1226
1227 self.build_fragment_distributions(&txn, rows).await
1228 }
1229
1230 fn build_fragment_query(is_creating: bool) -> sea_orm::Select<fragment::Entity> {
1231 if is_creating {
1232 FragmentModel::find()
1233 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1234 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1235 .filter(
1236 streaming_job::Column::JobStatus
1237 .eq(JobStatus::Initial)
1238 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1239 )
1240 } else {
1241 FragmentModel::find()
1242 }
1243 }
1244
1245 async fn build_fragment_distributions(
1246 &self,
1247 txn: &DatabaseTransaction,
1248 rows: Vec<FragmentDescRow>,
1249 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1250 let fragment_ids = rows.iter().map(|row| row.fragment_id).collect_vec();
1251 let job_ids = rows.iter().map(|row| row.job_id).unique().collect_vec();
1252
1253 let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragment_ids.is_empty() {
1254 HashMap::new()
1255 } else {
1256 StreamingJob::find()
1257 .select_only()
1258 .columns([
1259 streaming_job::Column::JobId,
1260 streaming_job::Column::Parallelism,
1261 ])
1262 .filter(streaming_job::Column::JobId.is_in(job_ids))
1263 .into_tuple()
1264 .all(txn)
1265 .await?
1266 .into_iter()
1267 .collect()
1268 };
1269
1270 let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1271 if fragment_ids.is_empty() {
1272 Vec::new()
1273 } else {
1274 FragmentRelation::find()
1275 .select_only()
1276 .columns([
1277 fragment_relation::Column::TargetFragmentId,
1278 fragment_relation::Column::SourceFragmentId,
1279 fragment_relation::Column::DispatcherType,
1280 ])
1281 .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1282 .into_tuple()
1283 .all(txn)
1284 .await?
1285 };
1286
1287 let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1288 for (target_id, source_id, _) in upstream_entries {
1289 all_upstreams.entry(target_id).or_default().push(source_id);
1290 }
1291
1292 let root_fragment_map = if fragment_ids.is_empty() {
1293 HashMap::new()
1294 } else {
1295 let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_ids).await?;
1296 Self::collect_root_fragment_mapping(ensembles)
1297 };
1298
1299 let guard = self.env.shared_actor_info.read_guard();
1300 let mut result = Vec::with_capacity(rows.len());
1301
1302 for row in rows {
1303 let parallelism = guard
1304 .get_fragment(row.fragment_id as _)
1305 .map(|fragment| fragment.actors.len())
1306 .unwrap_or_default();
1307
1308 let root_fragments = root_fragment_map
1309 .get(&row.fragment_id)
1310 .cloned()
1311 .unwrap_or_default();
1312
1313 let upstreams = all_upstreams.remove(&row.fragment_id).unwrap_or_default();
1314
1315 let parallelism_policy = Self::format_fragment_parallelism_policy(
1316 row.distribution_type,
1317 row.parallelism_override.as_ref(),
1318 job_parallelisms.get(&row.job_id),
1319 &root_fragments,
1320 );
1321
1322 let fragment = FragmentDistribution {
1323 fragment_id: row.fragment_id,
1324 table_id: row.job_id,
1325 distribution_type: PbFragmentDistributionType::from(row.distribution_type) as _,
1326 state_table_ids: row.state_table_ids.0,
1327 upstream_fragment_ids: upstreams.clone(),
1328 fragment_type_mask: row.fragment_type_mask as _,
1329 parallelism: parallelism as _,
1330 vnode_count: row.vnode_count as _,
1331 node: row.stream_node.map(|node| node.to_protobuf()),
1332 parallelism_policy,
1333 };
1334
1335 result.push((fragment, upstreams));
1336 }
1337
1338 Ok(result)
1339 }
1340
1341 pub async fn list_sink_log_store_tables(&self) -> MetaResult<Vec<(SinkId, TableId)>> {
1342 let inner = self.inner.read().await;
1343 let txn = inner.db.begin().await?;
1344
1345 let fragments: Vec<(JobId, StreamNode)> = FragmentModel::find()
1346 .select_only()
1347 .columns([fragment::Column::JobId, fragment::Column::StreamNode])
1348 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Sink))
1349 .into_tuple()
1350 .all(&txn)
1351 .await?;
1352
1353 let mut mapping: HashMap<SinkId, TableId> = HashMap::new();
1354
1355 for (job_id, stream_node) in fragments {
1356 let sink_id = SinkId::new(job_id.as_raw_id());
1357 let stream_node = stream_node.to_protobuf();
1358 let mut internal_table_id: Option<TableId> = None;
1359
1360 visit_stream_node_body(&stream_node, |body| {
1361 if let NodeBody::Sink(node) = body
1362 && node.log_store_type == SinkLogStoreType::KvLogStore as i32
1363 && let Some(table) = &node.table
1364 {
1365 internal_table_id = Some(TableId::new(table.id.as_raw_id()));
1366 }
1367 });
1368
1369 if let Some(table_id) = internal_table_id
1370 && let Some(existing) = mapping.insert(sink_id, table_id)
1371 && existing != table_id
1372 {
1373 tracing::warn!(
1374 "sink {sink_id:?} has multiple log store tables: {existing:?} vs {table_id:?}"
1375 );
1376 }
1377 }
1378
1379 Ok(mapping.into_iter().collect())
1380 }
1381
1382 fn collect_root_fragment_mapping(
1384 ensembles: Vec<NoShuffleEnsemble>,
1385 ) -> HashMap<FragmentId, Vec<FragmentId>> {
1386 let mut mapping = HashMap::new();
1387
1388 for ensemble in ensembles {
1389 let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1390 roots.sort_unstable();
1391 roots.dedup();
1392
1393 if roots.is_empty() {
1394 continue;
1395 }
1396
1397 let root_set: HashSet<_> = roots.iter().copied().collect();
1398
1399 for fragment_id in ensemble.component_fragments() {
1400 if root_set.contains(&fragment_id) {
1401 mapping.insert(fragment_id, Vec::new());
1402 } else {
1403 mapping.insert(fragment_id, roots.clone());
1404 }
1405 }
1406 }
1407
1408 mapping
1409 }
1410
1411 fn format_fragment_parallelism_policy(
1412 distribution_type: DistributionType,
1413 fragment_parallelism: Option<&StreamingParallelism>,
1414 job_parallelism: Option<&StreamingParallelism>,
1415 root_fragments: &[FragmentId],
1416 ) -> String {
1417 if distribution_type == DistributionType::Single {
1418 return "single".to_owned();
1419 }
1420
1421 if let Some(parallelism) = fragment_parallelism {
1422 return format!(
1423 "override({})",
1424 Self::format_streaming_parallelism(parallelism)
1425 );
1426 }
1427
1428 if !root_fragments.is_empty() {
1429 let mut upstreams = root_fragments.to_vec();
1430 upstreams.sort_unstable();
1431 upstreams.dedup();
1432
1433 return format!("upstream_fragment({upstreams:?})");
1434 }
1435
1436 let inherited = job_parallelism
1437 .map(Self::format_streaming_parallelism)
1438 .unwrap_or_else(|| "unknown".to_owned());
1439 format!("inherit({inherited})")
1440 }
1441
1442 fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1443 match parallelism {
1444 StreamingParallelism::Adaptive => "adaptive".to_owned(),
1445 StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1446 StreamingParallelism::Custom => "custom".to_owned(),
1447 }
1448 }
1449
1450 pub async fn list_sink_actor_mapping(
1451 &self,
1452 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1453 let inner = self.inner.read().await;
1454 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1455 .select_only()
1456 .columns([sink::Column::SinkId, sink::Column::Name])
1457 .into_tuple()
1458 .all(&inner.db)
1459 .await?;
1460 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1461
1462 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1463
1464 let actor_with_type: Vec<(ActorId, SinkId)> = {
1465 let info = self.env.shared_actor_infos().read_guard();
1466
1467 info.iter_over_fragments()
1468 .filter(|(_, fragment)| {
1469 sink_ids.contains(&fragment.job_id.as_sink_id())
1470 && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1471 })
1472 .flat_map(|(_, fragment)| {
1473 fragment
1474 .actors
1475 .keys()
1476 .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1477 })
1478 .collect()
1479 };
1480
1481 let mut sink_actor_mapping = HashMap::new();
1482 for (actor_id, sink_id) in actor_with_type {
1483 sink_actor_mapping
1484 .entry(sink_id)
1485 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1486 .1
1487 .push(actor_id);
1488 }
1489
1490 Ok(sink_actor_mapping)
1491 }
1492
1493 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1494 let inner = self.inner.read().await;
1495 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1496 .select_only()
1497 .columns([
1498 fragment::Column::FragmentId,
1499 fragment::Column::JobId,
1500 fragment::Column::StateTableIds,
1501 ])
1502 .into_partial_model()
1503 .all(&inner.db)
1504 .await?;
1505 Ok(fragment_state_tables)
1506 }
1507
1508 pub async fn load_all_actors_dynamic(
1511 &self,
1512 database_id: Option<DatabaseId>,
1513 worker_nodes: &ActiveStreamingWorkerNodes,
1514 ) -> MetaResult<FragmentRenderMap> {
1515 let loaded = self.load_fragment_context(database_id).await?;
1516
1517 if loaded.is_empty() {
1518 return Ok(HashMap::new());
1519 }
1520
1521 let adaptive_parallelism_strategy = {
1522 let system_params_reader = self.env.system_params_reader().await;
1523 system_params_reader.adaptive_parallelism_strategy()
1524 };
1525
1526 let RenderedGraph { fragments, .. } = render_actor_assignments(
1527 self.env.actor_id_generator(),
1528 worker_nodes.current(),
1529 adaptive_parallelism_strategy,
1530 &loaded,
1531 )?;
1532
1533 tracing::trace!(?fragments, "reload all actors");
1534
1535 Ok(fragments)
1536 }
1537
1538 pub async fn load_fragment_context(
1540 &self,
1541 database_id: Option<DatabaseId>,
1542 ) -> MetaResult<LoadedFragmentContext> {
1543 let inner = self.inner.read().await;
1544 let txn = inner.db.begin().await?;
1545
1546 self.load_fragment_context_in_txn(&txn, database_id).await
1547 }
1548
1549 pub async fn load_fragment_context_in_txn<C>(
1550 &self,
1551 txn: &C,
1552 database_id: Option<DatabaseId>,
1553 ) -> MetaResult<LoadedFragmentContext>
1554 where
1555 C: ConnectionTrait,
1556 {
1557 let mut query = StreamingJob::find()
1558 .select_only()
1559 .column(streaming_job::Column::JobId);
1560
1561 if let Some(database_id) = database_id {
1562 query = query
1563 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1564 .filter(object::Column::DatabaseId.eq(database_id));
1565 }
1566
1567 let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
1568
1569 let jobs: HashSet<JobId> = jobs.into_iter().collect();
1570
1571 if jobs.is_empty() {
1572 return Ok(LoadedFragmentContext::default());
1573 }
1574
1575 load_fragment_context_for_jobs(txn, jobs).await
1576 }
1577
1578 #[await_tree::instrument]
1579 pub async fn fill_snapshot_backfill_epoch(
1580 &self,
1581 fragment_ids: impl Iterator<Item = FragmentId>,
1582 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1583 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1584 ) -> MetaResult<()> {
1585 let inner = self.inner.write().await;
1586 let txn = inner.db.begin().await?;
1587 for fragment_id in fragment_ids {
1588 let fragment = FragmentModel::find_by_id(fragment_id)
1589 .one(&txn)
1590 .await?
1591 .context(format!("fragment {} not found", fragment_id))?;
1592 let mut node = fragment.stream_node.to_protobuf();
1593 if crate::stream::fill_snapshot_backfill_epoch(
1594 &mut node,
1595 snapshot_backfill_info,
1596 cross_db_snapshot_backfill_info,
1597 )? {
1598 let node = StreamNode::from(&node);
1599 FragmentModel::update(fragment::ActiveModel {
1600 fragment_id: Set(fragment_id),
1601 stream_node: Set(node),
1602 ..Default::default()
1603 })
1604 .exec(&txn)
1605 .await?;
1606 }
1607 }
1608 txn.commit().await?;
1609 Ok(())
1610 }
1611
1612 pub fn get_running_actors_of_fragment(
1614 &self,
1615 fragment_id: FragmentId,
1616 ) -> MetaResult<HashSet<model::ActorId>> {
1617 let info = self.env.shared_actor_infos().read_guard();
1618
1619 let actors = info
1620 .get_fragment(fragment_id as _)
1621 .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1622 .unwrap_or_default();
1623
1624 Ok(actors)
1625 }
1626
1627 pub async fn get_running_actors_for_source_backfill(
1630 &self,
1631 source_backfill_fragment_id: FragmentId,
1632 source_fragment_id: FragmentId,
1633 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1634 let inner = self.inner.read().await;
1635 let txn = inner.db.begin().await?;
1636
1637 let fragment_relation: DispatcherType = FragmentRelation::find()
1638 .select_only()
1639 .column(fragment_relation::Column::DispatcherType)
1640 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1641 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1642 .into_tuple()
1643 .one(&txn)
1644 .await?
1645 .ok_or_else(|| {
1646 anyhow!(
1647 "no fragment connection from source fragment {} to source backfill fragment {}",
1648 source_fragment_id,
1649 source_backfill_fragment_id
1650 )
1651 })?;
1652
1653 if fragment_relation != DispatcherType::NoShuffle {
1654 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1655 }
1656
1657 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1658 let result: MetaResult<DistributionType> = try {
1659 FragmentModel::find_by_id(fragment_id)
1660 .select_only()
1661 .column(fragment::Column::DistributionType)
1662 .into_tuple()
1663 .one(txn)
1664 .await?
1665 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1666 };
1667 result
1668 };
1669
1670 let source_backfill_distribution_type =
1671 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1672 let source_distribution_type =
1673 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1674
1675 let load_fragment_actor_distribution =
1676 |actor_info: &SharedActorInfos,
1677 fragment_id: FragmentId|
1678 -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1679 let guard = actor_info.read_guard();
1680
1681 guard
1682 .get_fragment(fragment_id as _)
1683 .map(|fragment| {
1684 fragment
1685 .actors
1686 .iter()
1687 .map(|(actor_id, actor)| {
1688 (
1689 *actor_id as _,
1690 actor
1691 .vnode_bitmap
1692 .as_ref()
1693 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1694 )
1695 })
1696 .collect()
1697 })
1698 .unwrap_or_default()
1699 };
1700
1701 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1702 load_fragment_actor_distribution(
1703 self.env.shared_actor_infos(),
1704 source_backfill_fragment_id,
1705 );
1706
1707 let source_actors =
1708 load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1709
1710 Ok(resolve_no_shuffle_actor_dispatcher(
1711 source_distribution_type,
1712 &source_actors,
1713 source_backfill_distribution_type,
1714 &source_backfill_actors,
1715 )
1716 .into_iter()
1717 .map(|(source_actor, source_backfill_actor)| {
1718 (source_backfill_actor as _, source_actor as _)
1719 })
1720 .collect())
1721 }
1722
1723 pub async fn get_root_fragments(
1736 &self,
1737 job_ids: Vec<JobId>,
1738 ) -> MetaResult<(
1739 HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1740 HashMap<ActorId, WorkerId>,
1741 )> {
1742 let inner = self.inner.read().await;
1743
1744 let all_fragments = FragmentModel::find()
1745 .filter(fragment::Column::JobId.is_in(job_ids))
1746 .all(&inner.db)
1747 .await?;
1748 let mut root_fragments = HashMap::<JobId, fragment::Model>::new();
1750 for fragment in all_fragments {
1751 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1752 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1753 _ = root_fragments.insert(fragment.job_id, fragment);
1754 } else if mask.contains(FragmentTypeFlag::Source) {
1755 _ = root_fragments.try_insert(fragment.job_id, fragment);
1758 }
1759 }
1760
1761 let mut root_fragments_pb = HashMap::new();
1762
1763 let info = self.env.shared_actor_infos().read_guard();
1764
1765 let root_fragment_to_jobs: HashMap<_, _> = root_fragments
1766 .iter()
1767 .map(|(job_id, fragment)| (fragment.fragment_id, *job_id))
1768 .collect();
1769
1770 for fragment in root_fragment_to_jobs.keys() {
1771 let fragment_info = info.get_fragment(*fragment).context(format!(
1772 "root fragment {} not found in shared actor info",
1773 fragment
1774 ))?;
1775
1776 let job_id = root_fragment_to_jobs[&fragment_info.fragment_id];
1777 let fragment = root_fragments
1778 .get(&job_id)
1779 .context(format!("root fragment for job {} not found", job_id))?;
1780
1781 root_fragments_pb.insert(
1782 job_id,
1783 (fragment_info.clone(), fragment.stream_node.to_protobuf()),
1784 );
1785 }
1786
1787 let mut all_actor_locations = HashMap::new();
1788
1789 for (_, SharedFragmentInfo { actors, .. }) in info.iter_over_fragments() {
1790 for (actor_id, actor_info) in actors {
1791 all_actor_locations.insert(*actor_id as ActorId, actor_info.worker_id);
1792 }
1793 }
1794
1795 Ok((root_fragments_pb, all_actor_locations))
1796 }
1797
1798 pub async fn get_root_fragment(
1799 &self,
1800 job_id: JobId,
1801 ) -> MetaResult<(SharedFragmentInfo, HashMap<ActorId, WorkerId>)> {
1802 let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1803 let (root_fragment, _) = root_fragments
1804 .remove(&job_id)
1805 .context(format!("root fragment for job {} not found", job_id))?;
1806
1807 Ok((root_fragment, actors))
1808 }
1809
1810 pub async fn get_downstream_fragments(
1812 &self,
1813 job_id: JobId,
1814 ) -> MetaResult<(
1815 Vec<(
1816 stream_plan::DispatcherType,
1817 SharedFragmentInfo,
1818 PbStreamNode,
1819 )>,
1820 HashMap<ActorId, WorkerId>,
1821 )> {
1822 let (root_fragment, actor_locations) = self.get_root_fragment(job_id).await?;
1823
1824 let inner = self.inner.read().await;
1825 let txn = inner.db.begin().await?;
1826 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1827 .filter(
1828 fragment_relation::Column::SourceFragmentId
1829 .eq(root_fragment.fragment_id as FragmentId),
1830 )
1831 .all(&txn)
1832 .await?;
1833
1834 let downstream_fragment_ids = downstream_fragment_relations
1835 .iter()
1836 .map(|model| model.target_fragment_id as FragmentId)
1837 .collect::<HashSet<_>>();
1838
1839 let downstream_fragment_nodes: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1840 .select_only()
1841 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1842 .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1843 .into_tuple()
1844 .all(&txn)
1845 .await?;
1846
1847 let downstream_fragment_nodes: HashMap<_, _> =
1848 downstream_fragment_nodes.into_iter().collect();
1849
1850 let mut downstream_fragments = vec![];
1851
1852 let info = self.env.shared_actor_infos().read_guard();
1853
1854 let fragment_map: HashMap<_, _> = downstream_fragment_relations
1855 .iter()
1856 .map(|model| (model.target_fragment_id, model.dispatcher_type))
1857 .collect();
1858
1859 for fragment_id in fragment_map.keys() {
1860 let fragment_info @ SharedFragmentInfo { actors, .. } =
1861 info.get_fragment(*fragment_id).unwrap();
1862
1863 let dispatcher_type = fragment_map[fragment_id];
1864
1865 if actors.is_empty() {
1866 bail!("No fragment found for fragment id {}", fragment_id);
1867 }
1868
1869 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1870
1871 let nodes = downstream_fragment_nodes
1872 .get(fragment_id)
1873 .context(format!(
1874 "downstream fragment node for id {} not found",
1875 fragment_id
1876 ))?
1877 .to_protobuf();
1878
1879 downstream_fragments.push((dispatch_type, fragment_info.clone(), nodes));
1880 }
1881 Ok((downstream_fragments, actor_locations))
1882 }
1883
1884 pub async fn load_source_fragment_ids(
1885 &self,
1886 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1887 let inner = self.inner.read().await;
1888 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1889 .select_only()
1890 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1891 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1892 .into_tuple()
1893 .all(&inner.db)
1894 .await?;
1895
1896 let mut source_fragment_ids = HashMap::new();
1897 for (fragment_id, stream_node) in fragments {
1898 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1899 source_fragment_ids
1900 .entry(source_id)
1901 .or_insert_with(BTreeSet::new)
1902 .insert(fragment_id);
1903 }
1904 }
1905 Ok(source_fragment_ids)
1906 }
1907
1908 pub async fn load_backfill_fragment_ids(
1909 &self,
1910 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1911 let inner = self.inner.read().await;
1912 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1913 .select_only()
1914 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1915 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1916 .into_tuple()
1917 .all(&inner.db)
1918 .await?;
1919
1920 let mut source_fragment_ids = HashMap::new();
1921 for (fragment_id, stream_node) in fragments {
1922 if let Some((source_id, upstream_source_fragment_id)) =
1923 stream_node.to_protobuf().find_source_backfill()
1924 {
1925 source_fragment_ids
1926 .entry(source_id)
1927 .or_insert_with(BTreeSet::new)
1928 .insert((fragment_id, upstream_source_fragment_id));
1929 }
1930 }
1931 Ok(source_fragment_ids)
1932 }
1933
1934 pub async fn get_all_upstream_sink_infos(
1935 &self,
1936 target_table: &PbTable,
1937 target_fragment_id: FragmentId,
1938 ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1939 let inner = self.inner.read().await;
1940 let txn = inner.db.begin().await?;
1941
1942 self.get_all_upstream_sink_infos_in_txn(&txn, target_table, target_fragment_id)
1943 .await
1944 }
1945
1946 pub async fn get_all_upstream_sink_infos_in_txn<C>(
1947 &self,
1948 txn: &C,
1949 target_table: &PbTable,
1950 target_fragment_id: FragmentId,
1951 ) -> MetaResult<Vec<UpstreamSinkInfo>>
1952 where
1953 C: ConnectionTrait,
1954 {
1955 let incoming_sinks = Sink::find()
1956 .filter(sink::Column::TargetTable.eq(target_table.id))
1957 .all(txn)
1958 .await?;
1959
1960 let sink_ids = incoming_sinks.iter().map(|s| s.sink_id).collect_vec();
1961 let sink_fragment_ids = get_sink_fragment_by_ids(txn, sink_ids).await?;
1962
1963 let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1964 for sink in &incoming_sinks {
1965 let sink_fragment_id =
1966 sink_fragment_ids
1967 .get(&sink.sink_id)
1968 .cloned()
1969 .ok_or(anyhow::anyhow!(
1970 "sink fragment not found for sink id {}",
1971 sink.sink_id
1972 ))?;
1973 let upstream_info = build_upstream_sink_info(
1974 sink.sink_id,
1975 sink.original_target_columns
1976 .as_ref()
1977 .map(|cols| cols.to_protobuf())
1978 .unwrap_or_default(),
1979 sink_fragment_id,
1980 target_table,
1981 target_fragment_id,
1982 )?;
1983 upstream_sink_infos.push(upstream_info);
1984 }
1985
1986 Ok(upstream_sink_infos)
1987 }
1988
1989 pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1990 let inner = self.inner.read().await;
1991 let txn = inner.db.begin().await?;
1992
1993 let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1994 .select_only()
1995 .column(fragment::Column::FragmentId)
1996 .filter(
1997 fragment::Column::JobId
1998 .eq(job_id)
1999 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2000 )
2001 .into_tuple()
2002 .all(&txn)
2003 .await?;
2004
2005 if mview_fragment.len() != 1 {
2006 return Err(anyhow::anyhow!(
2007 "expected exactly one mview fragment for job {}, found {}",
2008 job_id,
2009 mview_fragment.len()
2010 )
2011 .into());
2012 }
2013
2014 Ok(mview_fragment.into_iter().next().unwrap())
2015 }
2016
2017 pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
2018 let inner = self.inner.read().await;
2019 let txn = inner.db.begin().await?;
2020 has_table_been_migrated(&txn, table_id).await
2021 }
2022
2023 pub async fn update_fragment_splits<C>(
2024 &self,
2025 txn: &C,
2026 fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
2027 ) -> MetaResult<()>
2028 where
2029 C: ConnectionTrait,
2030 {
2031 if fragment_splits.is_empty() {
2032 return Ok(());
2033 }
2034
2035 let existing_fragment_ids: HashSet<FragmentId> = FragmentModel::find()
2036 .select_only()
2037 .column(fragment::Column::FragmentId)
2038 .filter(fragment::Column::FragmentId.is_in(fragment_splits.keys().copied()))
2039 .into_tuple()
2040 .all(txn)
2041 .await?
2042 .into_iter()
2043 .collect();
2044
2045 let (models, skipped_fragment_ids): (Vec<_>, Vec<_>) = fragment_splits
2047 .iter()
2048 .partition_map(|(fragment_id, splits)| {
2049 if existing_fragment_ids.contains(fragment_id) {
2050 Either::Left(fragment_splits::ActiveModel {
2051 fragment_id: Set(*fragment_id as _),
2052 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
2053 splits: splits.iter().map(Into::into).collect_vec(),
2054 }))),
2055 })
2056 } else {
2057 Either::Right(*fragment_id)
2058 }
2059 });
2060
2061 if !skipped_fragment_ids.is_empty() {
2062 tracing::warn!(
2063 skipped_fragment_ids = ?skipped_fragment_ids,
2064 total_fragment_ids = fragment_splits.len(),
2065 "skipping stale fragment split updates for missing fragments"
2066 );
2067 }
2068
2069 if models.is_empty() {
2070 return Ok(());
2071 }
2072
2073 FragmentSplits::insert_many(models)
2074 .on_conflict(
2075 OnConflict::column(fragment_splits::Column::FragmentId)
2076 .update_column(fragment_splits::Column::Splits)
2077 .to_owned(),
2078 )
2079 .exec(txn)
2080 .await?;
2081
2082 Ok(())
2083 }
2084}
2085
2086#[cfg(test)]
2087mod tests {
2088 use std::collections::{BTreeMap, HashMap, HashSet};
2089
2090 use itertools::Itertools;
2091 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
2092 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
2093 use risingwave_common::id::JobId;
2094 use risingwave_common::util::iter_util::ZipEqDebug;
2095 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
2096 use risingwave_meta_model::fragment::DistributionType;
2097 use risingwave_meta_model::*;
2098 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
2099 use risingwave_pb::plan_common::PbExprContext;
2100 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
2101 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
2102 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
2103
2104 use super::ActorInfo;
2105 use crate::MetaResult;
2106 use crate::controller::catalog::CatalogController;
2107 use crate::model::{Fragment, StreamActor};
2108
2109 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
2110
2111 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
2112
2113 const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
2114
2115 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
2116
2117 const TEST_JOB_ID: JobId = JobId::new(1);
2118
2119 const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
2120
2121 fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
2122 let mut upstream_actor_ids = BTreeMap::new();
2123 upstream_actor_ids.insert(
2124 TEST_UPSTREAM_FRAGMENT_ID,
2125 HashSet::from_iter([(actor_id + 100)]),
2126 );
2127 upstream_actor_ids.insert(
2128 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
2129 HashSet::from_iter([(actor_id + 200)]),
2130 );
2131 upstream_actor_ids
2132 }
2133
2134 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
2135 let mut input = vec![];
2136 for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
2137 input.push(PbStreamNode {
2138 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
2139 upstream_fragment_id,
2140 ..Default::default()
2141 }))),
2142 ..Default::default()
2143 });
2144 }
2145
2146 PbStreamNode {
2147 input,
2148 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
2149 ..Default::default()
2150 }
2151 }
2152
2153 #[tokio::test]
2154 async fn test_extract_fragment() -> MetaResult<()> {
2155 let actor_count = 3u32;
2156 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2157 .map(|actor_id| {
2158 (
2159 actor_id.into(),
2160 generate_upstream_actor_ids_for_actor(actor_id.into()),
2161 )
2162 })
2163 .collect();
2164
2165 let actor_bitmaps = ActorMapping::new_uniform(
2166 (0..actor_count).map(|i| i.into()),
2167 VirtualNode::COUNT_FOR_TEST,
2168 )
2169 .to_bitmaps();
2170
2171 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
2172
2173 let pb_actors = (0..actor_count)
2174 .map(|actor_id| StreamActor {
2175 actor_id: actor_id.into(),
2176 fragment_id: TEST_FRAGMENT_ID as _,
2177 vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
2178 mview_definition: "".to_owned(),
2179 expr_context: Some(PbExprContext {
2180 time_zone: String::from("America/New_York"),
2181 strict_mode: false,
2182 }),
2183 config_override: "".into(),
2184 })
2185 .collect_vec();
2186
2187 let pb_fragment = Fragment {
2188 fragment_id: TEST_FRAGMENT_ID as _,
2189 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
2190 distribution_type: PbFragmentDistributionType::Hash as _,
2191 actors: pb_actors,
2192 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
2193 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
2194 nodes: stream_node,
2195 };
2196
2197 let fragment =
2198 CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
2199
2200 check_fragment(fragment, pb_fragment);
2201
2202 Ok(())
2203 }
2204
2205 #[tokio::test]
2206 async fn test_compose_fragment() -> MetaResult<()> {
2207 let actor_count = 3u32;
2208
2209 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2210 .map(|actor_id| {
2211 (
2212 actor_id.into(),
2213 generate_upstream_actor_ids_for_actor(actor_id.into()),
2214 )
2215 })
2216 .collect();
2217
2218 let mut actor_bitmaps = ActorMapping::new_uniform(
2219 (0..actor_count).map(|i| i.into()),
2220 VirtualNode::COUNT_FOR_TEST,
2221 )
2222 .to_bitmaps();
2223
2224 let actors = (0..actor_count)
2225 .map(|actor_id| {
2226 let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
2227 splits: vec![PbConnectorSplit {
2228 split_type: "dummy".to_owned(),
2229 ..Default::default()
2230 }],
2231 });
2232
2233 ActorInfo {
2234 actor_id: actor_id.into(),
2235 fragment_id: TEST_FRAGMENT_ID,
2236 splits: actor_splits,
2237 worker_id: 0.into(),
2238 vnode_bitmap: actor_bitmaps
2239 .remove(&actor_id)
2240 .map(|bitmap| bitmap.to_protobuf())
2241 .as_ref()
2242 .map(VnodeBitmap::from),
2243 expr_context: ExprContext::from(&PbExprContext {
2244 time_zone: String::from("America/New_York"),
2245 strict_mode: false,
2246 }),
2247 config_override: "a.b.c = true".into(),
2248 }
2249 })
2250 .collect_vec();
2251
2252 let stream_node = {
2253 let template_actor = actors.first().cloned().unwrap();
2254
2255 let template_upstream_actor_ids =
2256 upstream_actor_ids.get(&template_actor.actor_id).unwrap();
2257
2258 generate_merger_stream_node(template_upstream_actor_ids)
2259 };
2260
2261 #[expect(deprecated)]
2262 let fragment = fragment::Model {
2263 fragment_id: TEST_FRAGMENT_ID,
2264 job_id: TEST_JOB_ID,
2265 fragment_type_mask: 0,
2266 distribution_type: DistributionType::Hash,
2267 stream_node: StreamNode::from(&stream_node),
2268 state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
2269 upstream_fragment_id: Default::default(),
2270 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2271 parallelism: None,
2272 };
2273
2274 let (pb_fragment, pb_actor_status, pb_actor_splits) =
2275 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2276
2277 assert_eq!(pb_actor_status.len(), actor_count as usize);
2278 assert!(
2279 pb_actor_status
2280 .values()
2281 .all(|actor_status| actor_status.location.is_some())
2282 );
2283 assert_eq!(pb_actor_splits.len(), actor_count as usize);
2284
2285 let pb_actors = pb_fragment.actors.clone();
2286
2287 check_fragment(fragment, pb_fragment);
2288 check_actors(
2289 actors,
2290 &upstream_actor_ids,
2291 pb_actors,
2292 pb_actor_splits,
2293 &stream_node,
2294 );
2295
2296 Ok(())
2297 }
2298
2299 fn check_actors(
2300 actors: Vec<ActorInfo>,
2301 actor_upstreams: &FragmentActorUpstreams,
2302 pb_actors: Vec<StreamActor>,
2303 pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2304 stream_node: &PbStreamNode,
2305 ) {
2306 for (
2307 ActorInfo {
2308 actor_id,
2309 fragment_id,
2310 splits,
2311 worker_id: _,
2312 vnode_bitmap,
2313 expr_context,
2314 ..
2315 },
2316 StreamActor {
2317 actor_id: pb_actor_id,
2318 fragment_id: pb_fragment_id,
2319 vnode_bitmap: pb_vnode_bitmap,
2320 mview_definition,
2321 expr_context: pb_expr_context,
2322 ..
2323 },
2324 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2325 {
2326 assert_eq!(actor_id, pb_actor_id as ActorId);
2327 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2328
2329 assert_eq!(
2330 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2331 pb_vnode_bitmap,
2332 );
2333
2334 assert_eq!(mview_definition, "");
2335
2336 visit_stream_node_body(stream_node, |body| {
2337 if let PbNodeBody::Merge(m) = body {
2338 assert!(
2339 actor_upstreams
2340 .get(&actor_id)
2341 .unwrap()
2342 .contains_key(&m.upstream_fragment_id)
2343 );
2344 }
2345 });
2346
2347 assert_eq!(
2348 splits,
2349 pb_actor_splits
2350 .get(&pb_actor_id)
2351 .map(ConnectorSplits::from)
2352 .unwrap_or_default()
2353 );
2354
2355 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2356 }
2357 }
2358
2359 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2360 let Fragment {
2361 fragment_id,
2362 fragment_type_mask,
2363 distribution_type: pb_distribution_type,
2364 actors: _,
2365 state_table_ids: pb_state_table_ids,
2366 maybe_vnode_count: _,
2367 nodes,
2368 } = pb_fragment;
2369
2370 assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2371 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2372 assert_eq!(
2373 pb_distribution_type,
2374 PbFragmentDistributionType::from(fragment.distribution_type)
2375 );
2376
2377 assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2378 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2379 }
2380
2381 #[test]
2382 fn test_parallelism_policy_with_root_fragments() {
2383 #[expect(deprecated)]
2384 let fragment = fragment::Model {
2385 fragment_id: 3.into(),
2386 job_id: TEST_JOB_ID,
2387 fragment_type_mask: 0,
2388 distribution_type: DistributionType::Hash,
2389 stream_node: StreamNode::from(&PbStreamNode::default()),
2390 state_table_ids: TableIdArray::default(),
2391 upstream_fragment_id: Default::default(),
2392 vnode_count: 0,
2393 parallelism: None,
2394 };
2395
2396 let job_parallelism = StreamingParallelism::Fixed(4);
2397
2398 let policy = super::CatalogController::format_fragment_parallelism_policy(
2399 fragment.distribution_type,
2400 fragment.parallelism.as_ref(),
2401 Some(&job_parallelism),
2402 &[],
2403 );
2404
2405 assert_eq!(policy, "inherit(fixed(4))");
2406 }
2407
2408 #[test]
2409 fn test_parallelism_policy_with_upstream_roots() {
2410 #[expect(deprecated)]
2411 let fragment = fragment::Model {
2412 fragment_id: 5.into(),
2413 job_id: TEST_JOB_ID,
2414 fragment_type_mask: 0,
2415 distribution_type: DistributionType::Hash,
2416 stream_node: StreamNode::from(&PbStreamNode::default()),
2417 state_table_ids: TableIdArray::default(),
2418 upstream_fragment_id: Default::default(),
2419 vnode_count: 0,
2420 parallelism: None,
2421 };
2422
2423 let policy = super::CatalogController::format_fragment_parallelism_policy(
2424 fragment.distribution_type,
2425 fragment.parallelism.as_ref(),
2426 None,
2427 &[3.into(), 1.into(), 2.into(), 1.into()],
2428 );
2429
2430 assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2431 }
2432}