1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
26use risingwave_common::util::stream_graph_visitor::{
27 visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_connector::source::SplitImpl;
30use risingwave_meta_model::actor::ActorStatus;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34 Actor, Fragment as FragmentModel, FragmentRelation, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39 VnodeBitmap, WorkerId, actor, database, fragment, fragment_relation, object, sink, source,
40 streaming_job, table,
41};
42use risingwave_meta_model_migration::{Alias, SelectStatement};
43use risingwave_pb::common::PbActorLocation;
44use risingwave_pb::meta::subscribe_response::{
45 Info as NotificationInfo, Operation as NotificationOperation,
46};
47use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
48use risingwave_pb::meta::table_fragments::fragment::{
49 FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan::stream_node::NodeBody;
55use risingwave_pb::stream_plan::{
56 PbDispatchOutputMapping, PbDispatcherType, PbStreamContext, PbStreamNode, PbStreamScanType,
57 StreamScanType,
58};
59use sea_orm::ActiveValue::Set;
60use sea_orm::sea_query::Expr;
61use sea_orm::{
62 ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, ModelTrait, PaginatorTrait,
63 QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
64};
65use serde::{Deserialize, Serialize};
66use tracing::debug;
67
68use crate::barrier::SnapshotBackfillInfo;
69use crate::controller::catalog::{CatalogController, CatalogControllerInner};
70use crate::controller::scale::resolve_streaming_job_definition;
71use crate::controller::utils::{
72 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
73 get_fragment_mappings, rebuild_fragment_mapping_from_actors,
74 resolve_no_shuffle_actor_dispatcher,
75};
76use crate::manager::LocalNotification;
77use crate::model::{
78 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
79 StreamActor, StreamContext, StreamJobFragments, TableParallelism,
80};
81use crate::stream::{SplitAssignment, build_actor_split_impls};
82use crate::{MetaError, MetaResult, model};
83
84#[derive(Clone, Debug)]
86pub struct InflightActorInfo {
87 pub worker_id: WorkerId,
88 pub vnode_bitmap: Option<Bitmap>,
89}
90
91#[derive(Clone, Debug)]
92pub struct InflightFragmentInfo {
93 pub fragment_id: crate::model::FragmentId,
94 pub distribution_type: DistributionType,
95 pub nodes: PbStreamNode,
96 pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
97 pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
98}
99
100#[derive(Clone, Debug)]
101pub struct FragmentParallelismInfo {
102 pub distribution_type: FragmentDistributionType,
103 pub actor_count: usize,
104 pub vnode_count: usize,
105}
106
107#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
108#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
110 pub job_id: ObjectId,
111 pub obj_type: ObjectType,
112 pub name: String,
113 pub job_status: JobStatus,
114 pub parallelism: StreamingParallelism,
115 pub max_parallelism: i32,
116 pub resource_group: String,
117 pub database_id: DatabaseId,
118 pub schema_id: SchemaId,
119}
120
121impl CatalogControllerInner {
122 pub async fn all_running_fragment_mappings(
124 &self,
125 ) -> MetaResult<impl Iterator<Item = FragmentWorkerSlotMapping> + '_> {
126 let txn = self.db.begin().await?;
127
128 let job_ids: Vec<ObjectId> = StreamingJob::find()
129 .select_only()
130 .column(streaming_job::Column::JobId)
131 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
132 .into_tuple()
133 .all(&txn)
134 .await?;
135
136 let mut result = vec![];
137 for job_id in job_ids {
138 let mappings = get_fragment_mappings(&txn, job_id).await?;
139
140 result.extend(mappings.into_iter());
141 }
142
143 Ok(result.into_iter())
144 }
145}
146
147impl CatalogController {
148 pub(crate) async fn notify_fragment_mapping(
149 &self,
150 operation: NotificationOperation,
151 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
152 ) {
153 let fragment_ids = fragment_mappings
154 .iter()
155 .map(|mapping| mapping.fragment_id)
156 .collect_vec();
157 for fragment_mapping in fragment_mappings {
159 self.env
160 .notification_manager()
161 .notify_frontend(
162 operation,
163 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
164 )
165 .await;
166 }
167
168 match operation {
170 NotificationOperation::Add | NotificationOperation::Update => {
171 self.env
172 .notification_manager()
173 .notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
174 fragment_ids,
175 ))
176 .await;
177 }
178 NotificationOperation::Delete => {
179 self.env
180 .notification_manager()
181 .notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
182 fragment_ids,
183 ))
184 .await;
185 }
186 op => {
187 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
188 }
189 }
190 }
191
192 pub fn extract_fragment_and_actors_from_fragments(
193 stream_job_fragments: &StreamJobFragments,
194 ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
195 stream_job_fragments
196 .fragments
197 .values()
198 .map(|fragment| {
199 Self::extract_fragment_and_actors_for_new_job(
200 stream_job_fragments.stream_job_id.table_id as _,
201 fragment,
202 &stream_job_fragments.actor_status,
203 &stream_job_fragments.actor_splits,
204 )
205 })
206 .try_collect()
207 }
208
209 fn extract_fragment_and_actors_for_new_job(
210 job_id: ObjectId,
211 fragment: &Fragment,
212 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
213 actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
214 ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
215 let vnode_count = fragment.vnode_count();
216 let Fragment {
217 fragment_id: pb_fragment_id,
218 fragment_type_mask: pb_fragment_type_mask,
219 distribution_type: pb_distribution_type,
220 actors: pb_actors,
221 state_table_ids: pb_state_table_ids,
222 nodes,
223 ..
224 } = fragment;
225
226 let state_table_ids = pb_state_table_ids.clone().into();
227
228 assert!(!pb_actors.is_empty());
229
230 let stream_node = {
231 let mut stream_node = nodes.clone();
232 visit_stream_node_mut(&mut stream_node, |body| {
233 #[expect(deprecated)]
234 if let NodeBody::Merge(m) = body {
235 m.upstream_actor_id = vec![];
236 }
237 });
238
239 stream_node
240 };
241
242 let mut actors = vec![];
243
244 for actor in pb_actors {
245 let StreamActor {
246 actor_id,
247 fragment_id,
248 vnode_bitmap,
249 mview_definition: _,
250 expr_context: pb_expr_context,
251 ..
252 } = actor;
253
254 let splits = actor_splits.get(actor_id).map(|splits| {
255 ConnectorSplits::from(&PbConnectorSplits {
256 splits: splits.iter().map(ConnectorSplit::from).collect(),
257 })
258 });
259 let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
260 anyhow::anyhow!(
261 "actor {} in fragment {} has no actor_status",
262 actor_id,
263 fragment_id
264 )
265 })?;
266
267 let worker_id = status.worker_id() as _;
268
269 let pb_expr_context = pb_expr_context
270 .as_ref()
271 .expect("no expression context found");
272
273 #[expect(deprecated)]
274 actors.push(actor::Model {
275 actor_id: *actor_id as _,
276 fragment_id: *fragment_id as _,
277 status: status.get_state().unwrap().into(),
278 splits,
279 worker_id,
280 upstream_actor_ids: Default::default(),
281 vnode_bitmap: vnode_bitmap
282 .as_ref()
283 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
284 expr_context: ExprContext::from(pb_expr_context),
285 });
286 }
287
288 let stream_node = StreamNode::from(&stream_node);
289
290 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
291 .unwrap()
292 .into();
293
294 #[expect(deprecated)]
295 let fragment = fragment::Model {
296 fragment_id: *pb_fragment_id as _,
297 job_id,
298 fragment_type_mask: (*pb_fragment_type_mask).into(),
299 distribution_type,
300 stream_node,
301 state_table_ids,
302 upstream_fragment_id: Default::default(),
303 vnode_count: vnode_count as _,
304 };
305
306 Ok((fragment, actors))
307 }
308
309 pub fn compose_table_fragments(
310 table_id: u32,
311 state: PbState,
312 ctx: Option<PbStreamContext>,
313 fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
314 parallelism: StreamingParallelism,
315 max_parallelism: usize,
316 job_definition: Option<String>,
317 ) -> MetaResult<StreamJobFragments> {
318 let mut pb_fragments = BTreeMap::new();
319 let mut pb_actor_splits = HashMap::new();
320 let mut pb_actor_status = BTreeMap::new();
321
322 for (fragment, actors) in fragments {
323 let (fragment, fragment_actor_status, fragment_actor_splits) =
324 Self::compose_fragment(fragment, actors, job_definition.clone())?;
325
326 pb_fragments.insert(fragment.fragment_id, fragment);
327
328 pb_actor_splits.extend(build_actor_split_impls(&fragment_actor_splits));
329 pb_actor_status.extend(fragment_actor_status.into_iter());
330 }
331
332 let table_fragments = StreamJobFragments {
333 stream_job_id: table_id.into(),
334 state: state as _,
335 fragments: pb_fragments,
336 actor_status: pb_actor_status,
337 actor_splits: pb_actor_splits,
338 ctx: ctx
339 .as_ref()
340 .map(StreamContext::from_protobuf)
341 .unwrap_or_default(),
342 assigned_parallelism: match parallelism {
343 StreamingParallelism::Custom => TableParallelism::Custom,
344 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
345 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
346 },
347 max_parallelism,
348 };
349
350 Ok(table_fragments)
351 }
352
353 #[allow(clippy::type_complexity)]
354 pub(crate) fn compose_fragment(
355 fragment: fragment::Model,
356 actors: Vec<actor::Model>,
357 job_definition: Option<String>,
358 ) -> MetaResult<(
359 Fragment,
360 HashMap<crate::model::ActorId, PbActorStatus>,
361 HashMap<crate::model::ActorId, PbConnectorSplits>,
362 )> {
363 let fragment::Model {
364 fragment_id,
365 fragment_type_mask,
366 distribution_type,
367 stream_node,
368 state_table_ids,
369 vnode_count,
370 ..
371 } = fragment;
372
373 let stream_node = stream_node.to_protobuf();
374 let mut upstream_fragments = HashSet::new();
375 visit_stream_node_body(&stream_node, |body| {
376 if let NodeBody::Merge(m) = body {
377 assert!(
378 upstream_fragments.insert(m.upstream_fragment_id),
379 "non-duplicate upstream fragment"
380 );
381 }
382 });
383
384 let mut pb_actors = vec![];
385
386 let mut pb_actor_status = HashMap::new();
387 let mut pb_actor_splits = HashMap::new();
388
389 for actor in actors {
390 if actor.fragment_id != fragment_id {
391 bail!(
392 "fragment id {} from actor {} is different from fragment {}",
393 actor.fragment_id,
394 actor.actor_id,
395 fragment_id
396 )
397 }
398
399 let actor::Model {
400 actor_id,
401 fragment_id,
402 status,
403 worker_id,
404 splits,
405 vnode_bitmap,
406 expr_context,
407 ..
408 } = actor;
409
410 let vnode_bitmap =
411 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
412 let pb_expr_context = Some(expr_context.to_protobuf());
413
414 pb_actor_status.insert(
415 actor_id as _,
416 PbActorStatus {
417 location: PbActorLocation::from_worker(worker_id as u32),
418 state: PbActorState::from(status) as _,
419 },
420 );
421
422 if let Some(splits) = splits {
423 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
424 }
425
426 pb_actors.push(StreamActor {
427 actor_id: actor_id as _,
428 fragment_id: fragment_id as _,
429 vnode_bitmap,
430 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
431 expr_context: pb_expr_context,
432 })
433 }
434
435 let pb_state_table_ids = state_table_ids.into_u32_array();
436 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
437 let pb_fragment = Fragment {
438 fragment_id: fragment_id as _,
439 fragment_type_mask: fragment_type_mask.into(),
440 distribution_type: pb_distribution_type,
441 actors: pb_actors,
442 state_table_ids: pb_state_table_ids,
443 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
444 nodes: stream_node,
445 };
446
447 Ok((pb_fragment, pb_actor_status, pb_actor_splits))
448 }
449
450 pub async fn running_fragment_parallelisms(
451 &self,
452 id_filter: Option<HashSet<FragmentId>>,
453 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
454 let inner = self.inner.read().await;
455
456 let query_alias = Alias::new("fragment_actor_count");
457 let count_alias = Alias::new("count");
458
459 let mut query = SelectStatement::new()
460 .column(actor::Column::FragmentId)
461 .expr_as(actor::Column::ActorId.count(), count_alias.clone())
462 .from(Actor)
463 .group_by_col(actor::Column::FragmentId)
464 .to_owned();
465
466 if let Some(id_filter) = id_filter {
467 query.cond_having(actor::Column::FragmentId.is_in(id_filter));
468 }
469
470 let outer = SelectStatement::new()
471 .column((FragmentModel, fragment::Column::FragmentId))
472 .column(count_alias)
473 .column(fragment::Column::DistributionType)
474 .column(fragment::Column::VnodeCount)
475 .from_subquery(query.to_owned(), query_alias.clone())
476 .inner_join(
477 FragmentModel,
478 Expr::col((query_alias, actor::Column::FragmentId))
479 .equals((FragmentModel, fragment::Column::FragmentId)),
480 )
481 .to_owned();
482
483 let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
484 Selector::<SelectGetableTuple<(FragmentId, i64, DistributionType, i32)>>::into_tuple(
485 outer.to_owned(),
486 )
487 .all(&inner.db)
488 .await?;
489
490 Ok(fragment_parallelisms
491 .into_iter()
492 .map(|(fragment_id, count, distribution_type, vnode_count)| {
493 (
494 fragment_id,
495 FragmentParallelismInfo {
496 distribution_type: distribution_type.into(),
497 actor_count: count as usize,
498 vnode_count: vnode_count as usize,
499 },
500 )
501 })
502 .collect())
503 }
504
505 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
506 let inner = self.inner.read().await;
507 let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
508 .select_only()
509 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
510 .into_tuple()
511 .all(&inner.db)
512 .await?;
513 Ok(fragment_jobs.into_iter().collect())
514 }
515
516 pub async fn get_fragment_job_id(
517 &self,
518 fragment_ids: Vec<FragmentId>,
519 ) -> MetaResult<Vec<ObjectId>> {
520 let inner = self.inner.read().await;
521
522 let object_ids: Vec<ObjectId> = FragmentModel::find()
523 .select_only()
524 .column(fragment::Column::JobId)
525 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
526 .into_tuple()
527 .all(&inner.db)
528 .await?;
529
530 Ok(object_ids)
531 }
532
533 pub async fn get_fragment_desc_by_id(
534 &self,
535 fragment_id: FragmentId,
536 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
537 let inner = self.inner.read().await;
538
539 let fragment_opt = FragmentModel::find()
541 .select_only()
542 .columns([
543 fragment::Column::FragmentId,
544 fragment::Column::JobId,
545 fragment::Column::FragmentTypeMask,
546 fragment::Column::DistributionType,
547 fragment::Column::StateTableIds,
548 fragment::Column::VnodeCount,
549 fragment::Column::StreamNode,
550 ])
551 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
552 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
553 .filter(fragment::Column::FragmentId.eq(fragment_id))
554 .group_by(fragment::Column::FragmentId)
555 .into_model::<FragmentDesc>()
556 .one(&inner.db)
557 .await?;
558
559 let Some(fragment) = fragment_opt else {
560 return Ok(None); };
562
563 let upstreams: Vec<FragmentId> = FragmentRelation::find()
565 .select_only()
566 .column(fragment_relation::Column::SourceFragmentId)
567 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
568 .into_tuple()
569 .all(&inner.db)
570 .await?;
571
572 Ok(Some((fragment, upstreams)))
573 }
574
575 pub async fn list_fragment_database_ids(
576 &self,
577 select_fragment_ids: Option<Vec<FragmentId>>,
578 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
579 let inner = self.inner.read().await;
580 let select = FragmentModel::find()
581 .select_only()
582 .column(fragment::Column::FragmentId)
583 .column(object::Column::DatabaseId)
584 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
585 let select = if let Some(select_fragment_ids) = select_fragment_ids {
586 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
587 } else {
588 select
589 };
590 Ok(select.into_tuple().all(&inner.db).await?)
591 }
592
593 pub async fn get_job_fragments_by_id(
594 &self,
595 job_id: ObjectId,
596 ) -> MetaResult<StreamJobFragments> {
597 let inner = self.inner.read().await;
598 let fragment_actors = FragmentModel::find()
599 .find_with_related(Actor)
600 .filter(fragment::Column::JobId.eq(job_id))
601 .all(&inner.db)
602 .await?;
603
604 let job_info = StreamingJob::find_by_id(job_id)
605 .one(&inner.db)
606 .await?
607 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
608
609 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
610 .await?
611 .remove(&job_id);
612
613 Self::compose_table_fragments(
614 job_id as _,
615 job_info.job_status.into(),
616 job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
617 fragment_actors,
618 job_info.parallelism.clone(),
619 job_info.max_parallelism as _,
620 job_definition,
621 )
622 }
623
624 pub async fn get_fragment_actor_dispatchers(
625 &self,
626 fragment_ids: Vec<FragmentId>,
627 ) -> MetaResult<FragmentActorDispatchers> {
628 let inner = self.inner.read().await;
629 get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
630 }
631
632 pub async fn get_fragment_downstream_relations(
633 &self,
634 fragment_ids: Vec<FragmentId>,
635 ) -> MetaResult<FragmentDownstreamRelation> {
636 let inner = self.inner.read().await;
637 let mut stream = FragmentRelation::find()
638 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
639 .stream(&inner.db)
640 .await?;
641 let mut relations = FragmentDownstreamRelation::new();
642 while let Some(relation) = stream.try_next().await? {
643 relations
644 .entry(relation.source_fragment_id as _)
645 .or_default()
646 .push(DownstreamFragmentRelation {
647 downstream_fragment_id: relation.target_fragment_id as _,
648 dispatcher_type: relation.dispatcher_type,
649 dist_key_indices: relation.dist_key_indices.into_u32_array(),
650 output_mapping: PbDispatchOutputMapping {
651 indices: relation.output_indices.into_u32_array(),
652 types: relation
653 .output_type_mapping
654 .unwrap_or_default()
655 .to_protobuf(),
656 },
657 });
658 }
659 Ok(relations)
660 }
661
662 pub async fn get_job_fragment_backfill_scan_type(
663 &self,
664 job_id: ObjectId,
665 ) -> MetaResult<HashMap<model::FragmentId, PbStreamScanType>> {
666 let inner = self.inner.read().await;
667 let fragments: Vec<_> = FragmentModel::find()
668 .filter(fragment::Column::JobId.eq(job_id))
669 .all(&inner.db)
670 .await?;
671
672 let mut result = HashMap::new();
673
674 for fragment::Model {
675 fragment_id,
676 stream_node,
677 ..
678 } in fragments
679 {
680 let stream_node = stream_node.to_protobuf();
681 visit_stream_node_body(&stream_node, |body| {
682 if let NodeBody::StreamScan(node) = body {
683 match node.stream_scan_type() {
684 StreamScanType::Unspecified => {}
685 scan_type => {
686 result.insert(fragment_id as model::FragmentId, scan_type);
687 }
688 }
689 }
690 });
691 }
692
693 Ok(result)
694 }
695
696 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
697 let inner = self.inner.read().await;
698 let job_states = StreamingJob::find()
699 .select_only()
700 .column(streaming_job::Column::JobId)
701 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
702 .join(JoinType::InnerJoin, object::Relation::Database2.def())
703 .column(object::Column::ObjType)
704 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
705 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
706 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
707 .column_as(
708 Expr::if_null(
709 Expr::col((table::Entity, table::Column::Name)),
710 Expr::if_null(
711 Expr::col((source::Entity, source::Column::Name)),
712 Expr::if_null(
713 Expr::col((sink::Entity, sink::Column::Name)),
714 Expr::val("<unknown>"),
715 ),
716 ),
717 ),
718 "name",
719 )
720 .columns([
721 streaming_job::Column::JobStatus,
722 streaming_job::Column::Parallelism,
723 streaming_job::Column::MaxParallelism,
724 ])
725 .column_as(
726 Expr::if_null(
727 Expr::col((
728 streaming_job::Entity,
729 streaming_job::Column::SpecificResourceGroup,
730 )),
731 Expr::col((database::Entity, database::Column::ResourceGroup)),
732 ),
733 "resource_group",
734 )
735 .column(object::Column::DatabaseId)
736 .column(object::Column::SchemaId)
737 .into_model()
738 .all(&inner.db)
739 .await?;
740 Ok(job_states)
741 }
742
743 pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
744 let inner = self.inner.read().await;
745 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
746 .select_only()
747 .column(streaming_job::Column::MaxParallelism)
748 .into_tuple()
749 .one(&inner.db)
750 .await?
751 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
752 Ok(max_parallelism as usize)
753 }
754
755 pub async fn get_job_actor_mapping(
757 &self,
758 job_ids: Vec<ObjectId>,
759 ) -> MetaResult<HashMap<ObjectId, Vec<ActorId>>> {
760 let inner = self.inner.read().await;
761 let job_actors: Vec<(ObjectId, ActorId)> = Actor::find()
762 .select_only()
763 .column(fragment::Column::JobId)
764 .column(actor::Column::ActorId)
765 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
766 .filter(fragment::Column::JobId.is_in(job_ids))
767 .into_tuple()
768 .all(&inner.db)
769 .await?;
770 Ok(job_actors.into_iter().into_group_map())
771 }
772
773 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
775 if let Ok(inner) = self.inner.try_read()
776 && let Ok(job_state_tables) = FragmentModel::find()
777 .select_only()
778 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
779 .into_tuple::<(ObjectId, I32Array)>()
780 .all(&inner.db)
781 .await
782 {
783 let mut job_internal_table_ids = HashMap::new();
784 for (job_id, state_table_ids) in job_state_tables {
785 job_internal_table_ids
786 .entry(job_id)
787 .or_insert_with(Vec::new)
788 .extend(state_table_ids.into_inner());
789 }
790 return Some(job_internal_table_ids.into_iter().collect());
791 }
792 None
793 }
794
795 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
796 let inner = self.inner.read().await;
797 let count = FragmentModel::find().count(&inner.db).await?;
798 Ok(count > 0)
799 }
800
801 pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
802 let inner = self.inner.read().await;
803 let actor_cnt: Vec<(WorkerId, i64)> = Actor::find()
804 .select_only()
805 .column(actor::Column::WorkerId)
806 .column_as(actor::Column::ActorId.count(), "count")
807 .group_by(actor::Column::WorkerId)
808 .into_tuple()
809 .all(&inner.db)
810 .await?;
811
812 Ok(actor_cnt
813 .into_iter()
814 .map(|(worker_id, count)| (worker_id, count as usize))
815 .collect())
816 }
817
818 pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
820 let inner = self.inner.read().await;
821 let jobs = StreamingJob::find().all(&inner.db).await?;
822
823 let mut job_definition = resolve_streaming_job_definition(
824 &inner.db,
825 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
826 )
827 .await?;
828
829 let mut table_fragments = BTreeMap::new();
830 for job in jobs {
831 let fragment_actors = FragmentModel::find()
832 .find_with_related(Actor)
833 .filter(fragment::Column::JobId.eq(job.job_id))
834 .all(&inner.db)
835 .await?;
836
837 table_fragments.insert(
838 job.job_id as ObjectId,
839 Self::compose_table_fragments(
840 job.job_id as _,
841 job.job_status.into(),
842 job.timezone.map(|tz| PbStreamContext { timezone: tz }),
843 fragment_actors,
844 job.parallelism.clone(),
845 job.max_parallelism as _,
846 job_definition.remove(&job.job_id),
847 )?,
848 );
849 }
850
851 Ok(table_fragments)
852 }
853
854 pub async fn upstream_fragments(
855 &self,
856 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
857 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
858 let inner = self.inner.read().await;
859 let mut stream = FragmentRelation::find()
860 .select_only()
861 .columns([
862 fragment_relation::Column::SourceFragmentId,
863 fragment_relation::Column::TargetFragmentId,
864 ])
865 .filter(
866 fragment_relation::Column::TargetFragmentId
867 .is_in(fragment_ids.map(|id| id as FragmentId)),
868 )
869 .into_tuple::<(FragmentId, FragmentId)>()
870 .stream(&inner.db)
871 .await?;
872 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
873 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
874 upstream_fragments
875 .entry(downstream_fragment_id as crate::model::FragmentId)
876 .or_default()
877 .insert(upstream_fragment_id as crate::model::FragmentId);
878 }
879 Ok(upstream_fragments)
880 }
881
882 pub async fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
883 let inner = self.inner.read().await;
884 let actor_locations: Vec<PartialActorLocation> =
885 Actor::find().into_partial_model().all(&inner.db).await?;
886 Ok(actor_locations)
887 }
888
889 pub async fn list_actor_info(
890 &self,
891 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
892 let inner = self.inner.read().await;
893 let actor_locations: Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)> =
894 Actor::find()
895 .join(JoinType::LeftJoin, actor::Relation::Fragment.def())
896 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
897 .select_only()
898 .columns([actor::Column::ActorId, actor::Column::FragmentId])
899 .column_as(object::Column::Oid, "job_id")
900 .column_as(object::Column::SchemaId, "schema_id")
901 .column_as(object::Column::ObjType, "type")
902 .into_tuple()
903 .all(&inner.db)
904 .await?;
905 Ok(actor_locations)
906 }
907
908 pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
909 let inner = self.inner.read().await;
910
911 let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
912 .select_only()
913 .filter(actor::Column::Splits.is_not_null())
914 .columns([actor::Column::ActorId, actor::Column::FragmentId])
915 .into_tuple()
916 .all(&inner.db)
917 .await?;
918
919 Ok(source_actors)
920 }
921
922 pub async fn list_creating_fragment_descs(
923 &self,
924 ) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
925 let inner = self.inner.read().await;
926 let mut result = Vec::new();
927 let fragments = FragmentModel::find()
928 .select_only()
929 .columns([
930 fragment::Column::FragmentId,
931 fragment::Column::JobId,
932 fragment::Column::FragmentTypeMask,
933 fragment::Column::DistributionType,
934 fragment::Column::StateTableIds,
935 fragment::Column::VnodeCount,
936 fragment::Column::StreamNode,
937 ])
938 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
939 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
940 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
941 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
942 .filter(
943 streaming_job::Column::JobStatus
944 .eq(JobStatus::Initial)
945 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
946 )
947 .group_by(fragment::Column::FragmentId)
948 .into_model::<FragmentDesc>()
949 .all(&inner.db)
950 .await?;
951 for fragment in fragments {
952 let upstreams: Vec<FragmentId> = FragmentRelation::find()
953 .select_only()
954 .column(fragment_relation::Column::SourceFragmentId)
955 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
956 .into_tuple()
957 .all(&inner.db)
958 .await?;
959 result.push((fragment, upstreams));
960 }
961 Ok(result)
962 }
963
964 pub async fn list_fragment_descs(&self) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
965 let inner = self.inner.read().await;
966 let mut result = Vec::new();
967 let fragments = FragmentModel::find()
968 .select_only()
969 .columns([
970 fragment::Column::FragmentId,
971 fragment::Column::JobId,
972 fragment::Column::FragmentTypeMask,
973 fragment::Column::DistributionType,
974 fragment::Column::StateTableIds,
975 fragment::Column::VnodeCount,
976 fragment::Column::StreamNode,
977 ])
978 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
979 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
980 .group_by(fragment::Column::FragmentId)
981 .into_model::<FragmentDesc>()
982 .all(&inner.db)
983 .await?;
984 for fragment in fragments {
985 let upstreams: Vec<FragmentId> = FragmentRelation::find()
986 .select_only()
987 .column(fragment_relation::Column::SourceFragmentId)
988 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
989 .into_tuple()
990 .all(&inner.db)
991 .await?;
992 result.push((fragment, upstreams));
993 }
994 Ok(result)
995 }
996
997 pub async fn list_sink_actor_mapping(
998 &self,
999 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1000 let inner = self.inner.read().await;
1001 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1002 .select_only()
1003 .columns([sink::Column::SinkId, sink::Column::Name])
1004 .into_tuple()
1005 .all(&inner.db)
1006 .await?;
1007 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1008 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1009
1010 let actor_with_type: Vec<(ActorId, SinkId, i32)> = Actor::find()
1011 .select_only()
1012 .column(actor::Column::ActorId)
1013 .columns([fragment::Column::JobId, fragment::Column::FragmentTypeMask])
1014 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1015 .filter(fragment::Column::JobId.is_in(sink_ids))
1016 .into_tuple()
1017 .all(&inner.db)
1018 .await?;
1019
1020 let mut sink_actor_mapping = HashMap::new();
1021 for (actor_id, sink_id, type_mask) in actor_with_type {
1022 if FragmentTypeMask::from(type_mask).contains(FragmentTypeFlag::Sink) {
1023 sink_actor_mapping
1024 .entry(sink_id)
1025 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1026 .1
1027 .push(actor_id);
1028 }
1029 }
1030
1031 Ok(sink_actor_mapping)
1032 }
1033
1034 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1035 let inner = self.inner.read().await;
1036 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1037 .select_only()
1038 .columns([
1039 fragment::Column::FragmentId,
1040 fragment::Column::JobId,
1041 fragment::Column::StateTableIds,
1042 ])
1043 .into_partial_model()
1044 .all(&inner.db)
1045 .await?;
1046 Ok(fragment_state_tables)
1047 }
1048
1049 pub async fn load_all_actors(
1052 &self,
1053 database_id: Option<DatabaseId>,
1054 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
1055 {
1056 let inner = self.inner.read().await;
1057 let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
1058 let filter_condition = if let Some(database_id) = database_id {
1059 filter_condition.and(object::Column::DatabaseId.eq(database_id))
1060 } else {
1061 filter_condition
1062 };
1063 #[expect(clippy::type_complexity)]
1064 let mut actor_info_stream: BoxStream<
1065 '_,
1066 Result<
1067 (
1068 ActorId,
1069 WorkerId,
1070 Option<VnodeBitmap>,
1071 FragmentId,
1072 StreamNode,
1073 I32Array,
1074 DistributionType,
1075 DatabaseId,
1076 ObjectId,
1077 ),
1078 _,
1079 >,
1080 > = Actor::find()
1081 .select_only()
1082 .column(actor::Column::ActorId)
1083 .column(actor::Column::WorkerId)
1084 .column(actor::Column::VnodeBitmap)
1085 .column(fragment::Column::FragmentId)
1086 .column(fragment::Column::StreamNode)
1087 .column(fragment::Column::StateTableIds)
1088 .column(fragment::Column::DistributionType)
1089 .column(object::Column::DatabaseId)
1090 .column(object::Column::Oid)
1091 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1092 .join(JoinType::InnerJoin, fragment::Relation::Object.def())
1093 .filter(filter_condition)
1094 .into_tuple()
1095 .stream(&inner.db)
1096 .await?;
1097
1098 let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1099 HashMap::new();
1100
1101 while let Some((
1102 actor_id,
1103 worker_id,
1104 vnode_bitmap,
1105 fragment_id,
1106 node,
1107 state_table_ids,
1108 distribution_type,
1109 database_id,
1110 job_id,
1111 )) = actor_info_stream.try_next().await?
1112 {
1113 let fragment_infos = database_fragment_infos
1114 .entry(database_id)
1115 .or_default()
1116 .entry(job_id)
1117 .or_default();
1118 let state_table_ids = state_table_ids.into_inner();
1119 let state_table_ids = state_table_ids
1120 .into_iter()
1121 .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1122 .collect();
1123 let actor_info = InflightActorInfo {
1124 worker_id,
1125 vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1126 };
1127 match fragment_infos.entry(fragment_id) {
1128 Entry::Occupied(mut entry) => {
1129 let info: &mut InflightFragmentInfo = entry.get_mut();
1130 assert_eq!(info.state_table_ids, state_table_ids);
1131 assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1132 }
1133 Entry::Vacant(entry) => {
1134 entry.insert(InflightFragmentInfo {
1135 fragment_id: fragment_id as _,
1136 distribution_type,
1137 nodes: node.to_protobuf(),
1138 actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1139 state_table_ids,
1140 });
1141 }
1142 }
1143 }
1144
1145 debug!(?database_fragment_infos, "reload all actors");
1146
1147 Ok(database_fragment_infos)
1148 }
1149
1150 pub async fn migrate_actors(
1151 &self,
1152 plan: HashMap<WorkerSlotId, WorkerSlotId>,
1153 ) -> MetaResult<()> {
1154 let inner = self.inner.read().await;
1155 let txn = inner.db.begin().await?;
1156
1157 let actors: Vec<(
1158 FragmentId,
1159 DistributionType,
1160 ActorId,
1161 Option<VnodeBitmap>,
1162 WorkerId,
1163 ActorStatus,
1164 )> = Actor::find()
1165 .select_only()
1166 .columns([
1167 fragment::Column::FragmentId,
1168 fragment::Column::DistributionType,
1169 ])
1170 .columns([
1171 actor::Column::ActorId,
1172 actor::Column::VnodeBitmap,
1173 actor::Column::WorkerId,
1174 actor::Column::Status,
1175 ])
1176 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1177 .into_tuple()
1178 .all(&txn)
1179 .await?;
1180
1181 let mut actor_locations = HashMap::new();
1182
1183 for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1184 if *status != ActorStatus::Running {
1185 tracing::warn!(
1186 "skipping actor {} in fragment {} with status {:?}",
1187 actor_id,
1188 fragment_id,
1189 status
1190 );
1191 continue;
1192 }
1193
1194 actor_locations
1195 .entry(*worker_id)
1196 .or_insert(HashMap::new())
1197 .entry(*fragment_id)
1198 .or_insert(BTreeSet::new())
1199 .insert(*actor_id);
1200 }
1201
1202 let expired_or_changed_workers: HashSet<_> =
1203 plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1204
1205 let mut actor_migration_plan = HashMap::new();
1206 for (worker, fragment) in actor_locations {
1207 if expired_or_changed_workers.contains(&worker) {
1208 for (fragment_id, actors) in fragment {
1209 debug!(
1210 "worker {} expired or changed, migrating fragment {}",
1211 worker, fragment_id
1212 );
1213 let worker_slot_to_actor: HashMap<_, _> = actors
1214 .iter()
1215 .enumerate()
1216 .map(|(idx, actor_id)| {
1217 (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1218 })
1219 .collect();
1220
1221 for (worker_slot, actor) in worker_slot_to_actor {
1222 if let Some(target) = plan.get(&worker_slot) {
1223 actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1224 }
1225 }
1226 }
1227 }
1228 }
1229
1230 for (actor, worker) in actor_migration_plan {
1231 Actor::update_many()
1232 .col_expr(
1233 actor::Column::WorkerId,
1234 Expr::value(Value::Int(Some(worker))),
1235 )
1236 .filter(actor::Column::ActorId.eq(actor))
1237 .exec(&txn)
1238 .await?;
1239 }
1240
1241 txn.commit().await?;
1242
1243 self.notify_fragment_mapping(
1244 NotificationOperation::Update,
1245 rebuild_fragment_mapping_from_actors(actors),
1246 )
1247 .await;
1248
1249 Ok(())
1250 }
1251
1252 pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1253 let inner = self.inner.read().await;
1254
1255 let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1256 .select_only()
1257 .columns([fragment::Column::FragmentId])
1258 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1259 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1260 .into_tuple()
1261 .all(&inner.db)
1262 .await?;
1263
1264 let mut actor_locations = HashMap::new();
1265
1266 for (fragment_id, _, worker_id) in actors {
1267 *actor_locations
1268 .entry(worker_id)
1269 .or_insert(HashMap::new())
1270 .entry(fragment_id)
1271 .or_insert(0_usize) += 1;
1272 }
1273
1274 let mut result = HashSet::new();
1275 for (worker_id, mapping) in actor_locations {
1276 let max_fragment_len = mapping.values().max().unwrap();
1277
1278 result
1279 .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1280 }
1281
1282 Ok(result)
1283 }
1284
1285 pub async fn all_node_actors(
1286 &self,
1287 include_inactive: bool,
1288 ) -> MetaResult<HashMap<WorkerId, Vec<StreamActor>>> {
1289 let inner = self.inner.read().await;
1290 let fragment_actors = if include_inactive {
1291 FragmentModel::find()
1292 .find_with_related(Actor)
1293 .all(&inner.db)
1294 .await?
1295 } else {
1296 FragmentModel::find()
1297 .find_with_related(Actor)
1298 .filter(actor::Column::Status.eq(ActorStatus::Running))
1299 .all(&inner.db)
1300 .await?
1301 };
1302
1303 let job_definitions = resolve_streaming_job_definition(
1304 &inner.db,
1305 &HashSet::from_iter(fragment_actors.iter().map(|(fragment, _)| fragment.job_id)),
1306 )
1307 .await?;
1308
1309 let mut node_actors = HashMap::new();
1310 for (fragment, actors) in fragment_actors {
1311 let job_id = fragment.job_id;
1312 let (table_fragments, actor_status, _) = Self::compose_fragment(
1313 fragment,
1314 actors,
1315 job_definitions.get(&(job_id as _)).cloned(),
1316 )?;
1317 for actor in table_fragments.actors {
1318 let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
1319 node_actors
1320 .entry(node_id)
1321 .or_insert_with(Vec::new)
1322 .push(actor);
1323 }
1324 }
1325
1326 Ok(node_actors)
1327 }
1328
1329 pub async fn get_worker_actor_ids(
1330 &self,
1331 job_ids: Vec<ObjectId>,
1332 ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
1333 let inner = self.inner.read().await;
1334 let actor_workers: Vec<(ActorId, WorkerId)> = Actor::find()
1335 .select_only()
1336 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1337 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1338 .filter(fragment::Column::JobId.is_in(job_ids))
1339 .into_tuple()
1340 .all(&inner.db)
1341 .await?;
1342
1343 let mut worker_actors = BTreeMap::new();
1344 for (actor_id, worker_id) in actor_workers {
1345 worker_actors
1346 .entry(worker_id)
1347 .or_insert_with(Vec::new)
1348 .push(actor_id);
1349 }
1350
1351 Ok(worker_actors)
1352 }
1353
1354 pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1355 let inner = self.inner.read().await;
1356 let txn = inner.db.begin().await?;
1357 for assignments in split_assignment.values() {
1358 for (actor_id, splits) in assignments {
1359 let actor_splits = splits.iter().map(Into::into).collect_vec();
1360 Actor::update(actor::ActiveModel {
1361 actor_id: Set(*actor_id as _),
1362 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1363 splits: actor_splits,
1364 }))),
1365 ..Default::default()
1366 })
1367 .exec(&txn)
1368 .await
1369 .map_err(|err| {
1370 if err == DbErr::RecordNotUpdated {
1371 MetaError::catalog_id_not_found("actor_id", actor_id)
1372 } else {
1373 err.into()
1374 }
1375 })?;
1376 }
1377 }
1378 txn.commit().await?;
1379
1380 Ok(())
1381 }
1382
1383 #[await_tree::instrument]
1384 pub async fn fill_snapshot_backfill_epoch(
1385 &self,
1386 fragment_ids: impl Iterator<Item = FragmentId>,
1387 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1388 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1389 ) -> MetaResult<()> {
1390 let inner = self.inner.write().await;
1391 let txn = inner.db.begin().await?;
1392 for fragment_id in fragment_ids {
1393 let fragment = FragmentModel::find_by_id(fragment_id)
1394 .one(&txn)
1395 .await?
1396 .context(format!("fragment {} not found", fragment_id))?;
1397 let mut node = fragment.stream_node.to_protobuf();
1398 if crate::stream::fill_snapshot_backfill_epoch(
1399 &mut node,
1400 snapshot_backfill_info,
1401 cross_db_snapshot_backfill_info,
1402 )? {
1403 let node = StreamNode::from(&node);
1404 FragmentModel::update(fragment::ActiveModel {
1405 fragment_id: Set(fragment_id),
1406 stream_node: Set(node),
1407 ..Default::default()
1408 })
1409 .exec(&txn)
1410 .await?;
1411 }
1412 }
1413 txn.commit().await?;
1414 Ok(())
1415 }
1416
1417 pub async fn get_running_actors_of_fragment(
1419 &self,
1420 fragment_id: FragmentId,
1421 ) -> MetaResult<Vec<ActorId>> {
1422 let inner = self.inner.read().await;
1423 let actors: Vec<ActorId> = Actor::find()
1424 .select_only()
1425 .column(actor::Column::ActorId)
1426 .filter(actor::Column::FragmentId.eq(fragment_id))
1427 .filter(actor::Column::Status.eq(ActorStatus::Running))
1428 .into_tuple()
1429 .all(&inner.db)
1430 .await?;
1431 Ok(actors)
1432 }
1433
1434 pub async fn get_running_actors_for_source_backfill(
1437 &self,
1438 source_backfill_fragment_id: FragmentId,
1439 source_fragment_id: FragmentId,
1440 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1441 let inner = self.inner.read().await;
1442 let txn = inner.db.begin().await?;
1443
1444 let fragment_relation: DispatcherType = FragmentRelation::find()
1445 .select_only()
1446 .column(fragment_relation::Column::DispatcherType)
1447 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1448 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1449 .into_tuple()
1450 .one(&txn)
1451 .await?
1452 .ok_or_else(|| {
1453 anyhow!(
1454 "no fragment connection from source fragment {} to source backfill fragment {}",
1455 source_fragment_id,
1456 source_backfill_fragment_id
1457 )
1458 })?;
1459
1460 if fragment_relation != DispatcherType::NoShuffle {
1461 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1462 }
1463
1464 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1465 let result: MetaResult<DistributionType> = try {
1466 FragmentModel::find_by_id(fragment_id)
1467 .select_only()
1468 .column(fragment::Column::DistributionType)
1469 .into_tuple()
1470 .one(txn)
1471 .await?
1472 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1473 };
1474 result
1475 };
1476
1477 let source_backfill_distribution_type =
1478 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1479 let source_distribution_type =
1480 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1481
1482 let load_fragment_actor_distribution = |txn, fragment_id: FragmentId| async move {
1483 Actor::find()
1484 .select_only()
1485 .column(actor::Column::ActorId)
1486 .column(actor::Column::VnodeBitmap)
1487 .filter(actor::Column::FragmentId.eq(fragment_id))
1488 .into_tuple()
1489 .stream(txn)
1490 .await?
1491 .map(|result| {
1492 result.map(|(actor_id, vnode): (ActorId, Option<VnodeBitmap>)| {
1493 (
1494 actor_id as _,
1495 vnode.map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1496 )
1497 })
1498 })
1499 .try_collect()
1500 .await
1501 };
1502
1503 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1504 load_fragment_actor_distribution(&txn, source_backfill_fragment_id).await?;
1505
1506 let source_actors = load_fragment_actor_distribution(&txn, source_fragment_id).await?;
1507
1508 Ok(resolve_no_shuffle_actor_dispatcher(
1509 source_distribution_type,
1510 &source_actors,
1511 source_backfill_distribution_type,
1512 &source_backfill_actors,
1513 )
1514 .into_iter()
1515 .map(|(source_actor, source_backfill_actor)| {
1516 (source_backfill_actor as _, source_actor as _)
1517 })
1518 .collect())
1519 }
1520
1521 pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
1522 let inner = self.inner.read().await;
1523 let actors: Vec<ActorId> = Actor::find()
1524 .select_only()
1525 .column(actor::Column::ActorId)
1526 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1527 .filter(fragment::Column::JobId.is_in(job_ids))
1528 .into_tuple()
1529 .all(&inner.db)
1530 .await?;
1531 Ok(actors)
1532 }
1533
1534 pub async fn get_root_fragments(
1547 &self,
1548 job_ids: Vec<ObjectId>,
1549 ) -> MetaResult<(HashMap<ObjectId, Fragment>, Vec<(ActorId, WorkerId)>)> {
1550 let inner = self.inner.read().await;
1551
1552 let job_definitions = resolve_streaming_job_definition(
1553 &inner.db,
1554 &HashSet::from_iter(job_ids.iter().copied()),
1555 )
1556 .await?;
1557
1558 let all_fragments = FragmentModel::find()
1559 .filter(fragment::Column::JobId.is_in(job_ids))
1560 .all(&inner.db)
1561 .await?;
1562 let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1564 for fragment in all_fragments {
1565 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1566 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1567 _ = root_fragments.insert(fragment.job_id, fragment);
1568 } else if mask.contains(FragmentTypeFlag::Source) {
1569 _ = root_fragments.try_insert(fragment.job_id, fragment);
1572 }
1573 }
1574
1575 let mut root_fragments_pb = HashMap::new();
1576 for (_, fragment) in root_fragments {
1577 let actors = fragment.find_related(Actor).all(&inner.db).await?;
1578
1579 let job_id = fragment.job_id;
1580 root_fragments_pb.insert(
1581 fragment.job_id,
1582 Self::compose_fragment(
1583 fragment,
1584 actors,
1585 job_definitions.get(&(job_id as _)).cloned(),
1586 )?
1587 .0,
1588 );
1589 }
1590
1591 let actors: Vec<(ActorId, WorkerId)> = Actor::find()
1592 .select_only()
1593 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1594 .into_tuple()
1595 .all(&inner.db)
1596 .await?;
1597
1598 Ok((root_fragments_pb, actors))
1599 }
1600
1601 pub async fn get_root_fragment(
1602 &self,
1603 job_id: ObjectId,
1604 ) -> MetaResult<(Fragment, Vec<(ActorId, WorkerId)>)> {
1605 let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1606 let root_fragment = root_fragments
1607 .remove(&job_id)
1608 .context(format!("root fragment for job {} not found", job_id))?;
1609 Ok((root_fragment, actors))
1610 }
1611
1612 pub async fn get_downstream_fragments(
1614 &self,
1615 job_id: ObjectId,
1616 ) -> MetaResult<(Vec<(PbDispatcherType, Fragment)>, Vec<(ActorId, WorkerId)>)> {
1617 let (root_fragment, actors) = self.get_root_fragment(job_id).await?;
1618
1619 let inner = self.inner.read().await;
1620 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1621 .filter(
1622 fragment_relation::Column::SourceFragmentId
1623 .eq(root_fragment.fragment_id as FragmentId),
1624 )
1625 .all(&inner.db)
1626 .await?;
1627 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
1628 .await?
1629 .remove(&job_id);
1630
1631 let mut downstream_fragments = vec![];
1632 for fragment_relation::Model {
1633 target_fragment_id: fragment_id,
1634 dispatcher_type,
1635 ..
1636 } in downstream_fragment_relations
1637 {
1638 let mut fragment_actors = FragmentModel::find_by_id(fragment_id)
1639 .find_with_related(Actor)
1640 .all(&inner.db)
1641 .await?;
1642 if fragment_actors.is_empty() {
1643 bail!("No fragment found for fragment id {}", fragment_id);
1644 }
1645 assert_eq!(fragment_actors.len(), 1);
1646 let (fragment, actors) = fragment_actors.pop().unwrap();
1647 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1648 let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0;
1649 downstream_fragments.push((dispatch_type, fragment));
1650 }
1651
1652 Ok((downstream_fragments, actors))
1653 }
1654
1655 pub async fn load_source_fragment_ids(
1656 &self,
1657 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1658 let inner = self.inner.read().await;
1659 let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1660 .select_only()
1661 .columns([
1662 fragment::Column::FragmentId,
1663 fragment::Column::FragmentTypeMask,
1664 fragment::Column::StreamNode,
1665 ])
1666 .into_tuple()
1667 .all(&inner.db)
1668 .await?;
1669 fragments.retain(|(_, mask, _)| {
1670 FragmentTypeMask::from(*mask).contains(FragmentTypeFlag::Source)
1671 });
1672
1673 let mut source_fragment_ids = HashMap::new();
1674 for (fragment_id, _, stream_node) in fragments {
1675 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1676 source_fragment_ids
1677 .entry(source_id as SourceId)
1678 .or_insert_with(BTreeSet::new)
1679 .insert(fragment_id);
1680 }
1681 }
1682 Ok(source_fragment_ids)
1683 }
1684
1685 pub async fn load_backfill_fragment_ids(
1686 &self,
1687 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1688 let inner = self.inner.read().await;
1689 let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1690 .select_only()
1691 .columns([
1692 fragment::Column::FragmentId,
1693 fragment::Column::FragmentTypeMask,
1694 fragment::Column::StreamNode,
1695 ])
1696 .into_tuple()
1697 .all(&inner.db)
1698 .await?;
1699 fragments.retain(|(_, mask, _)| {
1700 FragmentTypeMask::from(*mask).contains(FragmentTypeFlag::SourceScan)
1701 });
1702
1703 let mut source_fragment_ids = HashMap::new();
1704 for (fragment_id, _, stream_node) in fragments {
1705 if let Some((source_id, upstream_source_fragment_id)) =
1706 stream_node.to_protobuf().find_source_backfill()
1707 {
1708 source_fragment_ids
1709 .entry(source_id as SourceId)
1710 .or_insert_with(BTreeSet::new)
1711 .insert((fragment_id, upstream_source_fragment_id));
1712 }
1713 }
1714 Ok(source_fragment_ids)
1715 }
1716
1717 pub async fn load_actor_splits(&self) -> MetaResult<HashMap<ActorId, ConnectorSplits>> {
1718 let inner = self.inner.read().await;
1719 let splits: Vec<(ActorId, ConnectorSplits)> = Actor::find()
1720 .select_only()
1721 .columns([actor::Column::ActorId, actor::Column::Splits])
1722 .filter(actor::Column::Splits.is_not_null())
1723 .into_tuple()
1724 .all(&inner.db)
1725 .await?;
1726 Ok(splits.into_iter().collect())
1727 }
1728
1729 pub async fn get_actual_job_fragment_parallelism(
1731 &self,
1732 job_id: ObjectId,
1733 ) -> MetaResult<Option<usize>> {
1734 let inner = self.inner.read().await;
1735 let mut fragments: Vec<(FragmentId, i32, i64)> = FragmentModel::find()
1736 .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1737 .select_only()
1738 .columns([
1739 fragment::Column::FragmentId,
1740 fragment::Column::FragmentTypeMask,
1741 ])
1742 .column_as(actor::Column::ActorId.count(), "count")
1743 .filter(fragment::Column::JobId.eq(job_id))
1744 .group_by(fragment::Column::FragmentId)
1745 .into_tuple()
1746 .all(&inner.db)
1747 .await?;
1748
1749 fragments.retain(|(_, mask, _)| {
1750 FragmentTypeMask::from(*mask)
1751 .contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink])
1752 });
1753
1754 Ok(fragments
1755 .into_iter()
1756 .at_most_one()
1757 .ok()
1758 .flatten()
1759 .map(|(_, _, count)| count as usize))
1760 }
1761}
1762
1763#[cfg(test)]
1764mod tests {
1765 use std::collections::{BTreeMap, HashMap, HashSet};
1766
1767 use itertools::Itertools;
1768 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1769 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1770 use risingwave_common::util::iter_util::ZipEqDebug;
1771 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1772 use risingwave_meta_model::actor::ActorStatus;
1773 use risingwave_meta_model::fragment::DistributionType;
1774 use risingwave_meta_model::{
1775 ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1776 VnodeBitmap, actor, fragment,
1777 };
1778 use risingwave_pb::common::PbActorLocation;
1779 use risingwave_pb::meta::table_fragments::PbActorStatus;
1780 use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1781 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1782 use risingwave_pb::plan_common::PbExprContext;
1783 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1784 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1785 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1786
1787 use crate::MetaResult;
1788 use crate::controller::catalog::CatalogController;
1789 use crate::model::{Fragment, StreamActor};
1790
1791 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1792
1793 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1794
1795 const TEST_FRAGMENT_ID: FragmentId = 1;
1796
1797 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1798
1799 const TEST_JOB_ID: ObjectId = 1;
1800
1801 const TEST_STATE_TABLE_ID: TableId = 1000;
1802
1803 fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1804 let mut upstream_actor_ids = BTreeMap::new();
1805 upstream_actor_ids.insert(
1806 TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1807 HashSet::from_iter([(actor_id + 100)]),
1808 );
1809 upstream_actor_ids.insert(
1810 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1811 HashSet::from_iter([(actor_id + 200)]),
1812 );
1813 upstream_actor_ids
1814 }
1815
1816 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1817 let mut input = vec![];
1818 for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1819 input.push(PbStreamNode {
1820 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1821 upstream_fragment_id: *upstream_fragment_id as _,
1822 ..Default::default()
1823 }))),
1824 ..Default::default()
1825 });
1826 }
1827
1828 PbStreamNode {
1829 input,
1830 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1831 ..Default::default()
1832 }
1833 }
1834
1835 #[tokio::test]
1836 async fn test_extract_fragment() -> MetaResult<()> {
1837 let actor_count = 3u32;
1838 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1839 .map(|actor_id| {
1840 (
1841 actor_id as _,
1842 generate_upstream_actor_ids_for_actor(actor_id),
1843 )
1844 })
1845 .collect();
1846
1847 let actor_bitmaps = ActorMapping::new_uniform(
1848 (0..actor_count).map(|i| i as _),
1849 VirtualNode::COUNT_FOR_TEST,
1850 )
1851 .to_bitmaps();
1852
1853 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1854
1855 let pb_actors = (0..actor_count)
1856 .map(|actor_id| StreamActor {
1857 actor_id: actor_id as _,
1858 fragment_id: TEST_FRAGMENT_ID as _,
1859 vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1860 mview_definition: "".to_owned(),
1861 expr_context: Some(PbExprContext {
1862 time_zone: String::from("America/New_York"),
1863 strict_mode: false,
1864 }),
1865 })
1866 .collect_vec();
1867
1868 let pb_fragment = Fragment {
1869 fragment_id: TEST_FRAGMENT_ID as _,
1870 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1871 distribution_type: PbFragmentDistributionType::Hash as _,
1872 actors: pb_actors.clone(),
1873 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1874 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1875 nodes: stream_node.clone(),
1876 };
1877
1878 let pb_actor_status = (0..actor_count)
1879 .map(|actor_id| {
1880 (
1881 actor_id,
1882 PbActorStatus {
1883 location: PbActorLocation::from_worker(0),
1884 state: PbActorState::Running as _,
1885 },
1886 )
1887 })
1888 .collect();
1889
1890 let pb_actor_splits = Default::default();
1891
1892 let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
1893 TEST_JOB_ID,
1894 &pb_fragment,
1895 &pb_actor_status,
1896 &pb_actor_splits,
1897 )?;
1898
1899 check_fragment(fragment, pb_fragment);
1900 check_actors(
1901 actors,
1902 &upstream_actor_ids,
1903 pb_actors,
1904 Default::default(),
1905 &stream_node,
1906 );
1907
1908 Ok(())
1909 }
1910
1911 #[tokio::test]
1912 async fn test_compose_fragment() -> MetaResult<()> {
1913 let actor_count = 3u32;
1914
1915 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1916 .map(|actor_id| {
1917 (
1918 actor_id as _,
1919 generate_upstream_actor_ids_for_actor(actor_id),
1920 )
1921 })
1922 .collect();
1923
1924 let mut actor_bitmaps = ActorMapping::new_uniform(
1925 (0..actor_count).map(|i| i as _),
1926 VirtualNode::COUNT_FOR_TEST,
1927 )
1928 .to_bitmaps();
1929
1930 let actors = (0..actor_count)
1931 .map(|actor_id| {
1932 let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
1933 splits: vec![PbConnectorSplit {
1934 split_type: "dummy".to_owned(),
1935 ..Default::default()
1936 }],
1937 }));
1938
1939 #[expect(deprecated)]
1940 actor::Model {
1941 actor_id: actor_id as ActorId,
1942 fragment_id: TEST_FRAGMENT_ID,
1943 status: ActorStatus::Running,
1944 splits: actor_splits,
1945 worker_id: 0,
1946 upstream_actor_ids: Default::default(),
1947 vnode_bitmap: actor_bitmaps
1948 .remove(&actor_id)
1949 .map(|bitmap| bitmap.to_protobuf())
1950 .as_ref()
1951 .map(VnodeBitmap::from),
1952 expr_context: ExprContext::from(&PbExprContext {
1953 time_zone: String::from("America/New_York"),
1954 strict_mode: false,
1955 }),
1956 }
1957 })
1958 .collect_vec();
1959
1960 let stream_node = {
1961 let template_actor = actors.first().cloned().unwrap();
1962
1963 let template_upstream_actor_ids = upstream_actor_ids
1964 .get(&(template_actor.actor_id as _))
1965 .unwrap();
1966
1967 generate_merger_stream_node(template_upstream_actor_ids)
1968 };
1969
1970 #[expect(deprecated)]
1971 let fragment = fragment::Model {
1972 fragment_id: TEST_FRAGMENT_ID,
1973 job_id: TEST_JOB_ID,
1974 fragment_type_mask: 0,
1975 distribution_type: DistributionType::Hash,
1976 stream_node: StreamNode::from(&stream_node),
1977 state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
1978 upstream_fragment_id: Default::default(),
1979 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1980 };
1981
1982 let (pb_fragment, pb_actor_status, pb_actor_splits) =
1983 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
1984
1985 assert_eq!(pb_actor_status.len(), actor_count as usize);
1986 assert_eq!(pb_actor_splits.len(), actor_count as usize);
1987
1988 let pb_actors = pb_fragment.actors.clone();
1989
1990 check_fragment(fragment, pb_fragment);
1991 check_actors(
1992 actors,
1993 &upstream_actor_ids,
1994 pb_actors,
1995 pb_actor_splits,
1996 &stream_node,
1997 );
1998
1999 Ok(())
2000 }
2001
2002 fn check_actors(
2003 actors: Vec<actor::Model>,
2004 actor_upstreams: &FragmentActorUpstreams,
2005 pb_actors: Vec<StreamActor>,
2006 pb_actor_splits: HashMap<u32, PbConnectorSplits>,
2007 stream_node: &PbStreamNode,
2008 ) {
2009 for (
2010 actor::Model {
2011 actor_id,
2012 fragment_id,
2013 status,
2014 splits,
2015 worker_id: _,
2016 vnode_bitmap,
2017 expr_context,
2018 ..
2019 },
2020 StreamActor {
2021 actor_id: pb_actor_id,
2022 fragment_id: pb_fragment_id,
2023 vnode_bitmap: pb_vnode_bitmap,
2024 mview_definition,
2025 expr_context: pb_expr_context,
2026 ..
2027 },
2028 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2029 {
2030 assert_eq!(actor_id, pb_actor_id as ActorId);
2031 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2032
2033 assert_eq!(
2034 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2035 pb_vnode_bitmap,
2036 );
2037
2038 assert_eq!(mview_definition, "");
2039
2040 visit_stream_node_body(stream_node, |body| {
2041 if let PbNodeBody::Merge(m) = body {
2042 assert!(
2043 actor_upstreams
2044 .get(&(actor_id as _))
2045 .unwrap()
2046 .contains_key(&m.upstream_fragment_id)
2047 );
2048 }
2049 });
2050
2051 assert_eq!(status, ActorStatus::Running);
2052
2053 assert_eq!(
2054 splits,
2055 pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
2056 );
2057
2058 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2059 }
2060 }
2061
2062 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2063 let Fragment {
2064 fragment_id,
2065 fragment_type_mask,
2066 distribution_type: pb_distribution_type,
2067 actors: _,
2068 state_table_ids: pb_state_table_ids,
2069 maybe_vnode_count: _,
2070 nodes,
2071 } = pb_fragment;
2072
2073 assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2074 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2075 assert_eq!(
2076 pb_distribution_type,
2077 PbFragmentDistributionType::from(fragment.distribution_type)
2078 );
2079
2080 assert_eq!(
2081 pb_state_table_ids,
2082 fragment.state_table_ids.into_u32_array()
2083 );
2084 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2085 }
2086}