1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
26use risingwave_common::util::stream_graph_visitor::{
27 visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_connector::source::SplitImpl;
30use risingwave_meta_model::actor::ActorStatus;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34 Actor, Fragment as FragmentModel, FragmentRelation, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39 VnodeBitmap, WorkerId, actor, database, fragment, fragment_relation, object, sink, source,
40 streaming_job, table,
41};
42use risingwave_meta_model_migration::{Alias, SelectStatement};
43use risingwave_pb::common::PbActorLocation;
44use risingwave_pb::meta::subscribe_response::{
45 Info as NotificationInfo, Operation as NotificationOperation,
46};
47use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
48use risingwave_pb::meta::table_fragments::fragment::{
49 FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan::stream_node::NodeBody;
55use risingwave_pb::stream_plan::{
56 PbDispatchOutputMapping, PbDispatcherType, PbStreamContext, PbStreamNode, PbStreamScanType,
57 StreamScanType,
58};
59use sea_orm::ActiveValue::Set;
60use sea_orm::sea_query::Expr;
61use sea_orm::{
62 ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, ModelTrait, PaginatorTrait,
63 QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
64};
65use serde::{Deserialize, Serialize};
66use tracing::debug;
67
68use crate::barrier::SnapshotBackfillInfo;
69use crate::controller::catalog::{CatalogController, CatalogControllerInner};
70use crate::controller::scale::resolve_streaming_job_definition;
71use crate::controller::utils::{
72 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
73 get_fragment_mappings, resolve_no_shuffle_actor_dispatcher,
74};
75use crate::manager::{LocalNotification, NotificationManager};
76use crate::model::{
77 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
78 StreamActor, StreamContext, StreamJobFragments, TableParallelism,
79};
80use crate::stream::{SplitAssignment, build_actor_split_impls};
81use crate::{MetaError, MetaResult};
82
83#[derive(Clone, Debug)]
85pub struct InflightActorInfo {
86 pub worker_id: WorkerId,
87 pub vnode_bitmap: Option<Bitmap>,
88}
89
90#[derive(Clone, Debug)]
91pub struct InflightFragmentInfo {
92 pub fragment_id: crate::model::FragmentId,
93 pub distribution_type: DistributionType,
94 pub nodes: PbStreamNode,
95 pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
96 pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
97}
98
99#[derive(Clone, Debug)]
100pub struct FragmentParallelismInfo {
101 pub distribution_type: FragmentDistributionType,
102 pub actor_count: usize,
103 pub vnode_count: usize,
104}
105
106#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
107#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
109 pub job_id: ObjectId,
110 pub obj_type: ObjectType,
111 pub name: String,
112 pub job_status: JobStatus,
113 pub parallelism: StreamingParallelism,
114 pub max_parallelism: i32,
115 pub resource_group: String,
116 pub database_id: DatabaseId,
117 pub schema_id: SchemaId,
118}
119
120impl CatalogControllerInner {
121 pub async fn all_running_fragment_mappings(
123 &self,
124 ) -> MetaResult<impl Iterator<Item = FragmentWorkerSlotMapping> + '_> {
125 let txn = self.db.begin().await?;
126
127 let job_ids: Vec<ObjectId> = StreamingJob::find()
128 .select_only()
129 .column(streaming_job::Column::JobId)
130 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
131 .into_tuple()
132 .all(&txn)
133 .await?;
134
135 let mut result = vec![];
136 for job_id in job_ids {
137 let mappings = get_fragment_mappings(&txn, job_id).await?;
138
139 result.extend(mappings.into_iter());
140 }
141
142 Ok(result.into_iter())
143 }
144}
145
146impl NotificationManager {
147 pub(crate) fn notify_fragment_mapping(
148 &self,
149 operation: NotificationOperation,
150 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
151 ) {
152 let fragment_ids = fragment_mappings
153 .iter()
154 .map(|mapping| mapping.fragment_id)
155 .collect_vec();
156 if fragment_ids.is_empty() {
157 return;
158 }
159 for fragment_mapping in fragment_mappings {
161 self.notify_frontend_without_version(
162 operation,
163 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
164 );
165 }
166
167 match operation {
169 NotificationOperation::Add | NotificationOperation::Update => {
170 self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
171 fragment_ids,
172 ));
173 }
174 NotificationOperation::Delete => {
175 self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
176 fragment_ids,
177 ));
178 }
179 op => {
180 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
181 }
182 }
183 }
184}
185
186impl CatalogController {
187 pub fn extract_fragment_and_actors_from_fragments(
188 job_id: ObjectId,
189 fragments: impl Iterator<Item = &Fragment>,
190 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
191 actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
192 ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
193 fragments
194 .map(|fragment| {
195 Self::extract_fragment_and_actors_for_new_job(
196 job_id,
197 fragment,
198 actor_status,
199 actor_splits,
200 )
201 })
202 .try_collect()
203 }
204
205 pub fn extract_fragment_and_actors_for_new_job(
206 job_id: ObjectId,
207 fragment: &Fragment,
208 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
209 actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
210 ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
211 let vnode_count = fragment.vnode_count();
212 let Fragment {
213 fragment_id: pb_fragment_id,
214 fragment_type_mask: pb_fragment_type_mask,
215 distribution_type: pb_distribution_type,
216 actors: pb_actors,
217 state_table_ids: pb_state_table_ids,
218 nodes,
219 ..
220 } = fragment;
221
222 let state_table_ids = pb_state_table_ids.clone().into();
223
224 assert!(!pb_actors.is_empty());
225
226 let stream_node = {
227 let mut stream_node = nodes.clone();
228 visit_stream_node_mut(&mut stream_node, |body| {
229 #[expect(deprecated)]
230 if let NodeBody::Merge(m) = body {
231 m.upstream_actor_id = vec![];
232 }
233 });
234
235 stream_node
236 };
237
238 let mut actors = vec![];
239
240 for actor in pb_actors {
241 let StreamActor {
242 actor_id,
243 fragment_id,
244 vnode_bitmap,
245 mview_definition: _,
246 expr_context: pb_expr_context,
247 ..
248 } = actor;
249
250 let splits = actor_splits.get(actor_id).map(|splits| {
251 ConnectorSplits::from(&PbConnectorSplits {
252 splits: splits.iter().map(ConnectorSplit::from).collect(),
253 })
254 });
255 let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
256 anyhow::anyhow!(
257 "actor {} in fragment {} has no actor_status",
258 actor_id,
259 fragment_id
260 )
261 })?;
262
263 let worker_id = status.worker_id() as _;
264
265 let pb_expr_context = pb_expr_context
266 .as_ref()
267 .expect("no expression context found");
268
269 #[expect(deprecated)]
270 actors.push(actor::Model {
271 actor_id: *actor_id as _,
272 fragment_id: *fragment_id as _,
273 status: status.get_state().unwrap().into(),
274 splits,
275 worker_id,
276 upstream_actor_ids: Default::default(),
277 vnode_bitmap: vnode_bitmap
278 .as_ref()
279 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
280 expr_context: ExprContext::from(pb_expr_context),
281 });
282 }
283
284 let stream_node = StreamNode::from(&stream_node);
285
286 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
287 .unwrap()
288 .into();
289
290 #[expect(deprecated)]
291 let fragment = fragment::Model {
292 fragment_id: *pb_fragment_id as _,
293 job_id,
294 fragment_type_mask: (*pb_fragment_type_mask).into(),
295 distribution_type,
296 stream_node,
297 state_table_ids,
298 upstream_fragment_id: Default::default(),
299 vnode_count: vnode_count as _,
300 };
301
302 Ok((fragment, actors))
303 }
304
305 pub fn compose_table_fragments(
306 table_id: u32,
307 state: PbState,
308 ctx: Option<PbStreamContext>,
309 fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
310 parallelism: StreamingParallelism,
311 max_parallelism: usize,
312 job_definition: Option<String>,
313 ) -> MetaResult<StreamJobFragments> {
314 let mut pb_fragments = BTreeMap::new();
315 let mut pb_actor_splits = HashMap::new();
316 let mut pb_actor_status = BTreeMap::new();
317
318 for (fragment, actors) in fragments {
319 let (fragment, fragment_actor_status, fragment_actor_splits) =
320 Self::compose_fragment(fragment, actors, job_definition.clone())?;
321
322 pb_fragments.insert(fragment.fragment_id, fragment);
323
324 pb_actor_splits.extend(build_actor_split_impls(&fragment_actor_splits));
325 pb_actor_status.extend(fragment_actor_status.into_iter());
326 }
327
328 let table_fragments = StreamJobFragments {
329 stream_job_id: table_id.into(),
330 state: state as _,
331 fragments: pb_fragments,
332 actor_status: pb_actor_status,
333 actor_splits: pb_actor_splits,
334 ctx: ctx
335 .as_ref()
336 .map(StreamContext::from_protobuf)
337 .unwrap_or_default(),
338 assigned_parallelism: match parallelism {
339 StreamingParallelism::Custom => TableParallelism::Custom,
340 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
341 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
342 },
343 max_parallelism,
344 };
345
346 Ok(table_fragments)
347 }
348
349 #[allow(clippy::type_complexity)]
350 pub(crate) fn compose_fragment(
351 fragment: fragment::Model,
352 actors: Vec<actor::Model>,
353 job_definition: Option<String>,
354 ) -> MetaResult<(
355 Fragment,
356 HashMap<crate::model::ActorId, PbActorStatus>,
357 HashMap<crate::model::ActorId, PbConnectorSplits>,
358 )> {
359 let fragment::Model {
360 fragment_id,
361 fragment_type_mask,
362 distribution_type,
363 stream_node,
364 state_table_ids,
365 vnode_count,
366 ..
367 } = fragment;
368
369 let stream_node = stream_node.to_protobuf();
370 let mut upstream_fragments = HashSet::new();
371 visit_stream_node_body(&stream_node, |body| {
372 if let NodeBody::Merge(m) = body {
373 assert!(
374 upstream_fragments.insert(m.upstream_fragment_id),
375 "non-duplicate upstream fragment"
376 );
377 }
378 });
379
380 let mut pb_actors = vec![];
381
382 let mut pb_actor_status = HashMap::new();
383 let mut pb_actor_splits = HashMap::new();
384
385 for actor in actors {
386 if actor.fragment_id != fragment_id {
387 bail!(
388 "fragment id {} from actor {} is different from fragment {}",
389 actor.fragment_id,
390 actor.actor_id,
391 fragment_id
392 )
393 }
394
395 let actor::Model {
396 actor_id,
397 fragment_id,
398 status,
399 worker_id,
400 splits,
401 vnode_bitmap,
402 expr_context,
403 ..
404 } = actor;
405
406 let vnode_bitmap =
407 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
408 let pb_expr_context = Some(expr_context.to_protobuf());
409
410 pb_actor_status.insert(
411 actor_id as _,
412 PbActorStatus {
413 location: PbActorLocation::from_worker(worker_id as u32),
414 state: PbActorState::from(status) as _,
415 },
416 );
417
418 if let Some(splits) = splits {
419 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
420 }
421
422 pb_actors.push(StreamActor {
423 actor_id: actor_id as _,
424 fragment_id: fragment_id as _,
425 vnode_bitmap,
426 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
427 expr_context: pb_expr_context,
428 })
429 }
430
431 let pb_state_table_ids = state_table_ids.into_u32_array();
432 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
433 let pb_fragment = Fragment {
434 fragment_id: fragment_id as _,
435 fragment_type_mask: fragment_type_mask.into(),
436 distribution_type: pb_distribution_type,
437 actors: pb_actors,
438 state_table_ids: pb_state_table_ids,
439 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
440 nodes: stream_node,
441 };
442
443 Ok((pb_fragment, pb_actor_status, pb_actor_splits))
444 }
445
446 pub async fn running_fragment_parallelisms(
447 &self,
448 id_filter: Option<HashSet<FragmentId>>,
449 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
450 let inner = self.inner.read().await;
451
452 let query_alias = Alias::new("fragment_actor_count");
453 let count_alias = Alias::new("count");
454
455 let mut query = SelectStatement::new()
456 .column(actor::Column::FragmentId)
457 .expr_as(actor::Column::ActorId.count(), count_alias.clone())
458 .from(Actor)
459 .group_by_col(actor::Column::FragmentId)
460 .to_owned();
461
462 if let Some(id_filter) = id_filter {
463 query.cond_having(actor::Column::FragmentId.is_in(id_filter));
464 }
465
466 let outer = SelectStatement::new()
467 .column((FragmentModel, fragment::Column::FragmentId))
468 .column(count_alias)
469 .column(fragment::Column::DistributionType)
470 .column(fragment::Column::VnodeCount)
471 .from_subquery(query.to_owned(), query_alias.clone())
472 .inner_join(
473 FragmentModel,
474 Expr::col((query_alias, actor::Column::FragmentId))
475 .equals((FragmentModel, fragment::Column::FragmentId)),
476 )
477 .to_owned();
478
479 let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
480 Selector::<SelectGetableTuple<(FragmentId, i64, DistributionType, i32)>>::into_tuple(
481 outer.to_owned(),
482 )
483 .all(&inner.db)
484 .await?;
485
486 Ok(fragment_parallelisms
487 .into_iter()
488 .map(|(fragment_id, count, distribution_type, vnode_count)| {
489 (
490 fragment_id,
491 FragmentParallelismInfo {
492 distribution_type: distribution_type.into(),
493 actor_count: count as usize,
494 vnode_count: vnode_count as usize,
495 },
496 )
497 })
498 .collect())
499 }
500
501 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
502 let inner = self.inner.read().await;
503 let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
504 .select_only()
505 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
506 .into_tuple()
507 .all(&inner.db)
508 .await?;
509 Ok(fragment_jobs.into_iter().collect())
510 }
511
512 pub async fn get_fragment_job_id(
513 &self,
514 fragment_ids: Vec<FragmentId>,
515 ) -> MetaResult<Vec<ObjectId>> {
516 let inner = self.inner.read().await;
517
518 let object_ids: Vec<ObjectId> = FragmentModel::find()
519 .select_only()
520 .column(fragment::Column::JobId)
521 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
522 .into_tuple()
523 .all(&inner.db)
524 .await?;
525
526 Ok(object_ids)
527 }
528
529 pub async fn get_fragment_desc_by_id(
530 &self,
531 fragment_id: FragmentId,
532 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
533 let inner = self.inner.read().await;
534
535 let fragment_opt = FragmentModel::find()
537 .select_only()
538 .columns([
539 fragment::Column::FragmentId,
540 fragment::Column::JobId,
541 fragment::Column::FragmentTypeMask,
542 fragment::Column::DistributionType,
543 fragment::Column::StateTableIds,
544 fragment::Column::VnodeCount,
545 fragment::Column::StreamNode,
546 ])
547 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
548 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
549 .filter(fragment::Column::FragmentId.eq(fragment_id))
550 .group_by(fragment::Column::FragmentId)
551 .into_model::<FragmentDesc>()
552 .one(&inner.db)
553 .await?;
554
555 let Some(fragment) = fragment_opt else {
556 return Ok(None); };
558
559 let upstreams: Vec<FragmentId> = FragmentRelation::find()
561 .select_only()
562 .column(fragment_relation::Column::SourceFragmentId)
563 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
564 .into_tuple()
565 .all(&inner.db)
566 .await?;
567
568 Ok(Some((fragment, upstreams)))
569 }
570
571 pub async fn list_fragment_database_ids(
572 &self,
573 select_fragment_ids: Option<Vec<FragmentId>>,
574 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
575 let inner = self.inner.read().await;
576 let select = FragmentModel::find()
577 .select_only()
578 .column(fragment::Column::FragmentId)
579 .column(object::Column::DatabaseId)
580 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
581 let select = if let Some(select_fragment_ids) = select_fragment_ids {
582 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
583 } else {
584 select
585 };
586 Ok(select.into_tuple().all(&inner.db).await?)
587 }
588
589 pub async fn get_job_fragments_by_id(
590 &self,
591 job_id: ObjectId,
592 ) -> MetaResult<StreamJobFragments> {
593 let inner = self.inner.read().await;
594 let fragment_actors = FragmentModel::find()
595 .find_with_related(Actor)
596 .filter(fragment::Column::JobId.eq(job_id))
597 .all(&inner.db)
598 .await?;
599
600 let job_info = StreamingJob::find_by_id(job_id)
601 .one(&inner.db)
602 .await?
603 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
604
605 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
606 .await?
607 .remove(&job_id);
608
609 Self::compose_table_fragments(
610 job_id as _,
611 job_info.job_status.into(),
612 job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
613 fragment_actors,
614 job_info.parallelism.clone(),
615 job_info.max_parallelism as _,
616 job_definition,
617 )
618 }
619
620 pub async fn get_fragment_actor_dispatchers(
621 &self,
622 fragment_ids: Vec<FragmentId>,
623 ) -> MetaResult<FragmentActorDispatchers> {
624 let inner = self.inner.read().await;
625 get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
626 }
627
628 pub async fn get_fragment_downstream_relations(
629 &self,
630 fragment_ids: Vec<FragmentId>,
631 ) -> MetaResult<FragmentDownstreamRelation> {
632 let inner = self.inner.read().await;
633 let mut stream = FragmentRelation::find()
634 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
635 .stream(&inner.db)
636 .await?;
637 let mut relations = FragmentDownstreamRelation::new();
638 while let Some(relation) = stream.try_next().await? {
639 relations
640 .entry(relation.source_fragment_id as _)
641 .or_default()
642 .push(DownstreamFragmentRelation {
643 downstream_fragment_id: relation.target_fragment_id as _,
644 dispatcher_type: relation.dispatcher_type,
645 dist_key_indices: relation.dist_key_indices.into_u32_array(),
646 output_mapping: PbDispatchOutputMapping {
647 indices: relation.output_indices.into_u32_array(),
648 types: relation
649 .output_type_mapping
650 .unwrap_or_default()
651 .to_protobuf(),
652 },
653 });
654 }
655 Ok(relations)
656 }
657
658 pub async fn get_job_fragment_backfill_scan_type(
659 &self,
660 job_id: ObjectId,
661 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
662 let inner = self.inner.read().await;
663 let fragments: Vec<_> = FragmentModel::find()
664 .filter(fragment::Column::JobId.eq(job_id))
665 .all(&inner.db)
666 .await?;
667
668 let mut result = HashMap::new();
669
670 for fragment::Model {
671 fragment_id,
672 stream_node,
673 ..
674 } in fragments
675 {
676 let stream_node = stream_node.to_protobuf();
677 visit_stream_node_body(&stream_node, |body| {
678 if let NodeBody::StreamScan(node) = body {
679 match node.stream_scan_type() {
680 StreamScanType::Unspecified => {}
681 scan_type => {
682 result.insert(fragment_id as crate::model::FragmentId, scan_type);
683 }
684 }
685 }
686 });
687 }
688
689 Ok(result)
690 }
691
692 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
693 let inner = self.inner.read().await;
694 let job_states = StreamingJob::find()
695 .select_only()
696 .column(streaming_job::Column::JobId)
697 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
698 .join(JoinType::InnerJoin, object::Relation::Database2.def())
699 .column(object::Column::ObjType)
700 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
701 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
702 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
703 .column_as(
704 Expr::if_null(
705 Expr::col((table::Entity, table::Column::Name)),
706 Expr::if_null(
707 Expr::col((source::Entity, source::Column::Name)),
708 Expr::if_null(
709 Expr::col((sink::Entity, sink::Column::Name)),
710 Expr::val("<unknown>"),
711 ),
712 ),
713 ),
714 "name",
715 )
716 .columns([
717 streaming_job::Column::JobStatus,
718 streaming_job::Column::Parallelism,
719 streaming_job::Column::MaxParallelism,
720 ])
721 .column_as(
722 Expr::if_null(
723 Expr::col((
724 streaming_job::Entity,
725 streaming_job::Column::SpecificResourceGroup,
726 )),
727 Expr::col((database::Entity, database::Column::ResourceGroup)),
728 ),
729 "resource_group",
730 )
731 .column(object::Column::DatabaseId)
732 .column(object::Column::SchemaId)
733 .into_model()
734 .all(&inner.db)
735 .await?;
736 Ok(job_states)
737 }
738
739 pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
740 let inner = self.inner.read().await;
741 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
742 .select_only()
743 .column(streaming_job::Column::MaxParallelism)
744 .into_tuple()
745 .one(&inner.db)
746 .await?
747 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
748 Ok(max_parallelism as usize)
749 }
750
751 pub async fn get_job_actor_mapping(
753 &self,
754 job_ids: Vec<ObjectId>,
755 ) -> MetaResult<HashMap<ObjectId, Vec<ActorId>>> {
756 let inner = self.inner.read().await;
757 let job_actors: Vec<(ObjectId, ActorId)> = Actor::find()
758 .select_only()
759 .column(fragment::Column::JobId)
760 .column(actor::Column::ActorId)
761 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
762 .filter(fragment::Column::JobId.is_in(job_ids))
763 .into_tuple()
764 .all(&inner.db)
765 .await?;
766 Ok(job_actors.into_iter().into_group_map())
767 }
768
769 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
771 if let Ok(inner) = self.inner.try_read()
772 && let Ok(job_state_tables) = FragmentModel::find()
773 .select_only()
774 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
775 .into_tuple::<(ObjectId, I32Array)>()
776 .all(&inner.db)
777 .await
778 {
779 let mut job_internal_table_ids = HashMap::new();
780 for (job_id, state_table_ids) in job_state_tables {
781 job_internal_table_ids
782 .entry(job_id)
783 .or_insert_with(Vec::new)
784 .extend(state_table_ids.into_inner());
785 }
786 return Some(job_internal_table_ids.into_iter().collect());
787 }
788 None
789 }
790
791 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
792 let inner = self.inner.read().await;
793 let count = FragmentModel::find().count(&inner.db).await?;
794 Ok(count > 0)
795 }
796
797 pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
798 let inner = self.inner.read().await;
799 let actor_cnt: Vec<(WorkerId, i64)> = Actor::find()
800 .select_only()
801 .column(actor::Column::WorkerId)
802 .column_as(actor::Column::ActorId.count(), "count")
803 .group_by(actor::Column::WorkerId)
804 .into_tuple()
805 .all(&inner.db)
806 .await?;
807
808 Ok(actor_cnt
809 .into_iter()
810 .map(|(worker_id, count)| (worker_id, count as usize))
811 .collect())
812 }
813
814 pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
816 let inner = self.inner.read().await;
817 let jobs = StreamingJob::find().all(&inner.db).await?;
818
819 let mut job_definition = resolve_streaming_job_definition(
820 &inner.db,
821 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
822 )
823 .await?;
824
825 let mut table_fragments = BTreeMap::new();
826 for job in jobs {
827 let fragment_actors = FragmentModel::find()
828 .find_with_related(Actor)
829 .filter(fragment::Column::JobId.eq(job.job_id))
830 .all(&inner.db)
831 .await?;
832
833 table_fragments.insert(
834 job.job_id as ObjectId,
835 Self::compose_table_fragments(
836 job.job_id as _,
837 job.job_status.into(),
838 job.timezone.map(|tz| PbStreamContext { timezone: tz }),
839 fragment_actors,
840 job.parallelism.clone(),
841 job.max_parallelism as _,
842 job_definition.remove(&job.job_id),
843 )?,
844 );
845 }
846
847 Ok(table_fragments)
848 }
849
850 pub async fn cdc_table_backfill_actor_ids(&self) -> MetaResult<HashMap<u32, HashSet<u32>>> {
852 let inner = self.inner.read().await;
853 let mut job_id_actor_ids = HashMap::default();
854 let stream_cdc_scan_flag = FragmentTypeFlag::StreamCdcScan as i32;
855 let fragment_type_mask = stream_cdc_scan_flag;
856 let fragment_actors: Vec<(fragment::Model, Vec<actor::Model>)> = FragmentModel::find()
857 .find_with_related(Actor)
858 .filter(fragment::Column::FragmentTypeMask.eq(fragment_type_mask))
859 .all(&inner.db)
860 .await?;
861 for (fragment, actors) in fragment_actors {
862 let e: &mut HashSet<u32> = job_id_actor_ids.entry(fragment.job_id as _).or_default();
863 e.extend(actors.iter().map(|a| a.actor_id as u32));
864 }
865 Ok(job_id_actor_ids)
866 }
867
868 pub async fn upstream_fragments(
869 &self,
870 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
871 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
872 let inner = self.inner.read().await;
873 let mut stream = FragmentRelation::find()
874 .select_only()
875 .columns([
876 fragment_relation::Column::SourceFragmentId,
877 fragment_relation::Column::TargetFragmentId,
878 ])
879 .filter(
880 fragment_relation::Column::TargetFragmentId
881 .is_in(fragment_ids.map(|id| id as FragmentId)),
882 )
883 .into_tuple::<(FragmentId, FragmentId)>()
884 .stream(&inner.db)
885 .await?;
886 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
887 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
888 upstream_fragments
889 .entry(downstream_fragment_id as crate::model::FragmentId)
890 .or_default()
891 .insert(upstream_fragment_id as crate::model::FragmentId);
892 }
893 Ok(upstream_fragments)
894 }
895
896 pub async fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
897 let inner = self.inner.read().await;
898 let actor_locations: Vec<PartialActorLocation> =
899 Actor::find().into_partial_model().all(&inner.db).await?;
900 Ok(actor_locations)
901 }
902
903 pub async fn list_actor_info(
904 &self,
905 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
906 let inner = self.inner.read().await;
907 let actor_locations: Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)> =
908 Actor::find()
909 .join(JoinType::LeftJoin, actor::Relation::Fragment.def())
910 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
911 .select_only()
912 .columns([actor::Column::ActorId, actor::Column::FragmentId])
913 .column_as(object::Column::Oid, "job_id")
914 .column_as(object::Column::SchemaId, "schema_id")
915 .column_as(object::Column::ObjType, "type")
916 .into_tuple()
917 .all(&inner.db)
918 .await?;
919 Ok(actor_locations)
920 }
921
922 pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
923 let inner = self.inner.read().await;
924
925 let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
926 .select_only()
927 .filter(actor::Column::Splits.is_not_null())
928 .columns([actor::Column::ActorId, actor::Column::FragmentId])
929 .into_tuple()
930 .all(&inner.db)
931 .await?;
932
933 Ok(source_actors)
934 }
935
936 pub async fn list_creating_fragment_descs(
937 &self,
938 ) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
939 let inner = self.inner.read().await;
940 let mut result = Vec::new();
941 let fragments = FragmentModel::find()
942 .select_only()
943 .columns([
944 fragment::Column::FragmentId,
945 fragment::Column::JobId,
946 fragment::Column::FragmentTypeMask,
947 fragment::Column::DistributionType,
948 fragment::Column::StateTableIds,
949 fragment::Column::VnodeCount,
950 fragment::Column::StreamNode,
951 ])
952 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
953 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
954 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
955 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
956 .filter(
957 streaming_job::Column::JobStatus
958 .eq(JobStatus::Initial)
959 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
960 )
961 .group_by(fragment::Column::FragmentId)
962 .into_model::<FragmentDesc>()
963 .all(&inner.db)
964 .await?;
965 for fragment in fragments {
966 let upstreams: Vec<FragmentId> = FragmentRelation::find()
967 .select_only()
968 .column(fragment_relation::Column::SourceFragmentId)
969 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
970 .into_tuple()
971 .all(&inner.db)
972 .await?;
973 result.push((fragment, upstreams));
974 }
975 Ok(result)
976 }
977
978 pub async fn list_fragment_descs(&self) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
979 let inner = self.inner.read().await;
980 let mut result = Vec::new();
981 let fragments = FragmentModel::find()
982 .select_only()
983 .columns([
984 fragment::Column::FragmentId,
985 fragment::Column::JobId,
986 fragment::Column::FragmentTypeMask,
987 fragment::Column::DistributionType,
988 fragment::Column::StateTableIds,
989 fragment::Column::VnodeCount,
990 fragment::Column::StreamNode,
991 ])
992 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
993 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
994 .group_by(fragment::Column::FragmentId)
995 .into_model::<FragmentDesc>()
996 .all(&inner.db)
997 .await?;
998 for fragment in fragments {
999 let upstreams: Vec<FragmentId> = FragmentRelation::find()
1000 .select_only()
1001 .column(fragment_relation::Column::SourceFragmentId)
1002 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
1003 .into_tuple()
1004 .all(&inner.db)
1005 .await?;
1006 result.push((fragment, upstreams));
1007 }
1008 Ok(result)
1009 }
1010
1011 pub async fn list_sink_actor_mapping(
1012 &self,
1013 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1014 let inner = self.inner.read().await;
1015 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1016 .select_only()
1017 .columns([sink::Column::SinkId, sink::Column::Name])
1018 .into_tuple()
1019 .all(&inner.db)
1020 .await?;
1021 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1022 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1023
1024 let actor_with_type: Vec<(ActorId, SinkId, i32)> = Actor::find()
1025 .select_only()
1026 .column(actor::Column::ActorId)
1027 .columns([fragment::Column::JobId, fragment::Column::FragmentTypeMask])
1028 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1029 .filter(fragment::Column::JobId.is_in(sink_ids))
1030 .into_tuple()
1031 .all(&inner.db)
1032 .await?;
1033
1034 let mut sink_actor_mapping = HashMap::new();
1035 for (actor_id, sink_id, type_mask) in actor_with_type {
1036 if FragmentTypeMask::from(type_mask).contains(FragmentTypeFlag::Sink) {
1037 sink_actor_mapping
1038 .entry(sink_id)
1039 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1040 .1
1041 .push(actor_id);
1042 }
1043 }
1044
1045 Ok(sink_actor_mapping)
1046 }
1047
1048 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1049 let inner = self.inner.read().await;
1050 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1051 .select_only()
1052 .columns([
1053 fragment::Column::FragmentId,
1054 fragment::Column::JobId,
1055 fragment::Column::StateTableIds,
1056 ])
1057 .into_partial_model()
1058 .all(&inner.db)
1059 .await?;
1060 Ok(fragment_state_tables)
1061 }
1062
1063 pub async fn load_all_actors(
1066 &self,
1067 database_id: Option<DatabaseId>,
1068 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
1069 {
1070 let inner = self.inner.read().await;
1071 let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
1072 let filter_condition = if let Some(database_id) = database_id {
1073 filter_condition.and(object::Column::DatabaseId.eq(database_id))
1074 } else {
1075 filter_condition
1076 };
1077 #[expect(clippy::type_complexity)]
1078 let mut actor_info_stream: BoxStream<
1079 '_,
1080 Result<
1081 (
1082 ActorId,
1083 WorkerId,
1084 Option<VnodeBitmap>,
1085 FragmentId,
1086 StreamNode,
1087 I32Array,
1088 DistributionType,
1089 DatabaseId,
1090 ObjectId,
1091 ),
1092 _,
1093 >,
1094 > = Actor::find()
1095 .select_only()
1096 .column(actor::Column::ActorId)
1097 .column(actor::Column::WorkerId)
1098 .column(actor::Column::VnodeBitmap)
1099 .column(fragment::Column::FragmentId)
1100 .column(fragment::Column::StreamNode)
1101 .column(fragment::Column::StateTableIds)
1102 .column(fragment::Column::DistributionType)
1103 .column(object::Column::DatabaseId)
1104 .column(object::Column::Oid)
1105 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1106 .join(JoinType::InnerJoin, fragment::Relation::Object.def())
1107 .filter(filter_condition)
1108 .into_tuple()
1109 .stream(&inner.db)
1110 .await?;
1111
1112 let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1113 HashMap::new();
1114
1115 while let Some((
1116 actor_id,
1117 worker_id,
1118 vnode_bitmap,
1119 fragment_id,
1120 node,
1121 state_table_ids,
1122 distribution_type,
1123 database_id,
1124 job_id,
1125 )) = actor_info_stream.try_next().await?
1126 {
1127 let fragment_infos = database_fragment_infos
1128 .entry(database_id)
1129 .or_default()
1130 .entry(job_id)
1131 .or_default();
1132 let state_table_ids = state_table_ids.into_inner();
1133 let state_table_ids = state_table_ids
1134 .into_iter()
1135 .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1136 .collect();
1137 let actor_info = InflightActorInfo {
1138 worker_id,
1139 vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1140 };
1141 match fragment_infos.entry(fragment_id) {
1142 Entry::Occupied(mut entry) => {
1143 let info: &mut InflightFragmentInfo = entry.get_mut();
1144 assert_eq!(info.state_table_ids, state_table_ids);
1145 assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1146 }
1147 Entry::Vacant(entry) => {
1148 entry.insert(InflightFragmentInfo {
1149 fragment_id: fragment_id as _,
1150 distribution_type,
1151 nodes: node.to_protobuf(),
1152 actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1153 state_table_ids,
1154 });
1155 }
1156 }
1157 }
1158
1159 debug!(?database_fragment_infos, "reload all actors");
1160
1161 Ok(database_fragment_infos)
1162 }
1163
1164 pub async fn migrate_actors(
1165 &self,
1166 plan: HashMap<WorkerSlotId, WorkerSlotId>,
1167 ) -> MetaResult<()> {
1168 let inner = self.inner.read().await;
1169 let txn = inner.db.begin().await?;
1170
1171 let actors: Vec<(
1172 FragmentId,
1173 DistributionType,
1174 ActorId,
1175 Option<VnodeBitmap>,
1176 WorkerId,
1177 ActorStatus,
1178 )> = Actor::find()
1179 .select_only()
1180 .columns([
1181 fragment::Column::FragmentId,
1182 fragment::Column::DistributionType,
1183 ])
1184 .columns([
1185 actor::Column::ActorId,
1186 actor::Column::VnodeBitmap,
1187 actor::Column::WorkerId,
1188 actor::Column::Status,
1189 ])
1190 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1191 .into_tuple()
1192 .all(&txn)
1193 .await?;
1194
1195 let mut actor_locations = HashMap::new();
1196
1197 for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1198 if *status != ActorStatus::Running {
1199 tracing::warn!(
1200 "skipping actor {} in fragment {} with status {:?}",
1201 actor_id,
1202 fragment_id,
1203 status
1204 );
1205 continue;
1206 }
1207
1208 actor_locations
1209 .entry(*worker_id)
1210 .or_insert(HashMap::new())
1211 .entry(*fragment_id)
1212 .or_insert(BTreeSet::new())
1213 .insert(*actor_id);
1214 }
1215
1216 let expired_or_changed_workers: HashSet<_> =
1217 plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1218
1219 let mut actor_migration_plan = HashMap::new();
1220 for (worker, fragment) in actor_locations {
1221 if expired_or_changed_workers.contains(&worker) {
1222 for (fragment_id, actors) in fragment {
1223 debug!(
1224 "worker {} expired or changed, migrating fragment {}",
1225 worker, fragment_id
1226 );
1227 let worker_slot_to_actor: HashMap<_, _> = actors
1228 .iter()
1229 .enumerate()
1230 .map(|(idx, actor_id)| {
1231 (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1232 })
1233 .collect();
1234
1235 for (worker_slot, actor) in worker_slot_to_actor {
1236 if let Some(target) = plan.get(&worker_slot) {
1237 actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1238 }
1239 }
1240 }
1241 }
1242 }
1243
1244 for (actor, worker) in actor_migration_plan {
1245 Actor::update_many()
1246 .col_expr(
1247 actor::Column::WorkerId,
1248 Expr::value(Value::Int(Some(worker))),
1249 )
1250 .filter(actor::Column::ActorId.eq(actor))
1251 .exec(&txn)
1252 .await?;
1253 }
1254
1255 txn.commit().await?;
1256
1257 Ok(())
1258 }
1259
1260 pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1261 let inner = self.inner.read().await;
1262
1263 let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1264 .select_only()
1265 .columns([fragment::Column::FragmentId])
1266 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1267 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1268 .into_tuple()
1269 .all(&inner.db)
1270 .await?;
1271
1272 let mut actor_locations = HashMap::new();
1273
1274 for (fragment_id, _, worker_id) in actors {
1275 *actor_locations
1276 .entry(worker_id)
1277 .or_insert(HashMap::new())
1278 .entry(fragment_id)
1279 .or_insert(0_usize) += 1;
1280 }
1281
1282 let mut result = HashSet::new();
1283 for (worker_id, mapping) in actor_locations {
1284 let max_fragment_len = mapping.values().max().unwrap();
1285
1286 result
1287 .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1288 }
1289
1290 Ok(result)
1291 }
1292
1293 pub async fn all_node_actors(
1294 &self,
1295 include_inactive: bool,
1296 ) -> MetaResult<HashMap<WorkerId, Vec<StreamActor>>> {
1297 let inner = self.inner.read().await;
1298 let fragment_actors = if include_inactive {
1299 FragmentModel::find()
1300 .find_with_related(Actor)
1301 .all(&inner.db)
1302 .await?
1303 } else {
1304 FragmentModel::find()
1305 .find_with_related(Actor)
1306 .filter(actor::Column::Status.eq(ActorStatus::Running))
1307 .all(&inner.db)
1308 .await?
1309 };
1310
1311 let job_definitions = resolve_streaming_job_definition(
1312 &inner.db,
1313 &HashSet::from_iter(fragment_actors.iter().map(|(fragment, _)| fragment.job_id)),
1314 )
1315 .await?;
1316
1317 let mut node_actors = HashMap::new();
1318 for (fragment, actors) in fragment_actors {
1319 let job_id = fragment.job_id;
1320 let (table_fragments, actor_status, _) = Self::compose_fragment(
1321 fragment,
1322 actors,
1323 job_definitions.get(&(job_id as _)).cloned(),
1324 )?;
1325 for actor in table_fragments.actors {
1326 let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
1327 node_actors
1328 .entry(node_id)
1329 .or_insert_with(Vec::new)
1330 .push(actor);
1331 }
1332 }
1333
1334 Ok(node_actors)
1335 }
1336
1337 pub async fn get_worker_actor_ids(
1338 &self,
1339 job_ids: Vec<ObjectId>,
1340 ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
1341 let inner = self.inner.read().await;
1342 let actor_workers: Vec<(ActorId, WorkerId)> = Actor::find()
1343 .select_only()
1344 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1345 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1346 .filter(fragment::Column::JobId.is_in(job_ids))
1347 .into_tuple()
1348 .all(&inner.db)
1349 .await?;
1350
1351 let mut worker_actors = BTreeMap::new();
1352 for (actor_id, worker_id) in actor_workers {
1353 worker_actors
1354 .entry(worker_id)
1355 .or_insert_with(Vec::new)
1356 .push(actor_id);
1357 }
1358
1359 Ok(worker_actors)
1360 }
1361
1362 pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1363 let inner = self.inner.read().await;
1364 let txn = inner.db.begin().await?;
1365 for assignments in split_assignment.values() {
1366 for (actor_id, splits) in assignments {
1367 let actor_splits = splits.iter().map(Into::into).collect_vec();
1368 Actor::update(actor::ActiveModel {
1369 actor_id: Set(*actor_id as _),
1370 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1371 splits: actor_splits,
1372 }))),
1373 ..Default::default()
1374 })
1375 .exec(&txn)
1376 .await
1377 .map_err(|err| {
1378 if err == DbErr::RecordNotUpdated {
1379 MetaError::catalog_id_not_found("actor_id", actor_id)
1380 } else {
1381 err.into()
1382 }
1383 })?;
1384 }
1385 }
1386 txn.commit().await?;
1387
1388 Ok(())
1389 }
1390
1391 #[await_tree::instrument]
1392 pub async fn fill_snapshot_backfill_epoch(
1393 &self,
1394 fragment_ids: impl Iterator<Item = FragmentId>,
1395 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1396 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1397 ) -> MetaResult<()> {
1398 let inner = self.inner.write().await;
1399 let txn = inner.db.begin().await?;
1400 for fragment_id in fragment_ids {
1401 let fragment = FragmentModel::find_by_id(fragment_id)
1402 .one(&txn)
1403 .await?
1404 .context(format!("fragment {} not found", fragment_id))?;
1405 let mut node = fragment.stream_node.to_protobuf();
1406 if crate::stream::fill_snapshot_backfill_epoch(
1407 &mut node,
1408 snapshot_backfill_info,
1409 cross_db_snapshot_backfill_info,
1410 )? {
1411 let node = StreamNode::from(&node);
1412 FragmentModel::update(fragment::ActiveModel {
1413 fragment_id: Set(fragment_id),
1414 stream_node: Set(node),
1415 ..Default::default()
1416 })
1417 .exec(&txn)
1418 .await?;
1419 }
1420 }
1421 txn.commit().await?;
1422 Ok(())
1423 }
1424
1425 pub async fn get_running_actors_of_fragment(
1427 &self,
1428 fragment_id: FragmentId,
1429 ) -> MetaResult<Vec<ActorId>> {
1430 let inner = self.inner.read().await;
1431 let actors: Vec<ActorId> = Actor::find()
1432 .select_only()
1433 .column(actor::Column::ActorId)
1434 .filter(actor::Column::FragmentId.eq(fragment_id))
1435 .filter(actor::Column::Status.eq(ActorStatus::Running))
1436 .into_tuple()
1437 .all(&inner.db)
1438 .await?;
1439 Ok(actors)
1440 }
1441
1442 pub async fn get_running_actors_for_source_backfill(
1445 &self,
1446 source_backfill_fragment_id: FragmentId,
1447 source_fragment_id: FragmentId,
1448 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1449 let inner = self.inner.read().await;
1450 let txn = inner.db.begin().await?;
1451
1452 let fragment_relation: DispatcherType = FragmentRelation::find()
1453 .select_only()
1454 .column(fragment_relation::Column::DispatcherType)
1455 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1456 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1457 .into_tuple()
1458 .one(&txn)
1459 .await?
1460 .ok_or_else(|| {
1461 anyhow!(
1462 "no fragment connection from source fragment {} to source backfill fragment {}",
1463 source_fragment_id,
1464 source_backfill_fragment_id
1465 )
1466 })?;
1467
1468 if fragment_relation != DispatcherType::NoShuffle {
1469 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1470 }
1471
1472 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1473 let result: MetaResult<DistributionType> = try {
1474 FragmentModel::find_by_id(fragment_id)
1475 .select_only()
1476 .column(fragment::Column::DistributionType)
1477 .into_tuple()
1478 .one(txn)
1479 .await?
1480 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1481 };
1482 result
1483 };
1484
1485 let source_backfill_distribution_type =
1486 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1487 let source_distribution_type =
1488 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1489
1490 let load_fragment_actor_distribution = |txn, fragment_id: FragmentId| async move {
1491 Actor::find()
1492 .select_only()
1493 .column(actor::Column::ActorId)
1494 .column(actor::Column::VnodeBitmap)
1495 .filter(actor::Column::FragmentId.eq(fragment_id))
1496 .into_tuple()
1497 .stream(txn)
1498 .await?
1499 .map(|result| {
1500 result.map(|(actor_id, vnode): (ActorId, Option<VnodeBitmap>)| {
1501 (
1502 actor_id as _,
1503 vnode.map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1504 )
1505 })
1506 })
1507 .try_collect()
1508 .await
1509 };
1510
1511 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1512 load_fragment_actor_distribution(&txn, source_backfill_fragment_id).await?;
1513
1514 let source_actors = load_fragment_actor_distribution(&txn, source_fragment_id).await?;
1515
1516 Ok(resolve_no_shuffle_actor_dispatcher(
1517 source_distribution_type,
1518 &source_actors,
1519 source_backfill_distribution_type,
1520 &source_backfill_actors,
1521 )
1522 .into_iter()
1523 .map(|(source_actor, source_backfill_actor)| {
1524 (source_backfill_actor as _, source_actor as _)
1525 })
1526 .collect())
1527 }
1528
1529 pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
1530 let inner = self.inner.read().await;
1531 let actors: Vec<ActorId> = Actor::find()
1532 .select_only()
1533 .column(actor::Column::ActorId)
1534 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1535 .filter(fragment::Column::JobId.is_in(job_ids))
1536 .into_tuple()
1537 .all(&inner.db)
1538 .await?;
1539 Ok(actors)
1540 }
1541
1542 pub async fn get_root_fragments(
1555 &self,
1556 job_ids: Vec<ObjectId>,
1557 ) -> MetaResult<(HashMap<ObjectId, Fragment>, Vec<(ActorId, WorkerId)>)> {
1558 let inner = self.inner.read().await;
1559
1560 let job_definitions = resolve_streaming_job_definition(
1561 &inner.db,
1562 &HashSet::from_iter(job_ids.iter().copied()),
1563 )
1564 .await?;
1565
1566 let all_fragments = FragmentModel::find()
1567 .filter(fragment::Column::JobId.is_in(job_ids))
1568 .all(&inner.db)
1569 .await?;
1570 let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1572 for fragment in all_fragments {
1573 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1574 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1575 _ = root_fragments.insert(fragment.job_id, fragment);
1576 } else if mask.contains(FragmentTypeFlag::Source) {
1577 _ = root_fragments.try_insert(fragment.job_id, fragment);
1580 }
1581 }
1582
1583 let mut root_fragments_pb = HashMap::new();
1584 for (_, fragment) in root_fragments {
1585 let actors = fragment.find_related(Actor).all(&inner.db).await?;
1586
1587 let job_id = fragment.job_id;
1588 root_fragments_pb.insert(
1589 fragment.job_id,
1590 Self::compose_fragment(
1591 fragment,
1592 actors,
1593 job_definitions.get(&(job_id as _)).cloned(),
1594 )?
1595 .0,
1596 );
1597 }
1598
1599 let actors: Vec<(ActorId, WorkerId)> = Actor::find()
1600 .select_only()
1601 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1602 .into_tuple()
1603 .all(&inner.db)
1604 .await?;
1605
1606 Ok((root_fragments_pb, actors))
1607 }
1608
1609 pub async fn get_root_fragment(
1610 &self,
1611 job_id: ObjectId,
1612 ) -> MetaResult<(Fragment, Vec<(ActorId, WorkerId)>)> {
1613 let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1614 let root_fragment = root_fragments
1615 .remove(&job_id)
1616 .context(format!("root fragment for job {} not found", job_id))?;
1617 Ok((root_fragment, actors))
1618 }
1619
1620 pub async fn get_downstream_fragments(
1622 &self,
1623 job_id: ObjectId,
1624 ) -> MetaResult<(Vec<(PbDispatcherType, Fragment)>, Vec<(ActorId, WorkerId)>)> {
1625 let (root_fragment, actors) = self.get_root_fragment(job_id).await?;
1626
1627 let inner = self.inner.read().await;
1628 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1629 .filter(
1630 fragment_relation::Column::SourceFragmentId
1631 .eq(root_fragment.fragment_id as FragmentId),
1632 )
1633 .all(&inner.db)
1634 .await?;
1635 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
1636 .await?
1637 .remove(&job_id);
1638
1639 let mut downstream_fragments = vec![];
1640 for fragment_relation::Model {
1641 target_fragment_id: fragment_id,
1642 dispatcher_type,
1643 ..
1644 } in downstream_fragment_relations
1645 {
1646 let mut fragment_actors = FragmentModel::find_by_id(fragment_id)
1647 .find_with_related(Actor)
1648 .all(&inner.db)
1649 .await?;
1650 if fragment_actors.is_empty() {
1651 bail!("No fragment found for fragment id {}", fragment_id);
1652 }
1653 assert_eq!(fragment_actors.len(), 1);
1654 let (fragment, actors) = fragment_actors.pop().unwrap();
1655 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1656 let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0;
1657 downstream_fragments.push((dispatch_type, fragment));
1658 }
1659
1660 Ok((downstream_fragments, actors))
1661 }
1662
1663 pub async fn load_source_fragment_ids(
1664 &self,
1665 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1666 let inner = self.inner.read().await;
1667 let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1668 .select_only()
1669 .columns([
1670 fragment::Column::FragmentId,
1671 fragment::Column::FragmentTypeMask,
1672 fragment::Column::StreamNode,
1673 ])
1674 .into_tuple()
1675 .all(&inner.db)
1676 .await?;
1677 fragments.retain(|(_, mask, _)| {
1678 FragmentTypeMask::from(*mask).contains(FragmentTypeFlag::Source)
1679 });
1680
1681 let mut source_fragment_ids = HashMap::new();
1682 for (fragment_id, _, stream_node) in fragments {
1683 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1684 source_fragment_ids
1685 .entry(source_id as SourceId)
1686 .or_insert_with(BTreeSet::new)
1687 .insert(fragment_id);
1688 }
1689 }
1690 Ok(source_fragment_ids)
1691 }
1692
1693 pub async fn load_backfill_fragment_ids(
1694 &self,
1695 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1696 let inner = self.inner.read().await;
1697 let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1698 .select_only()
1699 .columns([
1700 fragment::Column::FragmentId,
1701 fragment::Column::FragmentTypeMask,
1702 fragment::Column::StreamNode,
1703 ])
1704 .into_tuple()
1705 .all(&inner.db)
1706 .await?;
1707 fragments.retain(|(_, mask, _)| {
1708 FragmentTypeMask::from(*mask).contains(FragmentTypeFlag::SourceScan)
1709 });
1710
1711 let mut source_fragment_ids = HashMap::new();
1712 for (fragment_id, _, stream_node) in fragments {
1713 if let Some((source_id, upstream_source_fragment_id)) =
1714 stream_node.to_protobuf().find_source_backfill()
1715 {
1716 source_fragment_ids
1717 .entry(source_id as SourceId)
1718 .or_insert_with(BTreeSet::new)
1719 .insert((fragment_id, upstream_source_fragment_id));
1720 }
1721 }
1722 Ok(source_fragment_ids)
1723 }
1724
1725 pub async fn load_actor_splits(&self) -> MetaResult<HashMap<ActorId, ConnectorSplits>> {
1726 let inner = self.inner.read().await;
1727 let splits: Vec<(ActorId, ConnectorSplits)> = Actor::find()
1728 .select_only()
1729 .columns([actor::Column::ActorId, actor::Column::Splits])
1730 .filter(actor::Column::Splits.is_not_null())
1731 .into_tuple()
1732 .all(&inner.db)
1733 .await?;
1734 Ok(splits.into_iter().collect())
1735 }
1736
1737 pub async fn get_actual_job_fragment_parallelism(
1739 &self,
1740 job_id: ObjectId,
1741 ) -> MetaResult<Option<usize>> {
1742 let inner = self.inner.read().await;
1743 let mut fragments: Vec<(FragmentId, i32, i64)> = FragmentModel::find()
1744 .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1745 .select_only()
1746 .columns([
1747 fragment::Column::FragmentId,
1748 fragment::Column::FragmentTypeMask,
1749 ])
1750 .column_as(actor::Column::ActorId.count(), "count")
1751 .filter(fragment::Column::JobId.eq(job_id))
1752 .group_by(fragment::Column::FragmentId)
1753 .into_tuple()
1754 .all(&inner.db)
1755 .await?;
1756
1757 fragments.retain(|(_, mask, _)| {
1758 FragmentTypeMask::from(*mask)
1759 .contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink])
1760 });
1761
1762 Ok(fragments
1763 .into_iter()
1764 .at_most_one()
1765 .ok()
1766 .flatten()
1767 .map(|(_, _, count)| count as usize))
1768 }
1769}
1770
1771#[cfg(test)]
1772mod tests {
1773 use std::collections::{BTreeMap, HashMap, HashSet};
1774
1775 use itertools::Itertools;
1776 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1777 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1778 use risingwave_common::util::iter_util::ZipEqDebug;
1779 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1780 use risingwave_meta_model::actor::ActorStatus;
1781 use risingwave_meta_model::fragment::DistributionType;
1782 use risingwave_meta_model::{
1783 ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1784 VnodeBitmap, actor, fragment,
1785 };
1786 use risingwave_pb::common::PbActorLocation;
1787 use risingwave_pb::meta::table_fragments::PbActorStatus;
1788 use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1789 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1790 use risingwave_pb::plan_common::PbExprContext;
1791 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1792 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1793 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1794
1795 use crate::MetaResult;
1796 use crate::controller::catalog::CatalogController;
1797 use crate::model::{Fragment, StreamActor};
1798
1799 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1800
1801 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1802
1803 const TEST_FRAGMENT_ID: FragmentId = 1;
1804
1805 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1806
1807 const TEST_JOB_ID: ObjectId = 1;
1808
1809 const TEST_STATE_TABLE_ID: TableId = 1000;
1810
1811 fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1812 let mut upstream_actor_ids = BTreeMap::new();
1813 upstream_actor_ids.insert(
1814 TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1815 HashSet::from_iter([(actor_id + 100)]),
1816 );
1817 upstream_actor_ids.insert(
1818 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1819 HashSet::from_iter([(actor_id + 200)]),
1820 );
1821 upstream_actor_ids
1822 }
1823
1824 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1825 let mut input = vec![];
1826 for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1827 input.push(PbStreamNode {
1828 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1829 upstream_fragment_id: *upstream_fragment_id as _,
1830 ..Default::default()
1831 }))),
1832 ..Default::default()
1833 });
1834 }
1835
1836 PbStreamNode {
1837 input,
1838 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1839 ..Default::default()
1840 }
1841 }
1842
1843 #[tokio::test]
1844 async fn test_extract_fragment() -> MetaResult<()> {
1845 let actor_count = 3u32;
1846 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1847 .map(|actor_id| {
1848 (
1849 actor_id as _,
1850 generate_upstream_actor_ids_for_actor(actor_id),
1851 )
1852 })
1853 .collect();
1854
1855 let actor_bitmaps = ActorMapping::new_uniform(
1856 (0..actor_count).map(|i| i as _),
1857 VirtualNode::COUNT_FOR_TEST,
1858 )
1859 .to_bitmaps();
1860
1861 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1862
1863 let pb_actors = (0..actor_count)
1864 .map(|actor_id| StreamActor {
1865 actor_id: actor_id as _,
1866 fragment_id: TEST_FRAGMENT_ID as _,
1867 vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1868 mview_definition: "".to_owned(),
1869 expr_context: Some(PbExprContext {
1870 time_zone: String::from("America/New_York"),
1871 strict_mode: false,
1872 }),
1873 })
1874 .collect_vec();
1875
1876 let pb_fragment = Fragment {
1877 fragment_id: TEST_FRAGMENT_ID as _,
1878 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1879 distribution_type: PbFragmentDistributionType::Hash as _,
1880 actors: pb_actors.clone(),
1881 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1882 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1883 nodes: stream_node.clone(),
1884 };
1885
1886 let pb_actor_status = (0..actor_count)
1887 .map(|actor_id| {
1888 (
1889 actor_id,
1890 PbActorStatus {
1891 location: PbActorLocation::from_worker(0),
1892 state: PbActorState::Running as _,
1893 },
1894 )
1895 })
1896 .collect();
1897
1898 let pb_actor_splits = Default::default();
1899
1900 let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
1901 TEST_JOB_ID,
1902 &pb_fragment,
1903 &pb_actor_status,
1904 &pb_actor_splits,
1905 )?;
1906
1907 check_fragment(fragment, pb_fragment);
1908 check_actors(
1909 actors,
1910 &upstream_actor_ids,
1911 pb_actors,
1912 Default::default(),
1913 &stream_node,
1914 );
1915
1916 Ok(())
1917 }
1918
1919 #[tokio::test]
1920 async fn test_compose_fragment() -> MetaResult<()> {
1921 let actor_count = 3u32;
1922
1923 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1924 .map(|actor_id| {
1925 (
1926 actor_id as _,
1927 generate_upstream_actor_ids_for_actor(actor_id),
1928 )
1929 })
1930 .collect();
1931
1932 let mut actor_bitmaps = ActorMapping::new_uniform(
1933 (0..actor_count).map(|i| i as _),
1934 VirtualNode::COUNT_FOR_TEST,
1935 )
1936 .to_bitmaps();
1937
1938 let actors = (0..actor_count)
1939 .map(|actor_id| {
1940 let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
1941 splits: vec![PbConnectorSplit {
1942 split_type: "dummy".to_owned(),
1943 ..Default::default()
1944 }],
1945 }));
1946
1947 #[expect(deprecated)]
1948 actor::Model {
1949 actor_id: actor_id as ActorId,
1950 fragment_id: TEST_FRAGMENT_ID,
1951 status: ActorStatus::Running,
1952 splits: actor_splits,
1953 worker_id: 0,
1954 upstream_actor_ids: Default::default(),
1955 vnode_bitmap: actor_bitmaps
1956 .remove(&actor_id)
1957 .map(|bitmap| bitmap.to_protobuf())
1958 .as_ref()
1959 .map(VnodeBitmap::from),
1960 expr_context: ExprContext::from(&PbExprContext {
1961 time_zone: String::from("America/New_York"),
1962 strict_mode: false,
1963 }),
1964 }
1965 })
1966 .collect_vec();
1967
1968 let stream_node = {
1969 let template_actor = actors.first().cloned().unwrap();
1970
1971 let template_upstream_actor_ids = upstream_actor_ids
1972 .get(&(template_actor.actor_id as _))
1973 .unwrap();
1974
1975 generate_merger_stream_node(template_upstream_actor_ids)
1976 };
1977
1978 #[expect(deprecated)]
1979 let fragment = fragment::Model {
1980 fragment_id: TEST_FRAGMENT_ID,
1981 job_id: TEST_JOB_ID,
1982 fragment_type_mask: 0,
1983 distribution_type: DistributionType::Hash,
1984 stream_node: StreamNode::from(&stream_node),
1985 state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
1986 upstream_fragment_id: Default::default(),
1987 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1988 };
1989
1990 let (pb_fragment, pb_actor_status, pb_actor_splits) =
1991 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
1992
1993 assert_eq!(pb_actor_status.len(), actor_count as usize);
1994 assert_eq!(pb_actor_splits.len(), actor_count as usize);
1995
1996 let pb_actors = pb_fragment.actors.clone();
1997
1998 check_fragment(fragment, pb_fragment);
1999 check_actors(
2000 actors,
2001 &upstream_actor_ids,
2002 pb_actors,
2003 pb_actor_splits,
2004 &stream_node,
2005 );
2006
2007 Ok(())
2008 }
2009
2010 fn check_actors(
2011 actors: Vec<actor::Model>,
2012 actor_upstreams: &FragmentActorUpstreams,
2013 pb_actors: Vec<StreamActor>,
2014 pb_actor_splits: HashMap<u32, PbConnectorSplits>,
2015 stream_node: &PbStreamNode,
2016 ) {
2017 for (
2018 actor::Model {
2019 actor_id,
2020 fragment_id,
2021 status,
2022 splits,
2023 worker_id: _,
2024 vnode_bitmap,
2025 expr_context,
2026 ..
2027 },
2028 StreamActor {
2029 actor_id: pb_actor_id,
2030 fragment_id: pb_fragment_id,
2031 vnode_bitmap: pb_vnode_bitmap,
2032 mview_definition,
2033 expr_context: pb_expr_context,
2034 ..
2035 },
2036 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2037 {
2038 assert_eq!(actor_id, pb_actor_id as ActorId);
2039 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2040
2041 assert_eq!(
2042 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2043 pb_vnode_bitmap,
2044 );
2045
2046 assert_eq!(mview_definition, "");
2047
2048 visit_stream_node_body(stream_node, |body| {
2049 if let PbNodeBody::Merge(m) = body {
2050 assert!(
2051 actor_upstreams
2052 .get(&(actor_id as _))
2053 .unwrap()
2054 .contains_key(&m.upstream_fragment_id)
2055 );
2056 }
2057 });
2058
2059 assert_eq!(status, ActorStatus::Running);
2060
2061 assert_eq!(
2062 splits,
2063 pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
2064 );
2065
2066 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2067 }
2068 }
2069
2070 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2071 let Fragment {
2072 fragment_id,
2073 fragment_type_mask,
2074 distribution_type: pb_distribution_type,
2075 actors: _,
2076 state_table_ids: pb_state_table_ids,
2077 maybe_vnode_count: _,
2078 nodes,
2079 } = pb_fragment;
2080
2081 assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2082 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2083 assert_eq!(
2084 pb_distribution_type,
2085 PbFragmentDistributionType::from(fragment.distribution_type)
2086 );
2087
2088 assert_eq!(
2089 pb_state_table_ids,
2090 fragment.state_table_ids.into_u32_array()
2091 );
2092 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2093 }
2094}