risingwave_meta/controller/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
26use risingwave_common::util::stream_graph_visitor::{
27    visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_connector::source::SplitImpl;
30use risingwave_meta_model::actor::ActorStatus;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34    Actor, Fragment as FragmentModel, FragmentRelation, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39    VnodeBitmap, WorkerId, actor, database, fragment, fragment_relation, object, sink, source,
40    streaming_job, table,
41};
42use risingwave_meta_model_migration::{Alias, SelectStatement};
43use risingwave_pb::common::PbActorLocation;
44use risingwave_pb::meta::subscribe_response::{
45    Info as NotificationInfo, Operation as NotificationOperation,
46};
47use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
48use risingwave_pb::meta::table_fragments::fragment::{
49    FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan::stream_node::NodeBody;
55use risingwave_pb::stream_plan::{
56    PbDispatchOutputMapping, PbDispatcherType, PbStreamContext, PbStreamNode, PbStreamScanType,
57    StreamScanType,
58};
59use sea_orm::ActiveValue::Set;
60use sea_orm::sea_query::Expr;
61use sea_orm::{
62    ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, ModelTrait, PaginatorTrait,
63    QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
64};
65use serde::{Deserialize, Serialize};
66use tracing::debug;
67
68use crate::barrier::SnapshotBackfillInfo;
69use crate::controller::catalog::{CatalogController, CatalogControllerInner};
70use crate::controller::scale::resolve_streaming_job_definition;
71use crate::controller::utils::{
72    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
73    get_fragment_mappings, rebuild_fragment_mapping_from_actors,
74    resolve_no_shuffle_actor_dispatcher,
75};
76use crate::manager::LocalNotification;
77use crate::model::{
78    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
79    StreamActor, StreamContext, StreamJobFragments, TableParallelism,
80};
81use crate::stream::{SplitAssignment, build_actor_split_impls};
82use crate::{MetaError, MetaResult, model};
83
84/// Some information of running (inflight) actors.
85#[derive(Clone, Debug)]
86pub struct InflightActorInfo {
87    pub worker_id: WorkerId,
88    pub vnode_bitmap: Option<Bitmap>,
89}
90
91#[derive(Clone, Debug)]
92pub struct InflightFragmentInfo {
93    pub fragment_id: crate::model::FragmentId,
94    pub distribution_type: DistributionType,
95    pub nodes: PbStreamNode,
96    pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
97    pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
98}
99
100#[derive(Clone, Debug)]
101pub struct FragmentParallelismInfo {
102    pub distribution_type: FragmentDistributionType,
103    pub actor_count: usize,
104    pub vnode_count: usize,
105}
106
107#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
108#[serde(rename_all = "camelCase")] // for dashboard
109pub struct StreamingJobInfo {
110    pub job_id: ObjectId,
111    pub obj_type: ObjectType,
112    pub name: String,
113    pub job_status: JobStatus,
114    pub parallelism: StreamingParallelism,
115    pub max_parallelism: i32,
116    pub resource_group: String,
117    pub database_id: DatabaseId,
118    pub schema_id: SchemaId,
119}
120
121impl CatalogControllerInner {
122    /// List all fragment vnode mapping info for all CREATED streaming jobs.
123    pub async fn all_running_fragment_mappings(
124        &self,
125    ) -> MetaResult<impl Iterator<Item = FragmentWorkerSlotMapping> + '_> {
126        let txn = self.db.begin().await?;
127
128        let job_ids: Vec<ObjectId> = StreamingJob::find()
129            .select_only()
130            .column(streaming_job::Column::JobId)
131            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
132            .into_tuple()
133            .all(&txn)
134            .await?;
135
136        let mut result = vec![];
137        for job_id in job_ids {
138            let mappings = get_fragment_mappings(&txn, job_id).await?;
139
140            result.extend(mappings.into_iter());
141        }
142
143        Ok(result.into_iter())
144    }
145}
146
147impl CatalogController {
148    pub(crate) async fn notify_fragment_mapping(
149        &self,
150        operation: NotificationOperation,
151        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
152    ) {
153        let fragment_ids = fragment_mappings
154            .iter()
155            .map(|mapping| mapping.fragment_id)
156            .collect_vec();
157        // notify all fragment mappings to frontend.
158        for fragment_mapping in fragment_mappings {
159            self.env
160                .notification_manager()
161                .notify_frontend(
162                    operation,
163                    NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
164                )
165                .await;
166        }
167
168        // update serving vnode mappings.
169        match operation {
170            NotificationOperation::Add | NotificationOperation::Update => {
171                self.env
172                    .notification_manager()
173                    .notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
174                        fragment_ids,
175                    ))
176                    .await;
177            }
178            NotificationOperation::Delete => {
179                self.env
180                    .notification_manager()
181                    .notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
182                        fragment_ids,
183                    ))
184                    .await;
185            }
186            op => {
187                tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
188            }
189        }
190    }
191
192    pub fn extract_fragment_and_actors_from_fragments(
193        stream_job_fragments: &StreamJobFragments,
194    ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
195        stream_job_fragments
196            .fragments
197            .values()
198            .map(|fragment| {
199                Self::extract_fragment_and_actors_for_new_job(
200                    stream_job_fragments.stream_job_id.table_id as _,
201                    fragment,
202                    &stream_job_fragments.actor_status,
203                    &stream_job_fragments.actor_splits,
204                )
205            })
206            .try_collect()
207    }
208
209    fn extract_fragment_and_actors_for_new_job(
210        job_id: ObjectId,
211        fragment: &Fragment,
212        actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
213        actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
214    ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
215        let vnode_count = fragment.vnode_count();
216        let Fragment {
217            fragment_id: pb_fragment_id,
218            fragment_type_mask: pb_fragment_type_mask,
219            distribution_type: pb_distribution_type,
220            actors: pb_actors,
221            state_table_ids: pb_state_table_ids,
222            nodes,
223            ..
224        } = fragment;
225
226        let state_table_ids = pb_state_table_ids.clone().into();
227
228        assert!(!pb_actors.is_empty());
229
230        let stream_node = {
231            let mut stream_node = nodes.clone();
232            visit_stream_node_mut(&mut stream_node, |body| {
233                #[expect(deprecated)]
234                if let NodeBody::Merge(m) = body {
235                    m.upstream_actor_id = vec![];
236                }
237            });
238
239            stream_node
240        };
241
242        let mut actors = vec![];
243
244        for actor in pb_actors {
245            let StreamActor {
246                actor_id,
247                fragment_id,
248                vnode_bitmap,
249                mview_definition: _,
250                expr_context: pb_expr_context,
251                ..
252            } = actor;
253
254            let splits = actor_splits.get(actor_id).map(|splits| {
255                ConnectorSplits::from(&PbConnectorSplits {
256                    splits: splits.iter().map(ConnectorSplit::from).collect(),
257                })
258            });
259            let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
260                anyhow::anyhow!(
261                    "actor {} in fragment {} has no actor_status",
262                    actor_id,
263                    fragment_id
264                )
265            })?;
266
267            let worker_id = status.worker_id() as _;
268
269            let pb_expr_context = pb_expr_context
270                .as_ref()
271                .expect("no expression context found");
272
273            #[expect(deprecated)]
274            actors.push(actor::Model {
275                actor_id: *actor_id as _,
276                fragment_id: *fragment_id as _,
277                status: status.get_state().unwrap().into(),
278                splits,
279                worker_id,
280                upstream_actor_ids: Default::default(),
281                vnode_bitmap: vnode_bitmap
282                    .as_ref()
283                    .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
284                expr_context: ExprContext::from(pb_expr_context),
285            });
286        }
287
288        let stream_node = StreamNode::from(&stream_node);
289
290        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
291            .unwrap()
292            .into();
293
294        #[expect(deprecated)]
295        let fragment = fragment::Model {
296            fragment_id: *pb_fragment_id as _,
297            job_id,
298            fragment_type_mask: (*pb_fragment_type_mask).into(),
299            distribution_type,
300            stream_node,
301            state_table_ids,
302            upstream_fragment_id: Default::default(),
303            vnode_count: vnode_count as _,
304        };
305
306        Ok((fragment, actors))
307    }
308
309    pub fn compose_table_fragments(
310        table_id: u32,
311        state: PbState,
312        ctx: Option<PbStreamContext>,
313        fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
314        parallelism: StreamingParallelism,
315        max_parallelism: usize,
316        job_definition: Option<String>,
317    ) -> MetaResult<StreamJobFragments> {
318        let mut pb_fragments = BTreeMap::new();
319        let mut pb_actor_splits = HashMap::new();
320        let mut pb_actor_status = BTreeMap::new();
321
322        for (fragment, actors) in fragments {
323            let (fragment, fragment_actor_status, fragment_actor_splits) =
324                Self::compose_fragment(fragment, actors, job_definition.clone())?;
325
326            pb_fragments.insert(fragment.fragment_id, fragment);
327
328            pb_actor_splits.extend(build_actor_split_impls(&fragment_actor_splits));
329            pb_actor_status.extend(fragment_actor_status.into_iter());
330        }
331
332        let table_fragments = StreamJobFragments {
333            stream_job_id: table_id.into(),
334            state: state as _,
335            fragments: pb_fragments,
336            actor_status: pb_actor_status,
337            actor_splits: pb_actor_splits,
338            ctx: ctx
339                .as_ref()
340                .map(StreamContext::from_protobuf)
341                .unwrap_or_default(),
342            assigned_parallelism: match parallelism {
343                StreamingParallelism::Custom => TableParallelism::Custom,
344                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
345                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
346            },
347            max_parallelism,
348        };
349
350        Ok(table_fragments)
351    }
352
353    #[allow(clippy::type_complexity)]
354    pub(crate) fn compose_fragment(
355        fragment: fragment::Model,
356        actors: Vec<actor::Model>,
357        job_definition: Option<String>,
358    ) -> MetaResult<(
359        Fragment,
360        HashMap<crate::model::ActorId, PbActorStatus>,
361        HashMap<crate::model::ActorId, PbConnectorSplits>,
362    )> {
363        let fragment::Model {
364            fragment_id,
365            fragment_type_mask,
366            distribution_type,
367            stream_node,
368            state_table_ids,
369            vnode_count,
370            ..
371        } = fragment;
372
373        let stream_node = stream_node.to_protobuf();
374        let mut upstream_fragments = HashSet::new();
375        visit_stream_node_body(&stream_node, |body| {
376            if let NodeBody::Merge(m) = body {
377                assert!(
378                    upstream_fragments.insert(m.upstream_fragment_id),
379                    "non-duplicate upstream fragment"
380                );
381            }
382        });
383
384        let mut pb_actors = vec![];
385
386        let mut pb_actor_status = HashMap::new();
387        let mut pb_actor_splits = HashMap::new();
388
389        for actor in actors {
390            if actor.fragment_id != fragment_id {
391                bail!(
392                    "fragment id {} from actor {} is different from fragment {}",
393                    actor.fragment_id,
394                    actor.actor_id,
395                    fragment_id
396                )
397            }
398
399            let actor::Model {
400                actor_id,
401                fragment_id,
402                status,
403                worker_id,
404                splits,
405                vnode_bitmap,
406                expr_context,
407                ..
408            } = actor;
409
410            let vnode_bitmap =
411                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
412            let pb_expr_context = Some(expr_context.to_protobuf());
413
414            pb_actor_status.insert(
415                actor_id as _,
416                PbActorStatus {
417                    location: PbActorLocation::from_worker(worker_id as u32),
418                    state: PbActorState::from(status) as _,
419                },
420            );
421
422            if let Some(splits) = splits {
423                pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
424            }
425
426            pb_actors.push(StreamActor {
427                actor_id: actor_id as _,
428                fragment_id: fragment_id as _,
429                vnode_bitmap,
430                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
431                expr_context: pb_expr_context,
432            })
433        }
434
435        let pb_state_table_ids = state_table_ids.into_u32_array();
436        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
437        let pb_fragment = Fragment {
438            fragment_id: fragment_id as _,
439            fragment_type_mask: fragment_type_mask.into(),
440            distribution_type: pb_distribution_type,
441            actors: pb_actors,
442            state_table_ids: pb_state_table_ids,
443            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
444            nodes: stream_node,
445        };
446
447        Ok((pb_fragment, pb_actor_status, pb_actor_splits))
448    }
449
450    pub async fn running_fragment_parallelisms(
451        &self,
452        id_filter: Option<HashSet<FragmentId>>,
453    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
454        let inner = self.inner.read().await;
455
456        let query_alias = Alias::new("fragment_actor_count");
457        let count_alias = Alias::new("count");
458
459        let mut query = SelectStatement::new()
460            .column(actor::Column::FragmentId)
461            .expr_as(actor::Column::ActorId.count(), count_alias.clone())
462            .from(Actor)
463            .group_by_col(actor::Column::FragmentId)
464            .to_owned();
465
466        if let Some(id_filter) = id_filter {
467            query.cond_having(actor::Column::FragmentId.is_in(id_filter));
468        }
469
470        let outer = SelectStatement::new()
471            .column((FragmentModel, fragment::Column::FragmentId))
472            .column(count_alias)
473            .column(fragment::Column::DistributionType)
474            .column(fragment::Column::VnodeCount)
475            .from_subquery(query.to_owned(), query_alias.clone())
476            .inner_join(
477                FragmentModel,
478                Expr::col((query_alias, actor::Column::FragmentId))
479                    .equals((FragmentModel, fragment::Column::FragmentId)),
480            )
481            .to_owned();
482
483        let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
484            Selector::<SelectGetableTuple<(FragmentId, i64, DistributionType, i32)>>::into_tuple(
485                outer.to_owned(),
486            )
487            .all(&inner.db)
488            .await?;
489
490        Ok(fragment_parallelisms
491            .into_iter()
492            .map(|(fragment_id, count, distribution_type, vnode_count)| {
493                (
494                    fragment_id,
495                    FragmentParallelismInfo {
496                        distribution_type: distribution_type.into(),
497                        actor_count: count as usize,
498                        vnode_count: vnode_count as usize,
499                    },
500                )
501            })
502            .collect())
503    }
504
505    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
506        let inner = self.inner.read().await;
507        let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
508            .select_only()
509            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
510            .into_tuple()
511            .all(&inner.db)
512            .await?;
513        Ok(fragment_jobs.into_iter().collect())
514    }
515
516    pub async fn get_fragment_job_id(
517        &self,
518        fragment_ids: Vec<FragmentId>,
519    ) -> MetaResult<Vec<ObjectId>> {
520        let inner = self.inner.read().await;
521
522        let object_ids: Vec<ObjectId> = FragmentModel::find()
523            .select_only()
524            .column(fragment::Column::JobId)
525            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
526            .into_tuple()
527            .all(&inner.db)
528            .await?;
529
530        Ok(object_ids)
531    }
532
533    pub async fn get_fragment_desc_by_id(
534        &self,
535        fragment_id: FragmentId,
536    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
537        let inner = self.inner.read().await;
538
539        // Get the fragment description
540        let fragment_opt = FragmentModel::find()
541            .select_only()
542            .columns([
543                fragment::Column::FragmentId,
544                fragment::Column::JobId,
545                fragment::Column::FragmentTypeMask,
546                fragment::Column::DistributionType,
547                fragment::Column::StateTableIds,
548                fragment::Column::VnodeCount,
549                fragment::Column::StreamNode,
550            ])
551            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
552            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
553            .filter(fragment::Column::FragmentId.eq(fragment_id))
554            .group_by(fragment::Column::FragmentId)
555            .into_model::<FragmentDesc>()
556            .one(&inner.db)
557            .await?;
558
559        let Some(fragment) = fragment_opt else {
560            return Ok(None); // Fragment not found
561        };
562
563        // Get upstream fragments
564        let upstreams: Vec<FragmentId> = FragmentRelation::find()
565            .select_only()
566            .column(fragment_relation::Column::SourceFragmentId)
567            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
568            .into_tuple()
569            .all(&inner.db)
570            .await?;
571
572        Ok(Some((fragment, upstreams)))
573    }
574
575    pub async fn list_fragment_database_ids(
576        &self,
577        select_fragment_ids: Option<Vec<FragmentId>>,
578    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
579        let inner = self.inner.read().await;
580        let select = FragmentModel::find()
581            .select_only()
582            .column(fragment::Column::FragmentId)
583            .column(object::Column::DatabaseId)
584            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
585        let select = if let Some(select_fragment_ids) = select_fragment_ids {
586            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
587        } else {
588            select
589        };
590        Ok(select.into_tuple().all(&inner.db).await?)
591    }
592
593    pub async fn get_job_fragments_by_id(
594        &self,
595        job_id: ObjectId,
596    ) -> MetaResult<StreamJobFragments> {
597        let inner = self.inner.read().await;
598        let fragment_actors = FragmentModel::find()
599            .find_with_related(Actor)
600            .filter(fragment::Column::JobId.eq(job_id))
601            .all(&inner.db)
602            .await?;
603
604        let job_info = StreamingJob::find_by_id(job_id)
605            .one(&inner.db)
606            .await?
607            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
608
609        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
610            .await?
611            .remove(&job_id);
612
613        Self::compose_table_fragments(
614            job_id as _,
615            job_info.job_status.into(),
616            job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
617            fragment_actors,
618            job_info.parallelism.clone(),
619            job_info.max_parallelism as _,
620            job_definition,
621        )
622    }
623
624    pub async fn get_fragment_actor_dispatchers(
625        &self,
626        fragment_ids: Vec<FragmentId>,
627    ) -> MetaResult<FragmentActorDispatchers> {
628        let inner = self.inner.read().await;
629        get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
630    }
631
632    pub async fn get_fragment_downstream_relations(
633        &self,
634        fragment_ids: Vec<FragmentId>,
635    ) -> MetaResult<FragmentDownstreamRelation> {
636        let inner = self.inner.read().await;
637        let mut stream = FragmentRelation::find()
638            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
639            .stream(&inner.db)
640            .await?;
641        let mut relations = FragmentDownstreamRelation::new();
642        while let Some(relation) = stream.try_next().await? {
643            relations
644                .entry(relation.source_fragment_id as _)
645                .or_default()
646                .push(DownstreamFragmentRelation {
647                    downstream_fragment_id: relation.target_fragment_id as _,
648                    dispatcher_type: relation.dispatcher_type,
649                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
650                    output_mapping: PbDispatchOutputMapping {
651                        indices: relation.output_indices.into_u32_array(),
652                        types: relation
653                            .output_type_mapping
654                            .unwrap_or_default()
655                            .to_protobuf(),
656                    },
657                });
658        }
659        Ok(relations)
660    }
661
662    pub async fn get_job_fragment_backfill_scan_type(
663        &self,
664        job_id: ObjectId,
665    ) -> MetaResult<HashMap<model::FragmentId, PbStreamScanType>> {
666        let inner = self.inner.read().await;
667        let fragments: Vec<_> = FragmentModel::find()
668            .filter(fragment::Column::JobId.eq(job_id))
669            .all(&inner.db)
670            .await?;
671
672        let mut result = HashMap::new();
673
674        for fragment::Model {
675            fragment_id,
676            stream_node,
677            ..
678        } in fragments
679        {
680            let stream_node = stream_node.to_protobuf();
681            visit_stream_node_body(&stream_node, |body| {
682                if let NodeBody::StreamScan(node) = body {
683                    match node.stream_scan_type() {
684                        StreamScanType::Unspecified => {}
685                        scan_type => {
686                            result.insert(fragment_id as model::FragmentId, scan_type);
687                        }
688                    }
689                }
690            });
691        }
692
693        Ok(result)
694    }
695
696    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
697        let inner = self.inner.read().await;
698        let job_states = StreamingJob::find()
699            .select_only()
700            .column(streaming_job::Column::JobId)
701            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
702            .join(JoinType::InnerJoin, object::Relation::Database2.def())
703            .column(object::Column::ObjType)
704            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
705            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
706            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
707            .column_as(
708                Expr::if_null(
709                    Expr::col((table::Entity, table::Column::Name)),
710                    Expr::if_null(
711                        Expr::col((source::Entity, source::Column::Name)),
712                        Expr::if_null(
713                            Expr::col((sink::Entity, sink::Column::Name)),
714                            Expr::val("<unknown>"),
715                        ),
716                    ),
717                ),
718                "name",
719            )
720            .columns([
721                streaming_job::Column::JobStatus,
722                streaming_job::Column::Parallelism,
723                streaming_job::Column::MaxParallelism,
724            ])
725            .column_as(
726                Expr::if_null(
727                    Expr::col((
728                        streaming_job::Entity,
729                        streaming_job::Column::SpecificResourceGroup,
730                    )),
731                    Expr::col((database::Entity, database::Column::ResourceGroup)),
732                ),
733                "resource_group",
734            )
735            .column(object::Column::DatabaseId)
736            .column(object::Column::SchemaId)
737            .into_model()
738            .all(&inner.db)
739            .await?;
740        Ok(job_states)
741    }
742
743    pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
744        let inner = self.inner.read().await;
745        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
746            .select_only()
747            .column(streaming_job::Column::MaxParallelism)
748            .into_tuple()
749            .one(&inner.db)
750            .await?
751            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
752        Ok(max_parallelism as usize)
753    }
754
755    /// Get all actor ids in the target streaming jobs.
756    pub async fn get_job_actor_mapping(
757        &self,
758        job_ids: Vec<ObjectId>,
759    ) -> MetaResult<HashMap<ObjectId, Vec<ActorId>>> {
760        let inner = self.inner.read().await;
761        let job_actors: Vec<(ObjectId, ActorId)> = Actor::find()
762            .select_only()
763            .column(fragment::Column::JobId)
764            .column(actor::Column::ActorId)
765            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
766            .filter(fragment::Column::JobId.is_in(job_ids))
767            .into_tuple()
768            .all(&inner.db)
769            .await?;
770        Ok(job_actors.into_iter().into_group_map())
771    }
772
773    /// Try to get internal table ids of each streaming job, used by metrics collection.
774    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
775        if let Ok(inner) = self.inner.try_read()
776            && let Ok(job_state_tables) = FragmentModel::find()
777                .select_only()
778                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
779                .into_tuple::<(ObjectId, I32Array)>()
780                .all(&inner.db)
781                .await
782        {
783            let mut job_internal_table_ids = HashMap::new();
784            for (job_id, state_table_ids) in job_state_tables {
785                job_internal_table_ids
786                    .entry(job_id)
787                    .or_insert_with(Vec::new)
788                    .extend(state_table_ids.into_inner());
789            }
790            return Some(job_internal_table_ids.into_iter().collect());
791        }
792        None
793    }
794
795    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
796        let inner = self.inner.read().await;
797        let count = FragmentModel::find().count(&inner.db).await?;
798        Ok(count > 0)
799    }
800
801    pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
802        let inner = self.inner.read().await;
803        let actor_cnt: Vec<(WorkerId, i64)> = Actor::find()
804            .select_only()
805            .column(actor::Column::WorkerId)
806            .column_as(actor::Column::ActorId.count(), "count")
807            .group_by(actor::Column::WorkerId)
808            .into_tuple()
809            .all(&inner.db)
810            .await?;
811
812        Ok(actor_cnt
813            .into_iter()
814            .map(|(worker_id, count)| (worker_id, count as usize))
815            .collect())
816    }
817
818    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
819    pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
820        let inner = self.inner.read().await;
821        let jobs = StreamingJob::find().all(&inner.db).await?;
822
823        let mut job_definition = resolve_streaming_job_definition(
824            &inner.db,
825            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
826        )
827        .await?;
828
829        let mut table_fragments = BTreeMap::new();
830        for job in jobs {
831            let fragment_actors = FragmentModel::find()
832                .find_with_related(Actor)
833                .filter(fragment::Column::JobId.eq(job.job_id))
834                .all(&inner.db)
835                .await?;
836
837            table_fragments.insert(
838                job.job_id as ObjectId,
839                Self::compose_table_fragments(
840                    job.job_id as _,
841                    job.job_status.into(),
842                    job.timezone.map(|tz| PbStreamContext { timezone: tz }),
843                    fragment_actors,
844                    job.parallelism.clone(),
845                    job.max_parallelism as _,
846                    job_definition.remove(&job.job_id),
847                )?,
848            );
849        }
850
851        Ok(table_fragments)
852    }
853
854    pub async fn upstream_fragments(
855        &self,
856        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
857    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
858        let inner = self.inner.read().await;
859        let mut stream = FragmentRelation::find()
860            .select_only()
861            .columns([
862                fragment_relation::Column::SourceFragmentId,
863                fragment_relation::Column::TargetFragmentId,
864            ])
865            .filter(
866                fragment_relation::Column::TargetFragmentId
867                    .is_in(fragment_ids.map(|id| id as FragmentId)),
868            )
869            .into_tuple::<(FragmentId, FragmentId)>()
870            .stream(&inner.db)
871            .await?;
872        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
873        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
874            upstream_fragments
875                .entry(downstream_fragment_id as crate::model::FragmentId)
876                .or_default()
877                .insert(upstream_fragment_id as crate::model::FragmentId);
878        }
879        Ok(upstream_fragments)
880    }
881
882    pub async fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
883        let inner = self.inner.read().await;
884        let actor_locations: Vec<PartialActorLocation> =
885            Actor::find().into_partial_model().all(&inner.db).await?;
886        Ok(actor_locations)
887    }
888
889    pub async fn list_actor_info(
890        &self,
891    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
892        let inner = self.inner.read().await;
893        let actor_locations: Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)> =
894            Actor::find()
895                .join(JoinType::LeftJoin, actor::Relation::Fragment.def())
896                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
897                .select_only()
898                .columns([actor::Column::ActorId, actor::Column::FragmentId])
899                .column_as(object::Column::Oid, "job_id")
900                .column_as(object::Column::SchemaId, "schema_id")
901                .column_as(object::Column::ObjType, "type")
902                .into_tuple()
903                .all(&inner.db)
904                .await?;
905        Ok(actor_locations)
906    }
907
908    pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
909        let inner = self.inner.read().await;
910
911        let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
912            .select_only()
913            .filter(actor::Column::Splits.is_not_null())
914            .columns([actor::Column::ActorId, actor::Column::FragmentId])
915            .into_tuple()
916            .all(&inner.db)
917            .await?;
918
919        Ok(source_actors)
920    }
921
922    pub async fn list_creating_fragment_descs(
923        &self,
924    ) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
925        let inner = self.inner.read().await;
926        let mut result = Vec::new();
927        let fragments = FragmentModel::find()
928            .select_only()
929            .columns([
930                fragment::Column::FragmentId,
931                fragment::Column::JobId,
932                fragment::Column::FragmentTypeMask,
933                fragment::Column::DistributionType,
934                fragment::Column::StateTableIds,
935                fragment::Column::VnodeCount,
936                fragment::Column::StreamNode,
937            ])
938            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
939            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
940            .join(JoinType::LeftJoin, fragment::Relation::Object.def())
941            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
942            .filter(
943                streaming_job::Column::JobStatus
944                    .eq(JobStatus::Initial)
945                    .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
946            )
947            .group_by(fragment::Column::FragmentId)
948            .into_model::<FragmentDesc>()
949            .all(&inner.db)
950            .await?;
951        for fragment in fragments {
952            let upstreams: Vec<FragmentId> = FragmentRelation::find()
953                .select_only()
954                .column(fragment_relation::Column::SourceFragmentId)
955                .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
956                .into_tuple()
957                .all(&inner.db)
958                .await?;
959            result.push((fragment, upstreams));
960        }
961        Ok(result)
962    }
963
964    pub async fn list_fragment_descs(&self) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
965        let inner = self.inner.read().await;
966        let mut result = Vec::new();
967        let fragments = FragmentModel::find()
968            .select_only()
969            .columns([
970                fragment::Column::FragmentId,
971                fragment::Column::JobId,
972                fragment::Column::FragmentTypeMask,
973                fragment::Column::DistributionType,
974                fragment::Column::StateTableIds,
975                fragment::Column::VnodeCount,
976                fragment::Column::StreamNode,
977            ])
978            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
979            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
980            .group_by(fragment::Column::FragmentId)
981            .into_model::<FragmentDesc>()
982            .all(&inner.db)
983            .await?;
984        for fragment in fragments {
985            let upstreams: Vec<FragmentId> = FragmentRelation::find()
986                .select_only()
987                .column(fragment_relation::Column::SourceFragmentId)
988                .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
989                .into_tuple()
990                .all(&inner.db)
991                .await?;
992            result.push((fragment, upstreams));
993        }
994        Ok(result)
995    }
996
997    pub async fn list_sink_actor_mapping(
998        &self,
999    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1000        let inner = self.inner.read().await;
1001        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1002            .select_only()
1003            .columns([sink::Column::SinkId, sink::Column::Name])
1004            .into_tuple()
1005            .all(&inner.db)
1006            .await?;
1007        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1008        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1009
1010        let actor_with_type: Vec<(ActorId, SinkId, i32)> = Actor::find()
1011            .select_only()
1012            .column(actor::Column::ActorId)
1013            .columns([fragment::Column::JobId, fragment::Column::FragmentTypeMask])
1014            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1015            .filter(fragment::Column::JobId.is_in(sink_ids))
1016            .into_tuple()
1017            .all(&inner.db)
1018            .await?;
1019
1020        let mut sink_actor_mapping = HashMap::new();
1021        for (actor_id, sink_id, type_mask) in actor_with_type {
1022            if FragmentTypeMask::from(type_mask).contains(FragmentTypeFlag::Sink) {
1023                sink_actor_mapping
1024                    .entry(sink_id)
1025                    .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1026                    .1
1027                    .push(actor_id);
1028            }
1029        }
1030
1031        Ok(sink_actor_mapping)
1032    }
1033
1034    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1035        let inner = self.inner.read().await;
1036        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1037            .select_only()
1038            .columns([
1039                fragment::Column::FragmentId,
1040                fragment::Column::JobId,
1041                fragment::Column::StateTableIds,
1042            ])
1043            .into_partial_model()
1044            .all(&inner.db)
1045            .await?;
1046        Ok(fragment_state_tables)
1047    }
1048
1049    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1050    /// collected
1051    pub async fn load_all_actors(
1052        &self,
1053        database_id: Option<DatabaseId>,
1054    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
1055    {
1056        let inner = self.inner.read().await;
1057        let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
1058        let filter_condition = if let Some(database_id) = database_id {
1059            filter_condition.and(object::Column::DatabaseId.eq(database_id))
1060        } else {
1061            filter_condition
1062        };
1063        #[expect(clippy::type_complexity)]
1064        let mut actor_info_stream: BoxStream<
1065            '_,
1066            Result<
1067                (
1068                    ActorId,
1069                    WorkerId,
1070                    Option<VnodeBitmap>,
1071                    FragmentId,
1072                    StreamNode,
1073                    I32Array,
1074                    DistributionType,
1075                    DatabaseId,
1076                    ObjectId,
1077                ),
1078                _,
1079            >,
1080        > = Actor::find()
1081            .select_only()
1082            .column(actor::Column::ActorId)
1083            .column(actor::Column::WorkerId)
1084            .column(actor::Column::VnodeBitmap)
1085            .column(fragment::Column::FragmentId)
1086            .column(fragment::Column::StreamNode)
1087            .column(fragment::Column::StateTableIds)
1088            .column(fragment::Column::DistributionType)
1089            .column(object::Column::DatabaseId)
1090            .column(object::Column::Oid)
1091            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1092            .join(JoinType::InnerJoin, fragment::Relation::Object.def())
1093            .filter(filter_condition)
1094            .into_tuple()
1095            .stream(&inner.db)
1096            .await?;
1097
1098        let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1099            HashMap::new();
1100
1101        while let Some((
1102            actor_id,
1103            worker_id,
1104            vnode_bitmap,
1105            fragment_id,
1106            node,
1107            state_table_ids,
1108            distribution_type,
1109            database_id,
1110            job_id,
1111        )) = actor_info_stream.try_next().await?
1112        {
1113            let fragment_infos = database_fragment_infos
1114                .entry(database_id)
1115                .or_default()
1116                .entry(job_id)
1117                .or_default();
1118            let state_table_ids = state_table_ids.into_inner();
1119            let state_table_ids = state_table_ids
1120                .into_iter()
1121                .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1122                .collect();
1123            let actor_info = InflightActorInfo {
1124                worker_id,
1125                vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1126            };
1127            match fragment_infos.entry(fragment_id) {
1128                Entry::Occupied(mut entry) => {
1129                    let info: &mut InflightFragmentInfo = entry.get_mut();
1130                    assert_eq!(info.state_table_ids, state_table_ids);
1131                    assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1132                }
1133                Entry::Vacant(entry) => {
1134                    entry.insert(InflightFragmentInfo {
1135                        fragment_id: fragment_id as _,
1136                        distribution_type,
1137                        nodes: node.to_protobuf(),
1138                        actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1139                        state_table_ids,
1140                    });
1141                }
1142            }
1143        }
1144
1145        debug!(?database_fragment_infos, "reload all actors");
1146
1147        Ok(database_fragment_infos)
1148    }
1149
1150    pub async fn migrate_actors(
1151        &self,
1152        plan: HashMap<WorkerSlotId, WorkerSlotId>,
1153    ) -> MetaResult<()> {
1154        let inner = self.inner.read().await;
1155        let txn = inner.db.begin().await?;
1156
1157        let actors: Vec<(
1158            FragmentId,
1159            DistributionType,
1160            ActorId,
1161            Option<VnodeBitmap>,
1162            WorkerId,
1163            ActorStatus,
1164        )> = Actor::find()
1165            .select_only()
1166            .columns([
1167                fragment::Column::FragmentId,
1168                fragment::Column::DistributionType,
1169            ])
1170            .columns([
1171                actor::Column::ActorId,
1172                actor::Column::VnodeBitmap,
1173                actor::Column::WorkerId,
1174                actor::Column::Status,
1175            ])
1176            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1177            .into_tuple()
1178            .all(&txn)
1179            .await?;
1180
1181        let mut actor_locations = HashMap::new();
1182
1183        for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1184            if *status != ActorStatus::Running {
1185                tracing::warn!(
1186                    "skipping actor {} in fragment {} with status {:?}",
1187                    actor_id,
1188                    fragment_id,
1189                    status
1190                );
1191                continue;
1192            }
1193
1194            actor_locations
1195                .entry(*worker_id)
1196                .or_insert(HashMap::new())
1197                .entry(*fragment_id)
1198                .or_insert(BTreeSet::new())
1199                .insert(*actor_id);
1200        }
1201
1202        let expired_or_changed_workers: HashSet<_> =
1203            plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1204
1205        let mut actor_migration_plan = HashMap::new();
1206        for (worker, fragment) in actor_locations {
1207            if expired_or_changed_workers.contains(&worker) {
1208                for (fragment_id, actors) in fragment {
1209                    debug!(
1210                        "worker {} expired or changed, migrating fragment {}",
1211                        worker, fragment_id
1212                    );
1213                    let worker_slot_to_actor: HashMap<_, _> = actors
1214                        .iter()
1215                        .enumerate()
1216                        .map(|(idx, actor_id)| {
1217                            (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1218                        })
1219                        .collect();
1220
1221                    for (worker_slot, actor) in worker_slot_to_actor {
1222                        if let Some(target) = plan.get(&worker_slot) {
1223                            actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1224                        }
1225                    }
1226                }
1227            }
1228        }
1229
1230        for (actor, worker) in actor_migration_plan {
1231            Actor::update_many()
1232                .col_expr(
1233                    actor::Column::WorkerId,
1234                    Expr::value(Value::Int(Some(worker))),
1235                )
1236                .filter(actor::Column::ActorId.eq(actor))
1237                .exec(&txn)
1238                .await?;
1239        }
1240
1241        txn.commit().await?;
1242
1243        self.notify_fragment_mapping(
1244            NotificationOperation::Update,
1245            rebuild_fragment_mapping_from_actors(actors),
1246        )
1247        .await;
1248
1249        Ok(())
1250    }
1251
1252    pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1253        let inner = self.inner.read().await;
1254
1255        let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1256            .select_only()
1257            .columns([fragment::Column::FragmentId])
1258            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1259            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1260            .into_tuple()
1261            .all(&inner.db)
1262            .await?;
1263
1264        let mut actor_locations = HashMap::new();
1265
1266        for (fragment_id, _, worker_id) in actors {
1267            *actor_locations
1268                .entry(worker_id)
1269                .or_insert(HashMap::new())
1270                .entry(fragment_id)
1271                .or_insert(0_usize) += 1;
1272        }
1273
1274        let mut result = HashSet::new();
1275        for (worker_id, mapping) in actor_locations {
1276            let max_fragment_len = mapping.values().max().unwrap();
1277
1278            result
1279                .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1280        }
1281
1282        Ok(result)
1283    }
1284
1285    pub async fn all_node_actors(
1286        &self,
1287        include_inactive: bool,
1288    ) -> MetaResult<HashMap<WorkerId, Vec<StreamActor>>> {
1289        let inner = self.inner.read().await;
1290        let fragment_actors = if include_inactive {
1291            FragmentModel::find()
1292                .find_with_related(Actor)
1293                .all(&inner.db)
1294                .await?
1295        } else {
1296            FragmentModel::find()
1297                .find_with_related(Actor)
1298                .filter(actor::Column::Status.eq(ActorStatus::Running))
1299                .all(&inner.db)
1300                .await?
1301        };
1302
1303        let job_definitions = resolve_streaming_job_definition(
1304            &inner.db,
1305            &HashSet::from_iter(fragment_actors.iter().map(|(fragment, _)| fragment.job_id)),
1306        )
1307        .await?;
1308
1309        let mut node_actors = HashMap::new();
1310        for (fragment, actors) in fragment_actors {
1311            let job_id = fragment.job_id;
1312            let (table_fragments, actor_status, _) = Self::compose_fragment(
1313                fragment,
1314                actors,
1315                job_definitions.get(&(job_id as _)).cloned(),
1316            )?;
1317            for actor in table_fragments.actors {
1318                let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
1319                node_actors
1320                    .entry(node_id)
1321                    .or_insert_with(Vec::new)
1322                    .push(actor);
1323            }
1324        }
1325
1326        Ok(node_actors)
1327    }
1328
1329    pub async fn get_worker_actor_ids(
1330        &self,
1331        job_ids: Vec<ObjectId>,
1332    ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
1333        let inner = self.inner.read().await;
1334        let actor_workers: Vec<(ActorId, WorkerId)> = Actor::find()
1335            .select_only()
1336            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1337            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1338            .filter(fragment::Column::JobId.is_in(job_ids))
1339            .into_tuple()
1340            .all(&inner.db)
1341            .await?;
1342
1343        let mut worker_actors = BTreeMap::new();
1344        for (actor_id, worker_id) in actor_workers {
1345            worker_actors
1346                .entry(worker_id)
1347                .or_insert_with(Vec::new)
1348                .push(actor_id);
1349        }
1350
1351        Ok(worker_actors)
1352    }
1353
1354    pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1355        let inner = self.inner.read().await;
1356        let txn = inner.db.begin().await?;
1357        for assignments in split_assignment.values() {
1358            for (actor_id, splits) in assignments {
1359                let actor_splits = splits.iter().map(Into::into).collect_vec();
1360                Actor::update(actor::ActiveModel {
1361                    actor_id: Set(*actor_id as _),
1362                    splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1363                        splits: actor_splits,
1364                    }))),
1365                    ..Default::default()
1366                })
1367                .exec(&txn)
1368                .await
1369                .map_err(|err| {
1370                    if err == DbErr::RecordNotUpdated {
1371                        MetaError::catalog_id_not_found("actor_id", actor_id)
1372                    } else {
1373                        err.into()
1374                    }
1375                })?;
1376            }
1377        }
1378        txn.commit().await?;
1379
1380        Ok(())
1381    }
1382
1383    #[await_tree::instrument]
1384    pub async fn fill_snapshot_backfill_epoch(
1385        &self,
1386        fragment_ids: impl Iterator<Item = FragmentId>,
1387        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1388        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1389    ) -> MetaResult<()> {
1390        let inner = self.inner.write().await;
1391        let txn = inner.db.begin().await?;
1392        for fragment_id in fragment_ids {
1393            let fragment = FragmentModel::find_by_id(fragment_id)
1394                .one(&txn)
1395                .await?
1396                .context(format!("fragment {} not found", fragment_id))?;
1397            let mut node = fragment.stream_node.to_protobuf();
1398            if crate::stream::fill_snapshot_backfill_epoch(
1399                &mut node,
1400                snapshot_backfill_info,
1401                cross_db_snapshot_backfill_info,
1402            )? {
1403                let node = StreamNode::from(&node);
1404                FragmentModel::update(fragment::ActiveModel {
1405                    fragment_id: Set(fragment_id),
1406                    stream_node: Set(node),
1407                    ..Default::default()
1408                })
1409                .exec(&txn)
1410                .await?;
1411            }
1412        }
1413        txn.commit().await?;
1414        Ok(())
1415    }
1416
1417    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1418    pub async fn get_running_actors_of_fragment(
1419        &self,
1420        fragment_id: FragmentId,
1421    ) -> MetaResult<Vec<ActorId>> {
1422        let inner = self.inner.read().await;
1423        let actors: Vec<ActorId> = Actor::find()
1424            .select_only()
1425            .column(actor::Column::ActorId)
1426            .filter(actor::Column::FragmentId.eq(fragment_id))
1427            .filter(actor::Column::Status.eq(ActorStatus::Running))
1428            .into_tuple()
1429            .all(&inner.db)
1430            .await?;
1431        Ok(actors)
1432    }
1433
1434    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1435    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1436    pub async fn get_running_actors_for_source_backfill(
1437        &self,
1438        source_backfill_fragment_id: FragmentId,
1439        source_fragment_id: FragmentId,
1440    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1441        let inner = self.inner.read().await;
1442        let txn = inner.db.begin().await?;
1443
1444        let fragment_relation: DispatcherType = FragmentRelation::find()
1445            .select_only()
1446            .column(fragment_relation::Column::DispatcherType)
1447            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1448            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1449            .into_tuple()
1450            .one(&txn)
1451            .await?
1452            .ok_or_else(|| {
1453                anyhow!(
1454                    "no fragment connection from source fragment {} to source backfill fragment {}",
1455                    source_fragment_id,
1456                    source_backfill_fragment_id
1457                )
1458            })?;
1459
1460        if fragment_relation != DispatcherType::NoShuffle {
1461            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1462        }
1463
1464        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1465            let result: MetaResult<DistributionType> = try {
1466                FragmentModel::find_by_id(fragment_id)
1467                    .select_only()
1468                    .column(fragment::Column::DistributionType)
1469                    .into_tuple()
1470                    .one(txn)
1471                    .await?
1472                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1473            };
1474            result
1475        };
1476
1477        let source_backfill_distribution_type =
1478            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1479        let source_distribution_type =
1480            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1481
1482        let load_fragment_actor_distribution = |txn, fragment_id: FragmentId| async move {
1483            Actor::find()
1484                .select_only()
1485                .column(actor::Column::ActorId)
1486                .column(actor::Column::VnodeBitmap)
1487                .filter(actor::Column::FragmentId.eq(fragment_id))
1488                .into_tuple()
1489                .stream(txn)
1490                .await?
1491                .map(|result| {
1492                    result.map(|(actor_id, vnode): (ActorId, Option<VnodeBitmap>)| {
1493                        (
1494                            actor_id as _,
1495                            vnode.map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1496                        )
1497                    })
1498                })
1499                .try_collect()
1500                .await
1501        };
1502
1503        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1504            load_fragment_actor_distribution(&txn, source_backfill_fragment_id).await?;
1505
1506        let source_actors = load_fragment_actor_distribution(&txn, source_fragment_id).await?;
1507
1508        Ok(resolve_no_shuffle_actor_dispatcher(
1509            source_distribution_type,
1510            &source_actors,
1511            source_backfill_distribution_type,
1512            &source_backfill_actors,
1513        )
1514        .into_iter()
1515        .map(|(source_actor, source_backfill_actor)| {
1516            (source_backfill_actor as _, source_actor as _)
1517        })
1518        .collect())
1519    }
1520
1521    pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
1522        let inner = self.inner.read().await;
1523        let actors: Vec<ActorId> = Actor::find()
1524            .select_only()
1525            .column(actor::Column::ActorId)
1526            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1527            .filter(fragment::Column::JobId.is_in(job_ids))
1528            .into_tuple()
1529            .all(&inner.db)
1530            .await?;
1531        Ok(actors)
1532    }
1533
1534    /// Get and filter the "**root**" fragments of the specified jobs.
1535    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1536    ///
1537    /// Root fragment connects to downstream jobs.
1538    ///
1539    /// ## What can be the root fragment
1540    /// - For sink, it should have one `Sink` fragment.
1541    /// - For MV, it should have one `MView` fragment.
1542    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1543    /// - For source, it should have one `Source` fragment.
1544    ///
1545    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1546    pub async fn get_root_fragments(
1547        &self,
1548        job_ids: Vec<ObjectId>,
1549    ) -> MetaResult<(HashMap<ObjectId, Fragment>, Vec<(ActorId, WorkerId)>)> {
1550        let inner = self.inner.read().await;
1551
1552        let job_definitions = resolve_streaming_job_definition(
1553            &inner.db,
1554            &HashSet::from_iter(job_ids.iter().copied()),
1555        )
1556        .await?;
1557
1558        let all_fragments = FragmentModel::find()
1559            .filter(fragment::Column::JobId.is_in(job_ids))
1560            .all(&inner.db)
1561            .await?;
1562        // job_id -> fragment
1563        let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1564        for fragment in all_fragments {
1565            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1566            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1567                _ = root_fragments.insert(fragment.job_id, fragment);
1568            } else if mask.contains(FragmentTypeFlag::Source) {
1569                // look for Source fragment only if there's no MView fragment
1570                // (notice try_insert here vs insert above)
1571                _ = root_fragments.try_insert(fragment.job_id, fragment);
1572            }
1573        }
1574
1575        let mut root_fragments_pb = HashMap::new();
1576        for (_, fragment) in root_fragments {
1577            let actors = fragment.find_related(Actor).all(&inner.db).await?;
1578
1579            let job_id = fragment.job_id;
1580            root_fragments_pb.insert(
1581                fragment.job_id,
1582                Self::compose_fragment(
1583                    fragment,
1584                    actors,
1585                    job_definitions.get(&(job_id as _)).cloned(),
1586                )?
1587                .0,
1588            );
1589        }
1590
1591        let actors: Vec<(ActorId, WorkerId)> = Actor::find()
1592            .select_only()
1593            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1594            .into_tuple()
1595            .all(&inner.db)
1596            .await?;
1597
1598        Ok((root_fragments_pb, actors))
1599    }
1600
1601    pub async fn get_root_fragment(
1602        &self,
1603        job_id: ObjectId,
1604    ) -> MetaResult<(Fragment, Vec<(ActorId, WorkerId)>)> {
1605        let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1606        let root_fragment = root_fragments
1607            .remove(&job_id)
1608            .context(format!("root fragment for job {} not found", job_id))?;
1609        Ok((root_fragment, actors))
1610    }
1611
1612    /// Get the downstream fragments connected to the specified job.
1613    pub async fn get_downstream_fragments(
1614        &self,
1615        job_id: ObjectId,
1616    ) -> MetaResult<(Vec<(PbDispatcherType, Fragment)>, Vec<(ActorId, WorkerId)>)> {
1617        let (root_fragment, actors) = self.get_root_fragment(job_id).await?;
1618
1619        let inner = self.inner.read().await;
1620        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1621            .filter(
1622                fragment_relation::Column::SourceFragmentId
1623                    .eq(root_fragment.fragment_id as FragmentId),
1624            )
1625            .all(&inner.db)
1626            .await?;
1627        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
1628            .await?
1629            .remove(&job_id);
1630
1631        let mut downstream_fragments = vec![];
1632        for fragment_relation::Model {
1633            target_fragment_id: fragment_id,
1634            dispatcher_type,
1635            ..
1636        } in downstream_fragment_relations
1637        {
1638            let mut fragment_actors = FragmentModel::find_by_id(fragment_id)
1639                .find_with_related(Actor)
1640                .all(&inner.db)
1641                .await?;
1642            if fragment_actors.is_empty() {
1643                bail!("No fragment found for fragment id {}", fragment_id);
1644            }
1645            assert_eq!(fragment_actors.len(), 1);
1646            let (fragment, actors) = fragment_actors.pop().unwrap();
1647            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1648            let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0;
1649            downstream_fragments.push((dispatch_type, fragment));
1650        }
1651
1652        Ok((downstream_fragments, actors))
1653    }
1654
1655    pub async fn load_source_fragment_ids(
1656        &self,
1657    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1658        let inner = self.inner.read().await;
1659        let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1660            .select_only()
1661            .columns([
1662                fragment::Column::FragmentId,
1663                fragment::Column::FragmentTypeMask,
1664                fragment::Column::StreamNode,
1665            ])
1666            .into_tuple()
1667            .all(&inner.db)
1668            .await?;
1669        fragments.retain(|(_, mask, _)| {
1670            FragmentTypeMask::from(*mask).contains(FragmentTypeFlag::Source)
1671        });
1672
1673        let mut source_fragment_ids = HashMap::new();
1674        for (fragment_id, _, stream_node) in fragments {
1675            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1676                source_fragment_ids
1677                    .entry(source_id as SourceId)
1678                    .or_insert_with(BTreeSet::new)
1679                    .insert(fragment_id);
1680            }
1681        }
1682        Ok(source_fragment_ids)
1683    }
1684
1685    pub async fn load_backfill_fragment_ids(
1686        &self,
1687    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1688        let inner = self.inner.read().await;
1689        let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1690            .select_only()
1691            .columns([
1692                fragment::Column::FragmentId,
1693                fragment::Column::FragmentTypeMask,
1694                fragment::Column::StreamNode,
1695            ])
1696            .into_tuple()
1697            .all(&inner.db)
1698            .await?;
1699        fragments.retain(|(_, mask, _)| {
1700            FragmentTypeMask::from(*mask).contains(FragmentTypeFlag::SourceScan)
1701        });
1702
1703        let mut source_fragment_ids = HashMap::new();
1704        for (fragment_id, _, stream_node) in fragments {
1705            if let Some((source_id, upstream_source_fragment_id)) =
1706                stream_node.to_protobuf().find_source_backfill()
1707            {
1708                source_fragment_ids
1709                    .entry(source_id as SourceId)
1710                    .or_insert_with(BTreeSet::new)
1711                    .insert((fragment_id, upstream_source_fragment_id));
1712            }
1713        }
1714        Ok(source_fragment_ids)
1715    }
1716
1717    pub async fn load_actor_splits(&self) -> MetaResult<HashMap<ActorId, ConnectorSplits>> {
1718        let inner = self.inner.read().await;
1719        let splits: Vec<(ActorId, ConnectorSplits)> = Actor::find()
1720            .select_only()
1721            .columns([actor::Column::ActorId, actor::Column::Splits])
1722            .filter(actor::Column::Splits.is_not_null())
1723            .into_tuple()
1724            .all(&inner.db)
1725            .await?;
1726        Ok(splits.into_iter().collect())
1727    }
1728
1729    /// Get the actor count of `Materialize` or `Sink` fragment of the specified table.
1730    pub async fn get_actual_job_fragment_parallelism(
1731        &self,
1732        job_id: ObjectId,
1733    ) -> MetaResult<Option<usize>> {
1734        let inner = self.inner.read().await;
1735        let mut fragments: Vec<(FragmentId, i32, i64)> = FragmentModel::find()
1736            .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1737            .select_only()
1738            .columns([
1739                fragment::Column::FragmentId,
1740                fragment::Column::FragmentTypeMask,
1741            ])
1742            .column_as(actor::Column::ActorId.count(), "count")
1743            .filter(fragment::Column::JobId.eq(job_id))
1744            .group_by(fragment::Column::FragmentId)
1745            .into_tuple()
1746            .all(&inner.db)
1747            .await?;
1748
1749        fragments.retain(|(_, mask, _)| {
1750            FragmentTypeMask::from(*mask)
1751                .contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink])
1752        });
1753
1754        Ok(fragments
1755            .into_iter()
1756            .at_most_one()
1757            .ok()
1758            .flatten()
1759            .map(|(_, _, count)| count as usize))
1760    }
1761}
1762
1763#[cfg(test)]
1764mod tests {
1765    use std::collections::{BTreeMap, HashMap, HashSet};
1766
1767    use itertools::Itertools;
1768    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1769    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1770    use risingwave_common::util::iter_util::ZipEqDebug;
1771    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1772    use risingwave_meta_model::actor::ActorStatus;
1773    use risingwave_meta_model::fragment::DistributionType;
1774    use risingwave_meta_model::{
1775        ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1776        VnodeBitmap, actor, fragment,
1777    };
1778    use risingwave_pb::common::PbActorLocation;
1779    use risingwave_pb::meta::table_fragments::PbActorStatus;
1780    use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1781    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1782    use risingwave_pb::plan_common::PbExprContext;
1783    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1784    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1785    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1786
1787    use crate::MetaResult;
1788    use crate::controller::catalog::CatalogController;
1789    use crate::model::{Fragment, StreamActor};
1790
1791    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1792
1793    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1794
1795    const TEST_FRAGMENT_ID: FragmentId = 1;
1796
1797    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1798
1799    const TEST_JOB_ID: ObjectId = 1;
1800
1801    const TEST_STATE_TABLE_ID: TableId = 1000;
1802
1803    fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1804        let mut upstream_actor_ids = BTreeMap::new();
1805        upstream_actor_ids.insert(
1806            TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1807            HashSet::from_iter([(actor_id + 100)]),
1808        );
1809        upstream_actor_ids.insert(
1810            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1811            HashSet::from_iter([(actor_id + 200)]),
1812        );
1813        upstream_actor_ids
1814    }
1815
1816    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1817        let mut input = vec![];
1818        for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1819            input.push(PbStreamNode {
1820                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1821                    upstream_fragment_id: *upstream_fragment_id as _,
1822                    ..Default::default()
1823                }))),
1824                ..Default::default()
1825            });
1826        }
1827
1828        PbStreamNode {
1829            input,
1830            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1831            ..Default::default()
1832        }
1833    }
1834
1835    #[tokio::test]
1836    async fn test_extract_fragment() -> MetaResult<()> {
1837        let actor_count = 3u32;
1838        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1839            .map(|actor_id| {
1840                (
1841                    actor_id as _,
1842                    generate_upstream_actor_ids_for_actor(actor_id),
1843                )
1844            })
1845            .collect();
1846
1847        let actor_bitmaps = ActorMapping::new_uniform(
1848            (0..actor_count).map(|i| i as _),
1849            VirtualNode::COUNT_FOR_TEST,
1850        )
1851        .to_bitmaps();
1852
1853        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1854
1855        let pb_actors = (0..actor_count)
1856            .map(|actor_id| StreamActor {
1857                actor_id: actor_id as _,
1858                fragment_id: TEST_FRAGMENT_ID as _,
1859                vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1860                mview_definition: "".to_owned(),
1861                expr_context: Some(PbExprContext {
1862                    time_zone: String::from("America/New_York"),
1863                    strict_mode: false,
1864                }),
1865            })
1866            .collect_vec();
1867
1868        let pb_fragment = Fragment {
1869            fragment_id: TEST_FRAGMENT_ID as _,
1870            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1871            distribution_type: PbFragmentDistributionType::Hash as _,
1872            actors: pb_actors.clone(),
1873            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1874            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1875            nodes: stream_node.clone(),
1876        };
1877
1878        let pb_actor_status = (0..actor_count)
1879            .map(|actor_id| {
1880                (
1881                    actor_id,
1882                    PbActorStatus {
1883                        location: PbActorLocation::from_worker(0),
1884                        state: PbActorState::Running as _,
1885                    },
1886                )
1887            })
1888            .collect();
1889
1890        let pb_actor_splits = Default::default();
1891
1892        let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
1893            TEST_JOB_ID,
1894            &pb_fragment,
1895            &pb_actor_status,
1896            &pb_actor_splits,
1897        )?;
1898
1899        check_fragment(fragment, pb_fragment);
1900        check_actors(
1901            actors,
1902            &upstream_actor_ids,
1903            pb_actors,
1904            Default::default(),
1905            &stream_node,
1906        );
1907
1908        Ok(())
1909    }
1910
1911    #[tokio::test]
1912    async fn test_compose_fragment() -> MetaResult<()> {
1913        let actor_count = 3u32;
1914
1915        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1916            .map(|actor_id| {
1917                (
1918                    actor_id as _,
1919                    generate_upstream_actor_ids_for_actor(actor_id),
1920                )
1921            })
1922            .collect();
1923
1924        let mut actor_bitmaps = ActorMapping::new_uniform(
1925            (0..actor_count).map(|i| i as _),
1926            VirtualNode::COUNT_FOR_TEST,
1927        )
1928        .to_bitmaps();
1929
1930        let actors = (0..actor_count)
1931            .map(|actor_id| {
1932                let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
1933                    splits: vec![PbConnectorSplit {
1934                        split_type: "dummy".to_owned(),
1935                        ..Default::default()
1936                    }],
1937                }));
1938
1939                #[expect(deprecated)]
1940                actor::Model {
1941                    actor_id: actor_id as ActorId,
1942                    fragment_id: TEST_FRAGMENT_ID,
1943                    status: ActorStatus::Running,
1944                    splits: actor_splits,
1945                    worker_id: 0,
1946                    upstream_actor_ids: Default::default(),
1947                    vnode_bitmap: actor_bitmaps
1948                        .remove(&actor_id)
1949                        .map(|bitmap| bitmap.to_protobuf())
1950                        .as_ref()
1951                        .map(VnodeBitmap::from),
1952                    expr_context: ExprContext::from(&PbExprContext {
1953                        time_zone: String::from("America/New_York"),
1954                        strict_mode: false,
1955                    }),
1956                }
1957            })
1958            .collect_vec();
1959
1960        let stream_node = {
1961            let template_actor = actors.first().cloned().unwrap();
1962
1963            let template_upstream_actor_ids = upstream_actor_ids
1964                .get(&(template_actor.actor_id as _))
1965                .unwrap();
1966
1967            generate_merger_stream_node(template_upstream_actor_ids)
1968        };
1969
1970        #[expect(deprecated)]
1971        let fragment = fragment::Model {
1972            fragment_id: TEST_FRAGMENT_ID,
1973            job_id: TEST_JOB_ID,
1974            fragment_type_mask: 0,
1975            distribution_type: DistributionType::Hash,
1976            stream_node: StreamNode::from(&stream_node),
1977            state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
1978            upstream_fragment_id: Default::default(),
1979            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1980        };
1981
1982        let (pb_fragment, pb_actor_status, pb_actor_splits) =
1983            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
1984
1985        assert_eq!(pb_actor_status.len(), actor_count as usize);
1986        assert_eq!(pb_actor_splits.len(), actor_count as usize);
1987
1988        let pb_actors = pb_fragment.actors.clone();
1989
1990        check_fragment(fragment, pb_fragment);
1991        check_actors(
1992            actors,
1993            &upstream_actor_ids,
1994            pb_actors,
1995            pb_actor_splits,
1996            &stream_node,
1997        );
1998
1999        Ok(())
2000    }
2001
2002    fn check_actors(
2003        actors: Vec<actor::Model>,
2004        actor_upstreams: &FragmentActorUpstreams,
2005        pb_actors: Vec<StreamActor>,
2006        pb_actor_splits: HashMap<u32, PbConnectorSplits>,
2007        stream_node: &PbStreamNode,
2008    ) {
2009        for (
2010            actor::Model {
2011                actor_id,
2012                fragment_id,
2013                status,
2014                splits,
2015                worker_id: _,
2016                vnode_bitmap,
2017                expr_context,
2018                ..
2019            },
2020            StreamActor {
2021                actor_id: pb_actor_id,
2022                fragment_id: pb_fragment_id,
2023                vnode_bitmap: pb_vnode_bitmap,
2024                mview_definition,
2025                expr_context: pb_expr_context,
2026                ..
2027            },
2028        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2029        {
2030            assert_eq!(actor_id, pb_actor_id as ActorId);
2031            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2032
2033            assert_eq!(
2034                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2035                pb_vnode_bitmap,
2036            );
2037
2038            assert_eq!(mview_definition, "");
2039
2040            visit_stream_node_body(stream_node, |body| {
2041                if let PbNodeBody::Merge(m) = body {
2042                    assert!(
2043                        actor_upstreams
2044                            .get(&(actor_id as _))
2045                            .unwrap()
2046                            .contains_key(&m.upstream_fragment_id)
2047                    );
2048                }
2049            });
2050
2051            assert_eq!(status, ActorStatus::Running);
2052
2053            assert_eq!(
2054                splits,
2055                pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
2056            );
2057
2058            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2059        }
2060    }
2061
2062    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2063        let Fragment {
2064            fragment_id,
2065            fragment_type_mask,
2066            distribution_type: pb_distribution_type,
2067            actors: _,
2068            state_table_ids: pb_state_table_ids,
2069            maybe_vnode_count: _,
2070            nodes,
2071        } = pb_fragment;
2072
2073        assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2074        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2075        assert_eq!(
2076            pb_distribution_type,
2077            PbFragmentDistributionType::from(fragment.distribution_type)
2078        );
2079
2080        assert_eq!(
2081            pb_state_table_ids,
2082            fragment.state_table_ids.into_u32_array()
2083        );
2084        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2085    }
2086}