1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
26use risingwave_common::util::stream_graph_visitor::{
27 visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_connector::source::SplitImpl;
30use risingwave_meta_model::actor::ActorStatus;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34 Actor, Fragment as FragmentModel, FragmentRelation, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39 VnodeBitmap, WorkerId, actor, database, fragment, fragment_relation, object, sink, source,
40 streaming_job, table,
41};
42use risingwave_meta_model_migration::{Alias, ExprTrait, SelectStatement, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46 Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
49use risingwave_pb::meta::table_fragments::fragment::{
50 FragmentDistributionType, PbFragmentDistributionType,
51};
52use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
53use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping};
54use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57 PbDispatchOutputMapping, PbDispatcherType, PbStreamContext, PbStreamNode, PbStreamScanType,
58 StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63 ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, ModelTrait, PaginatorTrait,
64 QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
65};
66use serde::{Deserialize, Serialize};
67use tracing::debug;
68
69use crate::barrier::SnapshotBackfillInfo;
70use crate::controller::catalog::{CatalogController, CatalogControllerInner};
71use crate::controller::scale::resolve_streaming_job_definition;
72use crate::controller::utils::{
73 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
74 get_fragment_mappings, get_sink_fragment_by_ids, has_table_been_migrated,
75 resolve_no_shuffle_actor_dispatcher,
76};
77use crate::manager::{LocalNotification, NotificationManager};
78use crate::model::{
79 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
80 StreamActor, StreamContext, StreamJobFragments, TableParallelism,
81};
82use crate::rpc::ddl_controller::build_upstream_sink_info;
83use crate::stream::{SplitAssignment, UpstreamSinkInfo, build_actor_split_impls};
84use crate::{MetaError, MetaResult};
85
86#[derive(Clone, Debug)]
88pub struct InflightActorInfo {
89 pub worker_id: WorkerId,
90 pub vnode_bitmap: Option<Bitmap>,
91}
92
93#[derive(Clone, Debug)]
94pub struct InflightFragmentInfo {
95 pub fragment_id: crate::model::FragmentId,
96 pub distribution_type: DistributionType,
97 pub nodes: PbStreamNode,
98 pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
99 pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
100}
101
102#[derive(Clone, Debug)]
103pub struct FragmentParallelismInfo {
104 pub distribution_type: FragmentDistributionType,
105 pub actor_count: usize,
106 pub vnode_count: usize,
107}
108
109pub(crate) trait FragmentTypeMaskExt {
110 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr;
111 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr;
112}
113
114impl FragmentTypeMaskExt for FragmentTypeMask {
115 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
116 Expr::col(fragment::Column::FragmentTypeMask)
117 .bit_and(Expr::value(flag as i32))
118 .ne(0)
119 }
120
121 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
122 Expr::col(fragment::Column::FragmentTypeMask)
123 .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
124 .ne(0)
125 }
126}
127
128#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
129#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
131 pub job_id: ObjectId,
132 pub obj_type: ObjectType,
133 pub name: String,
134 pub job_status: JobStatus,
135 pub parallelism: StreamingParallelism,
136 pub max_parallelism: i32,
137 pub resource_group: String,
138 pub database_id: DatabaseId,
139 pub schema_id: SchemaId,
140}
141
142impl CatalogControllerInner {
143 pub async fn all_running_fragment_mappings(
145 &self,
146 ) -> MetaResult<impl Iterator<Item = FragmentWorkerSlotMapping> + '_> {
147 let txn = self.db.begin().await?;
148
149 let job_ids: Vec<ObjectId> = StreamingJob::find()
150 .select_only()
151 .column(streaming_job::Column::JobId)
152 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
153 .into_tuple()
154 .all(&txn)
155 .await?;
156
157 let mut result = vec![];
158 for job_id in job_ids {
159 let mappings = get_fragment_mappings(&txn, job_id).await?;
160
161 result.extend(mappings.into_iter());
162 }
163
164 Ok(result.into_iter())
165 }
166}
167
168impl NotificationManager {
169 pub(crate) fn notify_fragment_mapping(
170 &self,
171 operation: NotificationOperation,
172 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
173 ) {
174 let fragment_ids = fragment_mappings
175 .iter()
176 .map(|mapping| mapping.fragment_id)
177 .collect_vec();
178 if fragment_ids.is_empty() {
179 return;
180 }
181 for fragment_mapping in fragment_mappings {
183 self.notify_frontend_without_version(
184 operation,
185 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
186 );
187 }
188
189 match operation {
191 NotificationOperation::Add | NotificationOperation::Update => {
192 self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
193 fragment_ids,
194 ));
195 }
196 NotificationOperation::Delete => {
197 self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
198 fragment_ids,
199 ));
200 }
201 op => {
202 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
203 }
204 }
205 }
206}
207
208impl CatalogController {
209 pub fn extract_fragment_and_actors_from_fragments(
210 job_id: ObjectId,
211 fragments: impl Iterator<Item = &Fragment>,
212 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
213 actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
214 ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
215 fragments
216 .map(|fragment| {
217 Self::extract_fragment_and_actors_for_new_job(
218 job_id,
219 fragment,
220 actor_status,
221 actor_splits,
222 )
223 })
224 .try_collect()
225 }
226
227 pub fn extract_fragment_and_actors_for_new_job(
228 job_id: ObjectId,
229 fragment: &Fragment,
230 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
231 actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
232 ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
233 let vnode_count = fragment.vnode_count();
234 let Fragment {
235 fragment_id: pb_fragment_id,
236 fragment_type_mask: pb_fragment_type_mask,
237 distribution_type: pb_distribution_type,
238 actors: pb_actors,
239 state_table_ids: pb_state_table_ids,
240 nodes,
241 ..
242 } = fragment;
243
244 let state_table_ids = pb_state_table_ids.clone().into();
245
246 assert!(!pb_actors.is_empty());
247
248 let stream_node = {
249 let mut stream_node = nodes.clone();
250 visit_stream_node_mut(&mut stream_node, |body| {
251 #[expect(deprecated)]
252 if let NodeBody::Merge(m) = body {
253 m.upstream_actor_id = vec![];
254 }
255 });
256
257 stream_node
258 };
259
260 let mut actors = vec![];
261
262 for actor in pb_actors {
263 let StreamActor {
264 actor_id,
265 fragment_id,
266 vnode_bitmap,
267 mview_definition: _,
268 expr_context: pb_expr_context,
269 ..
270 } = actor;
271
272 let splits = actor_splits.get(actor_id).map(|splits| {
273 ConnectorSplits::from(&PbConnectorSplits {
274 splits: splits.iter().map(ConnectorSplit::from).collect(),
275 })
276 });
277 let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
278 anyhow::anyhow!(
279 "actor {} in fragment {} has no actor_status",
280 actor_id,
281 fragment_id
282 )
283 })?;
284
285 let worker_id = status.worker_id() as _;
286
287 let pb_expr_context = pb_expr_context
288 .as_ref()
289 .expect("no expression context found");
290
291 #[expect(deprecated)]
292 actors.push(actor::Model {
293 actor_id: *actor_id as _,
294 fragment_id: *fragment_id as _,
295 status: status.get_state().unwrap().into(),
296 splits,
297 worker_id,
298 upstream_actor_ids: Default::default(),
299 vnode_bitmap: vnode_bitmap
300 .as_ref()
301 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
302 expr_context: ExprContext::from(pb_expr_context),
303 });
304 }
305
306 let stream_node = StreamNode::from(&stream_node);
307
308 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
309 .unwrap()
310 .into();
311
312 #[expect(deprecated)]
313 let fragment = fragment::Model {
314 fragment_id: *pb_fragment_id as _,
315 job_id,
316 fragment_type_mask: (*pb_fragment_type_mask).into(),
317 distribution_type,
318 stream_node,
319 state_table_ids,
320 upstream_fragment_id: Default::default(),
321 vnode_count: vnode_count as _,
322 };
323
324 Ok((fragment, actors))
325 }
326
327 pub fn compose_table_fragments(
328 table_id: u32,
329 state: PbState,
330 ctx: Option<PbStreamContext>,
331 fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
332 parallelism: StreamingParallelism,
333 max_parallelism: usize,
334 job_definition: Option<String>,
335 ) -> MetaResult<StreamJobFragments> {
336 let mut pb_fragments = BTreeMap::new();
337 let mut pb_actor_splits = HashMap::new();
338 let mut pb_actor_status = BTreeMap::new();
339
340 for (fragment, actors) in fragments {
341 let (fragment, fragment_actor_status, fragment_actor_splits) =
342 Self::compose_fragment(fragment, actors, job_definition.clone())?;
343
344 pb_fragments.insert(fragment.fragment_id, fragment);
345
346 pb_actor_splits.extend(build_actor_split_impls(&fragment_actor_splits));
347 pb_actor_status.extend(fragment_actor_status.into_iter());
348 }
349
350 let table_fragments = StreamJobFragments {
351 stream_job_id: table_id.into(),
352 state: state as _,
353 fragments: pb_fragments,
354 actor_status: pb_actor_status,
355 actor_splits: pb_actor_splits,
356 ctx: ctx
357 .as_ref()
358 .map(StreamContext::from_protobuf)
359 .unwrap_or_default(),
360 assigned_parallelism: match parallelism {
361 StreamingParallelism::Custom => TableParallelism::Custom,
362 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
363 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
364 },
365 max_parallelism,
366 };
367
368 Ok(table_fragments)
369 }
370
371 #[allow(clippy::type_complexity)]
372 pub(crate) fn compose_fragment(
373 fragment: fragment::Model,
374 actors: Vec<actor::Model>,
375 job_definition: Option<String>,
376 ) -> MetaResult<(
377 Fragment,
378 HashMap<crate::model::ActorId, PbActorStatus>,
379 HashMap<crate::model::ActorId, PbConnectorSplits>,
380 )> {
381 let fragment::Model {
382 fragment_id,
383 fragment_type_mask,
384 distribution_type,
385 stream_node,
386 state_table_ids,
387 vnode_count,
388 ..
389 } = fragment;
390
391 let stream_node = stream_node.to_protobuf();
392 let mut upstream_fragments = HashSet::new();
393 visit_stream_node_body(&stream_node, |body| {
394 if let NodeBody::Merge(m) = body {
395 assert!(
396 upstream_fragments.insert(m.upstream_fragment_id),
397 "non-duplicate upstream fragment"
398 );
399 }
400 });
401
402 let mut pb_actors = vec![];
403
404 let mut pb_actor_status = HashMap::new();
405 let mut pb_actor_splits = HashMap::new();
406
407 for actor in actors {
408 if actor.fragment_id != fragment_id {
409 bail!(
410 "fragment id {} from actor {} is different from fragment {}",
411 actor.fragment_id,
412 actor.actor_id,
413 fragment_id
414 )
415 }
416
417 let actor::Model {
418 actor_id,
419 fragment_id,
420 status,
421 worker_id,
422 splits,
423 vnode_bitmap,
424 expr_context,
425 ..
426 } = actor;
427
428 let vnode_bitmap =
429 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
430 let pb_expr_context = Some(expr_context.to_protobuf());
431
432 pb_actor_status.insert(
433 actor_id as _,
434 PbActorStatus {
435 location: PbActorLocation::from_worker(worker_id as u32),
436 state: PbActorState::from(status) as _,
437 },
438 );
439
440 if let Some(splits) = splits {
441 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
442 }
443
444 pb_actors.push(StreamActor {
445 actor_id: actor_id as _,
446 fragment_id: fragment_id as _,
447 vnode_bitmap,
448 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
449 expr_context: pb_expr_context,
450 })
451 }
452
453 let pb_state_table_ids = state_table_ids.into_u32_array();
454 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
455 let pb_fragment = Fragment {
456 fragment_id: fragment_id as _,
457 fragment_type_mask: fragment_type_mask.into(),
458 distribution_type: pb_distribution_type,
459 actors: pb_actors,
460 state_table_ids: pb_state_table_ids,
461 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
462 nodes: stream_node,
463 };
464
465 Ok((pb_fragment, pb_actor_status, pb_actor_splits))
466 }
467
468 pub async fn running_fragment_parallelisms(
469 &self,
470 id_filter: Option<HashSet<FragmentId>>,
471 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
472 let inner = self.inner.read().await;
473
474 let query_alias = Alias::new("fragment_actor_count");
475 let count_alias = Alias::new("count");
476
477 let mut query = SelectStatement::new()
478 .column(actor::Column::FragmentId)
479 .expr_as(actor::Column::ActorId.count(), count_alias.clone())
480 .from(Actor)
481 .group_by_col(actor::Column::FragmentId)
482 .to_owned();
483
484 if let Some(id_filter) = id_filter {
485 query.cond_having(actor::Column::FragmentId.is_in(id_filter));
486 }
487
488 let outer = SelectStatement::new()
489 .column((FragmentModel, fragment::Column::FragmentId))
490 .column(count_alias)
491 .column(fragment::Column::DistributionType)
492 .column(fragment::Column::VnodeCount)
493 .from_subquery(query.to_owned(), query_alias.clone())
494 .inner_join(
495 FragmentModel,
496 Expr::col((query_alias, actor::Column::FragmentId))
497 .equals((FragmentModel, fragment::Column::FragmentId)),
498 )
499 .to_owned();
500
501 let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
502 Selector::<SelectGetableTuple<(FragmentId, i64, DistributionType, i32)>>::into_tuple(
503 outer.to_owned(),
504 )
505 .all(&inner.db)
506 .await?;
507
508 Ok(fragment_parallelisms
509 .into_iter()
510 .map(|(fragment_id, count, distribution_type, vnode_count)| {
511 (
512 fragment_id,
513 FragmentParallelismInfo {
514 distribution_type: distribution_type.into(),
515 actor_count: count as usize,
516 vnode_count: vnode_count as usize,
517 },
518 )
519 })
520 .collect())
521 }
522
523 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
524 let inner = self.inner.read().await;
525 let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
526 .select_only()
527 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
528 .into_tuple()
529 .all(&inner.db)
530 .await?;
531 Ok(fragment_jobs.into_iter().collect())
532 }
533
534 pub async fn get_fragment_job_id(
535 &self,
536 fragment_ids: Vec<FragmentId>,
537 ) -> MetaResult<Vec<ObjectId>> {
538 let inner = self.inner.read().await;
539
540 let object_ids: Vec<ObjectId> = FragmentModel::find()
541 .select_only()
542 .column(fragment::Column::JobId)
543 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
544 .into_tuple()
545 .all(&inner.db)
546 .await?;
547
548 Ok(object_ids)
549 }
550
551 pub async fn get_fragment_desc_by_id(
552 &self,
553 fragment_id: FragmentId,
554 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
555 let inner = self.inner.read().await;
556
557 let fragment_opt = FragmentModel::find()
559 .select_only()
560 .columns([
561 fragment::Column::FragmentId,
562 fragment::Column::JobId,
563 fragment::Column::FragmentTypeMask,
564 fragment::Column::DistributionType,
565 fragment::Column::StateTableIds,
566 fragment::Column::VnodeCount,
567 fragment::Column::StreamNode,
568 ])
569 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
570 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
571 .filter(fragment::Column::FragmentId.eq(fragment_id))
572 .group_by(fragment::Column::FragmentId)
573 .into_model::<FragmentDesc>()
574 .one(&inner.db)
575 .await?;
576
577 let Some(fragment) = fragment_opt else {
578 return Ok(None); };
580
581 let upstreams: Vec<FragmentId> = FragmentRelation::find()
583 .select_only()
584 .column(fragment_relation::Column::SourceFragmentId)
585 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
586 .into_tuple()
587 .all(&inner.db)
588 .await?;
589
590 Ok(Some((fragment, upstreams)))
591 }
592
593 pub async fn list_fragment_database_ids(
594 &self,
595 select_fragment_ids: Option<Vec<FragmentId>>,
596 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
597 let inner = self.inner.read().await;
598 let select = FragmentModel::find()
599 .select_only()
600 .column(fragment::Column::FragmentId)
601 .column(object::Column::DatabaseId)
602 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
603 let select = if let Some(select_fragment_ids) = select_fragment_ids {
604 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
605 } else {
606 select
607 };
608 Ok(select.into_tuple().all(&inner.db).await?)
609 }
610
611 pub async fn get_job_fragments_by_id(
612 &self,
613 job_id: ObjectId,
614 ) -> MetaResult<StreamJobFragments> {
615 let inner = self.inner.read().await;
616 let fragment_actors = FragmentModel::find()
617 .find_with_related(Actor)
618 .filter(fragment::Column::JobId.eq(job_id))
619 .all(&inner.db)
620 .await?;
621
622 let job_info = StreamingJob::find_by_id(job_id)
623 .one(&inner.db)
624 .await?
625 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
626
627 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
628 .await?
629 .remove(&job_id);
630
631 Self::compose_table_fragments(
632 job_id as _,
633 job_info.job_status.into(),
634 job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
635 fragment_actors,
636 job_info.parallelism.clone(),
637 job_info.max_parallelism as _,
638 job_definition,
639 )
640 }
641
642 pub async fn get_fragment_actor_dispatchers(
643 &self,
644 fragment_ids: Vec<FragmentId>,
645 ) -> MetaResult<FragmentActorDispatchers> {
646 let inner = self.inner.read().await;
647 get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
648 }
649
650 pub async fn get_fragment_downstream_relations(
651 &self,
652 fragment_ids: Vec<FragmentId>,
653 ) -> MetaResult<FragmentDownstreamRelation> {
654 let inner = self.inner.read().await;
655 let mut stream = FragmentRelation::find()
656 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
657 .stream(&inner.db)
658 .await?;
659 let mut relations = FragmentDownstreamRelation::new();
660 while let Some(relation) = stream.try_next().await? {
661 relations
662 .entry(relation.source_fragment_id as _)
663 .or_default()
664 .push(DownstreamFragmentRelation {
665 downstream_fragment_id: relation.target_fragment_id as _,
666 dispatcher_type: relation.dispatcher_type,
667 dist_key_indices: relation.dist_key_indices.into_u32_array(),
668 output_mapping: PbDispatchOutputMapping {
669 indices: relation.output_indices.into_u32_array(),
670 types: relation
671 .output_type_mapping
672 .unwrap_or_default()
673 .to_protobuf(),
674 },
675 });
676 }
677 Ok(relations)
678 }
679
680 pub async fn get_job_fragment_backfill_scan_type(
681 &self,
682 job_id: ObjectId,
683 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
684 let inner = self.inner.read().await;
685 let fragments: Vec<_> = FragmentModel::find()
686 .filter(fragment::Column::JobId.eq(job_id))
687 .all(&inner.db)
688 .await?;
689
690 let mut result = HashMap::new();
691
692 for fragment::Model {
693 fragment_id,
694 stream_node,
695 ..
696 } in fragments
697 {
698 let stream_node = stream_node.to_protobuf();
699 visit_stream_node_body(&stream_node, |body| {
700 if let NodeBody::StreamScan(node) = body {
701 match node.stream_scan_type() {
702 StreamScanType::Unspecified => {}
703 scan_type => {
704 result.insert(fragment_id as crate::model::FragmentId, scan_type);
705 }
706 }
707 }
708 });
709 }
710
711 Ok(result)
712 }
713
714 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
715 let inner = self.inner.read().await;
716 let job_states = StreamingJob::find()
717 .select_only()
718 .column(streaming_job::Column::JobId)
719 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
720 .join(JoinType::InnerJoin, object::Relation::Database2.def())
721 .column(object::Column::ObjType)
722 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
723 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
724 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
725 .column_as(
726 Expr::if_null(
727 Expr::col((table::Entity, table::Column::Name)),
728 Expr::if_null(
729 Expr::col((source::Entity, source::Column::Name)),
730 Expr::if_null(
731 Expr::col((sink::Entity, sink::Column::Name)),
732 Expr::val("<unknown>"),
733 ),
734 ),
735 ),
736 "name",
737 )
738 .columns([
739 streaming_job::Column::JobStatus,
740 streaming_job::Column::Parallelism,
741 streaming_job::Column::MaxParallelism,
742 ])
743 .column_as(
744 Expr::if_null(
745 Expr::col((
746 streaming_job::Entity,
747 streaming_job::Column::SpecificResourceGroup,
748 )),
749 Expr::col((database::Entity, database::Column::ResourceGroup)),
750 ),
751 "resource_group",
752 )
753 .column(object::Column::DatabaseId)
754 .column(object::Column::SchemaId)
755 .into_model()
756 .all(&inner.db)
757 .await?;
758 Ok(job_states)
759 }
760
761 pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
762 let inner = self.inner.read().await;
763 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
764 .select_only()
765 .column(streaming_job::Column::MaxParallelism)
766 .into_tuple()
767 .one(&inner.db)
768 .await?
769 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
770 Ok(max_parallelism as usize)
771 }
772
773 pub async fn get_job_actor_mapping(
775 &self,
776 job_ids: Vec<ObjectId>,
777 ) -> MetaResult<HashMap<ObjectId, Vec<ActorId>>> {
778 let inner = self.inner.read().await;
779 let job_actors: Vec<(ObjectId, ActorId)> = Actor::find()
780 .select_only()
781 .column(fragment::Column::JobId)
782 .column(actor::Column::ActorId)
783 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
784 .filter(fragment::Column::JobId.is_in(job_ids))
785 .into_tuple()
786 .all(&inner.db)
787 .await?;
788 Ok(job_actors.into_iter().into_group_map())
789 }
790
791 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
793 if let Ok(inner) = self.inner.try_read()
794 && let Ok(job_state_tables) = FragmentModel::find()
795 .select_only()
796 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
797 .into_tuple::<(ObjectId, I32Array)>()
798 .all(&inner.db)
799 .await
800 {
801 let mut job_internal_table_ids = HashMap::new();
802 for (job_id, state_table_ids) in job_state_tables {
803 job_internal_table_ids
804 .entry(job_id)
805 .or_insert_with(Vec::new)
806 .extend(state_table_ids.into_inner());
807 }
808 return Some(job_internal_table_ids.into_iter().collect());
809 }
810 None
811 }
812
813 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
814 let inner = self.inner.read().await;
815 let count = FragmentModel::find().count(&inner.db).await?;
816 Ok(count > 0)
817 }
818
819 pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
820 let inner = self.inner.read().await;
821 let actor_cnt: Vec<(WorkerId, i64)> = Actor::find()
822 .select_only()
823 .column(actor::Column::WorkerId)
824 .column_as(actor::Column::ActorId.count(), "count")
825 .group_by(actor::Column::WorkerId)
826 .into_tuple()
827 .all(&inner.db)
828 .await?;
829
830 Ok(actor_cnt
831 .into_iter()
832 .map(|(worker_id, count)| (worker_id, count as usize))
833 .collect())
834 }
835
836 pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
838 let inner = self.inner.read().await;
839 let jobs = StreamingJob::find().all(&inner.db).await?;
840
841 let mut job_definition = resolve_streaming_job_definition(
842 &inner.db,
843 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
844 )
845 .await?;
846
847 let mut table_fragments = BTreeMap::new();
848 for job in jobs {
849 let fragment_actors = FragmentModel::find()
850 .find_with_related(Actor)
851 .filter(fragment::Column::JobId.eq(job.job_id))
852 .all(&inner.db)
853 .await?;
854
855 table_fragments.insert(
856 job.job_id as ObjectId,
857 Self::compose_table_fragments(
858 job.job_id as _,
859 job.job_status.into(),
860 job.timezone.map(|tz| PbStreamContext { timezone: tz }),
861 fragment_actors,
862 job.parallelism.clone(),
863 job.max_parallelism as _,
864 job_definition.remove(&job.job_id),
865 )?,
866 );
867 }
868
869 Ok(table_fragments)
870 }
871
872 pub async fn cdc_table_backfill_actor_ids(&self) -> MetaResult<HashMap<u32, HashSet<u32>>> {
874 let inner = self.inner.read().await;
875 let mut job_id_actor_ids = HashMap::default();
876 let stream_cdc_scan_flag = FragmentTypeFlag::StreamCdcScan as i32;
877 let fragment_type_mask = stream_cdc_scan_flag;
878 let fragment_actors: Vec<(fragment::Model, Vec<actor::Model>)> = FragmentModel::find()
879 .find_with_related(Actor)
880 .filter(fragment::Column::FragmentTypeMask.eq(fragment_type_mask))
881 .all(&inner.db)
882 .await?;
883 for (fragment, actors) in fragment_actors {
884 let e: &mut HashSet<u32> = job_id_actor_ids.entry(fragment.job_id as _).or_default();
885 e.extend(actors.iter().map(|a| a.actor_id as u32));
886 }
887 Ok(job_id_actor_ids)
888 }
889
890 pub async fn upstream_fragments(
891 &self,
892 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
893 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
894 let inner = self.inner.read().await;
895 let mut stream = FragmentRelation::find()
896 .select_only()
897 .columns([
898 fragment_relation::Column::SourceFragmentId,
899 fragment_relation::Column::TargetFragmentId,
900 ])
901 .filter(
902 fragment_relation::Column::TargetFragmentId
903 .is_in(fragment_ids.map(|id| id as FragmentId)),
904 )
905 .into_tuple::<(FragmentId, FragmentId)>()
906 .stream(&inner.db)
907 .await?;
908 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
909 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
910 upstream_fragments
911 .entry(downstream_fragment_id as crate::model::FragmentId)
912 .or_default()
913 .insert(upstream_fragment_id as crate::model::FragmentId);
914 }
915 Ok(upstream_fragments)
916 }
917
918 pub async fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
919 let inner = self.inner.read().await;
920 let actor_locations: Vec<PartialActorLocation> =
921 Actor::find().into_partial_model().all(&inner.db).await?;
922 Ok(actor_locations)
923 }
924
925 pub async fn list_actor_info(
926 &self,
927 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
928 let inner = self.inner.read().await;
929 let actor_locations: Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)> =
930 Actor::find()
931 .join(JoinType::LeftJoin, actor::Relation::Fragment.def())
932 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
933 .select_only()
934 .columns([actor::Column::ActorId, actor::Column::FragmentId])
935 .column_as(object::Column::Oid, "job_id")
936 .column_as(object::Column::SchemaId, "schema_id")
937 .column_as(object::Column::ObjType, "type")
938 .into_tuple()
939 .all(&inner.db)
940 .await?;
941 Ok(actor_locations)
942 }
943
944 pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
945 let inner = self.inner.read().await;
946
947 let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
948 .select_only()
949 .filter(actor::Column::Splits.is_not_null())
950 .columns([actor::Column::ActorId, actor::Column::FragmentId])
951 .into_tuple()
952 .all(&inner.db)
953 .await?;
954
955 Ok(source_actors)
956 }
957
958 pub async fn list_creating_fragment_descs(
959 &self,
960 ) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
961 let inner = self.inner.read().await;
962 let mut result = Vec::new();
963 let fragments = FragmentModel::find()
964 .select_only()
965 .columns([
966 fragment::Column::FragmentId,
967 fragment::Column::JobId,
968 fragment::Column::FragmentTypeMask,
969 fragment::Column::DistributionType,
970 fragment::Column::StateTableIds,
971 fragment::Column::VnodeCount,
972 fragment::Column::StreamNode,
973 ])
974 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
975 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
976 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
977 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
978 .filter(
979 streaming_job::Column::JobStatus
980 .eq(JobStatus::Initial)
981 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
982 )
983 .group_by(fragment::Column::FragmentId)
984 .into_model::<FragmentDesc>()
985 .all(&inner.db)
986 .await?;
987 for fragment in fragments {
988 let upstreams: Vec<FragmentId> = FragmentRelation::find()
989 .select_only()
990 .column(fragment_relation::Column::SourceFragmentId)
991 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
992 .into_tuple()
993 .all(&inner.db)
994 .await?;
995 result.push((fragment, upstreams));
996 }
997 Ok(result)
998 }
999
1000 pub async fn list_fragment_descs(&self) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
1001 let inner = self.inner.read().await;
1002 let mut result = Vec::new();
1003 let fragments = FragmentModel::find()
1004 .select_only()
1005 .columns([
1006 fragment::Column::FragmentId,
1007 fragment::Column::JobId,
1008 fragment::Column::FragmentTypeMask,
1009 fragment::Column::DistributionType,
1010 fragment::Column::StateTableIds,
1011 fragment::Column::VnodeCount,
1012 fragment::Column::StreamNode,
1013 ])
1014 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
1015 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
1016 .group_by(fragment::Column::FragmentId)
1017 .into_model::<FragmentDesc>()
1018 .all(&inner.db)
1019 .await?;
1020 for fragment in fragments {
1021 let upstreams: Vec<FragmentId> = FragmentRelation::find()
1022 .select_only()
1023 .column(fragment_relation::Column::SourceFragmentId)
1024 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
1025 .into_tuple()
1026 .all(&inner.db)
1027 .await?;
1028 result.push((fragment, upstreams));
1029 }
1030 Ok(result)
1031 }
1032
1033 pub async fn list_sink_actor_mapping(
1034 &self,
1035 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1036 let inner = self.inner.read().await;
1037 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1038 .select_only()
1039 .columns([sink::Column::SinkId, sink::Column::Name])
1040 .into_tuple()
1041 .all(&inner.db)
1042 .await?;
1043 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1044 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1045
1046 let actor_with_type: Vec<(ActorId, SinkId)> = Actor::find()
1047 .select_only()
1048 .column(actor::Column::ActorId)
1049 .column(fragment::Column::JobId)
1050 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1051 .filter(
1052 fragment::Column::JobId
1053 .is_in(sink_ids)
1054 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
1055 )
1056 .into_tuple()
1057 .all(&inner.db)
1058 .await?;
1059
1060 let mut sink_actor_mapping = HashMap::new();
1061 for (actor_id, sink_id) in actor_with_type {
1062 sink_actor_mapping
1063 .entry(sink_id)
1064 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1065 .1
1066 .push(actor_id);
1067 }
1068
1069 Ok(sink_actor_mapping)
1070 }
1071
1072 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1073 let inner = self.inner.read().await;
1074 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1075 .select_only()
1076 .columns([
1077 fragment::Column::FragmentId,
1078 fragment::Column::JobId,
1079 fragment::Column::StateTableIds,
1080 ])
1081 .into_partial_model()
1082 .all(&inner.db)
1083 .await?;
1084 Ok(fragment_state_tables)
1085 }
1086
1087 pub async fn load_all_actors(
1090 &self,
1091 database_id: Option<DatabaseId>,
1092 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
1093 {
1094 let inner = self.inner.read().await;
1095 let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
1096 let filter_condition = if let Some(database_id) = database_id {
1097 filter_condition.and(object::Column::DatabaseId.eq(database_id))
1098 } else {
1099 filter_condition
1100 };
1101 #[expect(clippy::type_complexity)]
1102 let mut actor_info_stream: BoxStream<
1103 '_,
1104 Result<
1105 (
1106 ActorId,
1107 WorkerId,
1108 Option<VnodeBitmap>,
1109 FragmentId,
1110 StreamNode,
1111 I32Array,
1112 DistributionType,
1113 DatabaseId,
1114 ObjectId,
1115 ),
1116 _,
1117 >,
1118 > = Actor::find()
1119 .select_only()
1120 .column(actor::Column::ActorId)
1121 .column(actor::Column::WorkerId)
1122 .column(actor::Column::VnodeBitmap)
1123 .column(fragment::Column::FragmentId)
1124 .column(fragment::Column::StreamNode)
1125 .column(fragment::Column::StateTableIds)
1126 .column(fragment::Column::DistributionType)
1127 .column(object::Column::DatabaseId)
1128 .column(object::Column::Oid)
1129 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1130 .join(JoinType::InnerJoin, fragment::Relation::Object.def())
1131 .filter(filter_condition)
1132 .into_tuple()
1133 .stream(&inner.db)
1134 .await?;
1135
1136 let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1137 HashMap::new();
1138
1139 while let Some((
1140 actor_id,
1141 worker_id,
1142 vnode_bitmap,
1143 fragment_id,
1144 node,
1145 state_table_ids,
1146 distribution_type,
1147 database_id,
1148 job_id,
1149 )) = actor_info_stream.try_next().await?
1150 {
1151 let fragment_infos = database_fragment_infos
1152 .entry(database_id)
1153 .or_default()
1154 .entry(job_id)
1155 .or_default();
1156 let state_table_ids = state_table_ids.into_inner();
1157 let state_table_ids = state_table_ids
1158 .into_iter()
1159 .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1160 .collect();
1161 let actor_info = InflightActorInfo {
1162 worker_id,
1163 vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1164 };
1165 match fragment_infos.entry(fragment_id) {
1166 Entry::Occupied(mut entry) => {
1167 let info: &mut InflightFragmentInfo = entry.get_mut();
1168 assert_eq!(info.state_table_ids, state_table_ids);
1169 assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1170 }
1171 Entry::Vacant(entry) => {
1172 entry.insert(InflightFragmentInfo {
1173 fragment_id: fragment_id as _,
1174 distribution_type,
1175 nodes: node.to_protobuf(),
1176 actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1177 state_table_ids,
1178 });
1179 }
1180 }
1181 }
1182
1183 debug!(?database_fragment_infos, "reload all actors");
1184
1185 Ok(database_fragment_infos)
1186 }
1187
1188 pub async fn migrate_actors(
1189 &self,
1190 plan: HashMap<WorkerSlotId, WorkerSlotId>,
1191 ) -> MetaResult<()> {
1192 let inner = self.inner.read().await;
1193 let txn = inner.db.begin().await?;
1194
1195 let actors: Vec<(
1196 FragmentId,
1197 DistributionType,
1198 ActorId,
1199 Option<VnodeBitmap>,
1200 WorkerId,
1201 ActorStatus,
1202 )> = Actor::find()
1203 .select_only()
1204 .columns([
1205 fragment::Column::FragmentId,
1206 fragment::Column::DistributionType,
1207 ])
1208 .columns([
1209 actor::Column::ActorId,
1210 actor::Column::VnodeBitmap,
1211 actor::Column::WorkerId,
1212 actor::Column::Status,
1213 ])
1214 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1215 .into_tuple()
1216 .all(&txn)
1217 .await?;
1218
1219 let mut actor_locations = HashMap::new();
1220
1221 for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1222 if *status != ActorStatus::Running {
1223 tracing::warn!(
1224 "skipping actor {} in fragment {} with status {:?}",
1225 actor_id,
1226 fragment_id,
1227 status
1228 );
1229 continue;
1230 }
1231
1232 actor_locations
1233 .entry(*worker_id)
1234 .or_insert(HashMap::new())
1235 .entry(*fragment_id)
1236 .or_insert(BTreeSet::new())
1237 .insert(*actor_id);
1238 }
1239
1240 let expired_or_changed_workers: HashSet<_> =
1241 plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1242
1243 let mut actor_migration_plan = HashMap::new();
1244 for (worker, fragment) in actor_locations {
1245 if expired_or_changed_workers.contains(&worker) {
1246 for (fragment_id, actors) in fragment {
1247 debug!(
1248 "worker {} expired or changed, migrating fragment {}",
1249 worker, fragment_id
1250 );
1251 let worker_slot_to_actor: HashMap<_, _> = actors
1252 .iter()
1253 .enumerate()
1254 .map(|(idx, actor_id)| {
1255 (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1256 })
1257 .collect();
1258
1259 for (worker_slot, actor) in worker_slot_to_actor {
1260 if let Some(target) = plan.get(&worker_slot) {
1261 actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1262 }
1263 }
1264 }
1265 }
1266 }
1267
1268 for (actor, worker) in actor_migration_plan {
1269 Actor::update_many()
1270 .col_expr(
1271 actor::Column::WorkerId,
1272 Expr::value(Value::Int(Some(worker))),
1273 )
1274 .filter(actor::Column::ActorId.eq(actor))
1275 .exec(&txn)
1276 .await?;
1277 }
1278
1279 txn.commit().await?;
1280
1281 Ok(())
1282 }
1283
1284 pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1285 let inner = self.inner.read().await;
1286
1287 let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1288 .select_only()
1289 .columns([fragment::Column::FragmentId])
1290 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1291 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1292 .into_tuple()
1293 .all(&inner.db)
1294 .await?;
1295
1296 let mut actor_locations = HashMap::new();
1297
1298 for (fragment_id, _, worker_id) in actors {
1299 *actor_locations
1300 .entry(worker_id)
1301 .or_insert(HashMap::new())
1302 .entry(fragment_id)
1303 .or_insert(0_usize) += 1;
1304 }
1305
1306 let mut result = HashSet::new();
1307 for (worker_id, mapping) in actor_locations {
1308 let max_fragment_len = mapping.values().max().unwrap();
1309
1310 result
1311 .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1312 }
1313
1314 Ok(result)
1315 }
1316
1317 pub async fn all_node_actors(
1318 &self,
1319 include_inactive: bool,
1320 ) -> MetaResult<HashMap<WorkerId, Vec<StreamActor>>> {
1321 let inner = self.inner.read().await;
1322 let fragment_actors = if include_inactive {
1323 FragmentModel::find()
1324 .find_with_related(Actor)
1325 .all(&inner.db)
1326 .await?
1327 } else {
1328 FragmentModel::find()
1329 .find_with_related(Actor)
1330 .filter(actor::Column::Status.eq(ActorStatus::Running))
1331 .all(&inner.db)
1332 .await?
1333 };
1334
1335 let job_definitions = resolve_streaming_job_definition(
1336 &inner.db,
1337 &HashSet::from_iter(fragment_actors.iter().map(|(fragment, _)| fragment.job_id)),
1338 )
1339 .await?;
1340
1341 let mut node_actors = HashMap::new();
1342 for (fragment, actors) in fragment_actors {
1343 let job_id = fragment.job_id;
1344 let (table_fragments, actor_status, _) = Self::compose_fragment(
1345 fragment,
1346 actors,
1347 job_definitions.get(&(job_id as _)).cloned(),
1348 )?;
1349 for actor in table_fragments.actors {
1350 let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
1351 node_actors
1352 .entry(node_id)
1353 .or_insert_with(Vec::new)
1354 .push(actor);
1355 }
1356 }
1357
1358 Ok(node_actors)
1359 }
1360
1361 pub async fn get_worker_actor_ids(
1362 &self,
1363 job_ids: Vec<ObjectId>,
1364 ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
1365 let inner = self.inner.read().await;
1366 let actor_workers: Vec<(ActorId, WorkerId)> = Actor::find()
1367 .select_only()
1368 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1369 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1370 .filter(fragment::Column::JobId.is_in(job_ids))
1371 .into_tuple()
1372 .all(&inner.db)
1373 .await?;
1374
1375 let mut worker_actors = BTreeMap::new();
1376 for (actor_id, worker_id) in actor_workers {
1377 worker_actors
1378 .entry(worker_id)
1379 .or_insert_with(Vec::new)
1380 .push(actor_id);
1381 }
1382
1383 Ok(worker_actors)
1384 }
1385
1386 pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1387 let inner = self.inner.read().await;
1388 let txn = inner.db.begin().await?;
1389 for assignments in split_assignment.values() {
1390 for (actor_id, splits) in assignments {
1391 let actor_splits = splits.iter().map(Into::into).collect_vec();
1392 Actor::update(actor::ActiveModel {
1393 actor_id: Set(*actor_id as _),
1394 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1395 splits: actor_splits,
1396 }))),
1397 ..Default::default()
1398 })
1399 .exec(&txn)
1400 .await
1401 .map_err(|err| {
1402 if err == DbErr::RecordNotUpdated {
1403 MetaError::catalog_id_not_found("actor_id", actor_id)
1404 } else {
1405 err.into()
1406 }
1407 })?;
1408 }
1409 }
1410 txn.commit().await?;
1411
1412 Ok(())
1413 }
1414
1415 #[await_tree::instrument]
1416 pub async fn fill_snapshot_backfill_epoch(
1417 &self,
1418 fragment_ids: impl Iterator<Item = FragmentId>,
1419 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1420 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1421 ) -> MetaResult<()> {
1422 let inner = self.inner.write().await;
1423 let txn = inner.db.begin().await?;
1424 for fragment_id in fragment_ids {
1425 let fragment = FragmentModel::find_by_id(fragment_id)
1426 .one(&txn)
1427 .await?
1428 .context(format!("fragment {} not found", fragment_id))?;
1429 let mut node = fragment.stream_node.to_protobuf();
1430 if crate::stream::fill_snapshot_backfill_epoch(
1431 &mut node,
1432 snapshot_backfill_info,
1433 cross_db_snapshot_backfill_info,
1434 )? {
1435 let node = StreamNode::from(&node);
1436 FragmentModel::update(fragment::ActiveModel {
1437 fragment_id: Set(fragment_id),
1438 stream_node: Set(node),
1439 ..Default::default()
1440 })
1441 .exec(&txn)
1442 .await?;
1443 }
1444 }
1445 txn.commit().await?;
1446 Ok(())
1447 }
1448
1449 pub async fn get_running_actors_of_fragment(
1451 &self,
1452 fragment_id: FragmentId,
1453 ) -> MetaResult<Vec<ActorId>> {
1454 let inner = self.inner.read().await;
1455 let actors: Vec<ActorId> = Actor::find()
1456 .select_only()
1457 .column(actor::Column::ActorId)
1458 .filter(actor::Column::FragmentId.eq(fragment_id))
1459 .filter(actor::Column::Status.eq(ActorStatus::Running))
1460 .into_tuple()
1461 .all(&inner.db)
1462 .await?;
1463 Ok(actors)
1464 }
1465
1466 pub async fn get_running_actors_for_source_backfill(
1469 &self,
1470 source_backfill_fragment_id: FragmentId,
1471 source_fragment_id: FragmentId,
1472 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1473 let inner = self.inner.read().await;
1474 let txn = inner.db.begin().await?;
1475
1476 let fragment_relation: DispatcherType = FragmentRelation::find()
1477 .select_only()
1478 .column(fragment_relation::Column::DispatcherType)
1479 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1480 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1481 .into_tuple()
1482 .one(&txn)
1483 .await?
1484 .ok_or_else(|| {
1485 anyhow!(
1486 "no fragment connection from source fragment {} to source backfill fragment {}",
1487 source_fragment_id,
1488 source_backfill_fragment_id
1489 )
1490 })?;
1491
1492 if fragment_relation != DispatcherType::NoShuffle {
1493 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1494 }
1495
1496 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1497 let result: MetaResult<DistributionType> = try {
1498 FragmentModel::find_by_id(fragment_id)
1499 .select_only()
1500 .column(fragment::Column::DistributionType)
1501 .into_tuple()
1502 .one(txn)
1503 .await?
1504 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1505 };
1506 result
1507 };
1508
1509 let source_backfill_distribution_type =
1510 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1511 let source_distribution_type =
1512 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1513
1514 let load_fragment_actor_distribution = |txn, fragment_id: FragmentId| async move {
1515 Actor::find()
1516 .select_only()
1517 .column(actor::Column::ActorId)
1518 .column(actor::Column::VnodeBitmap)
1519 .filter(actor::Column::FragmentId.eq(fragment_id))
1520 .into_tuple()
1521 .stream(txn)
1522 .await?
1523 .map(|result| {
1524 result.map(|(actor_id, vnode): (ActorId, Option<VnodeBitmap>)| {
1525 (
1526 actor_id as _,
1527 vnode.map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1528 )
1529 })
1530 })
1531 .try_collect()
1532 .await
1533 };
1534
1535 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1536 load_fragment_actor_distribution(&txn, source_backfill_fragment_id).await?;
1537
1538 let source_actors = load_fragment_actor_distribution(&txn, source_fragment_id).await?;
1539
1540 Ok(resolve_no_shuffle_actor_dispatcher(
1541 source_distribution_type,
1542 &source_actors,
1543 source_backfill_distribution_type,
1544 &source_backfill_actors,
1545 )
1546 .into_iter()
1547 .map(|(source_actor, source_backfill_actor)| {
1548 (source_backfill_actor as _, source_actor as _)
1549 })
1550 .collect())
1551 }
1552
1553 pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
1554 let inner = self.inner.read().await;
1555 let actors: Vec<ActorId> = Actor::find()
1556 .select_only()
1557 .column(actor::Column::ActorId)
1558 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1559 .filter(fragment::Column::JobId.is_in(job_ids))
1560 .into_tuple()
1561 .all(&inner.db)
1562 .await?;
1563 Ok(actors)
1564 }
1565
1566 pub async fn get_root_fragments(
1579 &self,
1580 job_ids: Vec<ObjectId>,
1581 ) -> MetaResult<(HashMap<ObjectId, Fragment>, Vec<(ActorId, WorkerId)>)> {
1582 let inner = self.inner.read().await;
1583
1584 let job_definitions = resolve_streaming_job_definition(
1585 &inner.db,
1586 &HashSet::from_iter(job_ids.iter().copied()),
1587 )
1588 .await?;
1589
1590 let all_fragments = FragmentModel::find()
1591 .filter(fragment::Column::JobId.is_in(job_ids))
1592 .all(&inner.db)
1593 .await?;
1594 let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1596 for fragment in all_fragments {
1597 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1598 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1599 _ = root_fragments.insert(fragment.job_id, fragment);
1600 } else if mask.contains(FragmentTypeFlag::Source) {
1601 _ = root_fragments.try_insert(fragment.job_id, fragment);
1604 }
1605 }
1606
1607 let mut root_fragments_pb = HashMap::new();
1608 for (_, fragment) in root_fragments {
1609 let actors = fragment.find_related(Actor).all(&inner.db).await?;
1610
1611 let job_id = fragment.job_id;
1612 root_fragments_pb.insert(
1613 fragment.job_id,
1614 Self::compose_fragment(
1615 fragment,
1616 actors,
1617 job_definitions.get(&(job_id as _)).cloned(),
1618 )?
1619 .0,
1620 );
1621 }
1622
1623 let actors: Vec<(ActorId, WorkerId)> = Actor::find()
1624 .select_only()
1625 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1626 .into_tuple()
1627 .all(&inner.db)
1628 .await?;
1629
1630 Ok((root_fragments_pb, actors))
1631 }
1632
1633 pub async fn get_root_fragment(
1634 &self,
1635 job_id: ObjectId,
1636 ) -> MetaResult<(Fragment, Vec<(ActorId, WorkerId)>)> {
1637 let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1638 let root_fragment = root_fragments
1639 .remove(&job_id)
1640 .context(format!("root fragment for job {} not found", job_id))?;
1641 Ok((root_fragment, actors))
1642 }
1643
1644 pub async fn get_downstream_fragments(
1646 &self,
1647 job_id: ObjectId,
1648 ) -> MetaResult<(Vec<(PbDispatcherType, Fragment)>, Vec<(ActorId, WorkerId)>)> {
1649 let (root_fragment, actors) = self.get_root_fragment(job_id).await?;
1650
1651 let inner = self.inner.read().await;
1652 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1653 .filter(
1654 fragment_relation::Column::SourceFragmentId
1655 .eq(root_fragment.fragment_id as FragmentId),
1656 )
1657 .all(&inner.db)
1658 .await?;
1659 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
1660 .await?
1661 .remove(&job_id);
1662
1663 let mut downstream_fragments = vec![];
1664 for fragment_relation::Model {
1665 target_fragment_id: fragment_id,
1666 dispatcher_type,
1667 ..
1668 } in downstream_fragment_relations
1669 {
1670 let mut fragment_actors = FragmentModel::find_by_id(fragment_id)
1671 .find_with_related(Actor)
1672 .all(&inner.db)
1673 .await?;
1674 if fragment_actors.is_empty() {
1675 bail!("No fragment found for fragment id {}", fragment_id);
1676 }
1677 assert_eq!(fragment_actors.len(), 1);
1678 let (fragment, actors) = fragment_actors.pop().unwrap();
1679 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1680 let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0;
1681 downstream_fragments.push((dispatch_type, fragment));
1682 }
1683
1684 Ok((downstream_fragments, actors))
1685 }
1686
1687 pub async fn load_source_fragment_ids(
1688 &self,
1689 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1690 let inner = self.inner.read().await;
1691 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1692 .select_only()
1693 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1694 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1695 .into_tuple()
1696 .all(&inner.db)
1697 .await?;
1698
1699 let mut source_fragment_ids = HashMap::new();
1700 for (fragment_id, stream_node) in fragments {
1701 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1702 source_fragment_ids
1703 .entry(source_id as SourceId)
1704 .or_insert_with(BTreeSet::new)
1705 .insert(fragment_id);
1706 }
1707 }
1708 Ok(source_fragment_ids)
1709 }
1710
1711 pub async fn load_backfill_fragment_ids(
1712 &self,
1713 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1714 let inner = self.inner.read().await;
1715 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1716 .select_only()
1717 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1718 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1719 .into_tuple()
1720 .all(&inner.db)
1721 .await?;
1722
1723 let mut source_fragment_ids = HashMap::new();
1724 for (fragment_id, stream_node) in fragments {
1725 if let Some((source_id, upstream_source_fragment_id)) =
1726 stream_node.to_protobuf().find_source_backfill()
1727 {
1728 source_fragment_ids
1729 .entry(source_id as SourceId)
1730 .or_insert_with(BTreeSet::new)
1731 .insert((fragment_id, upstream_source_fragment_id));
1732 }
1733 }
1734 Ok(source_fragment_ids)
1735 }
1736
1737 pub async fn load_actor_splits(&self) -> MetaResult<HashMap<ActorId, ConnectorSplits>> {
1738 let inner = self.inner.read().await;
1739 let splits: Vec<(ActorId, ConnectorSplits)> = Actor::find()
1740 .select_only()
1741 .columns([actor::Column::ActorId, actor::Column::Splits])
1742 .filter(actor::Column::Splits.is_not_null())
1743 .into_tuple()
1744 .all(&inner.db)
1745 .await?;
1746 Ok(splits.into_iter().collect())
1747 }
1748
1749 pub async fn get_actual_job_fragment_parallelism(
1751 &self,
1752 job_id: ObjectId,
1753 ) -> MetaResult<Option<usize>> {
1754 let inner = self.inner.read().await;
1755 let fragments: Vec<(FragmentId, i64)> = FragmentModel::find()
1756 .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1757 .select_only()
1758 .columns([fragment::Column::FragmentId])
1759 .column_as(actor::Column::ActorId.count(), "count")
1760 .filter(
1761 fragment::Column::JobId
1762 .eq(job_id)
1763 .and(FragmentTypeMask::intersects_any([
1764 FragmentTypeFlag::Mview,
1765 FragmentTypeFlag::Sink,
1766 ])),
1767 )
1768 .group_by(fragment::Column::FragmentId)
1769 .into_tuple()
1770 .all(&inner.db)
1771 .await?;
1772
1773 Ok(fragments
1774 .into_iter()
1775 .at_most_one()
1776 .ok()
1777 .flatten()
1778 .map(|(_, count)| count as usize))
1779 }
1780
1781 pub async fn get_all_upstream_sink_infos(
1782 &self,
1783 target_table: &PbTable,
1784 target_fragment_id: FragmentId,
1785 ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1786 let incoming_sinks = self.get_table_incoming_sinks(target_table.id as _).await?;
1787
1788 let inner = self.inner.read().await;
1789 let txn = inner.db.begin().await?;
1790
1791 let sink_ids = incoming_sinks.iter().map(|s| s.id as SinkId).collect_vec();
1792 let sink_fragment_ids = get_sink_fragment_by_ids(&txn, sink_ids).await?;
1793
1794 let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1795 for pb_sink in &incoming_sinks {
1796 let sink_fragment_id =
1797 sink_fragment_ids
1798 .get(&(pb_sink.id as _))
1799 .cloned()
1800 .ok_or(anyhow::anyhow!(
1801 "sink fragment not found for sink id {}",
1802 pb_sink.id
1803 ))?;
1804 let upstream_info = build_upstream_sink_info(
1805 pb_sink,
1806 sink_fragment_id,
1807 target_table,
1808 target_fragment_id,
1809 )?;
1810 upstream_sink_infos.push(upstream_info);
1811 }
1812
1813 Ok(upstream_sink_infos)
1814 }
1815
1816 pub async fn get_mview_fragment_by_id(&self, table_id: TableId) -> MetaResult<FragmentId> {
1817 let inner = self.inner.read().await;
1818 let txn = inner.db.begin().await?;
1819
1820 let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1821 .select_only()
1822 .column(fragment::Column::FragmentId)
1823 .filter(
1824 fragment::Column::JobId
1825 .eq(table_id)
1826 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1827 )
1828 .into_tuple()
1829 .all(&txn)
1830 .await?;
1831
1832 if mview_fragment.len() != 1 {
1833 return Err(anyhow::anyhow!(
1834 "expected exactly one mview fragment for table {}, found {}",
1835 table_id,
1836 mview_fragment.len()
1837 )
1838 .into());
1839 }
1840
1841 Ok(mview_fragment.into_iter().next().unwrap())
1842 }
1843
1844 pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1845 let inner = self.inner.read().await;
1846 let txn = inner.db.begin().await?;
1847 has_table_been_migrated(&txn, table_id).await
1848 }
1849}
1850
1851#[cfg(test)]
1852mod tests {
1853 use std::collections::{BTreeMap, HashMap, HashSet};
1854
1855 use itertools::Itertools;
1856 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1857 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1858 use risingwave_common::util::iter_util::ZipEqDebug;
1859 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1860 use risingwave_meta_model::actor::ActorStatus;
1861 use risingwave_meta_model::fragment::DistributionType;
1862 use risingwave_meta_model::{
1863 ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1864 VnodeBitmap, actor, fragment,
1865 };
1866 use risingwave_pb::common::PbActorLocation;
1867 use risingwave_pb::meta::table_fragments::PbActorStatus;
1868 use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1869 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1870 use risingwave_pb::plan_common::PbExprContext;
1871 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1872 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1873 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1874
1875 use crate::MetaResult;
1876 use crate::controller::catalog::CatalogController;
1877 use crate::model::{Fragment, StreamActor};
1878
1879 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1880
1881 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1882
1883 const TEST_FRAGMENT_ID: FragmentId = 1;
1884
1885 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1886
1887 const TEST_JOB_ID: ObjectId = 1;
1888
1889 const TEST_STATE_TABLE_ID: TableId = 1000;
1890
1891 fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1892 let mut upstream_actor_ids = BTreeMap::new();
1893 upstream_actor_ids.insert(
1894 TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1895 HashSet::from_iter([(actor_id + 100)]),
1896 );
1897 upstream_actor_ids.insert(
1898 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1899 HashSet::from_iter([(actor_id + 200)]),
1900 );
1901 upstream_actor_ids
1902 }
1903
1904 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1905 let mut input = vec![];
1906 for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1907 input.push(PbStreamNode {
1908 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1909 upstream_fragment_id: *upstream_fragment_id as _,
1910 ..Default::default()
1911 }))),
1912 ..Default::default()
1913 });
1914 }
1915
1916 PbStreamNode {
1917 input,
1918 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1919 ..Default::default()
1920 }
1921 }
1922
1923 #[tokio::test]
1924 async fn test_extract_fragment() -> MetaResult<()> {
1925 let actor_count = 3u32;
1926 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1927 .map(|actor_id| {
1928 (
1929 actor_id as _,
1930 generate_upstream_actor_ids_for_actor(actor_id),
1931 )
1932 })
1933 .collect();
1934
1935 let actor_bitmaps = ActorMapping::new_uniform(
1936 (0..actor_count).map(|i| i as _),
1937 VirtualNode::COUNT_FOR_TEST,
1938 )
1939 .to_bitmaps();
1940
1941 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1942
1943 let pb_actors = (0..actor_count)
1944 .map(|actor_id| StreamActor {
1945 actor_id: actor_id as _,
1946 fragment_id: TEST_FRAGMENT_ID as _,
1947 vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1948 mview_definition: "".to_owned(),
1949 expr_context: Some(PbExprContext {
1950 time_zone: String::from("America/New_York"),
1951 strict_mode: false,
1952 }),
1953 })
1954 .collect_vec();
1955
1956 let pb_fragment = Fragment {
1957 fragment_id: TEST_FRAGMENT_ID as _,
1958 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1959 distribution_type: PbFragmentDistributionType::Hash as _,
1960 actors: pb_actors.clone(),
1961 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1962 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1963 nodes: stream_node.clone(),
1964 };
1965
1966 let pb_actor_status = (0..actor_count)
1967 .map(|actor_id| {
1968 (
1969 actor_id,
1970 PbActorStatus {
1971 location: PbActorLocation::from_worker(0),
1972 state: PbActorState::Running as _,
1973 },
1974 )
1975 })
1976 .collect();
1977
1978 let pb_actor_splits = Default::default();
1979
1980 let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
1981 TEST_JOB_ID,
1982 &pb_fragment,
1983 &pb_actor_status,
1984 &pb_actor_splits,
1985 )?;
1986
1987 check_fragment(fragment, pb_fragment);
1988 check_actors(
1989 actors,
1990 &upstream_actor_ids,
1991 pb_actors,
1992 Default::default(),
1993 &stream_node,
1994 );
1995
1996 Ok(())
1997 }
1998
1999 #[tokio::test]
2000 async fn test_compose_fragment() -> MetaResult<()> {
2001 let actor_count = 3u32;
2002
2003 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2004 .map(|actor_id| {
2005 (
2006 actor_id as _,
2007 generate_upstream_actor_ids_for_actor(actor_id),
2008 )
2009 })
2010 .collect();
2011
2012 let mut actor_bitmaps = ActorMapping::new_uniform(
2013 (0..actor_count).map(|i| i as _),
2014 VirtualNode::COUNT_FOR_TEST,
2015 )
2016 .to_bitmaps();
2017
2018 let actors = (0..actor_count)
2019 .map(|actor_id| {
2020 let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
2021 splits: vec![PbConnectorSplit {
2022 split_type: "dummy".to_owned(),
2023 ..Default::default()
2024 }],
2025 }));
2026
2027 #[expect(deprecated)]
2028 actor::Model {
2029 actor_id: actor_id as ActorId,
2030 fragment_id: TEST_FRAGMENT_ID,
2031 status: ActorStatus::Running,
2032 splits: actor_splits,
2033 worker_id: 0,
2034 upstream_actor_ids: Default::default(),
2035 vnode_bitmap: actor_bitmaps
2036 .remove(&actor_id)
2037 .map(|bitmap| bitmap.to_protobuf())
2038 .as_ref()
2039 .map(VnodeBitmap::from),
2040 expr_context: ExprContext::from(&PbExprContext {
2041 time_zone: String::from("America/New_York"),
2042 strict_mode: false,
2043 }),
2044 }
2045 })
2046 .collect_vec();
2047
2048 let stream_node = {
2049 let template_actor = actors.first().cloned().unwrap();
2050
2051 let template_upstream_actor_ids = upstream_actor_ids
2052 .get(&(template_actor.actor_id as _))
2053 .unwrap();
2054
2055 generate_merger_stream_node(template_upstream_actor_ids)
2056 };
2057
2058 #[expect(deprecated)]
2059 let fragment = fragment::Model {
2060 fragment_id: TEST_FRAGMENT_ID,
2061 job_id: TEST_JOB_ID,
2062 fragment_type_mask: 0,
2063 distribution_type: DistributionType::Hash,
2064 stream_node: StreamNode::from(&stream_node),
2065 state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
2066 upstream_fragment_id: Default::default(),
2067 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2068 };
2069
2070 let (pb_fragment, pb_actor_status, pb_actor_splits) =
2071 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2072
2073 assert_eq!(pb_actor_status.len(), actor_count as usize);
2074 assert_eq!(pb_actor_splits.len(), actor_count as usize);
2075
2076 let pb_actors = pb_fragment.actors.clone();
2077
2078 check_fragment(fragment, pb_fragment);
2079 check_actors(
2080 actors,
2081 &upstream_actor_ids,
2082 pb_actors,
2083 pb_actor_splits,
2084 &stream_node,
2085 );
2086
2087 Ok(())
2088 }
2089
2090 fn check_actors(
2091 actors: Vec<actor::Model>,
2092 actor_upstreams: &FragmentActorUpstreams,
2093 pb_actors: Vec<StreamActor>,
2094 pb_actor_splits: HashMap<u32, PbConnectorSplits>,
2095 stream_node: &PbStreamNode,
2096 ) {
2097 for (
2098 actor::Model {
2099 actor_id,
2100 fragment_id,
2101 status,
2102 splits,
2103 worker_id: _,
2104 vnode_bitmap,
2105 expr_context,
2106 ..
2107 },
2108 StreamActor {
2109 actor_id: pb_actor_id,
2110 fragment_id: pb_fragment_id,
2111 vnode_bitmap: pb_vnode_bitmap,
2112 mview_definition,
2113 expr_context: pb_expr_context,
2114 ..
2115 },
2116 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2117 {
2118 assert_eq!(actor_id, pb_actor_id as ActorId);
2119 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2120
2121 assert_eq!(
2122 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2123 pb_vnode_bitmap,
2124 );
2125
2126 assert_eq!(mview_definition, "");
2127
2128 visit_stream_node_body(stream_node, |body| {
2129 if let PbNodeBody::Merge(m) = body {
2130 assert!(
2131 actor_upstreams
2132 .get(&(actor_id as _))
2133 .unwrap()
2134 .contains_key(&m.upstream_fragment_id)
2135 );
2136 }
2137 });
2138
2139 assert_eq!(status, ActorStatus::Running);
2140
2141 assert_eq!(
2142 splits,
2143 pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
2144 );
2145
2146 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2147 }
2148 }
2149
2150 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2151 let Fragment {
2152 fragment_id,
2153 fragment_type_mask,
2154 distribution_type: pb_distribution_type,
2155 actors: _,
2156 state_table_ids: pb_state_table_ids,
2157 maybe_vnode_count: _,
2158 nodes,
2159 } = pb_fragment;
2160
2161 assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2162 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2163 assert_eq!(
2164 pb_distribution_type,
2165 PbFragmentDistributionType::from(fragment.distribution_type)
2166 );
2167
2168 assert_eq!(
2169 pb_state_table_ids,
2170 fragment.state_table_ids.into_u32_array()
2171 );
2172 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2173 }
2174}