1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
25use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut};
26use risingwave_connector::source::SplitImpl;
27use risingwave_meta_model::actor::ActorStatus;
28use risingwave_meta_model::fragment::DistributionType;
29use risingwave_meta_model::object::ObjectType;
30use risingwave_meta_model::prelude::{
31 Actor, Fragment as FragmentModel, FragmentRelation, Sink, StreamingJob,
32};
33use risingwave_meta_model::{
34 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
35 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
36 VnodeBitmap, WorkerId, actor, database, fragment, fragment_relation, object, sink, source,
37 streaming_job, table,
38};
39use risingwave_meta_model_migration::{Alias, SelectStatement};
40use risingwave_pb::common::PbActorLocation;
41use risingwave_pb::meta::subscribe_response::{
42 Info as NotificationInfo, Operation as NotificationOperation,
43};
44use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
45use risingwave_pb::meta::table_fragments::fragment::{
46 FragmentDistributionType, PbFragmentDistributionType,
47};
48use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
49use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping};
50use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
51use risingwave_pb::stream_plan::stream_node::NodeBody;
52use risingwave_pb::stream_plan::{
53 PbDispatchOutputMapping, PbDispatcherType, PbFragmentTypeFlag, PbStreamContext, PbStreamNode,
54 PbStreamScanType, StreamScanType,
55};
56use sea_orm::ActiveValue::Set;
57use sea_orm::sea_query::Expr;
58use sea_orm::{
59 ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, ModelTrait, PaginatorTrait,
60 QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
61};
62use serde::{Deserialize, Serialize};
63use tracing::debug;
64
65use crate::barrier::SnapshotBackfillInfo;
66use crate::controller::catalog::{CatalogController, CatalogControllerInner};
67use crate::controller::scale::resolve_streaming_job_definition;
68use crate::controller::utils::{
69 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
70 get_fragment_mappings, rebuild_fragment_mapping_from_actors,
71 resolve_no_shuffle_actor_dispatcher,
72};
73use crate::manager::LocalNotification;
74use crate::model::{
75 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
76 StreamActor, StreamContext, StreamJobFragments, TableParallelism,
77};
78use crate::stream::{SplitAssignment, build_actor_split_impls};
79use crate::{MetaError, MetaResult, model};
80
81#[derive(Clone, Debug)]
83pub struct InflightActorInfo {
84 pub worker_id: WorkerId,
85 pub vnode_bitmap: Option<Bitmap>,
86}
87
88#[derive(Clone, Debug)]
89pub struct InflightFragmentInfo {
90 pub fragment_id: crate::model::FragmentId,
91 pub distribution_type: DistributionType,
92 pub nodes: PbStreamNode,
93 pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
94 pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
95}
96
97#[derive(Clone, Debug)]
98pub struct FragmentParallelismInfo {
99 pub distribution_type: FragmentDistributionType,
100 pub actor_count: usize,
101 pub vnode_count: usize,
102}
103
104#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
105#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
107 pub job_id: ObjectId,
108 pub obj_type: ObjectType,
109 pub name: String,
110 pub job_status: JobStatus,
111 pub parallelism: StreamingParallelism,
112 pub max_parallelism: i32,
113 pub resource_group: String,
114}
115
116impl CatalogControllerInner {
117 pub async fn all_running_fragment_mappings(
119 &self,
120 ) -> MetaResult<impl Iterator<Item = FragmentWorkerSlotMapping> + '_> {
121 let txn = self.db.begin().await?;
122
123 let job_ids: Vec<ObjectId> = StreamingJob::find()
124 .select_only()
125 .column(streaming_job::Column::JobId)
126 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
127 .into_tuple()
128 .all(&txn)
129 .await?;
130
131 let mut result = vec![];
132 for job_id in job_ids {
133 let mappings = get_fragment_mappings(&txn, job_id).await?;
134
135 result.extend(mappings.into_iter());
136 }
137
138 Ok(result.into_iter())
139 }
140}
141
142impl CatalogController {
143 pub(crate) async fn notify_fragment_mapping(
144 &self,
145 operation: NotificationOperation,
146 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
147 ) {
148 let fragment_ids = fragment_mappings
149 .iter()
150 .map(|mapping| mapping.fragment_id)
151 .collect_vec();
152 for fragment_mapping in fragment_mappings {
154 self.env
155 .notification_manager()
156 .notify_frontend(
157 operation,
158 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
159 )
160 .await;
161 }
162
163 match operation {
165 NotificationOperation::Add | NotificationOperation::Update => {
166 self.env
167 .notification_manager()
168 .notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
169 fragment_ids,
170 ))
171 .await;
172 }
173 NotificationOperation::Delete => {
174 self.env
175 .notification_manager()
176 .notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
177 fragment_ids,
178 ))
179 .await;
180 }
181 op => {
182 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
183 }
184 }
185 }
186
187 pub fn extract_fragment_and_actors_from_fragments(
188 stream_job_fragments: &StreamJobFragments,
189 ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
190 stream_job_fragments
191 .fragments
192 .values()
193 .map(|fragment| {
194 Self::extract_fragment_and_actors_for_new_job(
195 stream_job_fragments.stream_job_id.table_id as _,
196 fragment,
197 &stream_job_fragments.actor_status,
198 &stream_job_fragments.actor_splits,
199 )
200 })
201 .try_collect()
202 }
203
204 fn extract_fragment_and_actors_for_new_job(
205 job_id: ObjectId,
206 fragment: &Fragment,
207 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
208 actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
209 ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
210 let vnode_count = fragment.vnode_count();
211 let Fragment {
212 fragment_id: pb_fragment_id,
213 fragment_type_mask: pb_fragment_type_mask,
214 distribution_type: pb_distribution_type,
215 actors: pb_actors,
216 state_table_ids: pb_state_table_ids,
217 nodes,
218 ..
219 } = fragment;
220
221 let state_table_ids = pb_state_table_ids.clone().into();
222
223 assert!(!pb_actors.is_empty());
224
225 let stream_node = {
226 let mut stream_node = nodes.clone();
227 visit_stream_node_mut(&mut stream_node, |body| {
228 #[expect(deprecated)]
229 if let NodeBody::Merge(m) = body {
230 m.upstream_actor_id = vec![];
231 }
232 });
233
234 stream_node
235 };
236
237 let mut actors = vec![];
238
239 for actor in pb_actors {
240 let StreamActor {
241 actor_id,
242 fragment_id,
243 vnode_bitmap,
244 mview_definition: _,
245 expr_context: pb_expr_context,
246 ..
247 } = actor;
248
249 let splits = actor_splits.get(actor_id).map(|splits| {
250 ConnectorSplits::from(&PbConnectorSplits {
251 splits: splits.iter().map(ConnectorSplit::from).collect(),
252 })
253 });
254 let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
255 anyhow::anyhow!(
256 "actor {} in fragment {} has no actor_status",
257 actor_id,
258 fragment_id
259 )
260 })?;
261
262 let worker_id = status.worker_id() as _;
263
264 let pb_expr_context = pb_expr_context
265 .as_ref()
266 .expect("no expression context found");
267
268 #[expect(deprecated)]
269 actors.push(actor::Model {
270 actor_id: *actor_id as _,
271 fragment_id: *fragment_id as _,
272 status: status.get_state().unwrap().into(),
273 splits,
274 worker_id,
275 upstream_actor_ids: Default::default(),
276 vnode_bitmap: vnode_bitmap
277 .as_ref()
278 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
279 expr_context: ExprContext::from(pb_expr_context),
280 });
281 }
282
283 let stream_node = StreamNode::from(&stream_node);
284
285 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
286 .unwrap()
287 .into();
288
289 #[expect(deprecated)]
290 let fragment = fragment::Model {
291 fragment_id: *pb_fragment_id as _,
292 job_id,
293 fragment_type_mask: *pb_fragment_type_mask as _,
294 distribution_type,
295 stream_node,
296 state_table_ids,
297 upstream_fragment_id: Default::default(),
298 vnode_count: vnode_count as _,
299 };
300
301 Ok((fragment, actors))
302 }
303
304 pub fn compose_table_fragments(
305 table_id: u32,
306 state: PbState,
307 ctx: Option<PbStreamContext>,
308 fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
309 parallelism: StreamingParallelism,
310 max_parallelism: usize,
311 job_definition: Option<String>,
312 ) -> MetaResult<StreamJobFragments> {
313 let mut pb_fragments = BTreeMap::new();
314 let mut pb_actor_splits = HashMap::new();
315 let mut pb_actor_status = BTreeMap::new();
316
317 for (fragment, actors) in fragments {
318 let (fragment, fragment_actor_status, fragment_actor_splits) =
319 Self::compose_fragment(fragment, actors, job_definition.clone())?;
320
321 pb_fragments.insert(fragment.fragment_id, fragment);
322
323 pb_actor_splits.extend(build_actor_split_impls(&fragment_actor_splits));
324 pb_actor_status.extend(fragment_actor_status.into_iter());
325 }
326
327 let table_fragments = StreamJobFragments {
328 stream_job_id: table_id.into(),
329 state: state as _,
330 fragments: pb_fragments,
331 actor_status: pb_actor_status,
332 actor_splits: pb_actor_splits,
333 ctx: ctx
334 .as_ref()
335 .map(StreamContext::from_protobuf)
336 .unwrap_or_default(),
337 assigned_parallelism: match parallelism {
338 StreamingParallelism::Custom => TableParallelism::Custom,
339 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
340 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
341 },
342 max_parallelism,
343 };
344
345 Ok(table_fragments)
346 }
347
348 #[allow(clippy::type_complexity)]
349 pub(crate) fn compose_fragment(
350 fragment: fragment::Model,
351 actors: Vec<actor::Model>,
352 job_definition: Option<String>,
353 ) -> MetaResult<(
354 Fragment,
355 HashMap<crate::model::ActorId, PbActorStatus>,
356 HashMap<crate::model::ActorId, PbConnectorSplits>,
357 )> {
358 let fragment::Model {
359 fragment_id,
360 fragment_type_mask,
361 distribution_type,
362 stream_node,
363 state_table_ids,
364 vnode_count,
365 ..
366 } = fragment;
367
368 let stream_node = stream_node.to_protobuf();
369 let mut upstream_fragments = HashSet::new();
370 visit_stream_node(&stream_node, |body| {
371 if let NodeBody::Merge(m) = body {
372 assert!(
373 upstream_fragments.insert(m.upstream_fragment_id),
374 "non-duplicate upstream fragment"
375 );
376 }
377 });
378
379 let mut pb_actors = vec![];
380
381 let mut pb_actor_status = HashMap::new();
382 let mut pb_actor_splits = HashMap::new();
383
384 for actor in actors {
385 if actor.fragment_id != fragment_id {
386 bail!(
387 "fragment id {} from actor {} is different from fragment {}",
388 actor.fragment_id,
389 actor.actor_id,
390 fragment_id
391 )
392 }
393
394 let actor::Model {
395 actor_id,
396 fragment_id,
397 status,
398 worker_id,
399 splits,
400 vnode_bitmap,
401 expr_context,
402 ..
403 } = actor;
404
405 let vnode_bitmap =
406 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
407 let pb_expr_context = Some(expr_context.to_protobuf());
408
409 pb_actor_status.insert(
410 actor_id as _,
411 PbActorStatus {
412 location: PbActorLocation::from_worker(worker_id as u32),
413 state: PbActorState::from(status) as _,
414 },
415 );
416
417 if let Some(splits) = splits {
418 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
419 }
420
421 pb_actors.push(StreamActor {
422 actor_id: actor_id as _,
423 fragment_id: fragment_id as _,
424 vnode_bitmap,
425 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
426 expr_context: pb_expr_context,
427 })
428 }
429
430 let pb_state_table_ids = state_table_ids.into_u32_array();
431 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
432 let pb_fragment = Fragment {
433 fragment_id: fragment_id as _,
434 fragment_type_mask: fragment_type_mask as _,
435 distribution_type: pb_distribution_type,
436 actors: pb_actors,
437 state_table_ids: pb_state_table_ids,
438 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
439 nodes: stream_node,
440 };
441
442 Ok((pb_fragment, pb_actor_status, pb_actor_splits))
443 }
444
445 pub async fn running_fragment_parallelisms(
446 &self,
447 id_filter: Option<HashSet<FragmentId>>,
448 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
449 let inner = self.inner.read().await;
450
451 let query_alias = Alias::new("fragment_actor_count");
452 let count_alias = Alias::new("count");
453
454 let mut query = SelectStatement::new()
455 .column(actor::Column::FragmentId)
456 .expr_as(actor::Column::ActorId.count(), count_alias.clone())
457 .from(Actor)
458 .group_by_col(actor::Column::FragmentId)
459 .to_owned();
460
461 if let Some(id_filter) = id_filter {
462 query.cond_having(actor::Column::FragmentId.is_in(id_filter));
463 }
464
465 let outer = SelectStatement::new()
466 .column((FragmentModel, fragment::Column::FragmentId))
467 .column(count_alias)
468 .column(fragment::Column::DistributionType)
469 .column(fragment::Column::VnodeCount)
470 .from_subquery(query.to_owned(), query_alias.clone())
471 .inner_join(
472 FragmentModel,
473 Expr::col((query_alias, actor::Column::FragmentId))
474 .equals((FragmentModel, fragment::Column::FragmentId)),
475 )
476 .to_owned();
477
478 let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
479 Selector::<SelectGetableTuple<(FragmentId, i64, DistributionType, i32)>>::into_tuple(
480 outer.to_owned(),
481 )
482 .all(&inner.db)
483 .await?;
484
485 Ok(fragment_parallelisms
486 .into_iter()
487 .map(|(fragment_id, count, distribution_type, vnode_count)| {
488 (
489 fragment_id,
490 FragmentParallelismInfo {
491 distribution_type: distribution_type.into(),
492 actor_count: count as usize,
493 vnode_count: vnode_count as usize,
494 },
495 )
496 })
497 .collect())
498 }
499
500 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
501 let inner = self.inner.read().await;
502 let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
503 .select_only()
504 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
505 .into_tuple()
506 .all(&inner.db)
507 .await?;
508 Ok(fragment_jobs.into_iter().collect())
509 }
510
511 pub async fn get_fragment_job_id(
512 &self,
513 fragment_ids: Vec<FragmentId>,
514 ) -> MetaResult<Vec<ObjectId>> {
515 let inner = self.inner.read().await;
516
517 let object_ids: Vec<ObjectId> = FragmentModel::find()
518 .select_only()
519 .column(fragment::Column::JobId)
520 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
521 .into_tuple()
522 .all(&inner.db)
523 .await?;
524
525 Ok(object_ids)
526 }
527
528 pub async fn get_fragment_desc_by_id(
529 &self,
530 fragment_id: FragmentId,
531 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
532 let inner = self.inner.read().await;
533
534 let fragment_opt = FragmentModel::find()
536 .select_only()
537 .columns([
538 fragment::Column::FragmentId,
539 fragment::Column::JobId,
540 fragment::Column::FragmentTypeMask,
541 fragment::Column::DistributionType,
542 fragment::Column::StateTableIds,
543 fragment::Column::VnodeCount,
544 fragment::Column::StreamNode,
545 ])
546 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
547 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
548 .filter(fragment::Column::FragmentId.eq(fragment_id))
549 .group_by(fragment::Column::FragmentId)
550 .into_model::<FragmentDesc>()
551 .one(&inner.db)
552 .await?;
553
554 let Some(fragment) = fragment_opt else {
555 return Ok(None); };
557
558 let upstreams: Vec<FragmentId> = FragmentRelation::find()
560 .select_only()
561 .column(fragment_relation::Column::SourceFragmentId)
562 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
563 .into_tuple()
564 .all(&inner.db)
565 .await?;
566
567 Ok(Some((fragment, upstreams)))
568 }
569
570 pub async fn list_fragment_database_ids(
571 &self,
572 select_fragment_ids: Option<Vec<FragmentId>>,
573 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
574 let inner = self.inner.read().await;
575 let select = FragmentModel::find()
576 .select_only()
577 .column(fragment::Column::FragmentId)
578 .column(object::Column::DatabaseId)
579 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
580 let select = if let Some(select_fragment_ids) = select_fragment_ids {
581 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
582 } else {
583 select
584 };
585 Ok(select.into_tuple().all(&inner.db).await?)
586 }
587
588 pub async fn get_job_fragments_by_id(
589 &self,
590 job_id: ObjectId,
591 ) -> MetaResult<StreamJobFragments> {
592 let inner = self.inner.read().await;
593 let fragment_actors = FragmentModel::find()
594 .find_with_related(Actor)
595 .filter(fragment::Column::JobId.eq(job_id))
596 .all(&inner.db)
597 .await?;
598
599 let job_info = StreamingJob::find_by_id(job_id)
600 .one(&inner.db)
601 .await?
602 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
603
604 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
605 .await?
606 .remove(&job_id);
607
608 Self::compose_table_fragments(
609 job_id as _,
610 job_info.job_status.into(),
611 job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
612 fragment_actors,
613 job_info.parallelism.clone(),
614 job_info.max_parallelism as _,
615 job_definition,
616 )
617 }
618
619 pub async fn get_fragment_actor_dispatchers(
620 &self,
621 fragment_ids: Vec<FragmentId>,
622 ) -> MetaResult<FragmentActorDispatchers> {
623 let inner = self.inner.read().await;
624 get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
625 }
626
627 pub async fn get_fragment_downstream_relations(
628 &self,
629 fragment_ids: Vec<FragmentId>,
630 ) -> MetaResult<FragmentDownstreamRelation> {
631 let inner = self.inner.read().await;
632 let mut stream = FragmentRelation::find()
633 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
634 .stream(&inner.db)
635 .await?;
636 let mut relations = FragmentDownstreamRelation::new();
637 while let Some(relation) = stream.try_next().await? {
638 relations
639 .entry(relation.source_fragment_id as _)
640 .or_default()
641 .push(DownstreamFragmentRelation {
642 downstream_fragment_id: relation.target_fragment_id as _,
643 dispatcher_type: relation.dispatcher_type,
644 dist_key_indices: relation.dist_key_indices.into_u32_array(),
645 output_mapping: PbDispatchOutputMapping {
646 indices: relation.output_indices.into_u32_array(),
647 types: relation
648 .output_type_mapping
649 .unwrap_or_default()
650 .to_protobuf(),
651 },
652 });
653 }
654 Ok(relations)
655 }
656
657 pub async fn get_job_fragment_backfill_scan_type(
658 &self,
659 job_id: ObjectId,
660 ) -> MetaResult<HashMap<model::FragmentId, PbStreamScanType>> {
661 let inner = self.inner.read().await;
662 let fragments: Vec<_> = FragmentModel::find()
663 .filter(fragment::Column::JobId.eq(job_id))
664 .all(&inner.db)
665 .await?;
666
667 let mut result = HashMap::new();
668
669 for fragment::Model {
670 fragment_id,
671 stream_node,
672 ..
673 } in fragments
674 {
675 let stream_node = stream_node.to_protobuf();
676 visit_stream_node(&stream_node, |body| {
677 if let NodeBody::StreamScan(node) = body {
678 match node.stream_scan_type() {
679 StreamScanType::Unspecified => {}
680 scan_type => {
681 result.insert(fragment_id as model::FragmentId, scan_type);
682 }
683 }
684 }
685 });
686 }
687
688 Ok(result)
689 }
690
691 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
692 let inner = self.inner.read().await;
693 let job_states = StreamingJob::find()
694 .select_only()
695 .column(streaming_job::Column::JobId)
696 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
697 .join(JoinType::InnerJoin, object::Relation::Database2.def())
698 .column(object::Column::ObjType)
699 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
700 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
701 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
702 .column_as(
703 Expr::if_null(
704 Expr::col((table::Entity, table::Column::Name)),
705 Expr::if_null(
706 Expr::col((source::Entity, source::Column::Name)),
707 Expr::if_null(
708 Expr::col((sink::Entity, sink::Column::Name)),
709 Expr::val("<unknown>"),
710 ),
711 ),
712 ),
713 "name",
714 )
715 .columns([
716 streaming_job::Column::JobStatus,
717 streaming_job::Column::Parallelism,
718 streaming_job::Column::MaxParallelism,
719 ])
720 .column_as(
721 Expr::if_null(
722 Expr::col((
723 streaming_job::Entity,
724 streaming_job::Column::SpecificResourceGroup,
725 )),
726 Expr::col((database::Entity, database::Column::ResourceGroup)),
727 ),
728 "resource_group",
729 )
730 .into_model()
731 .all(&inner.db)
732 .await?;
733 Ok(job_states)
734 }
735
736 pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
737 let inner = self.inner.read().await;
738 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
739 .select_only()
740 .column(streaming_job::Column::MaxParallelism)
741 .into_tuple()
742 .one(&inner.db)
743 .await?
744 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
745 Ok(max_parallelism as usize)
746 }
747
748 pub async fn get_job_actor_mapping(
750 &self,
751 job_ids: Vec<ObjectId>,
752 ) -> MetaResult<HashMap<ObjectId, Vec<ActorId>>> {
753 let inner = self.inner.read().await;
754 let job_actors: Vec<(ObjectId, ActorId)> = Actor::find()
755 .select_only()
756 .column(fragment::Column::JobId)
757 .column(actor::Column::ActorId)
758 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
759 .filter(fragment::Column::JobId.is_in(job_ids))
760 .into_tuple()
761 .all(&inner.db)
762 .await?;
763 Ok(job_actors.into_iter().into_group_map())
764 }
765
766 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
768 if let Ok(inner) = self.inner.try_read() {
769 if let Ok(job_state_tables) = FragmentModel::find()
770 .select_only()
771 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
772 .into_tuple::<(ObjectId, I32Array)>()
773 .all(&inner.db)
774 .await
775 {
776 let mut job_internal_table_ids = HashMap::new();
777 for (job_id, state_table_ids) in job_state_tables {
778 job_internal_table_ids
779 .entry(job_id)
780 .or_insert_with(Vec::new)
781 .extend(state_table_ids.into_inner());
782 }
783 return Some(job_internal_table_ids.into_iter().collect());
784 }
785 }
786 None
787 }
788
789 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
790 let inner = self.inner.read().await;
791 let count = FragmentModel::find().count(&inner.db).await?;
792 Ok(count > 0)
793 }
794
795 pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
796 let inner = self.inner.read().await;
797 let actor_cnt: Vec<(WorkerId, i64)> = Actor::find()
798 .select_only()
799 .column(actor::Column::WorkerId)
800 .column_as(actor::Column::ActorId.count(), "count")
801 .group_by(actor::Column::WorkerId)
802 .into_tuple()
803 .all(&inner.db)
804 .await?;
805
806 Ok(actor_cnt
807 .into_iter()
808 .map(|(worker_id, count)| (worker_id, count as usize))
809 .collect())
810 }
811
812 pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
814 let inner = self.inner.read().await;
815 let jobs = StreamingJob::find().all(&inner.db).await?;
816
817 let mut job_definition = resolve_streaming_job_definition(
818 &inner.db,
819 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
820 )
821 .await?;
822
823 let mut table_fragments = BTreeMap::new();
824 for job in jobs {
825 let fragment_actors = FragmentModel::find()
826 .find_with_related(Actor)
827 .filter(fragment::Column::JobId.eq(job.job_id))
828 .all(&inner.db)
829 .await?;
830
831 table_fragments.insert(
832 job.job_id as ObjectId,
833 Self::compose_table_fragments(
834 job.job_id as _,
835 job.job_status.into(),
836 job.timezone.map(|tz| PbStreamContext { timezone: tz }),
837 fragment_actors,
838 job.parallelism.clone(),
839 job.max_parallelism as _,
840 job_definition.remove(&job.job_id),
841 )?,
842 );
843 }
844
845 Ok(table_fragments)
846 }
847
848 pub async fn upstream_fragments(
849 &self,
850 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
851 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
852 let inner = self.inner.read().await;
853 let mut stream = FragmentRelation::find()
854 .select_only()
855 .columns([
856 fragment_relation::Column::SourceFragmentId,
857 fragment_relation::Column::TargetFragmentId,
858 ])
859 .filter(
860 fragment_relation::Column::TargetFragmentId
861 .is_in(fragment_ids.map(|id| id as FragmentId)),
862 )
863 .into_tuple::<(FragmentId, FragmentId)>()
864 .stream(&inner.db)
865 .await?;
866 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
867 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
868 upstream_fragments
869 .entry(downstream_fragment_id as crate::model::FragmentId)
870 .or_default()
871 .insert(upstream_fragment_id as crate::model::FragmentId);
872 }
873 Ok(upstream_fragments)
874 }
875
876 pub async fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
877 let inner = self.inner.read().await;
878 let actor_locations: Vec<PartialActorLocation> =
879 Actor::find().into_partial_model().all(&inner.db).await?;
880 Ok(actor_locations)
881 }
882
883 pub async fn list_actor_info(
884 &self,
885 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
886 let inner = self.inner.read().await;
887 let actor_locations: Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)> =
888 Actor::find()
889 .join(JoinType::LeftJoin, actor::Relation::Fragment.def())
890 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
891 .select_only()
892 .columns([actor::Column::ActorId, actor::Column::FragmentId])
893 .column_as(object::Column::Oid, "job_id")
894 .column_as(object::Column::SchemaId, "schema_id")
895 .column_as(object::Column::ObjType, "type")
896 .into_tuple()
897 .all(&inner.db)
898 .await?;
899 Ok(actor_locations)
900 }
901
902 pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
903 let inner = self.inner.read().await;
904
905 let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
906 .select_only()
907 .filter(actor::Column::Splits.is_not_null())
908 .columns([actor::Column::ActorId, actor::Column::FragmentId])
909 .into_tuple()
910 .all(&inner.db)
911 .await?;
912
913 Ok(source_actors)
914 }
915
916 pub async fn list_rw_table_scan_fragments(
917 &self,
918 ) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
919 let inner = self.inner.read().await;
920 let mut result = Vec::new();
921 let fragments = FragmentModel::find()
922 .select_only()
923 .columns([
924 fragment::Column::FragmentId,
925 fragment::Column::JobId,
926 fragment::Column::FragmentTypeMask,
927 fragment::Column::DistributionType,
928 fragment::Column::StateTableIds,
929 fragment::Column::VnodeCount,
930 fragment::Column::StreamNode,
931 ])
932 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
933 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
934 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
935 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
936 .filter(
937 streaming_job::Column::JobStatus
938 .eq(JobStatus::Initial)
939 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
940 )
941 .group_by(fragment::Column::FragmentId)
942 .into_model::<FragmentDesc>()
943 .all(&inner.db)
944 .await?;
945 for fragment in fragments {
946 let upstreams: Vec<FragmentId> = FragmentRelation::find()
947 .select_only()
948 .column(fragment_relation::Column::SourceFragmentId)
949 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
950 .into_tuple()
951 .all(&inner.db)
952 .await?;
953 result.push((fragment, upstreams));
954 }
955 Ok(result)
956 }
957
958 pub async fn list_fragment_descs(&self) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
959 let inner = self.inner.read().await;
960 let mut result = Vec::new();
961 let fragments = FragmentModel::find()
962 .select_only()
963 .columns([
964 fragment::Column::FragmentId,
965 fragment::Column::JobId,
966 fragment::Column::FragmentTypeMask,
967 fragment::Column::DistributionType,
968 fragment::Column::StateTableIds,
969 fragment::Column::VnodeCount,
970 fragment::Column::StreamNode,
971 ])
972 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
973 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
974 .group_by(fragment::Column::FragmentId)
975 .into_model::<FragmentDesc>()
976 .all(&inner.db)
977 .await?;
978 for fragment in fragments {
979 let upstreams: Vec<FragmentId> = FragmentRelation::find()
980 .select_only()
981 .column(fragment_relation::Column::SourceFragmentId)
982 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
983 .into_tuple()
984 .all(&inner.db)
985 .await?;
986 result.push((fragment, upstreams));
987 }
988 Ok(result)
989 }
990
991 pub async fn list_sink_actor_mapping(
992 &self,
993 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
994 let inner = self.inner.read().await;
995 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
996 .select_only()
997 .columns([sink::Column::SinkId, sink::Column::Name])
998 .into_tuple()
999 .all(&inner.db)
1000 .await?;
1001 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1002 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1003
1004 let actor_with_type: Vec<(ActorId, SinkId, i32)> = Actor::find()
1005 .select_only()
1006 .column(actor::Column::ActorId)
1007 .columns([fragment::Column::JobId, fragment::Column::FragmentTypeMask])
1008 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1009 .filter(fragment::Column::JobId.is_in(sink_ids))
1010 .into_tuple()
1011 .all(&inner.db)
1012 .await?;
1013
1014 let mut sink_actor_mapping = HashMap::new();
1015 for (actor_id, sink_id, type_mask) in actor_with_type {
1016 if type_mask & PbFragmentTypeFlag::Sink as i32 != 0 {
1017 sink_actor_mapping
1018 .entry(sink_id)
1019 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1020 .1
1021 .push(actor_id);
1022 }
1023 }
1024
1025 Ok(sink_actor_mapping)
1026 }
1027
1028 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1029 let inner = self.inner.read().await;
1030 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1031 .select_only()
1032 .columns([
1033 fragment::Column::FragmentId,
1034 fragment::Column::JobId,
1035 fragment::Column::StateTableIds,
1036 ])
1037 .into_partial_model()
1038 .all(&inner.db)
1039 .await?;
1040 Ok(fragment_state_tables)
1041 }
1042
1043 pub async fn load_all_actors(
1046 &self,
1047 database_id: Option<DatabaseId>,
1048 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
1049 {
1050 let inner = self.inner.read().await;
1051 let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
1052 let filter_condition = if let Some(database_id) = database_id {
1053 filter_condition.and(object::Column::DatabaseId.eq(database_id))
1054 } else {
1055 filter_condition
1056 };
1057 #[expect(clippy::type_complexity)]
1058 let mut actor_info_stream: BoxStream<
1059 '_,
1060 Result<
1061 (
1062 ActorId,
1063 WorkerId,
1064 Option<VnodeBitmap>,
1065 FragmentId,
1066 StreamNode,
1067 I32Array,
1068 DistributionType,
1069 DatabaseId,
1070 ObjectId,
1071 ),
1072 _,
1073 >,
1074 > = Actor::find()
1075 .select_only()
1076 .column(actor::Column::ActorId)
1077 .column(actor::Column::WorkerId)
1078 .column(actor::Column::VnodeBitmap)
1079 .column(fragment::Column::FragmentId)
1080 .column(fragment::Column::StreamNode)
1081 .column(fragment::Column::StateTableIds)
1082 .column(fragment::Column::DistributionType)
1083 .column(object::Column::DatabaseId)
1084 .column(object::Column::Oid)
1085 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1086 .join(JoinType::InnerJoin, fragment::Relation::Object.def())
1087 .filter(filter_condition)
1088 .into_tuple()
1089 .stream(&inner.db)
1090 .await?;
1091
1092 let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1093 HashMap::new();
1094
1095 while let Some((
1096 actor_id,
1097 worker_id,
1098 vnode_bitmap,
1099 fragment_id,
1100 node,
1101 state_table_ids,
1102 distribution_type,
1103 database_id,
1104 job_id,
1105 )) = actor_info_stream.try_next().await?
1106 {
1107 let fragment_infos = database_fragment_infos
1108 .entry(database_id)
1109 .or_default()
1110 .entry(job_id)
1111 .or_default();
1112 let state_table_ids = state_table_ids.into_inner();
1113 let state_table_ids = state_table_ids
1114 .into_iter()
1115 .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1116 .collect();
1117 let actor_info = InflightActorInfo {
1118 worker_id,
1119 vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1120 };
1121 match fragment_infos.entry(fragment_id) {
1122 Entry::Occupied(mut entry) => {
1123 let info: &mut InflightFragmentInfo = entry.get_mut();
1124 assert_eq!(info.state_table_ids, state_table_ids);
1125 assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1126 }
1127 Entry::Vacant(entry) => {
1128 entry.insert(InflightFragmentInfo {
1129 fragment_id: fragment_id as _,
1130 distribution_type,
1131 nodes: node.to_protobuf(),
1132 actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1133 state_table_ids,
1134 });
1135 }
1136 }
1137 }
1138
1139 debug!(?database_fragment_infos, "reload all actors");
1140
1141 Ok(database_fragment_infos)
1142 }
1143
1144 pub async fn migrate_actors(
1145 &self,
1146 plan: HashMap<WorkerSlotId, WorkerSlotId>,
1147 ) -> MetaResult<()> {
1148 let inner = self.inner.read().await;
1149 let txn = inner.db.begin().await?;
1150
1151 let actors: Vec<(
1152 FragmentId,
1153 DistributionType,
1154 ActorId,
1155 Option<VnodeBitmap>,
1156 WorkerId,
1157 ActorStatus,
1158 )> = Actor::find()
1159 .select_only()
1160 .columns([
1161 fragment::Column::FragmentId,
1162 fragment::Column::DistributionType,
1163 ])
1164 .columns([
1165 actor::Column::ActorId,
1166 actor::Column::VnodeBitmap,
1167 actor::Column::WorkerId,
1168 actor::Column::Status,
1169 ])
1170 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1171 .into_tuple()
1172 .all(&txn)
1173 .await?;
1174
1175 let mut actor_locations = HashMap::new();
1176
1177 for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1178 if *status != ActorStatus::Running {
1179 tracing::warn!(
1180 "skipping actor {} in fragment {} with status {:?}",
1181 actor_id,
1182 fragment_id,
1183 status
1184 );
1185 continue;
1186 }
1187
1188 actor_locations
1189 .entry(*worker_id)
1190 .or_insert(HashMap::new())
1191 .entry(*fragment_id)
1192 .or_insert(BTreeSet::new())
1193 .insert(*actor_id);
1194 }
1195
1196 let expired_or_changed_workers: HashSet<_> =
1197 plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1198
1199 let mut actor_migration_plan = HashMap::new();
1200 for (worker, fragment) in actor_locations {
1201 if expired_or_changed_workers.contains(&worker) {
1202 for (fragment_id, actors) in fragment {
1203 debug!(
1204 "worker {} expired or changed, migrating fragment {}",
1205 worker, fragment_id
1206 );
1207 let worker_slot_to_actor: HashMap<_, _> = actors
1208 .iter()
1209 .enumerate()
1210 .map(|(idx, actor_id)| {
1211 (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1212 })
1213 .collect();
1214
1215 for (worker_slot, actor) in worker_slot_to_actor {
1216 if let Some(target) = plan.get(&worker_slot) {
1217 actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1218 }
1219 }
1220 }
1221 }
1222 }
1223
1224 for (actor, worker) in actor_migration_plan {
1225 Actor::update_many()
1226 .col_expr(
1227 actor::Column::WorkerId,
1228 Expr::value(Value::Int(Some(worker))),
1229 )
1230 .filter(actor::Column::ActorId.eq(actor))
1231 .exec(&txn)
1232 .await?;
1233 }
1234
1235 txn.commit().await?;
1236
1237 self.notify_fragment_mapping(
1238 NotificationOperation::Update,
1239 rebuild_fragment_mapping_from_actors(actors),
1240 )
1241 .await;
1242
1243 Ok(())
1244 }
1245
1246 pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1247 let inner = self.inner.read().await;
1248
1249 let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1250 .select_only()
1251 .columns([fragment::Column::FragmentId])
1252 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1253 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1254 .into_tuple()
1255 .all(&inner.db)
1256 .await?;
1257
1258 let mut actor_locations = HashMap::new();
1259
1260 for (fragment_id, _, worker_id) in actors {
1261 *actor_locations
1262 .entry(worker_id)
1263 .or_insert(HashMap::new())
1264 .entry(fragment_id)
1265 .or_insert(0_usize) += 1;
1266 }
1267
1268 let mut result = HashSet::new();
1269 for (worker_id, mapping) in actor_locations {
1270 let max_fragment_len = mapping.values().max().unwrap();
1271
1272 result
1273 .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1274 }
1275
1276 Ok(result)
1277 }
1278
1279 pub async fn all_node_actors(
1280 &self,
1281 include_inactive: bool,
1282 ) -> MetaResult<HashMap<WorkerId, Vec<StreamActor>>> {
1283 let inner = self.inner.read().await;
1284 let fragment_actors = if include_inactive {
1285 FragmentModel::find()
1286 .find_with_related(Actor)
1287 .all(&inner.db)
1288 .await?
1289 } else {
1290 FragmentModel::find()
1291 .find_with_related(Actor)
1292 .filter(actor::Column::Status.eq(ActorStatus::Running))
1293 .all(&inner.db)
1294 .await?
1295 };
1296
1297 let job_definitions = resolve_streaming_job_definition(
1298 &inner.db,
1299 &HashSet::from_iter(fragment_actors.iter().map(|(fragment, _)| fragment.job_id)),
1300 )
1301 .await?;
1302
1303 let mut node_actors = HashMap::new();
1304 for (fragment, actors) in fragment_actors {
1305 let job_id = fragment.job_id;
1306 let (table_fragments, actor_status, _) = Self::compose_fragment(
1307 fragment,
1308 actors,
1309 job_definitions.get(&(job_id as _)).cloned(),
1310 )?;
1311 for actor in table_fragments.actors {
1312 let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
1313 node_actors
1314 .entry(node_id)
1315 .or_insert_with(Vec::new)
1316 .push(actor);
1317 }
1318 }
1319
1320 Ok(node_actors)
1321 }
1322
1323 pub async fn get_worker_actor_ids(
1324 &self,
1325 job_ids: Vec<ObjectId>,
1326 ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
1327 let inner = self.inner.read().await;
1328 let actor_workers: Vec<(ActorId, WorkerId)> = Actor::find()
1329 .select_only()
1330 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1331 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1332 .filter(fragment::Column::JobId.is_in(job_ids))
1333 .into_tuple()
1334 .all(&inner.db)
1335 .await?;
1336
1337 let mut worker_actors = BTreeMap::new();
1338 for (actor_id, worker_id) in actor_workers {
1339 worker_actors
1340 .entry(worker_id)
1341 .or_insert_with(Vec::new)
1342 .push(actor_id);
1343 }
1344
1345 Ok(worker_actors)
1346 }
1347
1348 pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1349 let inner = self.inner.read().await;
1350 let txn = inner.db.begin().await?;
1351 for assignments in split_assignment.values() {
1352 for (actor_id, splits) in assignments {
1353 let actor_splits = splits.iter().map(Into::into).collect_vec();
1354 Actor::update(actor::ActiveModel {
1355 actor_id: Set(*actor_id as _),
1356 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1357 splits: actor_splits,
1358 }))),
1359 ..Default::default()
1360 })
1361 .exec(&txn)
1362 .await
1363 .map_err(|err| {
1364 if err == DbErr::RecordNotUpdated {
1365 MetaError::catalog_id_not_found("actor_id", actor_id)
1366 } else {
1367 err.into()
1368 }
1369 })?;
1370 }
1371 }
1372 txn.commit().await?;
1373
1374 Ok(())
1375 }
1376
1377 #[await_tree::instrument]
1378 pub async fn fill_snapshot_backfill_epoch(
1379 &self,
1380 fragment_ids: impl Iterator<Item = FragmentId>,
1381 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1382 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1383 ) -> MetaResult<()> {
1384 let inner = self.inner.write().await;
1385 let txn = inner.db.begin().await?;
1386 for fragment_id in fragment_ids {
1387 let fragment = FragmentModel::find_by_id(fragment_id)
1388 .one(&txn)
1389 .await?
1390 .context(format!("fragment {} not found", fragment_id))?;
1391 let mut node = fragment.stream_node.to_protobuf();
1392 if crate::stream::fill_snapshot_backfill_epoch(
1393 &mut node,
1394 snapshot_backfill_info,
1395 cross_db_snapshot_backfill_info,
1396 )? {
1397 let node = StreamNode::from(&node);
1398 FragmentModel::update(fragment::ActiveModel {
1399 fragment_id: Set(fragment_id),
1400 stream_node: Set(node),
1401 ..Default::default()
1402 })
1403 .exec(&txn)
1404 .await?;
1405 }
1406 }
1407 txn.commit().await?;
1408 Ok(())
1409 }
1410
1411 pub async fn get_running_actors_of_fragment(
1413 &self,
1414 fragment_id: FragmentId,
1415 ) -> MetaResult<Vec<ActorId>> {
1416 let inner = self.inner.read().await;
1417 let actors: Vec<ActorId> = Actor::find()
1418 .select_only()
1419 .column(actor::Column::ActorId)
1420 .filter(actor::Column::FragmentId.eq(fragment_id))
1421 .filter(actor::Column::Status.eq(ActorStatus::Running))
1422 .into_tuple()
1423 .all(&inner.db)
1424 .await?;
1425 Ok(actors)
1426 }
1427
1428 pub async fn get_running_actors_for_source_backfill(
1431 &self,
1432 source_backfill_fragment_id: FragmentId,
1433 source_fragment_id: FragmentId,
1434 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1435 let inner = self.inner.read().await;
1436 let txn = inner.db.begin().await?;
1437
1438 let fragment_relation: DispatcherType = FragmentRelation::find()
1439 .select_only()
1440 .column(fragment_relation::Column::DispatcherType)
1441 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1442 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1443 .into_tuple()
1444 .one(&txn)
1445 .await?
1446 .ok_or_else(|| {
1447 anyhow!(
1448 "no fragment connection from source fragment {} to source backfill fragment {}",
1449 source_fragment_id,
1450 source_backfill_fragment_id
1451 )
1452 })?;
1453
1454 if fragment_relation != DispatcherType::NoShuffle {
1455 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1456 }
1457
1458 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1459 let result: MetaResult<DistributionType> = try {
1460 FragmentModel::find_by_id(fragment_id)
1461 .select_only()
1462 .column(fragment::Column::DistributionType)
1463 .into_tuple()
1464 .one(txn)
1465 .await?
1466 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1467 };
1468 result
1469 };
1470
1471 let source_backfill_distribution_type =
1472 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1473 let source_distribution_type =
1474 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1475
1476 let load_fragment_actor_distribution = |txn, fragment_id: FragmentId| async move {
1477 Actor::find()
1478 .select_only()
1479 .column(actor::Column::ActorId)
1480 .column(actor::Column::VnodeBitmap)
1481 .filter(actor::Column::FragmentId.eq(fragment_id))
1482 .into_tuple()
1483 .stream(txn)
1484 .await?
1485 .map(|result| {
1486 result.map(|(actor_id, vnode): (ActorId, Option<VnodeBitmap>)| {
1487 (
1488 actor_id as _,
1489 vnode.map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1490 )
1491 })
1492 })
1493 .try_collect()
1494 .await
1495 };
1496
1497 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1498 load_fragment_actor_distribution(&txn, source_backfill_fragment_id).await?;
1499
1500 let source_actors = load_fragment_actor_distribution(&txn, source_fragment_id).await?;
1501
1502 Ok(resolve_no_shuffle_actor_dispatcher(
1503 source_distribution_type,
1504 &source_actors,
1505 source_backfill_distribution_type,
1506 &source_backfill_actors,
1507 )
1508 .into_iter()
1509 .map(|(source_actor, source_backfill_actor)| {
1510 (source_backfill_actor as _, source_actor as _)
1511 })
1512 .collect())
1513 }
1514
1515 pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
1516 let inner = self.inner.read().await;
1517 let actors: Vec<ActorId> = Actor::find()
1518 .select_only()
1519 .column(actor::Column::ActorId)
1520 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1521 .filter(fragment::Column::JobId.is_in(job_ids))
1522 .into_tuple()
1523 .all(&inner.db)
1524 .await?;
1525 Ok(actors)
1526 }
1527
1528 pub async fn get_root_fragments(
1541 &self,
1542 job_ids: Vec<ObjectId>,
1543 ) -> MetaResult<(HashMap<ObjectId, Fragment>, Vec<(ActorId, WorkerId)>)> {
1544 let inner = self.inner.read().await;
1545
1546 let job_definitions = resolve_streaming_job_definition(
1547 &inner.db,
1548 &HashSet::from_iter(job_ids.iter().copied()),
1549 )
1550 .await?;
1551
1552 let all_fragments = FragmentModel::find()
1553 .filter(fragment::Column::JobId.is_in(job_ids))
1554 .all(&inner.db)
1555 .await?;
1556 let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1558 for fragment in all_fragments {
1559 if (fragment.fragment_type_mask & PbFragmentTypeFlag::Mview as i32) != 0
1560 || (fragment.fragment_type_mask & PbFragmentTypeFlag::Sink as i32) != 0
1561 {
1562 _ = root_fragments.insert(fragment.job_id, fragment);
1563 } else if fragment.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
1564 _ = root_fragments.try_insert(fragment.job_id, fragment);
1567 }
1568 }
1569
1570 let mut root_fragments_pb = HashMap::new();
1571 for (_, fragment) in root_fragments {
1572 let actors = fragment.find_related(Actor).all(&inner.db).await?;
1573
1574 let job_id = fragment.job_id;
1575 root_fragments_pb.insert(
1576 fragment.job_id,
1577 Self::compose_fragment(
1578 fragment,
1579 actors,
1580 job_definitions.get(&(job_id as _)).cloned(),
1581 )?
1582 .0,
1583 );
1584 }
1585
1586 let actors: Vec<(ActorId, WorkerId)> = Actor::find()
1587 .select_only()
1588 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1589 .into_tuple()
1590 .all(&inner.db)
1591 .await?;
1592
1593 Ok((root_fragments_pb, actors))
1594 }
1595
1596 pub async fn get_root_fragment(
1597 &self,
1598 job_id: ObjectId,
1599 ) -> MetaResult<(Fragment, Vec<(ActorId, WorkerId)>)> {
1600 let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1601 let root_fragment = root_fragments
1602 .remove(&job_id)
1603 .context(format!("root fragment for job {} not found", job_id))?;
1604 Ok((root_fragment, actors))
1605 }
1606
1607 pub async fn get_downstream_fragments(
1609 &self,
1610 job_id: ObjectId,
1611 ) -> MetaResult<(Vec<(PbDispatcherType, Fragment)>, Vec<(ActorId, WorkerId)>)> {
1612 let (root_fragment, actors) = self.get_root_fragment(job_id).await?;
1613
1614 let inner = self.inner.read().await;
1615 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1616 .filter(
1617 fragment_relation::Column::SourceFragmentId
1618 .eq(root_fragment.fragment_id as FragmentId),
1619 )
1620 .all(&inner.db)
1621 .await?;
1622 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
1623 .await?
1624 .remove(&job_id);
1625
1626 let mut downstream_fragments = vec![];
1627 for fragment_relation::Model {
1628 target_fragment_id: fragment_id,
1629 dispatcher_type,
1630 ..
1631 } in downstream_fragment_relations
1632 {
1633 let mut fragment_actors = FragmentModel::find_by_id(fragment_id)
1634 .find_with_related(Actor)
1635 .all(&inner.db)
1636 .await?;
1637 if fragment_actors.is_empty() {
1638 bail!("No fragment found for fragment id {}", fragment_id);
1639 }
1640 assert_eq!(fragment_actors.len(), 1);
1641 let (fragment, actors) = fragment_actors.pop().unwrap();
1642 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1643 let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0;
1644 downstream_fragments.push((dispatch_type, fragment));
1645 }
1646
1647 Ok((downstream_fragments, actors))
1648 }
1649
1650 pub async fn load_source_fragment_ids(
1651 &self,
1652 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1653 let inner = self.inner.read().await;
1654 let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1655 .select_only()
1656 .columns([
1657 fragment::Column::FragmentId,
1658 fragment::Column::FragmentTypeMask,
1659 fragment::Column::StreamNode,
1660 ])
1661 .into_tuple()
1662 .all(&inner.db)
1663 .await?;
1664 fragments.retain(|(_, mask, _)| *mask & PbFragmentTypeFlag::Source as i32 != 0);
1665
1666 let mut source_fragment_ids = HashMap::new();
1667 for (fragment_id, _, stream_node) in fragments {
1668 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1669 source_fragment_ids
1670 .entry(source_id as SourceId)
1671 .or_insert_with(BTreeSet::new)
1672 .insert(fragment_id);
1673 }
1674 }
1675 Ok(source_fragment_ids)
1676 }
1677
1678 pub async fn load_backfill_fragment_ids(
1679 &self,
1680 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1681 let inner = self.inner.read().await;
1682 let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1683 .select_only()
1684 .columns([
1685 fragment::Column::FragmentId,
1686 fragment::Column::FragmentTypeMask,
1687 fragment::Column::StreamNode,
1688 ])
1689 .into_tuple()
1690 .all(&inner.db)
1691 .await?;
1692 fragments.retain(|(_, mask, _)| *mask & PbFragmentTypeFlag::SourceScan as i32 != 0);
1693
1694 let mut source_fragment_ids = HashMap::new();
1695 for (fragment_id, _, stream_node) in fragments {
1696 if let Some((source_id, upstream_source_fragment_id)) =
1697 stream_node.to_protobuf().find_source_backfill()
1698 {
1699 source_fragment_ids
1700 .entry(source_id as SourceId)
1701 .or_insert_with(BTreeSet::new)
1702 .insert((fragment_id, upstream_source_fragment_id));
1703 }
1704 }
1705 Ok(source_fragment_ids)
1706 }
1707
1708 pub async fn load_actor_splits(&self) -> MetaResult<HashMap<ActorId, ConnectorSplits>> {
1709 let inner = self.inner.read().await;
1710 let splits: Vec<(ActorId, ConnectorSplits)> = Actor::find()
1711 .select_only()
1712 .columns([actor::Column::ActorId, actor::Column::Splits])
1713 .filter(actor::Column::Splits.is_not_null())
1714 .into_tuple()
1715 .all(&inner.db)
1716 .await?;
1717 Ok(splits.into_iter().collect())
1718 }
1719
1720 pub async fn get_actual_job_fragment_parallelism(
1722 &self,
1723 job_id: ObjectId,
1724 ) -> MetaResult<Option<usize>> {
1725 let inner = self.inner.read().await;
1726 let mut fragments: Vec<(FragmentId, i32, i64)> = FragmentModel::find()
1727 .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1728 .select_only()
1729 .columns([
1730 fragment::Column::FragmentId,
1731 fragment::Column::FragmentTypeMask,
1732 ])
1733 .column_as(actor::Column::ActorId.count(), "count")
1734 .filter(fragment::Column::JobId.eq(job_id))
1735 .group_by(fragment::Column::FragmentId)
1736 .into_tuple()
1737 .all(&inner.db)
1738 .await?;
1739
1740 fragments.retain(|(_, mask, _)| {
1741 *mask & PbFragmentTypeFlag::Mview as i32 != 0
1742 || *mask & PbFragmentTypeFlag::Sink as i32 != 0
1743 });
1744
1745 Ok(fragments
1746 .into_iter()
1747 .at_most_one()
1748 .ok()
1749 .flatten()
1750 .map(|(_, _, count)| count as usize))
1751 }
1752}
1753
1754#[cfg(test)]
1755mod tests {
1756 use std::collections::{BTreeMap, HashMap, HashSet};
1757
1758 use itertools::Itertools;
1759 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1760 use risingwave_common::util::iter_util::ZipEqDebug;
1761 use risingwave_common::util::stream_graph_visitor::visit_stream_node;
1762 use risingwave_meta_model::actor::ActorStatus;
1763 use risingwave_meta_model::fragment::DistributionType;
1764 use risingwave_meta_model::{
1765 ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1766 VnodeBitmap, actor, fragment,
1767 };
1768 use risingwave_pb::common::PbActorLocation;
1769 use risingwave_pb::meta::table_fragments::PbActorStatus;
1770 use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1771 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1772 use risingwave_pb::plan_common::PbExprContext;
1773 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1774 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1775 use risingwave_pb::stream_plan::{MergeNode, PbFragmentTypeFlag, PbStreamNode, PbUnionNode};
1776
1777 use crate::MetaResult;
1778 use crate::controller::catalog::CatalogController;
1779 use crate::model::{Fragment, StreamActor};
1780
1781 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1782
1783 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1784
1785 const TEST_FRAGMENT_ID: FragmentId = 1;
1786
1787 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1788
1789 const TEST_JOB_ID: ObjectId = 1;
1790
1791 const TEST_STATE_TABLE_ID: TableId = 1000;
1792
1793 fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1794 let mut upstream_actor_ids = BTreeMap::new();
1795 upstream_actor_ids.insert(
1796 TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1797 HashSet::from_iter([(actor_id + 100)]),
1798 );
1799 upstream_actor_ids.insert(
1800 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1801 HashSet::from_iter([(actor_id + 200)]),
1802 );
1803 upstream_actor_ids
1804 }
1805
1806 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1807 let mut input = vec![];
1808 for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1809 input.push(PbStreamNode {
1810 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1811 upstream_fragment_id: *upstream_fragment_id as _,
1812 ..Default::default()
1813 }))),
1814 ..Default::default()
1815 });
1816 }
1817
1818 PbStreamNode {
1819 input,
1820 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1821 ..Default::default()
1822 }
1823 }
1824
1825 #[tokio::test]
1826 async fn test_extract_fragment() -> MetaResult<()> {
1827 let actor_count = 3u32;
1828 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1829 .map(|actor_id| {
1830 (
1831 actor_id as _,
1832 generate_upstream_actor_ids_for_actor(actor_id),
1833 )
1834 })
1835 .collect();
1836
1837 let actor_bitmaps = ActorMapping::new_uniform(
1838 (0..actor_count).map(|i| i as _),
1839 VirtualNode::COUNT_FOR_TEST,
1840 )
1841 .to_bitmaps();
1842
1843 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1844
1845 let pb_actors = (0..actor_count)
1846 .map(|actor_id| StreamActor {
1847 actor_id: actor_id as _,
1848 fragment_id: TEST_FRAGMENT_ID as _,
1849 vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1850 mview_definition: "".to_owned(),
1851 expr_context: Some(PbExprContext {
1852 time_zone: String::from("America/New_York"),
1853 strict_mode: false,
1854 }),
1855 })
1856 .collect_vec();
1857
1858 let pb_fragment = Fragment {
1859 fragment_id: TEST_FRAGMENT_ID as _,
1860 fragment_type_mask: PbFragmentTypeFlag::Source as _,
1861 distribution_type: PbFragmentDistributionType::Hash as _,
1862 actors: pb_actors.clone(),
1863 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1864 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1865 nodes: stream_node.clone(),
1866 };
1867
1868 let pb_actor_status = (0..actor_count)
1869 .map(|actor_id| {
1870 (
1871 actor_id,
1872 PbActorStatus {
1873 location: PbActorLocation::from_worker(0),
1874 state: PbActorState::Running as _,
1875 },
1876 )
1877 })
1878 .collect();
1879
1880 let pb_actor_splits = Default::default();
1881
1882 let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
1883 TEST_JOB_ID,
1884 &pb_fragment,
1885 &pb_actor_status,
1886 &pb_actor_splits,
1887 )?;
1888
1889 check_fragment(fragment, pb_fragment);
1890 check_actors(
1891 actors,
1892 &upstream_actor_ids,
1893 pb_actors,
1894 Default::default(),
1895 &stream_node,
1896 );
1897
1898 Ok(())
1899 }
1900
1901 #[tokio::test]
1902 async fn test_compose_fragment() -> MetaResult<()> {
1903 let actor_count = 3u32;
1904
1905 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1906 .map(|actor_id| {
1907 (
1908 actor_id as _,
1909 generate_upstream_actor_ids_for_actor(actor_id),
1910 )
1911 })
1912 .collect();
1913
1914 let mut actor_bitmaps = ActorMapping::new_uniform(
1915 (0..actor_count).map(|i| i as _),
1916 VirtualNode::COUNT_FOR_TEST,
1917 )
1918 .to_bitmaps();
1919
1920 let actors = (0..actor_count)
1921 .map(|actor_id| {
1922 let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
1923 splits: vec![PbConnectorSplit {
1924 split_type: "dummy".to_owned(),
1925 ..Default::default()
1926 }],
1927 }));
1928
1929 #[expect(deprecated)]
1930 actor::Model {
1931 actor_id: actor_id as ActorId,
1932 fragment_id: TEST_FRAGMENT_ID,
1933 status: ActorStatus::Running,
1934 splits: actor_splits,
1935 worker_id: 0,
1936 upstream_actor_ids: Default::default(),
1937 vnode_bitmap: actor_bitmaps
1938 .remove(&actor_id)
1939 .map(|bitmap| bitmap.to_protobuf())
1940 .as_ref()
1941 .map(VnodeBitmap::from),
1942 expr_context: ExprContext::from(&PbExprContext {
1943 time_zone: String::from("America/New_York"),
1944 strict_mode: false,
1945 }),
1946 }
1947 })
1948 .collect_vec();
1949
1950 let stream_node = {
1951 let template_actor = actors.first().cloned().unwrap();
1952
1953 let template_upstream_actor_ids = upstream_actor_ids
1954 .get(&(template_actor.actor_id as _))
1955 .unwrap();
1956
1957 generate_merger_stream_node(template_upstream_actor_ids)
1958 };
1959
1960 #[expect(deprecated)]
1961 let fragment = fragment::Model {
1962 fragment_id: TEST_FRAGMENT_ID,
1963 job_id: TEST_JOB_ID,
1964 fragment_type_mask: 0,
1965 distribution_type: DistributionType::Hash,
1966 stream_node: StreamNode::from(&stream_node),
1967 state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
1968 upstream_fragment_id: Default::default(),
1969 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1970 };
1971
1972 let (pb_fragment, pb_actor_status, pb_actor_splits) =
1973 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
1974
1975 assert_eq!(pb_actor_status.len(), actor_count as usize);
1976 assert_eq!(pb_actor_splits.len(), actor_count as usize);
1977
1978 let pb_actors = pb_fragment.actors.clone();
1979
1980 check_fragment(fragment, pb_fragment);
1981 check_actors(
1982 actors,
1983 &upstream_actor_ids,
1984 pb_actors,
1985 pb_actor_splits,
1986 &stream_node,
1987 );
1988
1989 Ok(())
1990 }
1991
1992 fn check_actors(
1993 actors: Vec<actor::Model>,
1994 actor_upstreams: &FragmentActorUpstreams,
1995 pb_actors: Vec<StreamActor>,
1996 pb_actor_splits: HashMap<u32, PbConnectorSplits>,
1997 stream_node: &PbStreamNode,
1998 ) {
1999 for (
2000 actor::Model {
2001 actor_id,
2002 fragment_id,
2003 status,
2004 splits,
2005 worker_id: _,
2006 vnode_bitmap,
2007 expr_context,
2008 ..
2009 },
2010 StreamActor {
2011 actor_id: pb_actor_id,
2012 fragment_id: pb_fragment_id,
2013 vnode_bitmap: pb_vnode_bitmap,
2014 mview_definition,
2015 expr_context: pb_expr_context,
2016 ..
2017 },
2018 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2019 {
2020 assert_eq!(actor_id, pb_actor_id as ActorId);
2021 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2022
2023 assert_eq!(
2024 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2025 pb_vnode_bitmap,
2026 );
2027
2028 assert_eq!(mview_definition, "");
2029
2030 visit_stream_node(stream_node, |body| {
2031 if let PbNodeBody::Merge(m) = body {
2032 assert!(
2033 actor_upstreams
2034 .get(&(actor_id as _))
2035 .unwrap()
2036 .contains_key(&m.upstream_fragment_id)
2037 );
2038 }
2039 });
2040
2041 assert_eq!(status, ActorStatus::Running);
2042
2043 assert_eq!(
2044 splits,
2045 pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
2046 );
2047
2048 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2049 }
2050 }
2051
2052 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2053 let Fragment {
2054 fragment_id,
2055 fragment_type_mask,
2056 distribution_type: pb_distribution_type,
2057 actors: _,
2058 state_table_ids: pb_state_table_ids,
2059 maybe_vnode_count: _,
2060 nodes,
2061 } = pb_fragment;
2062
2063 assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2064 assert_eq!(fragment_type_mask, fragment.fragment_type_mask as u32);
2065 assert_eq!(
2066 pb_distribution_type,
2067 PbFragmentDistributionType::from(fragment.distribution_type)
2068 );
2069
2070 assert_eq!(
2071 pb_state_table_ids,
2072 fragment.state_table_ids.into_u32_array()
2073 );
2074 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2075 }
2076}