1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::TryStreamExt;
20use futures::stream::BoxStream;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
26use risingwave_common::util::stream_graph_visitor::{
27 visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_connector::source::SplitImpl;
30use risingwave_meta_model::actor::ActorStatus;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34 Actor, Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39 VnodeBitmap, WorkerId, actor, database, fragment, fragment_relation, fragment_splits, object,
40 sink, source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46 Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
49use risingwave_pb::meta::table_fragments::fragment::{
50 FragmentDistributionType, PbFragmentDistributionType,
51};
52use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
53use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
54use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
55use risingwave_pb::stream_plan;
56use risingwave_pb::stream_plan::stream_node::NodeBody;
57use risingwave_pb::stream_plan::{
58 PbDispatchOutputMapping, PbDispatcherType, PbStreamContext, PbStreamNode, PbStreamScanType,
59 StreamScanType,
60};
61use sea_orm::ActiveValue::Set;
62use sea_orm::sea_query::Expr;
63use sea_orm::{
64 ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, PaginatorTrait, QueryFilter,
65 QuerySelect, RelationTrait, TransactionTrait, Value,
66};
67use serde::{Deserialize, Serialize};
68use tracing::debug;
69
70use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
71use crate::controller::catalog::CatalogController;
72use crate::controller::scale::resolve_streaming_job_definition;
73use crate::controller::utils::{
74 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
75 get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
76 resolve_no_shuffle_actor_dispatcher,
77};
78use crate::manager::{LocalNotification, NotificationManager};
79use crate::model::{
80 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
81 StreamActor, StreamContext, StreamJobFragments, TableParallelism,
82};
83use crate::rpc::ddl_controller::build_upstream_sink_info;
84use crate::stream::{SplitAssignment, UpstreamSinkInfo};
85use crate::{MetaError, MetaResult, model};
86
87#[derive(Clone, Debug)]
89pub struct InflightActorInfo {
90 pub worker_id: WorkerId,
91 pub vnode_bitmap: Option<Bitmap>,
92 pub splits: Vec<SplitImpl>,
93}
94
95#[derive(Clone, Debug)]
96pub struct InflightFragmentInfo {
97 pub fragment_id: crate::model::FragmentId,
98 pub distribution_type: DistributionType,
99 pub fragment_type_mask: FragmentTypeMask,
100 pub vnode_count: usize,
101 pub nodes: PbStreamNode,
102 pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
103 pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
104}
105
106#[derive(Clone, Debug)]
107pub struct FragmentParallelismInfo {
108 pub distribution_type: FragmentDistributionType,
109 pub actor_count: usize,
110 pub vnode_count: usize,
111}
112
113#[easy_ext::ext(FragmentTypeMaskExt)]
114pub impl FragmentTypeMask {
115 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
116 Expr::col(fragment::Column::FragmentTypeMask)
117 .bit_and(Expr::value(flag as i32))
118 .ne(0)
119 }
120
121 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
122 Expr::col(fragment::Column::FragmentTypeMask)
123 .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
124 .ne(0)
125 }
126
127 fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
128 Expr::col(fragment::Column::FragmentTypeMask)
129 .bit_and(Expr::value(flag as i32))
130 .eq(0)
131 }
132}
133
134#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
135#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
137 pub job_id: ObjectId,
138 pub obj_type: ObjectType,
139 pub name: String,
140 pub job_status: JobStatus,
141 pub parallelism: StreamingParallelism,
142 pub max_parallelism: i32,
143 pub resource_group: String,
144 pub database_id: DatabaseId,
145 pub schema_id: SchemaId,
146}
147
148impl NotificationManager {
149 pub(crate) fn notify_fragment_mapping(
150 &self,
151 operation: NotificationOperation,
152 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
153 ) {
154 let fragment_ids = fragment_mappings
155 .iter()
156 .map(|mapping| mapping.fragment_id)
157 .collect_vec();
158 if fragment_ids.is_empty() {
159 return;
160 }
161 for fragment_mapping in fragment_mappings {
163 self.notify_frontend_without_version(
164 operation,
165 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
166 );
167 }
168
169 match operation {
171 NotificationOperation::Add | NotificationOperation::Update => {
172 self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
173 fragment_ids,
174 ));
175 }
176 NotificationOperation::Delete => {
177 self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
178 fragment_ids,
179 ));
180 }
181 op => {
182 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
183 }
184 }
185 }
186}
187
188impl CatalogController {
189 pub fn extract_fragment_and_actors_from_fragments(
190 job_id: ObjectId,
191 fragments: impl Iterator<Item = &Fragment>,
192 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
193 actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
194 ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
195 fragments
196 .map(|fragment| {
197 Self::extract_fragment_and_actors_for_new_job(
198 job_id,
199 fragment,
200 actor_status,
201 actor_splits,
202 )
203 })
204 .try_collect()
205 }
206
207 pub fn extract_fragment_and_actors_for_new_job(
208 job_id: ObjectId,
209 fragment: &Fragment,
210 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
211 actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
212 ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
213 let vnode_count = fragment.vnode_count();
214 let Fragment {
215 fragment_id: pb_fragment_id,
216 fragment_type_mask: pb_fragment_type_mask,
217 distribution_type: pb_distribution_type,
218 actors: pb_actors,
219 state_table_ids: pb_state_table_ids,
220 nodes,
221 ..
222 } = fragment;
223
224 let state_table_ids = pb_state_table_ids.clone().into();
225
226 assert!(!pb_actors.is_empty());
227
228 let stream_node = {
229 let mut stream_node = nodes.clone();
230 visit_stream_node_mut(&mut stream_node, |body| {
231 #[expect(deprecated)]
232 if let NodeBody::Merge(m) = body {
233 m.upstream_actor_id = vec![];
234 }
235 });
236
237 stream_node
238 };
239
240 let mut actors = vec![];
241
242 for actor in pb_actors {
243 let StreamActor {
244 actor_id,
245 fragment_id,
246 vnode_bitmap,
247 mview_definition: _,
248 expr_context: pb_expr_context,
249 ..
250 } = actor;
251
252 let splits = actor_splits.get(actor_id).map(|splits| {
253 ConnectorSplits::from(&PbConnectorSplits {
254 splits: splits.iter().map(ConnectorSplit::from).collect(),
255 })
256 });
257 let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
258 anyhow::anyhow!(
259 "actor {} in fragment {} has no actor_status",
260 actor_id,
261 fragment_id
262 )
263 })?;
264
265 let worker_id = status.worker_id() as _;
266
267 let pb_expr_context = pb_expr_context
268 .as_ref()
269 .expect("no expression context found");
270
271 #[expect(deprecated)]
272 actors.push(actor::Model {
273 actor_id: *actor_id as _,
274 fragment_id: *fragment_id as _,
275 status: status.get_state().unwrap().into(),
276 splits,
277 worker_id,
278 upstream_actor_ids: Default::default(),
279 vnode_bitmap: vnode_bitmap
280 .as_ref()
281 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
282 expr_context: ExprContext::from(pb_expr_context),
283 });
284 }
285
286 let stream_node = StreamNode::from(&stream_node);
287
288 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
289 .unwrap()
290 .into();
291
292 #[expect(deprecated)]
293 let fragment = fragment::Model {
294 fragment_id: *pb_fragment_id as _,
295 job_id,
296 fragment_type_mask: (*pb_fragment_type_mask).into(),
297 distribution_type,
298 stream_node,
299 state_table_ids,
300 upstream_fragment_id: Default::default(),
301 vnode_count: vnode_count as _,
302 };
303
304 Ok((fragment, actors))
305 }
306
307 pub fn compose_table_fragments(
308 table_id: u32,
309 state: PbState,
310 ctx: Option<PbStreamContext>,
311 fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
312 parallelism: StreamingParallelism,
313 max_parallelism: usize,
314 job_definition: Option<String>,
315 ) -> MetaResult<StreamJobFragments> {
316 let mut pb_fragments = BTreeMap::new();
317 let mut pb_actor_status = BTreeMap::new();
318
319 for (fragment, actors) in fragments {
320 let (fragment, fragment_actor_status, _) =
321 Self::compose_fragment(fragment, actors, job_definition.clone())?;
322
323 pb_fragments.insert(fragment.fragment_id, fragment);
324 pb_actor_status.extend(fragment_actor_status.into_iter());
325 }
326
327 let table_fragments = StreamJobFragments {
328 stream_job_id: table_id.into(),
329 state: state as _,
330 fragments: pb_fragments,
331 actor_status: pb_actor_status,
332 ctx: ctx
333 .as_ref()
334 .map(StreamContext::from_protobuf)
335 .unwrap_or_default(),
336 assigned_parallelism: match parallelism {
337 StreamingParallelism::Custom => TableParallelism::Custom,
338 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
339 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
340 },
341 max_parallelism,
342 };
343
344 Ok(table_fragments)
345 }
346
347 #[allow(clippy::type_complexity)]
348 pub(crate) fn compose_fragment(
349 fragment: fragment::Model,
350 actors: Vec<actor::Model>,
351 job_definition: Option<String>,
352 ) -> MetaResult<(
353 Fragment,
354 HashMap<crate::model::ActorId, PbActorStatus>,
355 HashMap<crate::model::ActorId, PbConnectorSplits>,
356 )> {
357 let fragment::Model {
358 fragment_id,
359 fragment_type_mask,
360 distribution_type,
361 stream_node,
362 state_table_ids,
363 vnode_count,
364 ..
365 } = fragment;
366
367 let stream_node = stream_node.to_protobuf();
368 let mut upstream_fragments = HashSet::new();
369 visit_stream_node_body(&stream_node, |body| {
370 if let NodeBody::Merge(m) = body {
371 assert!(
372 upstream_fragments.insert(m.upstream_fragment_id),
373 "non-duplicate upstream fragment"
374 );
375 }
376 });
377
378 let mut pb_actors = vec![];
379
380 let mut pb_actor_status = HashMap::new();
381 let mut pb_actor_splits = HashMap::new();
382
383 for actor in actors {
384 if actor.fragment_id != fragment_id {
385 bail!(
386 "fragment id {} from actor {} is different from fragment {}",
387 actor.fragment_id,
388 actor.actor_id,
389 fragment_id
390 )
391 }
392
393 let actor::Model {
394 actor_id,
395 fragment_id,
396 status,
397 worker_id,
398 splits,
399 vnode_bitmap,
400 expr_context,
401 ..
402 } = actor;
403
404 let vnode_bitmap =
405 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
406 let pb_expr_context = Some(expr_context.to_protobuf());
407
408 pb_actor_status.insert(
409 actor_id as _,
410 PbActorStatus {
411 location: PbActorLocation::from_worker(worker_id as u32),
412 state: PbActorState::from(status) as _,
413 },
414 );
415
416 if let Some(splits) = splits {
417 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
418 }
419
420 pb_actors.push(StreamActor {
421 actor_id: actor_id as _,
422 fragment_id: fragment_id as _,
423 vnode_bitmap,
424 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
425 expr_context: pb_expr_context,
426 })
427 }
428
429 let pb_state_table_ids = state_table_ids.into_u32_array();
430 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
431 let pb_fragment = Fragment {
432 fragment_id: fragment_id as _,
433 fragment_type_mask: fragment_type_mask.into(),
434 distribution_type: pb_distribution_type,
435 actors: pb_actors,
436 state_table_ids: pb_state_table_ids,
437 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
438 nodes: stream_node,
439 };
440
441 Ok((pb_fragment, pb_actor_status, pb_actor_splits))
442 }
443
444 pub fn running_fragment_parallelisms(
445 &self,
446 id_filter: Option<HashSet<FragmentId>>,
447 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
448 let info = self.env.shared_actor_infos().read_guard();
449
450 let mut result = HashMap::new();
451 for (fragment_id, fragment) in info.iter_over_fragments() {
452 if let Some(id_filter) = &id_filter
453 && !id_filter.contains(&(*fragment_id as _))
454 {
455 continue; }
457
458 result.insert(
459 *fragment_id as _,
460 FragmentParallelismInfo {
461 distribution_type: fragment.distribution_type.into(),
462 actor_count: fragment.actors.len() as _,
463 vnode_count: fragment.vnode_count,
464 },
465 );
466 }
467
468 Ok(result)
469 }
470
471 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
472 let inner = self.inner.read().await;
473 let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
474 .select_only()
475 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
476 .into_tuple()
477 .all(&inner.db)
478 .await?;
479 Ok(fragment_jobs.into_iter().collect())
480 }
481
482 pub async fn get_fragment_job_id(
483 &self,
484 fragment_ids: Vec<FragmentId>,
485 ) -> MetaResult<Vec<ObjectId>> {
486 let inner = self.inner.read().await;
487
488 let object_ids: Vec<ObjectId> = FragmentModel::find()
489 .select_only()
490 .column(fragment::Column::JobId)
491 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
492 .into_tuple()
493 .all(&inner.db)
494 .await?;
495
496 Ok(object_ids)
497 }
498
499 pub async fn get_fragment_desc_by_id(
500 &self,
501 fragment_id: FragmentId,
502 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
503 let inner = self.inner.read().await;
504
505 let fragment_model_opt = FragmentModel::find_by_id(fragment_id)
506 .one(&inner.db)
507 .await?;
508
509 let fragment = fragment_model_opt.map(|fragment| {
510 let info = self.env.shared_actor_infos().read_guard();
511
512 let SharedFragmentInfo { actors, .. } = info
513 .get_fragment(fragment.fragment_id as _)
514 .unwrap_or_else(|| {
515 panic!(
516 "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
517 fragment.fragment_id,
518 fragment.job_id
519 )
520 });
521
522 FragmentDesc {
523 fragment_id: fragment.fragment_id,
524 job_id: fragment.job_id,
525 fragment_type_mask: fragment.fragment_type_mask,
526 distribution_type: fragment.distribution_type,
527 state_table_ids: fragment.state_table_ids.clone(),
528 vnode_count: fragment.vnode_count,
529 stream_node: fragment.stream_node.clone(),
530 parallelism: actors.len() as _,
531 }
532 });
533
534 let Some(fragment) = fragment else {
535 return Ok(None); };
537
538 let upstreams: Vec<FragmentId> = FragmentRelation::find()
540 .select_only()
541 .column(fragment_relation::Column::SourceFragmentId)
542 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
543 .into_tuple()
544 .all(&inner.db)
545 .await?;
546
547 Ok(Some((fragment, upstreams)))
548 }
549
550 pub async fn list_fragment_database_ids(
551 &self,
552 select_fragment_ids: Option<Vec<FragmentId>>,
553 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
554 let inner = self.inner.read().await;
555 let select = FragmentModel::find()
556 .select_only()
557 .column(fragment::Column::FragmentId)
558 .column(object::Column::DatabaseId)
559 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
560 let select = if let Some(select_fragment_ids) = select_fragment_ids {
561 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
562 } else {
563 select
564 };
565 Ok(select.into_tuple().all(&inner.db).await?)
566 }
567
568 pub async fn get_job_fragments_by_id(
569 &self,
570 job_id: ObjectId,
571 ) -> MetaResult<StreamJobFragments> {
572 let inner = self.inner.read().await;
573 let fragment_actors = FragmentModel::find()
574 .find_with_related(Actor)
575 .filter(fragment::Column::JobId.eq(job_id))
576 .all(&inner.db)
577 .await?;
578
579 let job_info = StreamingJob::find_by_id(job_id)
580 .one(&inner.db)
581 .await?
582 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
583
584 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
585 .await?
586 .remove(&job_id);
587
588 Self::compose_table_fragments(
589 job_id as _,
590 job_info.job_status.into(),
591 job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
592 fragment_actors,
593 job_info.parallelism.clone(),
594 job_info.max_parallelism as _,
595 job_definition,
596 )
597 }
598
599 pub async fn get_fragment_actor_dispatchers(
600 &self,
601 fragment_ids: Vec<FragmentId>,
602 ) -> MetaResult<FragmentActorDispatchers> {
603 let inner = self.inner.read().await;
604 get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
605 }
606
607 pub async fn get_fragment_downstream_relations(
608 &self,
609 fragment_ids: Vec<FragmentId>,
610 ) -> MetaResult<FragmentDownstreamRelation> {
611 let inner = self.inner.read().await;
612 let mut stream = FragmentRelation::find()
613 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
614 .stream(&inner.db)
615 .await?;
616 let mut relations = FragmentDownstreamRelation::new();
617 while let Some(relation) = stream.try_next().await? {
618 relations
619 .entry(relation.source_fragment_id as _)
620 .or_default()
621 .push(DownstreamFragmentRelation {
622 downstream_fragment_id: relation.target_fragment_id as _,
623 dispatcher_type: relation.dispatcher_type,
624 dist_key_indices: relation.dist_key_indices.into_u32_array(),
625 output_mapping: PbDispatchOutputMapping {
626 indices: relation.output_indices.into_u32_array(),
627 types: relation
628 .output_type_mapping
629 .unwrap_or_default()
630 .to_protobuf(),
631 },
632 });
633 }
634 Ok(relations)
635 }
636
637 pub async fn get_job_fragment_backfill_scan_type(
638 &self,
639 job_id: ObjectId,
640 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
641 let inner = self.inner.read().await;
642 let fragments: Vec<_> = FragmentModel::find()
643 .filter(fragment::Column::JobId.eq(job_id))
644 .all(&inner.db)
645 .await?;
646
647 let mut result = HashMap::new();
648
649 for fragment::Model {
650 fragment_id,
651 stream_node,
652 ..
653 } in fragments
654 {
655 let stream_node = stream_node.to_protobuf();
656 visit_stream_node_body(&stream_node, |body| {
657 if let NodeBody::StreamScan(node) = body {
658 match node.stream_scan_type() {
659 StreamScanType::Unspecified => {}
660 scan_type => {
661 result.insert(fragment_id as crate::model::FragmentId, scan_type);
662 }
663 }
664 }
665 });
666 }
667
668 Ok(result)
669 }
670
671 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
672 let inner = self.inner.read().await;
673 let job_states = StreamingJob::find()
674 .select_only()
675 .column(streaming_job::Column::JobId)
676 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
677 .join(JoinType::InnerJoin, object::Relation::Database2.def())
678 .column(object::Column::ObjType)
679 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
680 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
681 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
682 .column_as(
683 Expr::if_null(
684 Expr::col((table::Entity, table::Column::Name)),
685 Expr::if_null(
686 Expr::col((source::Entity, source::Column::Name)),
687 Expr::if_null(
688 Expr::col((sink::Entity, sink::Column::Name)),
689 Expr::val("<unknown>"),
690 ),
691 ),
692 ),
693 "name",
694 )
695 .columns([
696 streaming_job::Column::JobStatus,
697 streaming_job::Column::Parallelism,
698 streaming_job::Column::MaxParallelism,
699 ])
700 .column_as(
701 Expr::if_null(
702 Expr::col((
703 streaming_job::Entity,
704 streaming_job::Column::SpecificResourceGroup,
705 )),
706 Expr::col((database::Entity, database::Column::ResourceGroup)),
707 ),
708 "resource_group",
709 )
710 .column(object::Column::DatabaseId)
711 .column(object::Column::SchemaId)
712 .into_model()
713 .all(&inner.db)
714 .await?;
715 Ok(job_states)
716 }
717
718 pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
719 let inner = self.inner.read().await;
720 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
721 .select_only()
722 .column(streaming_job::Column::MaxParallelism)
723 .into_tuple()
724 .one(&inner.db)
725 .await?
726 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
727 Ok(max_parallelism as usize)
728 }
729
730 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
732 if let Ok(inner) = self.inner.try_read()
733 && let Ok(job_state_tables) = FragmentModel::find()
734 .select_only()
735 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
736 .into_tuple::<(ObjectId, I32Array)>()
737 .all(&inner.db)
738 .await
739 {
740 let mut job_internal_table_ids = HashMap::new();
741 for (job_id, state_table_ids) in job_state_tables {
742 job_internal_table_ids
743 .entry(job_id)
744 .or_insert_with(Vec::new)
745 .extend(state_table_ids.into_inner());
746 }
747 return Some(job_internal_table_ids.into_iter().collect());
748 }
749 None
750 }
751
752 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
753 let inner = self.inner.read().await;
754 let count = FragmentModel::find().count(&inner.db).await?;
755 Ok(count > 0)
756 }
757
758 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
759 let read_guard = self.env.shared_actor_infos().read_guard();
760 let actor_cnt: HashMap<WorkerId, _> = read_guard
761 .iter_over_fragments()
762 .flat_map(|(_, fragment)| {
763 fragment
764 .actors
765 .iter()
766 .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
767 })
768 .into_group_map()
769 .into_iter()
770 .map(|(k, v)| (k, v.len()))
771 .collect();
772
773 Ok(actor_cnt)
774 }
775
776 pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
778 let inner = self.inner.read().await;
779 let jobs = StreamingJob::find().all(&inner.db).await?;
780
781 let mut job_definition = resolve_streaming_job_definition(
782 &inner.db,
783 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
784 )
785 .await?;
786
787 let mut table_fragments = BTreeMap::new();
788 for job in jobs {
789 let fragment_actors = FragmentModel::find()
790 .find_with_related(Actor)
791 .filter(fragment::Column::JobId.eq(job.job_id))
792 .all(&inner.db)
793 .await?;
794
795 table_fragments.insert(
796 job.job_id as ObjectId,
797 Self::compose_table_fragments(
798 job.job_id as _,
799 job.job_status.into(),
800 job.timezone.map(|tz| PbStreamContext { timezone: tz }),
801 fragment_actors,
802 job.parallelism.clone(),
803 job.max_parallelism as _,
804 job_definition.remove(&job.job_id),
805 )?,
806 );
807 }
808
809 Ok(table_fragments)
810 }
811
812 pub async fn cdc_table_backfill_actor_ids(&self) -> MetaResult<HashMap<u32, HashSet<u32>>> {
814 let inner = self.inner.read().await;
815 let mut job_id_actor_ids = HashMap::default();
816 let stream_cdc_scan_flag = FragmentTypeFlag::StreamCdcScan as i32;
817 let fragment_type_mask = stream_cdc_scan_flag;
818 let fragment_actors: Vec<(fragment::Model, Vec<actor::Model>)> = FragmentModel::find()
819 .find_with_related(Actor)
820 .filter(fragment::Column::FragmentTypeMask.eq(fragment_type_mask))
821 .all(&inner.db)
822 .await?;
823 for (fragment, actors) in fragment_actors {
824 let e: &mut HashSet<u32> = job_id_actor_ids.entry(fragment.job_id as _).or_default();
825 e.extend(actors.iter().map(|a| a.actor_id as u32));
826 }
827 Ok(job_id_actor_ids)
828 }
829
830 pub async fn upstream_fragments(
831 &self,
832 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
833 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
834 let inner = self.inner.read().await;
835 let mut stream = FragmentRelation::find()
836 .select_only()
837 .columns([
838 fragment_relation::Column::SourceFragmentId,
839 fragment_relation::Column::TargetFragmentId,
840 ])
841 .filter(
842 fragment_relation::Column::TargetFragmentId
843 .is_in(fragment_ids.map(|id| id as FragmentId)),
844 )
845 .into_tuple::<(FragmentId, FragmentId)>()
846 .stream(&inner.db)
847 .await?;
848 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
849 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
850 upstream_fragments
851 .entry(downstream_fragment_id as crate::model::FragmentId)
852 .or_default()
853 .insert(upstream_fragment_id as crate::model::FragmentId);
854 }
855 Ok(upstream_fragments)
856 }
857
858 pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
859 let info = self.env.shared_actor_infos().read_guard();
860
861 let actor_locations = info
862 .iter_over_fragments()
863 .flat_map(|(fragment_id, fragment)| {
864 fragment
865 .actors
866 .iter()
867 .map(|(actor_id, actor)| PartialActorLocation {
868 actor_id: *actor_id as _,
869 fragment_id: *fragment_id as _,
870 worker_id: actor.worker_id,
871 status: ActorStatus::Running,
872 })
873 })
874 .collect_vec();
875
876 Ok(actor_locations)
877 }
878
879 pub async fn list_actor_info(
880 &self,
881 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
882 let inner = self.inner.read().await;
883
884 let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
885 FragmentModel::find()
886 .select_only()
887 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
888 .column_as(object::Column::Oid, "job_id")
889 .column_as(object::Column::SchemaId, "schema_id")
890 .column_as(object::Column::ObjType, "type")
891 .into_tuple()
892 .all(&inner.db)
893 .await?;
894
895 let actor_infos = {
896 let info = self.env.shared_actor_infos().read_guard();
897
898 fragment_objects
899 .into_iter()
900 .flat_map(|(fragment_id, object_id, schema_id, object_type)| {
901 let SharedFragmentInfo {
902 fragment_id,
903 actors,
904 ..
905 } = info.get_fragment(fragment_id as _).unwrap();
906 actors.keys().map(move |actor_id| {
907 (
908 *actor_id as _,
909 *fragment_id as _,
910 object_id,
911 schema_id,
912 object_type,
913 )
914 })
915 })
916 .collect_vec()
917 };
918
919 Ok(actor_infos)
920 }
921
922 pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
923 let inner = self.inner.read().await;
924
925 let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
926 .select_only()
927 .filter(actor::Column::Splits.is_not_null())
928 .columns([actor::Column::ActorId, actor::Column::FragmentId])
929 .into_tuple()
930 .all(&inner.db)
931 .await?;
932
933 Ok(source_actors)
934 }
935
936 pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
937 let guard = self.env.shared_actor_info.read_guard();
938 guard
939 .iter_over_fragments()
940 .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
941 .collect_vec()
942 }
943
944 pub async fn list_fragment_descs(
945 &self,
946 is_creating: bool,
947 ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
948 let inner = self.inner.read().await;
949
950 let txn = inner.db.begin().await?;
951
952 let fragments: Vec<_> = if is_creating {
953 FragmentModel::find()
954 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
955 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
956 .filter(
957 streaming_job::Column::JobStatus
958 .eq(JobStatus::Initial)
959 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
960 )
961 .all(&txn)
962 .await?
963 } else {
964 FragmentModel::find().all(&txn).await?
965 };
966
967 let fragment_ids = fragments.iter().map(|f| f.fragment_id).collect_vec();
968
969 let upstreams: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
970 .select_only()
971 .columns([
972 fragment_relation::Column::TargetFragmentId,
973 fragment_relation::Column::SourceFragmentId,
974 ])
975 .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids))
976 .into_tuple()
977 .all(&txn)
978 .await?;
979
980 let guard = self.env.shared_actor_info.read_guard();
981
982 let mut result = Vec::new();
983
984 let mut all_upstreams = upstreams.into_iter().into_group_map();
985
986 for fragment_desc in fragments {
987 let parallelism = guard
989 .get_fragment(fragment_desc.fragment_id as _)
990 .map(|fragment| fragment.actors.len())
991 .unwrap_or_default();
992
993 let upstreams = all_upstreams
994 .remove(&fragment_desc.fragment_id)
995 .unwrap_or_default();
996
997 let fragment = FragmentDistribution {
998 fragment_id: fragment_desc.fragment_id as _,
999 table_id: fragment_desc.job_id as _,
1000 distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type)
1001 as _,
1002 state_table_ids: fragment_desc.state_table_ids.into_u32_array(),
1003 upstream_fragment_ids: upstreams.iter().map(|id| *id as _).collect(),
1004 fragment_type_mask: fragment_desc.fragment_type_mask as _,
1005 parallelism: parallelism as _,
1006 vnode_count: fragment_desc.vnode_count as _,
1007 node: Some(fragment_desc.stream_node.to_protobuf()),
1008 };
1009
1010 result.push((fragment, upstreams));
1011 }
1012 Ok(result)
1013 }
1014
1015 pub async fn list_sink_actor_mapping(
1016 &self,
1017 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1018 let inner = self.inner.read().await;
1019 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1020 .select_only()
1021 .columns([sink::Column::SinkId, sink::Column::Name])
1022 .into_tuple()
1023 .all(&inner.db)
1024 .await?;
1025 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1026 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1027
1028 let actor_with_type: Vec<(ActorId, SinkId)> = Actor::find()
1029 .select_only()
1030 .column(actor::Column::ActorId)
1031 .column(fragment::Column::JobId)
1032 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1033 .filter(
1034 fragment::Column::JobId
1035 .is_in(sink_ids)
1036 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
1037 )
1038 .into_tuple()
1039 .all(&inner.db)
1040 .await?;
1041
1042 let mut sink_actor_mapping = HashMap::new();
1043 for (actor_id, sink_id) in actor_with_type {
1044 sink_actor_mapping
1045 .entry(sink_id)
1046 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1047 .1
1048 .push(actor_id);
1049 }
1050
1051 Ok(sink_actor_mapping)
1052 }
1053
1054 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1055 let inner = self.inner.read().await;
1056 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1057 .select_only()
1058 .columns([
1059 fragment::Column::FragmentId,
1060 fragment::Column::JobId,
1061 fragment::Column::StateTableIds,
1062 ])
1063 .into_partial_model()
1064 .all(&inner.db)
1065 .await?;
1066 Ok(fragment_state_tables)
1067 }
1068
1069 pub async fn load_all_actors(
1072 &self,
1073 database_id: Option<DatabaseId>,
1074 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
1075 {
1076 let inner = self.inner.read().await;
1077 let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
1078 let filter_condition = if let Some(database_id) = database_id {
1079 filter_condition.and(object::Column::DatabaseId.eq(database_id))
1080 } else {
1081 filter_condition
1082 };
1083 #[expect(clippy::type_complexity)]
1084 let mut actor_info_stream: BoxStream<
1085 '_,
1086 Result<
1087 (
1088 ActorId,
1089 WorkerId,
1090 Option<VnodeBitmap>,
1091 Option<ConnectorSplits>,
1092 FragmentId,
1093 StreamNode,
1094 I32Array,
1095 DistributionType,
1096 i32, i32, DatabaseId,
1099 ObjectId,
1100 ),
1101 _,
1102 >,
1103 > = Actor::find()
1104 .select_only()
1105 .column(actor::Column::ActorId)
1106 .column(actor::Column::WorkerId)
1107 .column(actor::Column::VnodeBitmap)
1108 .column(actor::Column::Splits)
1109 .column(fragment::Column::FragmentId)
1110 .column(fragment::Column::StreamNode)
1111 .column(fragment::Column::StateTableIds)
1112 .column(fragment::Column::DistributionType)
1113 .column(fragment::Column::VnodeCount)
1114 .column(fragment::Column::FragmentTypeMask)
1115 .column(object::Column::DatabaseId)
1116 .column(object::Column::Oid)
1117 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1118 .join(JoinType::InnerJoin, fragment::Relation::Object.def())
1119 .filter(filter_condition)
1120 .into_tuple()
1121 .stream(&inner.db)
1122 .await?;
1123
1124 let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1125 HashMap::new();
1126
1127 while let Some((
1128 actor_id,
1129 worker_id,
1130 vnode_bitmap,
1131 splits,
1132 fragment_id,
1133 node,
1134 state_table_ids,
1135 distribution_type,
1136 vnode_count,
1137 fragment_type_mask,
1138 database_id,
1139 job_id,
1140 )) = actor_info_stream.try_next().await?
1141 {
1142 let fragment_infos = database_fragment_infos
1143 .entry(database_id)
1144 .or_default()
1145 .entry(job_id)
1146 .or_default();
1147 let state_table_ids = state_table_ids.into_inner();
1148 let state_table_ids = state_table_ids
1149 .into_iter()
1150 .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1151 .collect();
1152
1153 let actor_info = InflightActorInfo {
1154 worker_id,
1155 vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1156 splits: splits
1157 .map(|connector_splits| {
1158 connector_splits
1159 .to_protobuf()
1160 .splits
1161 .iter()
1162 .map(|connector_split| SplitImpl::try_from(connector_split).unwrap())
1163 .collect_vec()
1164 })
1165 .unwrap_or_default(),
1166 };
1167 match fragment_infos.entry(fragment_id) {
1168 Entry::Occupied(mut entry) => {
1169 let info: &mut InflightFragmentInfo = entry.get_mut();
1170 assert_eq!(info.state_table_ids, state_table_ids);
1171 assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1172 }
1173 Entry::Vacant(entry) => {
1174 entry.insert(InflightFragmentInfo {
1175 fragment_id: fragment_id as _,
1176 distribution_type,
1177 fragment_type_mask: FragmentTypeMask::from(fragment_type_mask),
1178 vnode_count: vnode_count as _,
1179 nodes: node.to_protobuf(),
1180 actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1181 state_table_ids,
1182 });
1183 }
1184 }
1185 }
1186
1187 debug!(?database_fragment_infos, "reload all actors");
1188
1189 Ok(database_fragment_infos)
1190 }
1191
1192 pub async fn migrate_actors(
1193 &self,
1194 plan: HashMap<WorkerSlotId, WorkerSlotId>,
1195 ) -> MetaResult<()> {
1196 let inner = self.inner.read().await;
1197 let txn = inner.db.begin().await?;
1198
1199 let actors: Vec<(
1200 FragmentId,
1201 DistributionType,
1202 ActorId,
1203 Option<VnodeBitmap>,
1204 WorkerId,
1205 ActorStatus,
1206 )> = Actor::find()
1207 .select_only()
1208 .columns([
1209 fragment::Column::FragmentId,
1210 fragment::Column::DistributionType,
1211 ])
1212 .columns([
1213 actor::Column::ActorId,
1214 actor::Column::VnodeBitmap,
1215 actor::Column::WorkerId,
1216 actor::Column::Status,
1217 ])
1218 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1219 .into_tuple()
1220 .all(&txn)
1221 .await?;
1222
1223 let mut actor_locations = HashMap::new();
1224
1225 for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1226 if *status != ActorStatus::Running {
1227 tracing::warn!(
1228 "skipping actor {} in fragment {} with status {:?}",
1229 actor_id,
1230 fragment_id,
1231 status
1232 );
1233 continue;
1234 }
1235
1236 actor_locations
1237 .entry(*worker_id)
1238 .or_insert(HashMap::new())
1239 .entry(*fragment_id)
1240 .or_insert(BTreeSet::new())
1241 .insert(*actor_id);
1242 }
1243
1244 let expired_or_changed_workers: HashSet<_> =
1245 plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1246
1247 let mut actor_migration_plan = HashMap::new();
1248 for (worker, fragment) in actor_locations {
1249 if expired_or_changed_workers.contains(&worker) {
1250 for (fragment_id, actors) in fragment {
1251 debug!(
1252 "worker {} expired or changed, migrating fragment {}",
1253 worker, fragment_id
1254 );
1255 let worker_slot_to_actor: HashMap<_, _> = actors
1256 .iter()
1257 .enumerate()
1258 .map(|(idx, actor_id)| {
1259 (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1260 })
1261 .collect();
1262
1263 for (worker_slot, actor) in worker_slot_to_actor {
1264 if let Some(target) = plan.get(&worker_slot) {
1265 actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1266 }
1267 }
1268 }
1269 }
1270 }
1271
1272 for (actor, worker) in actor_migration_plan {
1273 Actor::update_many()
1274 .col_expr(
1275 actor::Column::WorkerId,
1276 Expr::value(Value::Int(Some(worker))),
1277 )
1278 .filter(actor::Column::ActorId.eq(actor))
1279 .exec(&txn)
1280 .await?;
1281 }
1282
1283 txn.commit().await?;
1284
1285 Ok(())
1286 }
1287
1288 pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1289 let inner = self.inner.read().await;
1290
1291 let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1292 .select_only()
1293 .columns([fragment::Column::FragmentId])
1294 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1295 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1296 .into_tuple()
1297 .all(&inner.db)
1298 .await?;
1299
1300 let mut actor_locations = HashMap::new();
1301
1302 for (fragment_id, _, worker_id) in actors {
1303 *actor_locations
1304 .entry(worker_id)
1305 .or_insert(HashMap::new())
1306 .entry(fragment_id)
1307 .or_insert(0_usize) += 1;
1308 }
1309
1310 let mut result = HashSet::new();
1311 for (worker_id, mapping) in actor_locations {
1312 let max_fragment_len = mapping.values().max().unwrap();
1313
1314 result
1315 .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1316 }
1317
1318 Ok(result)
1319 }
1320
1321 pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1322 let inner = self.inner.read().await;
1323 let txn = inner.db.begin().await?;
1324 for assignments in split_assignment.values() {
1325 for (actor_id, splits) in assignments {
1326 let actor_splits = splits.iter().map(Into::into).collect_vec();
1327 Actor::update(actor::ActiveModel {
1328 actor_id: Set(*actor_id as _),
1329 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1330 splits: actor_splits,
1331 }))),
1332 ..Default::default()
1333 })
1334 .exec(&txn)
1335 .await
1336 .map_err(|err| {
1337 if err == DbErr::RecordNotUpdated {
1338 MetaError::catalog_id_not_found("actor_id", actor_id)
1339 } else {
1340 err.into()
1341 }
1342 })?;
1343 }
1344 }
1345 txn.commit().await?;
1346
1347 Ok(())
1348 }
1349
1350 #[await_tree::instrument]
1351 pub async fn fill_snapshot_backfill_epoch(
1352 &self,
1353 fragment_ids: impl Iterator<Item = FragmentId>,
1354 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1355 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1356 ) -> MetaResult<()> {
1357 let inner = self.inner.write().await;
1358 let txn = inner.db.begin().await?;
1359 for fragment_id in fragment_ids {
1360 let fragment = FragmentModel::find_by_id(fragment_id)
1361 .one(&txn)
1362 .await?
1363 .context(format!("fragment {} not found", fragment_id))?;
1364 let mut node = fragment.stream_node.to_protobuf();
1365 if crate::stream::fill_snapshot_backfill_epoch(
1366 &mut node,
1367 snapshot_backfill_info,
1368 cross_db_snapshot_backfill_info,
1369 )? {
1370 let node = StreamNode::from(&node);
1371 FragmentModel::update(fragment::ActiveModel {
1372 fragment_id: Set(fragment_id),
1373 stream_node: Set(node),
1374 ..Default::default()
1375 })
1376 .exec(&txn)
1377 .await?;
1378 }
1379 }
1380 txn.commit().await?;
1381 Ok(())
1382 }
1383
1384 pub fn get_running_actors_of_fragment(
1386 &self,
1387 fragment_id: FragmentId,
1388 ) -> MetaResult<HashSet<model::ActorId>> {
1389 let info = self.env.shared_actor_infos().read_guard();
1390
1391 let actors = info
1392 .get_fragment(fragment_id as _)
1393 .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1394 .unwrap_or_default();
1395
1396 Ok(actors)
1397 }
1398
1399 pub async fn get_running_actors_for_source_backfill(
1402 &self,
1403 source_backfill_fragment_id: FragmentId,
1404 source_fragment_id: FragmentId,
1405 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1406 let inner = self.inner.read().await;
1407 let txn = inner.db.begin().await?;
1408
1409 let fragment_relation: DispatcherType = FragmentRelation::find()
1410 .select_only()
1411 .column(fragment_relation::Column::DispatcherType)
1412 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1413 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1414 .into_tuple()
1415 .one(&txn)
1416 .await?
1417 .ok_or_else(|| {
1418 anyhow!(
1419 "no fragment connection from source fragment {} to source backfill fragment {}",
1420 source_fragment_id,
1421 source_backfill_fragment_id
1422 )
1423 })?;
1424
1425 if fragment_relation != DispatcherType::NoShuffle {
1426 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1427 }
1428
1429 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1430 let result: MetaResult<DistributionType> = try {
1431 FragmentModel::find_by_id(fragment_id)
1432 .select_only()
1433 .column(fragment::Column::DistributionType)
1434 .into_tuple()
1435 .one(txn)
1436 .await?
1437 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1438 };
1439 result
1440 };
1441
1442 let source_backfill_distribution_type =
1443 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1444 let source_distribution_type =
1445 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1446
1447 let load_fragment_actor_distribution =
1448 |actor_info: &SharedActorInfos,
1449 fragment_id: FragmentId|
1450 -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1451 let guard = actor_info.read_guard();
1452
1453 guard
1454 .get_fragment(fragment_id as _)
1455 .map(|fragment| {
1456 fragment
1457 .actors
1458 .iter()
1459 .map(|(actor_id, actor)| {
1460 (
1461 *actor_id as _,
1462 actor
1463 .vnode_bitmap
1464 .as_ref()
1465 .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1466 )
1467 })
1468 .collect()
1469 })
1470 .unwrap_or_default()
1471 };
1472
1473 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1474 load_fragment_actor_distribution(
1475 self.env.shared_actor_infos(),
1476 source_backfill_fragment_id,
1477 );
1478
1479 let source_actors =
1480 load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1481
1482 Ok(resolve_no_shuffle_actor_dispatcher(
1483 source_distribution_type,
1484 &source_actors,
1485 source_backfill_distribution_type,
1486 &source_backfill_actors,
1487 )
1488 .into_iter()
1489 .map(|(source_actor, source_backfill_actor)| {
1490 (source_backfill_actor as _, source_actor as _)
1491 })
1492 .collect())
1493 }
1494
1495 pub async fn get_root_fragments(
1508 &self,
1509 job_ids: Vec<ObjectId>,
1510 ) -> MetaResult<(
1511 HashMap<ObjectId, (SharedFragmentInfo, PbStreamNode)>,
1512 HashMap<ActorId, WorkerId>,
1513 )> {
1514 let inner = self.inner.read().await;
1515
1516 let all_fragments = FragmentModel::find()
1517 .filter(fragment::Column::JobId.is_in(job_ids))
1518 .all(&inner.db)
1519 .await?;
1520 let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1522 for fragment in all_fragments {
1523 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1524 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1525 _ = root_fragments.insert(fragment.job_id, fragment);
1526 } else if mask.contains(FragmentTypeFlag::Source) {
1527 _ = root_fragments.try_insert(fragment.job_id, fragment);
1530 }
1531 }
1532
1533 let mut root_fragments_pb = HashMap::new();
1534
1535 let info = self.env.shared_actor_infos().read_guard();
1536
1537 let root_fragment_to_jobs: HashMap<_, _> = root_fragments
1538 .iter()
1539 .map(|(job_id, fragment)| (fragment.fragment_id as u32, *job_id))
1540 .collect();
1541
1542 for fragment in root_fragment_to_jobs.keys() {
1543 let fragment_info = info.get_fragment(*fragment).context(format!(
1544 "fragment {} not found in shared actor info",
1545 fragment
1546 ))?;
1547
1548 let job_id = root_fragment_to_jobs[&fragment_info.fragment_id];
1549 let fragment = root_fragments
1550 .get(&job_id)
1551 .context(format!("root fragment for job {} not found", job_id))?;
1552
1553 root_fragments_pb.insert(
1554 job_id,
1555 (fragment_info.clone(), fragment.stream_node.to_protobuf()),
1556 );
1557 }
1558
1559 let mut all_actor_locations = HashMap::new();
1560
1561 for (_, SharedFragmentInfo { actors, .. }) in info.iter_over_fragments() {
1562 for (actor_id, actor_info) in actors {
1563 all_actor_locations.insert(*actor_id as ActorId, actor_info.worker_id);
1564 }
1565 }
1566
1567 Ok((root_fragments_pb, all_actor_locations))
1568 }
1569
1570 pub async fn get_root_fragment(
1571 &self,
1572 job_id: ObjectId,
1573 ) -> MetaResult<(SharedFragmentInfo, HashMap<ActorId, WorkerId>)> {
1574 let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1575 let (root_fragment, _) = root_fragments
1576 .remove(&job_id)
1577 .context(format!("root fragment for job {} not found", job_id))?;
1578
1579 Ok((root_fragment, actors))
1580 }
1581
1582 pub async fn get_downstream_fragments(
1584 &self,
1585 job_id: ObjectId,
1586 ) -> MetaResult<(
1587 Vec<(
1588 stream_plan::DispatcherType,
1589 SharedFragmentInfo,
1590 PbStreamNode,
1591 )>,
1592 HashMap<ActorId, WorkerId>,
1593 )> {
1594 let (root_fragment, actor_locations) = self.get_root_fragment(job_id).await?;
1595
1596 let inner = self.inner.read().await;
1597 let txn = inner.db.begin().await?;
1598 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1599 .filter(
1600 fragment_relation::Column::SourceFragmentId
1601 .eq(root_fragment.fragment_id as FragmentId),
1602 )
1603 .all(&txn)
1604 .await?;
1605
1606 let downstream_fragment_ids = downstream_fragment_relations
1607 .iter()
1608 .map(|model| model.target_fragment_id as FragmentId)
1609 .collect::<HashSet<_>>();
1610
1611 let downstream_fragment_nodes: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1612 .select_only()
1613 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1614 .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1615 .into_tuple()
1616 .all(&txn)
1617 .await?;
1618
1619 let downstream_fragment_nodes: HashMap<_, _> = downstream_fragment_nodes
1620 .into_iter()
1621 .map(|(id, node)| (id as u32, node))
1622 .collect();
1623
1624 let mut downstream_fragments = vec![];
1625
1626 let info = self.env.shared_actor_infos().read_guard();
1627
1628 let fragment_map: HashMap<_, _> = downstream_fragment_relations
1629 .iter()
1630 .map(|model| (model.target_fragment_id as u32, model.dispatcher_type))
1631 .collect();
1632
1633 for fragment_id in fragment_map.keys() {
1634 let fragment_info @ SharedFragmentInfo { actors, .. } =
1635 info.get_fragment(*fragment_id).unwrap();
1636
1637 let dispatcher_type = fragment_map[fragment_id];
1638
1639 if actors.is_empty() {
1640 bail!("No fragment found for fragment id {}", fragment_id);
1641 }
1642
1643 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1644
1645 let nodes = downstream_fragment_nodes
1646 .get(fragment_id)
1647 .context(format!(
1648 "downstream fragment node for id {} not found",
1649 fragment_id
1650 ))?
1651 .to_protobuf();
1652
1653 downstream_fragments.push((dispatch_type, fragment_info.clone(), nodes));
1654 }
1655 Ok((downstream_fragments, actor_locations))
1656 }
1657
1658 pub async fn load_source_fragment_ids(
1659 &self,
1660 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1661 let inner = self.inner.read().await;
1662 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1663 .select_only()
1664 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1665 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1666 .into_tuple()
1667 .all(&inner.db)
1668 .await?;
1669
1670 let mut source_fragment_ids = HashMap::new();
1671 for (fragment_id, stream_node) in fragments {
1672 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1673 source_fragment_ids
1674 .entry(source_id as SourceId)
1675 .or_insert_with(BTreeSet::new)
1676 .insert(fragment_id);
1677 }
1678 }
1679 Ok(source_fragment_ids)
1680 }
1681
1682 pub async fn load_backfill_fragment_ids(
1683 &self,
1684 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1685 let inner = self.inner.read().await;
1686 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1687 .select_only()
1688 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1689 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1690 .into_tuple()
1691 .all(&inner.db)
1692 .await?;
1693
1694 let mut source_fragment_ids = HashMap::new();
1695 for (fragment_id, stream_node) in fragments {
1696 if let Some((source_id, upstream_source_fragment_id)) =
1697 stream_node.to_protobuf().find_source_backfill()
1698 {
1699 source_fragment_ids
1700 .entry(source_id as SourceId)
1701 .or_insert_with(BTreeSet::new)
1702 .insert((fragment_id, upstream_source_fragment_id));
1703 }
1704 }
1705 Ok(source_fragment_ids)
1706 }
1707
1708 pub async fn get_actual_job_fragment_parallelism(
1710 &self,
1711 job_id: ObjectId,
1712 ) -> MetaResult<Option<usize>> {
1713 let inner = self.inner.read().await;
1714 let fragments: Vec<(FragmentId, i64)> = FragmentModel::find()
1715 .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1716 .select_only()
1717 .columns([fragment::Column::FragmentId])
1718 .column_as(actor::Column::ActorId.count(), "count")
1719 .filter(
1720 fragment::Column::JobId
1721 .eq(job_id)
1722 .and(FragmentTypeMask::intersects_any([
1723 FragmentTypeFlag::Mview,
1724 FragmentTypeFlag::Sink,
1725 ])),
1726 )
1727 .group_by(fragment::Column::FragmentId)
1728 .into_tuple()
1729 .all(&inner.db)
1730 .await?;
1731
1732 Ok(fragments
1733 .into_iter()
1734 .at_most_one()
1735 .ok()
1736 .flatten()
1737 .map(|(_, count)| count as usize))
1738 }
1739
1740 pub async fn get_all_upstream_sink_infos(
1741 &self,
1742 target_table: &PbTable,
1743 target_fragment_id: FragmentId,
1744 ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1745 let incoming_sinks = self.get_table_incoming_sinks(target_table.id as _).await?;
1746
1747 let inner = self.inner.read().await;
1748 let txn = inner.db.begin().await?;
1749
1750 let sink_ids = incoming_sinks.iter().map(|s| s.id as SinkId).collect_vec();
1751 let sink_fragment_ids = get_sink_fragment_by_ids(&txn, sink_ids).await?;
1752
1753 let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1754 for pb_sink in &incoming_sinks {
1755 let sink_fragment_id =
1756 sink_fragment_ids
1757 .get(&(pb_sink.id as _))
1758 .cloned()
1759 .ok_or(anyhow::anyhow!(
1760 "sink fragment not found for sink id {}",
1761 pb_sink.id
1762 ))?;
1763 let upstream_info = build_upstream_sink_info(
1764 pb_sink,
1765 sink_fragment_id,
1766 target_table,
1767 target_fragment_id,
1768 )?;
1769 upstream_sink_infos.push(upstream_info);
1770 }
1771
1772 Ok(upstream_sink_infos)
1773 }
1774
1775 pub async fn get_mview_fragment_by_id(&self, table_id: TableId) -> MetaResult<FragmentId> {
1776 let inner = self.inner.read().await;
1777 let txn = inner.db.begin().await?;
1778
1779 let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1780 .select_only()
1781 .column(fragment::Column::FragmentId)
1782 .filter(
1783 fragment::Column::JobId
1784 .eq(table_id)
1785 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1786 )
1787 .into_tuple()
1788 .all(&txn)
1789 .await?;
1790
1791 if mview_fragment.len() != 1 {
1792 return Err(anyhow::anyhow!(
1793 "expected exactly one mview fragment for table {}, found {}",
1794 table_id,
1795 mview_fragment.len()
1796 )
1797 .into());
1798 }
1799
1800 Ok(mview_fragment.into_iter().next().unwrap())
1801 }
1802
1803 pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1804 let inner = self.inner.read().await;
1805 let txn = inner.db.begin().await?;
1806 has_table_been_migrated(&txn, table_id).await
1807 }
1808
1809 pub async fn update_fragment_splits(
1810 &self,
1811 fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
1812 ) -> MetaResult<()> {
1813 if fragment_splits.is_empty() {
1814 return Ok(());
1815 }
1816
1817 let inner = self.inner.write().await;
1818 let txn = inner.db.begin().await?;
1819
1820 let models: Vec<fragment_splits::ActiveModel> = fragment_splits
1821 .iter()
1822 .map(|(fragment_id, splits)| fragment_splits::ActiveModel {
1823 fragment_id: Set(*fragment_id as _),
1824 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1825 splits: splits.iter().map(Into::into).collect_vec(),
1826 }))),
1827 })
1828 .collect();
1829
1830 FragmentSplits::insert_many(models)
1831 .on_conflict(
1832 OnConflict::column(fragment_splits::Column::FragmentId)
1833 .update_column(fragment_splits::Column::Splits)
1834 .to_owned(),
1835 )
1836 .exec(&txn)
1837 .await?;
1838
1839 txn.commit().await?;
1840
1841 Ok(())
1842 }
1843}
1844
1845#[cfg(test)]
1846mod tests {
1847 use std::collections::{BTreeMap, HashMap, HashSet};
1848
1849 use itertools::Itertools;
1850 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1851 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1852 use risingwave_common::util::iter_util::ZipEqDebug;
1853 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1854 use risingwave_meta_model::actor::ActorStatus;
1855 use risingwave_meta_model::fragment::DistributionType;
1856 use risingwave_meta_model::{
1857 ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1858 VnodeBitmap, actor, fragment,
1859 };
1860 use risingwave_pb::common::PbActorLocation;
1861 use risingwave_pb::meta::table_fragments::PbActorStatus;
1862 use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1863 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1864 use risingwave_pb::plan_common::PbExprContext;
1865 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1866 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1867 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1868
1869 use crate::MetaResult;
1870 use crate::controller::catalog::CatalogController;
1871 use crate::model::{Fragment, StreamActor};
1872
1873 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1874
1875 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1876
1877 const TEST_FRAGMENT_ID: FragmentId = 1;
1878
1879 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1880
1881 const TEST_JOB_ID: ObjectId = 1;
1882
1883 const TEST_STATE_TABLE_ID: TableId = 1000;
1884
1885 fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1886 let mut upstream_actor_ids = BTreeMap::new();
1887 upstream_actor_ids.insert(
1888 TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1889 HashSet::from_iter([(actor_id + 100)]),
1890 );
1891 upstream_actor_ids.insert(
1892 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1893 HashSet::from_iter([(actor_id + 200)]),
1894 );
1895 upstream_actor_ids
1896 }
1897
1898 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1899 let mut input = vec![];
1900 for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1901 input.push(PbStreamNode {
1902 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1903 upstream_fragment_id: *upstream_fragment_id as _,
1904 ..Default::default()
1905 }))),
1906 ..Default::default()
1907 });
1908 }
1909
1910 PbStreamNode {
1911 input,
1912 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1913 ..Default::default()
1914 }
1915 }
1916
1917 #[tokio::test]
1918 async fn test_extract_fragment() -> MetaResult<()> {
1919 let actor_count = 3u32;
1920 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1921 .map(|actor_id| {
1922 (
1923 actor_id as _,
1924 generate_upstream_actor_ids_for_actor(actor_id),
1925 )
1926 })
1927 .collect();
1928
1929 let actor_bitmaps = ActorMapping::new_uniform(
1930 (0..actor_count).map(|i| i as _),
1931 VirtualNode::COUNT_FOR_TEST,
1932 )
1933 .to_bitmaps();
1934
1935 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1936
1937 let pb_actors = (0..actor_count)
1938 .map(|actor_id| StreamActor {
1939 actor_id: actor_id as _,
1940 fragment_id: TEST_FRAGMENT_ID as _,
1941 vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1942 mview_definition: "".to_owned(),
1943 expr_context: Some(PbExprContext {
1944 time_zone: String::from("America/New_York"),
1945 strict_mode: false,
1946 }),
1947 })
1948 .collect_vec();
1949
1950 let pb_fragment = Fragment {
1951 fragment_id: TEST_FRAGMENT_ID as _,
1952 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1953 distribution_type: PbFragmentDistributionType::Hash as _,
1954 actors: pb_actors.clone(),
1955 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1956 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1957 nodes: stream_node.clone(),
1958 };
1959
1960 let pb_actor_status = (0..actor_count)
1961 .map(|actor_id| {
1962 (
1963 actor_id,
1964 PbActorStatus {
1965 location: PbActorLocation::from_worker(0),
1966 state: PbActorState::Running as _,
1967 },
1968 )
1969 })
1970 .collect();
1971
1972 let pb_actor_splits = Default::default();
1973
1974 let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
1975 TEST_JOB_ID,
1976 &pb_fragment,
1977 &pb_actor_status,
1978 &pb_actor_splits,
1979 )?;
1980
1981 check_fragment(fragment, pb_fragment);
1982 check_actors(
1983 actors,
1984 &upstream_actor_ids,
1985 pb_actors,
1986 Default::default(),
1987 &stream_node,
1988 );
1989
1990 Ok(())
1991 }
1992
1993 #[tokio::test]
1994 async fn test_compose_fragment() -> MetaResult<()> {
1995 let actor_count = 3u32;
1996
1997 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1998 .map(|actor_id| {
1999 (
2000 actor_id as _,
2001 generate_upstream_actor_ids_for_actor(actor_id),
2002 )
2003 })
2004 .collect();
2005
2006 let mut actor_bitmaps = ActorMapping::new_uniform(
2007 (0..actor_count).map(|i| i as _),
2008 VirtualNode::COUNT_FOR_TEST,
2009 )
2010 .to_bitmaps();
2011
2012 let actors = (0..actor_count)
2013 .map(|actor_id| {
2014 let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
2015 splits: vec![PbConnectorSplit {
2016 split_type: "dummy".to_owned(),
2017 ..Default::default()
2018 }],
2019 }));
2020
2021 #[expect(deprecated)]
2022 actor::Model {
2023 actor_id: actor_id as ActorId,
2024 fragment_id: TEST_FRAGMENT_ID,
2025 status: ActorStatus::Running,
2026 splits: actor_splits,
2027 worker_id: 0,
2028 upstream_actor_ids: Default::default(),
2029 vnode_bitmap: actor_bitmaps
2030 .remove(&actor_id)
2031 .map(|bitmap| bitmap.to_protobuf())
2032 .as_ref()
2033 .map(VnodeBitmap::from),
2034 expr_context: ExprContext::from(&PbExprContext {
2035 time_zone: String::from("America/New_York"),
2036 strict_mode: false,
2037 }),
2038 }
2039 })
2040 .collect_vec();
2041
2042 let stream_node = {
2043 let template_actor = actors.first().cloned().unwrap();
2044
2045 let template_upstream_actor_ids = upstream_actor_ids
2046 .get(&(template_actor.actor_id as _))
2047 .unwrap();
2048
2049 generate_merger_stream_node(template_upstream_actor_ids)
2050 };
2051
2052 #[expect(deprecated)]
2053 let fragment = fragment::Model {
2054 fragment_id: TEST_FRAGMENT_ID,
2055 job_id: TEST_JOB_ID,
2056 fragment_type_mask: 0,
2057 distribution_type: DistributionType::Hash,
2058 stream_node: StreamNode::from(&stream_node),
2059 state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
2060 upstream_fragment_id: Default::default(),
2061 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2062 };
2063
2064 let (pb_fragment, pb_actor_status, pb_actor_splits) =
2065 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2066
2067 assert_eq!(pb_actor_status.len(), actor_count as usize);
2068 assert_eq!(pb_actor_splits.len(), actor_count as usize);
2069
2070 let pb_actors = pb_fragment.actors.clone();
2071
2072 check_fragment(fragment, pb_fragment);
2073 check_actors(
2074 actors,
2075 &upstream_actor_ids,
2076 pb_actors,
2077 pb_actor_splits,
2078 &stream_node,
2079 );
2080
2081 Ok(())
2082 }
2083
2084 fn check_actors(
2085 actors: Vec<actor::Model>,
2086 actor_upstreams: &FragmentActorUpstreams,
2087 pb_actors: Vec<StreamActor>,
2088 pb_actor_splits: HashMap<u32, PbConnectorSplits>,
2089 stream_node: &PbStreamNode,
2090 ) {
2091 for (
2092 actor::Model {
2093 actor_id,
2094 fragment_id,
2095 status,
2096 splits,
2097 worker_id: _,
2098 vnode_bitmap,
2099 expr_context,
2100 ..
2101 },
2102 StreamActor {
2103 actor_id: pb_actor_id,
2104 fragment_id: pb_fragment_id,
2105 vnode_bitmap: pb_vnode_bitmap,
2106 mview_definition,
2107 expr_context: pb_expr_context,
2108 ..
2109 },
2110 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2111 {
2112 assert_eq!(actor_id, pb_actor_id as ActorId);
2113 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2114
2115 assert_eq!(
2116 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2117 pb_vnode_bitmap,
2118 );
2119
2120 assert_eq!(mview_definition, "");
2121
2122 visit_stream_node_body(stream_node, |body| {
2123 if let PbNodeBody::Merge(m) = body {
2124 assert!(
2125 actor_upstreams
2126 .get(&(actor_id as _))
2127 .unwrap()
2128 .contains_key(&m.upstream_fragment_id)
2129 );
2130 }
2131 });
2132
2133 assert_eq!(status, ActorStatus::Running);
2134
2135 assert_eq!(
2136 splits,
2137 pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
2138 );
2139
2140 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2141 }
2142 }
2143
2144 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2145 let Fragment {
2146 fragment_id,
2147 fragment_type_mask,
2148 distribution_type: pb_distribution_type,
2149 actors: _,
2150 state_table_ids: pb_state_table_ids,
2151 maybe_vnode_count: _,
2152 nodes,
2153 } = pb_fragment;
2154
2155 assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2156 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2157 assert_eq!(
2158 pb_distribution_type,
2159 PbFragmentDistributionType::from(fragment.distribution_type)
2160 );
2161
2162 assert_eq!(
2163 pb_state_table_ids,
2164 fragment.state_table_ids.into_u32_array()
2165 );
2166 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2167 }
2168}