1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
26use risingwave_common::util::stream_graph_visitor::{
27 visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_connector::source::SplitImpl;
30use risingwave_meta_model::actor::ActorStatus;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34 Actor, Fragment as FragmentModel, FragmentRelation, Sink, SourceSplits, StreamingJob,
35};
36use risingwave_meta_model::{
37 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39 VnodeBitmap, WorkerId, actor, database, fragment, fragment_relation, object, sink, source,
40 source_splits, streaming_job, table,
41};
42use risingwave_meta_model_migration::{Alias, ExprTrait, OnConflict, SelectStatement, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46 Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
49use risingwave_pb::meta::table_fragments::fragment::{
50 FragmentDistributionType, PbFragmentDistributionType,
51};
52use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
53use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping};
54use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57 PbDispatchOutputMapping, PbDispatcherType, PbStreamContext, PbStreamNode, PbStreamScanType,
58 StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63 ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, ModelTrait, PaginatorTrait,
64 QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
65};
66use serde::{Deserialize, Serialize};
67use tracing::debug;
68
69use crate::barrier::SnapshotBackfillInfo;
70use crate::controller::catalog::{CatalogController, CatalogControllerInner};
71use crate::controller::scale::resolve_streaming_job_definition;
72use crate::controller::utils::{
73 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
74 get_fragment_mappings, get_sink_fragment_by_ids, has_table_been_migrated,
75 resolve_no_shuffle_actor_dispatcher,
76};
77use crate::manager::{LocalNotification, NotificationManager};
78use crate::model::{
79 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
80 StreamActor, StreamContext, StreamJobFragments, TableParallelism,
81};
82use crate::rpc::ddl_controller::build_upstream_sink_info;
83use crate::stream::{SplitAssignment, UpstreamSinkInfo};
84use crate::{MetaError, MetaResult};
85
86#[derive(Clone, Debug)]
88pub struct InflightActorInfo {
89 pub worker_id: WorkerId,
90 pub vnode_bitmap: Option<Bitmap>,
91}
92
93#[derive(Clone, Debug)]
94pub struct InflightFragmentInfo {
95 pub fragment_id: crate::model::FragmentId,
96 pub distribution_type: DistributionType,
97 pub nodes: PbStreamNode,
98 pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
99 pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
100}
101
102#[derive(Clone, Debug)]
103pub struct FragmentParallelismInfo {
104 pub distribution_type: FragmentDistributionType,
105 pub actor_count: usize,
106 pub vnode_count: usize,
107}
108
109pub(crate) trait FragmentTypeMaskExt {
110 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr;
111 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr;
112}
113
114impl FragmentTypeMaskExt for FragmentTypeMask {
115 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
116 Expr::col(fragment::Column::FragmentTypeMask)
117 .bit_and(Expr::value(flag as i32))
118 .ne(0)
119 }
120
121 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
122 Expr::col(fragment::Column::FragmentTypeMask)
123 .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
124 .ne(0)
125 }
126}
127
128#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
129#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
131 pub job_id: ObjectId,
132 pub obj_type: ObjectType,
133 pub name: String,
134 pub job_status: JobStatus,
135 pub parallelism: StreamingParallelism,
136 pub max_parallelism: i32,
137 pub resource_group: String,
138 pub database_id: DatabaseId,
139 pub schema_id: SchemaId,
140}
141
142impl CatalogControllerInner {
143 pub async fn all_running_fragment_mappings(
145 &self,
146 ) -> MetaResult<impl Iterator<Item = FragmentWorkerSlotMapping> + '_> {
147 let txn = self.db.begin().await?;
148
149 let job_ids: Vec<ObjectId> = StreamingJob::find()
150 .select_only()
151 .column(streaming_job::Column::JobId)
152 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
153 .into_tuple()
154 .all(&txn)
155 .await?;
156
157 let mut result = vec![];
158 for job_id in job_ids {
159 let mappings = get_fragment_mappings(&txn, job_id).await?;
160
161 result.extend(mappings.into_iter());
162 }
163
164 Ok(result.into_iter())
165 }
166}
167
168impl NotificationManager {
169 pub(crate) fn notify_fragment_mapping(
170 &self,
171 operation: NotificationOperation,
172 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
173 ) {
174 let fragment_ids = fragment_mappings
175 .iter()
176 .map(|mapping| mapping.fragment_id)
177 .collect_vec();
178 if fragment_ids.is_empty() {
179 return;
180 }
181 for fragment_mapping in fragment_mappings {
183 self.notify_frontend_without_version(
184 operation,
185 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
186 );
187 }
188
189 match operation {
191 NotificationOperation::Add | NotificationOperation::Update => {
192 self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
193 fragment_ids,
194 ));
195 }
196 NotificationOperation::Delete => {
197 self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
198 fragment_ids,
199 ));
200 }
201 op => {
202 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
203 }
204 }
205 }
206}
207
208impl CatalogController {
209 pub fn extract_fragment_and_actors_from_fragments(
210 job_id: ObjectId,
211 fragments: impl Iterator<Item = &Fragment>,
212 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
213 actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
214 ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
215 fragments
216 .map(|fragment| {
217 Self::extract_fragment_and_actors_for_new_job(
218 job_id,
219 fragment,
220 actor_status,
221 actor_splits,
222 )
223 })
224 .try_collect()
225 }
226
227 pub fn extract_fragment_and_actors_for_new_job(
228 job_id: ObjectId,
229 fragment: &Fragment,
230 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
231 actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
232 ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
233 let vnode_count = fragment.vnode_count();
234 let Fragment {
235 fragment_id: pb_fragment_id,
236 fragment_type_mask: pb_fragment_type_mask,
237 distribution_type: pb_distribution_type,
238 actors: pb_actors,
239 state_table_ids: pb_state_table_ids,
240 nodes,
241 ..
242 } = fragment;
243
244 let state_table_ids = pb_state_table_ids.clone().into();
245
246 assert!(!pb_actors.is_empty());
247
248 let stream_node = {
249 let mut stream_node = nodes.clone();
250 visit_stream_node_mut(&mut stream_node, |body| {
251 #[expect(deprecated)]
252 if let NodeBody::Merge(m) = body {
253 m.upstream_actor_id = vec![];
254 }
255 });
256
257 stream_node
258 };
259
260 let mut actors = vec![];
261
262 for actor in pb_actors {
263 let StreamActor {
264 actor_id,
265 fragment_id,
266 vnode_bitmap,
267 mview_definition: _,
268 expr_context: pb_expr_context,
269 ..
270 } = actor;
271
272 let splits = actor_splits.get(actor_id).map(|splits| {
273 ConnectorSplits::from(&PbConnectorSplits {
274 splits: splits.iter().map(ConnectorSplit::from).collect(),
275 })
276 });
277 let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
278 anyhow::anyhow!(
279 "actor {} in fragment {} has no actor_status",
280 actor_id,
281 fragment_id
282 )
283 })?;
284
285 let worker_id = status.worker_id() as _;
286
287 let pb_expr_context = pb_expr_context
288 .as_ref()
289 .expect("no expression context found");
290
291 #[expect(deprecated)]
292 actors.push(actor::Model {
293 actor_id: *actor_id as _,
294 fragment_id: *fragment_id as _,
295 status: status.get_state().unwrap().into(),
296 splits,
297 worker_id,
298 upstream_actor_ids: Default::default(),
299 vnode_bitmap: vnode_bitmap
300 .as_ref()
301 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
302 expr_context: ExprContext::from(pb_expr_context),
303 });
304 }
305
306 let stream_node = StreamNode::from(&stream_node);
307
308 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
309 .unwrap()
310 .into();
311
312 #[expect(deprecated)]
313 let fragment = fragment::Model {
314 fragment_id: *pb_fragment_id as _,
315 job_id,
316 fragment_type_mask: (*pb_fragment_type_mask).into(),
317 distribution_type,
318 stream_node,
319 state_table_ids,
320 upstream_fragment_id: Default::default(),
321 vnode_count: vnode_count as _,
322 };
323
324 Ok((fragment, actors))
325 }
326
327 pub fn compose_table_fragments(
328 table_id: u32,
329 state: PbState,
330 ctx: Option<PbStreamContext>,
331 fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
332 parallelism: StreamingParallelism,
333 max_parallelism: usize,
334 job_definition: Option<String>,
335 ) -> MetaResult<StreamJobFragments> {
336 let mut pb_fragments = BTreeMap::new();
337 let mut pb_actor_status = BTreeMap::new();
338
339 for (fragment, actors) in fragments {
340 let (fragment, fragment_actor_status, _) =
341 Self::compose_fragment(fragment, actors, job_definition.clone())?;
342
343 pb_fragments.insert(fragment.fragment_id, fragment);
344 pb_actor_status.extend(fragment_actor_status.into_iter());
345 }
346
347 let table_fragments = StreamJobFragments {
348 stream_job_id: table_id.into(),
349 state: state as _,
350 fragments: pb_fragments,
351 actor_status: pb_actor_status,
352 ctx: ctx
353 .as_ref()
354 .map(StreamContext::from_protobuf)
355 .unwrap_or_default(),
356 assigned_parallelism: match parallelism {
357 StreamingParallelism::Custom => TableParallelism::Custom,
358 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
359 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
360 },
361 max_parallelism,
362 };
363
364 Ok(table_fragments)
365 }
366
367 #[allow(clippy::type_complexity)]
368 pub(crate) fn compose_fragment(
369 fragment: fragment::Model,
370 actors: Vec<actor::Model>,
371 job_definition: Option<String>,
372 ) -> MetaResult<(
373 Fragment,
374 HashMap<crate::model::ActorId, PbActorStatus>,
375 HashMap<crate::model::ActorId, PbConnectorSplits>,
376 )> {
377 let fragment::Model {
378 fragment_id,
379 fragment_type_mask,
380 distribution_type,
381 stream_node,
382 state_table_ids,
383 vnode_count,
384 ..
385 } = fragment;
386
387 let stream_node = stream_node.to_protobuf();
388 let mut upstream_fragments = HashSet::new();
389 visit_stream_node_body(&stream_node, |body| {
390 if let NodeBody::Merge(m) = body {
391 assert!(
392 upstream_fragments.insert(m.upstream_fragment_id),
393 "non-duplicate upstream fragment"
394 );
395 }
396 });
397
398 let mut pb_actors = vec![];
399
400 let mut pb_actor_status = HashMap::new();
401 let mut pb_actor_splits = HashMap::new();
402
403 for actor in actors {
404 if actor.fragment_id != fragment_id {
405 bail!(
406 "fragment id {} from actor {} is different from fragment {}",
407 actor.fragment_id,
408 actor.actor_id,
409 fragment_id
410 )
411 }
412
413 let actor::Model {
414 actor_id,
415 fragment_id,
416 status,
417 worker_id,
418 splits,
419 vnode_bitmap,
420 expr_context,
421 ..
422 } = actor;
423
424 let vnode_bitmap =
425 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
426 let pb_expr_context = Some(expr_context.to_protobuf());
427
428 pb_actor_status.insert(
429 actor_id as _,
430 PbActorStatus {
431 location: PbActorLocation::from_worker(worker_id as u32),
432 state: PbActorState::from(status) as _,
433 },
434 );
435
436 if let Some(splits) = splits {
437 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
438 }
439
440 pb_actors.push(StreamActor {
441 actor_id: actor_id as _,
442 fragment_id: fragment_id as _,
443 vnode_bitmap,
444 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
445 expr_context: pb_expr_context,
446 })
447 }
448
449 let pb_state_table_ids = state_table_ids.into_u32_array();
450 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
451 let pb_fragment = Fragment {
452 fragment_id: fragment_id as _,
453 fragment_type_mask: fragment_type_mask.into(),
454 distribution_type: pb_distribution_type,
455 actors: pb_actors,
456 state_table_ids: pb_state_table_ids,
457 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
458 nodes: stream_node,
459 };
460
461 Ok((pb_fragment, pb_actor_status, pb_actor_splits))
462 }
463
464 pub async fn running_fragment_parallelisms(
465 &self,
466 id_filter: Option<HashSet<FragmentId>>,
467 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
468 let inner = self.inner.read().await;
469
470 let query_alias = Alias::new("fragment_actor_count");
471 let count_alias = Alias::new("count");
472
473 let mut query = SelectStatement::new()
474 .column(actor::Column::FragmentId)
475 .expr_as(actor::Column::ActorId.count(), count_alias.clone())
476 .from(Actor)
477 .group_by_col(actor::Column::FragmentId)
478 .to_owned();
479
480 if let Some(id_filter) = id_filter {
481 query.cond_having(actor::Column::FragmentId.is_in(id_filter));
482 }
483
484 let outer = SelectStatement::new()
485 .column((FragmentModel, fragment::Column::FragmentId))
486 .column(count_alias)
487 .column(fragment::Column::DistributionType)
488 .column(fragment::Column::VnodeCount)
489 .from_subquery(query.to_owned(), query_alias.clone())
490 .inner_join(
491 FragmentModel,
492 Expr::col((query_alias, actor::Column::FragmentId))
493 .equals((FragmentModel, fragment::Column::FragmentId)),
494 )
495 .to_owned();
496
497 let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
498 Selector::<SelectGetableTuple<(FragmentId, i64, DistributionType, i32)>>::into_tuple(
499 outer.to_owned(),
500 )
501 .all(&inner.db)
502 .await?;
503
504 Ok(fragment_parallelisms
505 .into_iter()
506 .map(|(fragment_id, count, distribution_type, vnode_count)| {
507 (
508 fragment_id,
509 FragmentParallelismInfo {
510 distribution_type: distribution_type.into(),
511 actor_count: count as usize,
512 vnode_count: vnode_count as usize,
513 },
514 )
515 })
516 .collect())
517 }
518
519 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
520 let inner = self.inner.read().await;
521 let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
522 .select_only()
523 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
524 .into_tuple()
525 .all(&inner.db)
526 .await?;
527 Ok(fragment_jobs.into_iter().collect())
528 }
529
530 pub async fn get_fragment_job_id(
531 &self,
532 fragment_ids: Vec<FragmentId>,
533 ) -> MetaResult<Vec<ObjectId>> {
534 let inner = self.inner.read().await;
535
536 let object_ids: Vec<ObjectId> = FragmentModel::find()
537 .select_only()
538 .column(fragment::Column::JobId)
539 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
540 .into_tuple()
541 .all(&inner.db)
542 .await?;
543
544 Ok(object_ids)
545 }
546
547 pub async fn get_fragment_desc_by_id(
548 &self,
549 fragment_id: FragmentId,
550 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
551 let inner = self.inner.read().await;
552
553 let fragment_opt = FragmentModel::find()
555 .select_only()
556 .columns([
557 fragment::Column::FragmentId,
558 fragment::Column::JobId,
559 fragment::Column::FragmentTypeMask,
560 fragment::Column::DistributionType,
561 fragment::Column::StateTableIds,
562 fragment::Column::VnodeCount,
563 fragment::Column::StreamNode,
564 ])
565 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
566 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
567 .filter(fragment::Column::FragmentId.eq(fragment_id))
568 .group_by(fragment::Column::FragmentId)
569 .into_model::<FragmentDesc>()
570 .one(&inner.db)
571 .await?;
572
573 let Some(fragment) = fragment_opt else {
574 return Ok(None); };
576
577 let upstreams: Vec<FragmentId> = FragmentRelation::find()
579 .select_only()
580 .column(fragment_relation::Column::SourceFragmentId)
581 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
582 .into_tuple()
583 .all(&inner.db)
584 .await?;
585
586 Ok(Some((fragment, upstreams)))
587 }
588
589 pub async fn list_fragment_database_ids(
590 &self,
591 select_fragment_ids: Option<Vec<FragmentId>>,
592 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
593 let inner = self.inner.read().await;
594 let select = FragmentModel::find()
595 .select_only()
596 .column(fragment::Column::FragmentId)
597 .column(object::Column::DatabaseId)
598 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
599 let select = if let Some(select_fragment_ids) = select_fragment_ids {
600 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
601 } else {
602 select
603 };
604 Ok(select.into_tuple().all(&inner.db).await?)
605 }
606
607 pub async fn get_job_fragments_by_id(
608 &self,
609 job_id: ObjectId,
610 ) -> MetaResult<StreamJobFragments> {
611 let inner = self.inner.read().await;
612 let fragment_actors = FragmentModel::find()
613 .find_with_related(Actor)
614 .filter(fragment::Column::JobId.eq(job_id))
615 .all(&inner.db)
616 .await?;
617
618 let job_info = StreamingJob::find_by_id(job_id)
619 .one(&inner.db)
620 .await?
621 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
622
623 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
624 .await?
625 .remove(&job_id);
626
627 Self::compose_table_fragments(
628 job_id as _,
629 job_info.job_status.into(),
630 job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
631 fragment_actors,
632 job_info.parallelism.clone(),
633 job_info.max_parallelism as _,
634 job_definition,
635 )
636 }
637
638 pub async fn get_fragment_actor_dispatchers(
639 &self,
640 fragment_ids: Vec<FragmentId>,
641 ) -> MetaResult<FragmentActorDispatchers> {
642 let inner = self.inner.read().await;
643 get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
644 }
645
646 pub async fn get_fragment_downstream_relations(
647 &self,
648 fragment_ids: Vec<FragmentId>,
649 ) -> MetaResult<FragmentDownstreamRelation> {
650 let inner = self.inner.read().await;
651 let mut stream = FragmentRelation::find()
652 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
653 .stream(&inner.db)
654 .await?;
655 let mut relations = FragmentDownstreamRelation::new();
656 while let Some(relation) = stream.try_next().await? {
657 relations
658 .entry(relation.source_fragment_id as _)
659 .or_default()
660 .push(DownstreamFragmentRelation {
661 downstream_fragment_id: relation.target_fragment_id as _,
662 dispatcher_type: relation.dispatcher_type,
663 dist_key_indices: relation.dist_key_indices.into_u32_array(),
664 output_mapping: PbDispatchOutputMapping {
665 indices: relation.output_indices.into_u32_array(),
666 types: relation
667 .output_type_mapping
668 .unwrap_or_default()
669 .to_protobuf(),
670 },
671 });
672 }
673 Ok(relations)
674 }
675
676 pub async fn get_job_fragment_backfill_scan_type(
677 &self,
678 job_id: ObjectId,
679 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
680 let inner = self.inner.read().await;
681 let fragments: Vec<_> = FragmentModel::find()
682 .filter(fragment::Column::JobId.eq(job_id))
683 .all(&inner.db)
684 .await?;
685
686 let mut result = HashMap::new();
687
688 for fragment::Model {
689 fragment_id,
690 stream_node,
691 ..
692 } in fragments
693 {
694 let stream_node = stream_node.to_protobuf();
695 visit_stream_node_body(&stream_node, |body| {
696 if let NodeBody::StreamScan(node) = body {
697 match node.stream_scan_type() {
698 StreamScanType::Unspecified => {}
699 scan_type => {
700 result.insert(fragment_id as crate::model::FragmentId, scan_type);
701 }
702 }
703 }
704 });
705 }
706
707 Ok(result)
708 }
709
710 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
711 let inner = self.inner.read().await;
712 let job_states = StreamingJob::find()
713 .select_only()
714 .column(streaming_job::Column::JobId)
715 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
716 .join(JoinType::InnerJoin, object::Relation::Database2.def())
717 .column(object::Column::ObjType)
718 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
719 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
720 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
721 .column_as(
722 Expr::if_null(
723 Expr::col((table::Entity, table::Column::Name)),
724 Expr::if_null(
725 Expr::col((source::Entity, source::Column::Name)),
726 Expr::if_null(
727 Expr::col((sink::Entity, sink::Column::Name)),
728 Expr::val("<unknown>"),
729 ),
730 ),
731 ),
732 "name",
733 )
734 .columns([
735 streaming_job::Column::JobStatus,
736 streaming_job::Column::Parallelism,
737 streaming_job::Column::MaxParallelism,
738 ])
739 .column_as(
740 Expr::if_null(
741 Expr::col((
742 streaming_job::Entity,
743 streaming_job::Column::SpecificResourceGroup,
744 )),
745 Expr::col((database::Entity, database::Column::ResourceGroup)),
746 ),
747 "resource_group",
748 )
749 .column(object::Column::DatabaseId)
750 .column(object::Column::SchemaId)
751 .into_model()
752 .all(&inner.db)
753 .await?;
754 Ok(job_states)
755 }
756
757 pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
758 let inner = self.inner.read().await;
759 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
760 .select_only()
761 .column(streaming_job::Column::MaxParallelism)
762 .into_tuple()
763 .one(&inner.db)
764 .await?
765 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
766 Ok(max_parallelism as usize)
767 }
768
769 pub async fn get_job_actor_mapping(
771 &self,
772 job_ids: Vec<ObjectId>,
773 ) -> MetaResult<HashMap<ObjectId, Vec<ActorId>>> {
774 let inner = self.inner.read().await;
775 let job_actors: Vec<(ObjectId, ActorId)> = Actor::find()
776 .select_only()
777 .column(fragment::Column::JobId)
778 .column(actor::Column::ActorId)
779 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
780 .filter(fragment::Column::JobId.is_in(job_ids))
781 .into_tuple()
782 .all(&inner.db)
783 .await?;
784 Ok(job_actors.into_iter().into_group_map())
785 }
786
787 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
789 if let Ok(inner) = self.inner.try_read()
790 && let Ok(job_state_tables) = FragmentModel::find()
791 .select_only()
792 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
793 .into_tuple::<(ObjectId, I32Array)>()
794 .all(&inner.db)
795 .await
796 {
797 let mut job_internal_table_ids = HashMap::new();
798 for (job_id, state_table_ids) in job_state_tables {
799 job_internal_table_ids
800 .entry(job_id)
801 .or_insert_with(Vec::new)
802 .extend(state_table_ids.into_inner());
803 }
804 return Some(job_internal_table_ids.into_iter().collect());
805 }
806 None
807 }
808
809 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
810 let inner = self.inner.read().await;
811 let count = FragmentModel::find().count(&inner.db).await?;
812 Ok(count > 0)
813 }
814
815 pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
816 let inner = self.inner.read().await;
817 let actor_cnt: Vec<(WorkerId, i64)> = Actor::find()
818 .select_only()
819 .column(actor::Column::WorkerId)
820 .column_as(actor::Column::ActorId.count(), "count")
821 .group_by(actor::Column::WorkerId)
822 .into_tuple()
823 .all(&inner.db)
824 .await?;
825
826 Ok(actor_cnt
827 .into_iter()
828 .map(|(worker_id, count)| (worker_id, count as usize))
829 .collect())
830 }
831
832 pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
834 let inner = self.inner.read().await;
835 let jobs = StreamingJob::find().all(&inner.db).await?;
836
837 let mut job_definition = resolve_streaming_job_definition(
838 &inner.db,
839 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
840 )
841 .await?;
842
843 let mut table_fragments = BTreeMap::new();
844 for job in jobs {
845 let fragment_actors = FragmentModel::find()
846 .find_with_related(Actor)
847 .filter(fragment::Column::JobId.eq(job.job_id))
848 .all(&inner.db)
849 .await?;
850
851 table_fragments.insert(
852 job.job_id as ObjectId,
853 Self::compose_table_fragments(
854 job.job_id as _,
855 job.job_status.into(),
856 job.timezone.map(|tz| PbStreamContext { timezone: tz }),
857 fragment_actors,
858 job.parallelism.clone(),
859 job.max_parallelism as _,
860 job_definition.remove(&job.job_id),
861 )?,
862 );
863 }
864
865 Ok(table_fragments)
866 }
867
868 pub async fn cdc_table_backfill_actor_ids(&self) -> MetaResult<HashMap<u32, HashSet<u32>>> {
870 let inner = self.inner.read().await;
871 let mut job_id_actor_ids = HashMap::default();
872 let stream_cdc_scan_flag = FragmentTypeFlag::StreamCdcScan as i32;
873 let fragment_type_mask = stream_cdc_scan_flag;
874 let fragment_actors: Vec<(fragment::Model, Vec<actor::Model>)> = FragmentModel::find()
875 .find_with_related(Actor)
876 .filter(fragment::Column::FragmentTypeMask.eq(fragment_type_mask))
877 .all(&inner.db)
878 .await?;
879 for (fragment, actors) in fragment_actors {
880 let e: &mut HashSet<u32> = job_id_actor_ids.entry(fragment.job_id as _).or_default();
881 e.extend(actors.iter().map(|a| a.actor_id as u32));
882 }
883 Ok(job_id_actor_ids)
884 }
885
886 pub async fn upstream_fragments(
887 &self,
888 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
889 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
890 let inner = self.inner.read().await;
891 let mut stream = FragmentRelation::find()
892 .select_only()
893 .columns([
894 fragment_relation::Column::SourceFragmentId,
895 fragment_relation::Column::TargetFragmentId,
896 ])
897 .filter(
898 fragment_relation::Column::TargetFragmentId
899 .is_in(fragment_ids.map(|id| id as FragmentId)),
900 )
901 .into_tuple::<(FragmentId, FragmentId)>()
902 .stream(&inner.db)
903 .await?;
904 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
905 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
906 upstream_fragments
907 .entry(downstream_fragment_id as crate::model::FragmentId)
908 .or_default()
909 .insert(upstream_fragment_id as crate::model::FragmentId);
910 }
911 Ok(upstream_fragments)
912 }
913
914 pub async fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
915 let inner = self.inner.read().await;
916 let actor_locations: Vec<PartialActorLocation> =
917 Actor::find().into_partial_model().all(&inner.db).await?;
918 Ok(actor_locations)
919 }
920
921 pub async fn list_actor_info(
922 &self,
923 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
924 let inner = self.inner.read().await;
925 let actor_locations: Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)> =
926 Actor::find()
927 .join(JoinType::LeftJoin, actor::Relation::Fragment.def())
928 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
929 .select_only()
930 .columns([actor::Column::ActorId, actor::Column::FragmentId])
931 .column_as(object::Column::Oid, "job_id")
932 .column_as(object::Column::SchemaId, "schema_id")
933 .column_as(object::Column::ObjType, "type")
934 .into_tuple()
935 .all(&inner.db)
936 .await?;
937 Ok(actor_locations)
938 }
939
940 pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
941 let inner = self.inner.read().await;
942
943 let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
944 .select_only()
945 .filter(actor::Column::Splits.is_not_null())
946 .columns([actor::Column::ActorId, actor::Column::FragmentId])
947 .into_tuple()
948 .all(&inner.db)
949 .await?;
950
951 Ok(source_actors)
952 }
953
954 pub async fn list_creating_fragment_descs(
955 &self,
956 ) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
957 let inner = self.inner.read().await;
958 let mut result = Vec::new();
959 let fragments = FragmentModel::find()
960 .select_only()
961 .columns([
962 fragment::Column::FragmentId,
963 fragment::Column::JobId,
964 fragment::Column::FragmentTypeMask,
965 fragment::Column::DistributionType,
966 fragment::Column::StateTableIds,
967 fragment::Column::VnodeCount,
968 fragment::Column::StreamNode,
969 ])
970 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
971 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
972 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
973 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
974 .filter(
975 streaming_job::Column::JobStatus
976 .eq(JobStatus::Initial)
977 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
978 )
979 .group_by(fragment::Column::FragmentId)
980 .into_model::<FragmentDesc>()
981 .all(&inner.db)
982 .await?;
983 for fragment in fragments {
984 let upstreams: Vec<FragmentId> = FragmentRelation::find()
985 .select_only()
986 .column(fragment_relation::Column::SourceFragmentId)
987 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
988 .into_tuple()
989 .all(&inner.db)
990 .await?;
991 result.push((fragment, upstreams));
992 }
993 Ok(result)
994 }
995
996 pub async fn list_fragment_descs(&self) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
997 let inner = self.inner.read().await;
998 let mut result = Vec::new();
999 let fragments = FragmentModel::find()
1000 .select_only()
1001 .columns([
1002 fragment::Column::FragmentId,
1003 fragment::Column::JobId,
1004 fragment::Column::FragmentTypeMask,
1005 fragment::Column::DistributionType,
1006 fragment::Column::StateTableIds,
1007 fragment::Column::VnodeCount,
1008 fragment::Column::StreamNode,
1009 ])
1010 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
1011 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
1012 .group_by(fragment::Column::FragmentId)
1013 .into_model::<FragmentDesc>()
1014 .all(&inner.db)
1015 .await?;
1016 for fragment in fragments {
1017 let upstreams: Vec<FragmentId> = FragmentRelation::find()
1018 .select_only()
1019 .column(fragment_relation::Column::SourceFragmentId)
1020 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
1021 .into_tuple()
1022 .all(&inner.db)
1023 .await?;
1024 result.push((fragment, upstreams));
1025 }
1026 Ok(result)
1027 }
1028
1029 pub async fn list_sink_actor_mapping(
1030 &self,
1031 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1032 let inner = self.inner.read().await;
1033 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1034 .select_only()
1035 .columns([sink::Column::SinkId, sink::Column::Name])
1036 .into_tuple()
1037 .all(&inner.db)
1038 .await?;
1039 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1040 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1041
1042 let actor_with_type: Vec<(ActorId, SinkId)> = Actor::find()
1043 .select_only()
1044 .column(actor::Column::ActorId)
1045 .column(fragment::Column::JobId)
1046 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1047 .filter(
1048 fragment::Column::JobId
1049 .is_in(sink_ids)
1050 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
1051 )
1052 .into_tuple()
1053 .all(&inner.db)
1054 .await?;
1055
1056 let mut sink_actor_mapping = HashMap::new();
1057 for (actor_id, sink_id) in actor_with_type {
1058 sink_actor_mapping
1059 .entry(sink_id)
1060 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1061 .1
1062 .push(actor_id);
1063 }
1064
1065 Ok(sink_actor_mapping)
1066 }
1067
1068 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1069 let inner = self.inner.read().await;
1070 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1071 .select_only()
1072 .columns([
1073 fragment::Column::FragmentId,
1074 fragment::Column::JobId,
1075 fragment::Column::StateTableIds,
1076 ])
1077 .into_partial_model()
1078 .all(&inner.db)
1079 .await?;
1080 Ok(fragment_state_tables)
1081 }
1082
1083 pub async fn load_all_actors(
1086 &self,
1087 database_id: Option<DatabaseId>,
1088 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
1089 {
1090 let inner = self.inner.read().await;
1091 let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
1092 let filter_condition = if let Some(database_id) = database_id {
1093 filter_condition.and(object::Column::DatabaseId.eq(database_id))
1094 } else {
1095 filter_condition
1096 };
1097 #[expect(clippy::type_complexity)]
1098 let mut actor_info_stream: BoxStream<
1099 '_,
1100 Result<
1101 (
1102 ActorId,
1103 WorkerId,
1104 Option<VnodeBitmap>,
1105 FragmentId,
1106 StreamNode,
1107 I32Array,
1108 DistributionType,
1109 DatabaseId,
1110 ObjectId,
1111 ),
1112 _,
1113 >,
1114 > = Actor::find()
1115 .select_only()
1116 .column(actor::Column::ActorId)
1117 .column(actor::Column::WorkerId)
1118 .column(actor::Column::VnodeBitmap)
1119 .column(fragment::Column::FragmentId)
1120 .column(fragment::Column::StreamNode)
1121 .column(fragment::Column::StateTableIds)
1122 .column(fragment::Column::DistributionType)
1123 .column(object::Column::DatabaseId)
1124 .column(object::Column::Oid)
1125 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1126 .join(JoinType::InnerJoin, fragment::Relation::Object.def())
1127 .filter(filter_condition)
1128 .into_tuple()
1129 .stream(&inner.db)
1130 .await?;
1131
1132 let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1133 HashMap::new();
1134
1135 while let Some((
1136 actor_id,
1137 worker_id,
1138 vnode_bitmap,
1139 fragment_id,
1140 node,
1141 state_table_ids,
1142 distribution_type,
1143 database_id,
1144 job_id,
1145 )) = actor_info_stream.try_next().await?
1146 {
1147 let fragment_infos = database_fragment_infos
1148 .entry(database_id)
1149 .or_default()
1150 .entry(job_id)
1151 .or_default();
1152 let state_table_ids = state_table_ids.into_inner();
1153 let state_table_ids = state_table_ids
1154 .into_iter()
1155 .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1156 .collect();
1157 let actor_info = InflightActorInfo {
1158 worker_id,
1159 vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1160 };
1161 match fragment_infos.entry(fragment_id) {
1162 Entry::Occupied(mut entry) => {
1163 let info: &mut InflightFragmentInfo = entry.get_mut();
1164 assert_eq!(info.state_table_ids, state_table_ids);
1165 assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1166 }
1167 Entry::Vacant(entry) => {
1168 entry.insert(InflightFragmentInfo {
1169 fragment_id: fragment_id as _,
1170 distribution_type,
1171 nodes: node.to_protobuf(),
1172 actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1173 state_table_ids,
1174 });
1175 }
1176 }
1177 }
1178
1179 debug!(?database_fragment_infos, "reload all actors");
1180
1181 Ok(database_fragment_infos)
1182 }
1183
1184 pub async fn migrate_actors(
1185 &self,
1186 plan: HashMap<WorkerSlotId, WorkerSlotId>,
1187 ) -> MetaResult<()> {
1188 let inner = self.inner.read().await;
1189 let txn = inner.db.begin().await?;
1190
1191 let actors: Vec<(
1192 FragmentId,
1193 DistributionType,
1194 ActorId,
1195 Option<VnodeBitmap>,
1196 WorkerId,
1197 ActorStatus,
1198 )> = Actor::find()
1199 .select_only()
1200 .columns([
1201 fragment::Column::FragmentId,
1202 fragment::Column::DistributionType,
1203 ])
1204 .columns([
1205 actor::Column::ActorId,
1206 actor::Column::VnodeBitmap,
1207 actor::Column::WorkerId,
1208 actor::Column::Status,
1209 ])
1210 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1211 .into_tuple()
1212 .all(&txn)
1213 .await?;
1214
1215 let mut actor_locations = HashMap::new();
1216
1217 for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1218 if *status != ActorStatus::Running {
1219 tracing::warn!(
1220 "skipping actor {} in fragment {} with status {:?}",
1221 actor_id,
1222 fragment_id,
1223 status
1224 );
1225 continue;
1226 }
1227
1228 actor_locations
1229 .entry(*worker_id)
1230 .or_insert(HashMap::new())
1231 .entry(*fragment_id)
1232 .or_insert(BTreeSet::new())
1233 .insert(*actor_id);
1234 }
1235
1236 let expired_or_changed_workers: HashSet<_> =
1237 plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1238
1239 let mut actor_migration_plan = HashMap::new();
1240 for (worker, fragment) in actor_locations {
1241 if expired_or_changed_workers.contains(&worker) {
1242 for (fragment_id, actors) in fragment {
1243 debug!(
1244 "worker {} expired or changed, migrating fragment {}",
1245 worker, fragment_id
1246 );
1247 let worker_slot_to_actor: HashMap<_, _> = actors
1248 .iter()
1249 .enumerate()
1250 .map(|(idx, actor_id)| {
1251 (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1252 })
1253 .collect();
1254
1255 for (worker_slot, actor) in worker_slot_to_actor {
1256 if let Some(target) = plan.get(&worker_slot) {
1257 actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1258 }
1259 }
1260 }
1261 }
1262 }
1263
1264 for (actor, worker) in actor_migration_plan {
1265 Actor::update_many()
1266 .col_expr(
1267 actor::Column::WorkerId,
1268 Expr::value(Value::Int(Some(worker))),
1269 )
1270 .filter(actor::Column::ActorId.eq(actor))
1271 .exec(&txn)
1272 .await?;
1273 }
1274
1275 txn.commit().await?;
1276
1277 Ok(())
1278 }
1279
1280 pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1281 let inner = self.inner.read().await;
1282
1283 let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1284 .select_only()
1285 .columns([fragment::Column::FragmentId])
1286 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1287 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1288 .into_tuple()
1289 .all(&inner.db)
1290 .await?;
1291
1292 let mut actor_locations = HashMap::new();
1293
1294 for (fragment_id, _, worker_id) in actors {
1295 *actor_locations
1296 .entry(worker_id)
1297 .or_insert(HashMap::new())
1298 .entry(fragment_id)
1299 .or_insert(0_usize) += 1;
1300 }
1301
1302 let mut result = HashSet::new();
1303 for (worker_id, mapping) in actor_locations {
1304 let max_fragment_len = mapping.values().max().unwrap();
1305
1306 result
1307 .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1308 }
1309
1310 Ok(result)
1311 }
1312
1313 pub async fn all_node_actors(
1314 &self,
1315 include_inactive: bool,
1316 ) -> MetaResult<HashMap<WorkerId, Vec<StreamActor>>> {
1317 let inner = self.inner.read().await;
1318 let fragment_actors = if include_inactive {
1319 FragmentModel::find()
1320 .find_with_related(Actor)
1321 .all(&inner.db)
1322 .await?
1323 } else {
1324 FragmentModel::find()
1325 .find_with_related(Actor)
1326 .filter(actor::Column::Status.eq(ActorStatus::Running))
1327 .all(&inner.db)
1328 .await?
1329 };
1330
1331 let job_definitions = resolve_streaming_job_definition(
1332 &inner.db,
1333 &HashSet::from_iter(fragment_actors.iter().map(|(fragment, _)| fragment.job_id)),
1334 )
1335 .await?;
1336
1337 let mut node_actors = HashMap::new();
1338 for (fragment, actors) in fragment_actors {
1339 let job_id = fragment.job_id;
1340 let (table_fragments, actor_status, _) = Self::compose_fragment(
1341 fragment,
1342 actors,
1343 job_definitions.get(&(job_id as _)).cloned(),
1344 )?;
1345 for actor in table_fragments.actors {
1346 let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
1347 node_actors
1348 .entry(node_id)
1349 .or_insert_with(Vec::new)
1350 .push(actor);
1351 }
1352 }
1353
1354 Ok(node_actors)
1355 }
1356
1357 pub async fn get_worker_actor_ids(
1358 &self,
1359 job_ids: Vec<ObjectId>,
1360 ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
1361 let inner = self.inner.read().await;
1362 let actor_workers: Vec<(ActorId, WorkerId)> = Actor::find()
1363 .select_only()
1364 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1365 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1366 .filter(fragment::Column::JobId.is_in(job_ids))
1367 .into_tuple()
1368 .all(&inner.db)
1369 .await?;
1370
1371 let mut worker_actors = BTreeMap::new();
1372 for (actor_id, worker_id) in actor_workers {
1373 worker_actors
1374 .entry(worker_id)
1375 .or_insert_with(Vec::new)
1376 .push(actor_id);
1377 }
1378
1379 Ok(worker_actors)
1380 }
1381
1382 pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1383 let inner = self.inner.read().await;
1384 let txn = inner.db.begin().await?;
1385 for assignments in split_assignment.values() {
1386 for (actor_id, splits) in assignments {
1387 let actor_splits = splits.iter().map(Into::into).collect_vec();
1388 Actor::update(actor::ActiveModel {
1389 actor_id: Set(*actor_id as _),
1390 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1391 splits: actor_splits,
1392 }))),
1393 ..Default::default()
1394 })
1395 .exec(&txn)
1396 .await
1397 .map_err(|err| {
1398 if err == DbErr::RecordNotUpdated {
1399 MetaError::catalog_id_not_found("actor_id", actor_id)
1400 } else {
1401 err.into()
1402 }
1403 })?;
1404 }
1405 }
1406 txn.commit().await?;
1407
1408 Ok(())
1409 }
1410
1411 #[await_tree::instrument]
1412 pub async fn fill_snapshot_backfill_epoch(
1413 &self,
1414 fragment_ids: impl Iterator<Item = FragmentId>,
1415 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1416 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1417 ) -> MetaResult<()> {
1418 let inner = self.inner.write().await;
1419 let txn = inner.db.begin().await?;
1420 for fragment_id in fragment_ids {
1421 let fragment = FragmentModel::find_by_id(fragment_id)
1422 .one(&txn)
1423 .await?
1424 .context(format!("fragment {} not found", fragment_id))?;
1425 let mut node = fragment.stream_node.to_protobuf();
1426 if crate::stream::fill_snapshot_backfill_epoch(
1427 &mut node,
1428 snapshot_backfill_info,
1429 cross_db_snapshot_backfill_info,
1430 )? {
1431 let node = StreamNode::from(&node);
1432 FragmentModel::update(fragment::ActiveModel {
1433 fragment_id: Set(fragment_id),
1434 stream_node: Set(node),
1435 ..Default::default()
1436 })
1437 .exec(&txn)
1438 .await?;
1439 }
1440 }
1441 txn.commit().await?;
1442 Ok(())
1443 }
1444
1445 pub async fn get_running_actors_of_fragment(
1447 &self,
1448 fragment_id: FragmentId,
1449 ) -> MetaResult<Vec<ActorId>> {
1450 let inner = self.inner.read().await;
1451 let actors: Vec<ActorId> = Actor::find()
1452 .select_only()
1453 .column(actor::Column::ActorId)
1454 .filter(actor::Column::FragmentId.eq(fragment_id))
1455 .filter(actor::Column::Status.eq(ActorStatus::Running))
1456 .into_tuple()
1457 .all(&inner.db)
1458 .await?;
1459 Ok(actors)
1460 }
1461
1462 pub async fn get_running_actors_for_source_backfill(
1465 &self,
1466 source_backfill_fragment_id: FragmentId,
1467 source_fragment_id: FragmentId,
1468 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1469 let inner = self.inner.read().await;
1470 let txn = inner.db.begin().await?;
1471
1472 let fragment_relation: DispatcherType = FragmentRelation::find()
1473 .select_only()
1474 .column(fragment_relation::Column::DispatcherType)
1475 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1476 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1477 .into_tuple()
1478 .one(&txn)
1479 .await?
1480 .ok_or_else(|| {
1481 anyhow!(
1482 "no fragment connection from source fragment {} to source backfill fragment {}",
1483 source_fragment_id,
1484 source_backfill_fragment_id
1485 )
1486 })?;
1487
1488 if fragment_relation != DispatcherType::NoShuffle {
1489 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1490 }
1491
1492 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1493 let result: MetaResult<DistributionType> = try {
1494 FragmentModel::find_by_id(fragment_id)
1495 .select_only()
1496 .column(fragment::Column::DistributionType)
1497 .into_tuple()
1498 .one(txn)
1499 .await?
1500 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1501 };
1502 result
1503 };
1504
1505 let source_backfill_distribution_type =
1506 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1507 let source_distribution_type =
1508 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1509
1510 let load_fragment_actor_distribution = |txn, fragment_id: FragmentId| async move {
1511 Actor::find()
1512 .select_only()
1513 .column(actor::Column::ActorId)
1514 .column(actor::Column::VnodeBitmap)
1515 .filter(actor::Column::FragmentId.eq(fragment_id))
1516 .into_tuple()
1517 .stream(txn)
1518 .await?
1519 .map(|result| {
1520 result.map(|(actor_id, vnode): (ActorId, Option<VnodeBitmap>)| {
1521 (
1522 actor_id as _,
1523 vnode.map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1524 )
1525 })
1526 })
1527 .try_collect()
1528 .await
1529 };
1530
1531 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1532 load_fragment_actor_distribution(&txn, source_backfill_fragment_id).await?;
1533
1534 let source_actors = load_fragment_actor_distribution(&txn, source_fragment_id).await?;
1535
1536 Ok(resolve_no_shuffle_actor_dispatcher(
1537 source_distribution_type,
1538 &source_actors,
1539 source_backfill_distribution_type,
1540 &source_backfill_actors,
1541 )
1542 .into_iter()
1543 .map(|(source_actor, source_backfill_actor)| {
1544 (source_backfill_actor as _, source_actor as _)
1545 })
1546 .collect())
1547 }
1548
1549 pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
1550 let inner = self.inner.read().await;
1551 let actors: Vec<ActorId> = Actor::find()
1552 .select_only()
1553 .column(actor::Column::ActorId)
1554 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1555 .filter(fragment::Column::JobId.is_in(job_ids))
1556 .into_tuple()
1557 .all(&inner.db)
1558 .await?;
1559 Ok(actors)
1560 }
1561
1562 pub async fn get_root_fragments(
1575 &self,
1576 job_ids: Vec<ObjectId>,
1577 ) -> MetaResult<(HashMap<ObjectId, Fragment>, Vec<(ActorId, WorkerId)>)> {
1578 let inner = self.inner.read().await;
1579
1580 let job_definitions = resolve_streaming_job_definition(
1581 &inner.db,
1582 &HashSet::from_iter(job_ids.iter().copied()),
1583 )
1584 .await?;
1585
1586 let all_fragments = FragmentModel::find()
1587 .filter(fragment::Column::JobId.is_in(job_ids))
1588 .all(&inner.db)
1589 .await?;
1590 let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1592 for fragment in all_fragments {
1593 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1594 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1595 _ = root_fragments.insert(fragment.job_id, fragment);
1596 } else if mask.contains(FragmentTypeFlag::Source) {
1597 _ = root_fragments.try_insert(fragment.job_id, fragment);
1600 }
1601 }
1602
1603 let mut root_fragments_pb = HashMap::new();
1604 for (_, fragment) in root_fragments {
1605 let actors = fragment.find_related(Actor).all(&inner.db).await?;
1606
1607 let job_id = fragment.job_id;
1608 root_fragments_pb.insert(
1609 fragment.job_id,
1610 Self::compose_fragment(
1611 fragment,
1612 actors,
1613 job_definitions.get(&(job_id as _)).cloned(),
1614 )?
1615 .0,
1616 );
1617 }
1618
1619 let actors: Vec<(ActorId, WorkerId)> = Actor::find()
1620 .select_only()
1621 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1622 .into_tuple()
1623 .all(&inner.db)
1624 .await?;
1625
1626 Ok((root_fragments_pb, actors))
1627 }
1628
1629 pub async fn get_root_fragment(
1630 &self,
1631 job_id: ObjectId,
1632 ) -> MetaResult<(Fragment, Vec<(ActorId, WorkerId)>)> {
1633 let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1634 let root_fragment = root_fragments
1635 .remove(&job_id)
1636 .context(format!("root fragment for job {} not found", job_id))?;
1637 Ok((root_fragment, actors))
1638 }
1639
1640 pub async fn get_downstream_fragments(
1642 &self,
1643 job_id: ObjectId,
1644 ) -> MetaResult<(Vec<(PbDispatcherType, Fragment)>, Vec<(ActorId, WorkerId)>)> {
1645 let (root_fragment, actors) = self.get_root_fragment(job_id).await?;
1646
1647 let inner = self.inner.read().await;
1648 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1649 .filter(
1650 fragment_relation::Column::SourceFragmentId
1651 .eq(root_fragment.fragment_id as FragmentId),
1652 )
1653 .all(&inner.db)
1654 .await?;
1655 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
1656 .await?
1657 .remove(&job_id);
1658
1659 let mut downstream_fragments = vec![];
1660 for fragment_relation::Model {
1661 target_fragment_id: fragment_id,
1662 dispatcher_type,
1663 ..
1664 } in downstream_fragment_relations
1665 {
1666 let mut fragment_actors = FragmentModel::find_by_id(fragment_id)
1667 .find_with_related(Actor)
1668 .all(&inner.db)
1669 .await?;
1670 if fragment_actors.is_empty() {
1671 bail!("No fragment found for fragment id {}", fragment_id);
1672 }
1673 assert_eq!(fragment_actors.len(), 1);
1674 let (fragment, actors) = fragment_actors.pop().unwrap();
1675 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1676 let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0;
1677 downstream_fragments.push((dispatch_type, fragment));
1678 }
1679
1680 Ok((downstream_fragments, actors))
1681 }
1682
1683 pub async fn load_source_fragment_ids(
1684 &self,
1685 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1686 let inner = self.inner.read().await;
1687 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1688 .select_only()
1689 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1690 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1691 .into_tuple()
1692 .all(&inner.db)
1693 .await?;
1694
1695 let mut source_fragment_ids = HashMap::new();
1696 for (fragment_id, stream_node) in fragments {
1697 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1698 source_fragment_ids
1699 .entry(source_id as SourceId)
1700 .or_insert_with(BTreeSet::new)
1701 .insert(fragment_id);
1702 }
1703 }
1704 Ok(source_fragment_ids)
1705 }
1706
1707 pub async fn load_backfill_fragment_ids(
1708 &self,
1709 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1710 let inner = self.inner.read().await;
1711 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1712 .select_only()
1713 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1714 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1715 .into_tuple()
1716 .all(&inner.db)
1717 .await?;
1718
1719 let mut source_fragment_ids = HashMap::new();
1720 for (fragment_id, stream_node) in fragments {
1721 if let Some((source_id, upstream_source_fragment_id)) =
1722 stream_node.to_protobuf().find_source_backfill()
1723 {
1724 source_fragment_ids
1725 .entry(source_id as SourceId)
1726 .or_insert_with(BTreeSet::new)
1727 .insert((fragment_id, upstream_source_fragment_id));
1728 }
1729 }
1730 Ok(source_fragment_ids)
1731 }
1732
1733 pub async fn load_actor_splits(&self) -> MetaResult<HashMap<ActorId, ConnectorSplits>> {
1734 let inner = self.inner.read().await;
1735 let splits: Vec<(ActorId, ConnectorSplits)> = Actor::find()
1736 .select_only()
1737 .columns([actor::Column::ActorId, actor::Column::Splits])
1738 .filter(actor::Column::Splits.is_not_null())
1739 .into_tuple()
1740 .all(&inner.db)
1741 .await?;
1742 Ok(splits.into_iter().collect())
1743 }
1744
1745 pub async fn get_actual_job_fragment_parallelism(
1747 &self,
1748 job_id: ObjectId,
1749 ) -> MetaResult<Option<usize>> {
1750 let inner = self.inner.read().await;
1751 let fragments: Vec<(FragmentId, i64)> = FragmentModel::find()
1752 .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1753 .select_only()
1754 .columns([fragment::Column::FragmentId])
1755 .column_as(actor::Column::ActorId.count(), "count")
1756 .filter(
1757 fragment::Column::JobId
1758 .eq(job_id)
1759 .and(FragmentTypeMask::intersects_any([
1760 FragmentTypeFlag::Mview,
1761 FragmentTypeFlag::Sink,
1762 ])),
1763 )
1764 .group_by(fragment::Column::FragmentId)
1765 .into_tuple()
1766 .all(&inner.db)
1767 .await?;
1768
1769 Ok(fragments
1770 .into_iter()
1771 .at_most_one()
1772 .ok()
1773 .flatten()
1774 .map(|(_, count)| count as usize))
1775 }
1776
1777 pub async fn get_all_upstream_sink_infos(
1778 &self,
1779 target_table: &PbTable,
1780 target_fragment_id: FragmentId,
1781 ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1782 let incoming_sinks = self.get_table_incoming_sinks(target_table.id as _).await?;
1783
1784 let inner = self.inner.read().await;
1785 let txn = inner.db.begin().await?;
1786
1787 let sink_ids = incoming_sinks.iter().map(|s| s.id as SinkId).collect_vec();
1788 let sink_fragment_ids = get_sink_fragment_by_ids(&txn, sink_ids).await?;
1789
1790 let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1791 for pb_sink in &incoming_sinks {
1792 let sink_fragment_id =
1793 sink_fragment_ids
1794 .get(&(pb_sink.id as _))
1795 .cloned()
1796 .ok_or(anyhow::anyhow!(
1797 "sink fragment not found for sink id {}",
1798 pb_sink.id
1799 ))?;
1800 let upstream_info = build_upstream_sink_info(
1801 pb_sink,
1802 sink_fragment_id,
1803 target_table,
1804 target_fragment_id,
1805 )?;
1806 upstream_sink_infos.push(upstream_info);
1807 }
1808
1809 Ok(upstream_sink_infos)
1810 }
1811
1812 pub async fn get_mview_fragment_by_id(&self, table_id: TableId) -> MetaResult<FragmentId> {
1813 let inner = self.inner.read().await;
1814 let txn = inner.db.begin().await?;
1815
1816 let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1817 .select_only()
1818 .column(fragment::Column::FragmentId)
1819 .filter(
1820 fragment::Column::JobId
1821 .eq(table_id)
1822 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1823 )
1824 .into_tuple()
1825 .all(&txn)
1826 .await?;
1827
1828 if mview_fragment.len() != 1 {
1829 return Err(anyhow::anyhow!(
1830 "expected exactly one mview fragment for table {}, found {}",
1831 table_id,
1832 mview_fragment.len()
1833 )
1834 .into());
1835 }
1836
1837 Ok(mview_fragment.into_iter().next().unwrap())
1838 }
1839
1840 pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1841 let inner = self.inner.read().await;
1842 let txn = inner.db.begin().await?;
1843 has_table_been_migrated(&txn, table_id).await
1844 }
1845
1846 pub async fn update_source_splits(
1847 &self,
1848 source_splits: &HashMap<SourceId, Vec<SplitImpl>>,
1849 ) -> MetaResult<()> {
1850 let inner = self.inner.read().await;
1851 let txn = inner.db.begin().await?;
1852
1853 let models: Vec<source_splits::ActiveModel> = source_splits
1854 .iter()
1855 .map(|(source_id, splits)| source_splits::ActiveModel {
1856 source_id: Set(*source_id as _),
1857 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1858 splits: splits.iter().map(Into::into).collect_vec(),
1859 }))),
1860 })
1861 .collect();
1862
1863 SourceSplits::insert_many(models)
1864 .on_conflict(
1865 OnConflict::column(source_splits::Column::SourceId)
1866 .update_column(source_splits::Column::Splits)
1867 .to_owned(),
1868 )
1869 .exec(&txn)
1870 .await?;
1871
1872 txn.commit().await?;
1873
1874 Ok(())
1875 }
1876}
1877
1878#[cfg(test)]
1879mod tests {
1880 use std::collections::{BTreeMap, HashMap, HashSet};
1881
1882 use itertools::Itertools;
1883 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1884 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1885 use risingwave_common::util::iter_util::ZipEqDebug;
1886 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1887 use risingwave_meta_model::actor::ActorStatus;
1888 use risingwave_meta_model::fragment::DistributionType;
1889 use risingwave_meta_model::{
1890 ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1891 VnodeBitmap, actor, fragment,
1892 };
1893 use risingwave_pb::common::PbActorLocation;
1894 use risingwave_pb::meta::table_fragments::PbActorStatus;
1895 use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1896 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1897 use risingwave_pb::plan_common::PbExprContext;
1898 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1899 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1900 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1901
1902 use crate::MetaResult;
1903 use crate::controller::catalog::CatalogController;
1904 use crate::model::{Fragment, StreamActor};
1905
1906 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1907
1908 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1909
1910 const TEST_FRAGMENT_ID: FragmentId = 1;
1911
1912 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1913
1914 const TEST_JOB_ID: ObjectId = 1;
1915
1916 const TEST_STATE_TABLE_ID: TableId = 1000;
1917
1918 fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1919 let mut upstream_actor_ids = BTreeMap::new();
1920 upstream_actor_ids.insert(
1921 TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1922 HashSet::from_iter([(actor_id + 100)]),
1923 );
1924 upstream_actor_ids.insert(
1925 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1926 HashSet::from_iter([(actor_id + 200)]),
1927 );
1928 upstream_actor_ids
1929 }
1930
1931 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1932 let mut input = vec![];
1933 for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1934 input.push(PbStreamNode {
1935 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1936 upstream_fragment_id: *upstream_fragment_id as _,
1937 ..Default::default()
1938 }))),
1939 ..Default::default()
1940 });
1941 }
1942
1943 PbStreamNode {
1944 input,
1945 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1946 ..Default::default()
1947 }
1948 }
1949
1950 #[tokio::test]
1951 async fn test_extract_fragment() -> MetaResult<()> {
1952 let actor_count = 3u32;
1953 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1954 .map(|actor_id| {
1955 (
1956 actor_id as _,
1957 generate_upstream_actor_ids_for_actor(actor_id),
1958 )
1959 })
1960 .collect();
1961
1962 let actor_bitmaps = ActorMapping::new_uniform(
1963 (0..actor_count).map(|i| i as _),
1964 VirtualNode::COUNT_FOR_TEST,
1965 )
1966 .to_bitmaps();
1967
1968 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1969
1970 let pb_actors = (0..actor_count)
1971 .map(|actor_id| StreamActor {
1972 actor_id: actor_id as _,
1973 fragment_id: TEST_FRAGMENT_ID as _,
1974 vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1975 mview_definition: "".to_owned(),
1976 expr_context: Some(PbExprContext {
1977 time_zone: String::from("America/New_York"),
1978 strict_mode: false,
1979 }),
1980 })
1981 .collect_vec();
1982
1983 let pb_fragment = Fragment {
1984 fragment_id: TEST_FRAGMENT_ID as _,
1985 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1986 distribution_type: PbFragmentDistributionType::Hash as _,
1987 actors: pb_actors.clone(),
1988 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1989 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1990 nodes: stream_node.clone(),
1991 };
1992
1993 let pb_actor_status = (0..actor_count)
1994 .map(|actor_id| {
1995 (
1996 actor_id,
1997 PbActorStatus {
1998 location: PbActorLocation::from_worker(0),
1999 state: PbActorState::Running as _,
2000 },
2001 )
2002 })
2003 .collect();
2004
2005 let pb_actor_splits = Default::default();
2006
2007 let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
2008 TEST_JOB_ID,
2009 &pb_fragment,
2010 &pb_actor_status,
2011 &pb_actor_splits,
2012 )?;
2013
2014 check_fragment(fragment, pb_fragment);
2015 check_actors(
2016 actors,
2017 &upstream_actor_ids,
2018 pb_actors,
2019 Default::default(),
2020 &stream_node,
2021 );
2022
2023 Ok(())
2024 }
2025
2026 #[tokio::test]
2027 async fn test_compose_fragment() -> MetaResult<()> {
2028 let actor_count = 3u32;
2029
2030 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2031 .map(|actor_id| {
2032 (
2033 actor_id as _,
2034 generate_upstream_actor_ids_for_actor(actor_id),
2035 )
2036 })
2037 .collect();
2038
2039 let mut actor_bitmaps = ActorMapping::new_uniform(
2040 (0..actor_count).map(|i| i as _),
2041 VirtualNode::COUNT_FOR_TEST,
2042 )
2043 .to_bitmaps();
2044
2045 let actors = (0..actor_count)
2046 .map(|actor_id| {
2047 let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
2048 splits: vec![PbConnectorSplit {
2049 split_type: "dummy".to_owned(),
2050 ..Default::default()
2051 }],
2052 }));
2053
2054 #[expect(deprecated)]
2055 actor::Model {
2056 actor_id: actor_id as ActorId,
2057 fragment_id: TEST_FRAGMENT_ID,
2058 status: ActorStatus::Running,
2059 splits: actor_splits,
2060 worker_id: 0,
2061 upstream_actor_ids: Default::default(),
2062 vnode_bitmap: actor_bitmaps
2063 .remove(&actor_id)
2064 .map(|bitmap| bitmap.to_protobuf())
2065 .as_ref()
2066 .map(VnodeBitmap::from),
2067 expr_context: ExprContext::from(&PbExprContext {
2068 time_zone: String::from("America/New_York"),
2069 strict_mode: false,
2070 }),
2071 }
2072 })
2073 .collect_vec();
2074
2075 let stream_node = {
2076 let template_actor = actors.first().cloned().unwrap();
2077
2078 let template_upstream_actor_ids = upstream_actor_ids
2079 .get(&(template_actor.actor_id as _))
2080 .unwrap();
2081
2082 generate_merger_stream_node(template_upstream_actor_ids)
2083 };
2084
2085 #[expect(deprecated)]
2086 let fragment = fragment::Model {
2087 fragment_id: TEST_FRAGMENT_ID,
2088 job_id: TEST_JOB_ID,
2089 fragment_type_mask: 0,
2090 distribution_type: DistributionType::Hash,
2091 stream_node: StreamNode::from(&stream_node),
2092 state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
2093 upstream_fragment_id: Default::default(),
2094 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2095 };
2096
2097 let (pb_fragment, pb_actor_status, pb_actor_splits) =
2098 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2099
2100 assert_eq!(pb_actor_status.len(), actor_count as usize);
2101 assert_eq!(pb_actor_splits.len(), actor_count as usize);
2102
2103 let pb_actors = pb_fragment.actors.clone();
2104
2105 check_fragment(fragment, pb_fragment);
2106 check_actors(
2107 actors,
2108 &upstream_actor_ids,
2109 pb_actors,
2110 pb_actor_splits,
2111 &stream_node,
2112 );
2113
2114 Ok(())
2115 }
2116
2117 fn check_actors(
2118 actors: Vec<actor::Model>,
2119 actor_upstreams: &FragmentActorUpstreams,
2120 pb_actors: Vec<StreamActor>,
2121 pb_actor_splits: HashMap<u32, PbConnectorSplits>,
2122 stream_node: &PbStreamNode,
2123 ) {
2124 for (
2125 actor::Model {
2126 actor_id,
2127 fragment_id,
2128 status,
2129 splits,
2130 worker_id: _,
2131 vnode_bitmap,
2132 expr_context,
2133 ..
2134 },
2135 StreamActor {
2136 actor_id: pb_actor_id,
2137 fragment_id: pb_fragment_id,
2138 vnode_bitmap: pb_vnode_bitmap,
2139 mview_definition,
2140 expr_context: pb_expr_context,
2141 ..
2142 },
2143 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2144 {
2145 assert_eq!(actor_id, pb_actor_id as ActorId);
2146 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2147
2148 assert_eq!(
2149 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2150 pb_vnode_bitmap,
2151 );
2152
2153 assert_eq!(mview_definition, "");
2154
2155 visit_stream_node_body(stream_node, |body| {
2156 if let PbNodeBody::Merge(m) = body {
2157 assert!(
2158 actor_upstreams
2159 .get(&(actor_id as _))
2160 .unwrap()
2161 .contains_key(&m.upstream_fragment_id)
2162 );
2163 }
2164 });
2165
2166 assert_eq!(status, ActorStatus::Running);
2167
2168 assert_eq!(
2169 splits,
2170 pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
2171 );
2172
2173 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2174 }
2175 }
2176
2177 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2178 let Fragment {
2179 fragment_id,
2180 fragment_type_mask,
2181 distribution_type: pb_distribution_type,
2182 actors: _,
2183 state_table_ids: pb_state_table_ids,
2184 maybe_vnode_count: _,
2185 nodes,
2186 } = pb_fragment;
2187
2188 assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2189 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2190 assert_eq!(
2191 pb_distribution_type,
2192 PbFragmentDistributionType::from(fragment.distribution_type)
2193 );
2194
2195 assert_eq!(
2196 pb_state_table_ids,
2197 fragment.state_table_ids.into_u32_array()
2198 );
2199 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2200 }
2201}