risingwave_meta/controller/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
25use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut};
26use risingwave_connector::source::SplitImpl;
27use risingwave_meta_model::actor::ActorStatus;
28use risingwave_meta_model::fragment::DistributionType;
29use risingwave_meta_model::object::ObjectType;
30use risingwave_meta_model::prelude::{
31    Actor, Fragment as FragmentModel, FragmentRelation, Sink, StreamingJob,
32};
33use risingwave_meta_model::{
34    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
35    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
36    VnodeBitmap, WorkerId, actor, database, fragment, fragment_relation, object, sink, source,
37    streaming_job, table,
38};
39use risingwave_meta_model_migration::{Alias, SelectStatement};
40use risingwave_pb::common::PbActorLocation;
41use risingwave_pb::meta::subscribe_response::{
42    Info as NotificationInfo, Operation as NotificationOperation,
43};
44use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
45use risingwave_pb::meta::table_fragments::fragment::{
46    FragmentDistributionType, PbFragmentDistributionType,
47};
48use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
49use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping};
50use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
51use risingwave_pb::stream_plan::stream_node::NodeBody;
52use risingwave_pb::stream_plan::{
53    PbDispatchOutputMapping, PbDispatcherType, PbFragmentTypeFlag, PbStreamContext, PbStreamNode,
54    PbStreamScanType, StreamScanType,
55};
56use sea_orm::ActiveValue::Set;
57use sea_orm::sea_query::Expr;
58use sea_orm::{
59    ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, ModelTrait, PaginatorTrait,
60    QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
61};
62use serde::{Deserialize, Serialize};
63use tracing::debug;
64
65use crate::barrier::SnapshotBackfillInfo;
66use crate::controller::catalog::{CatalogController, CatalogControllerInner};
67use crate::controller::scale::resolve_streaming_job_definition;
68use crate::controller::utils::{
69    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
70    get_fragment_mappings, rebuild_fragment_mapping_from_actors,
71    resolve_no_shuffle_actor_dispatcher,
72};
73use crate::manager::LocalNotification;
74use crate::model::{
75    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
76    StreamActor, StreamContext, StreamJobFragments, TableParallelism,
77};
78use crate::stream::{SplitAssignment, build_actor_split_impls};
79use crate::{MetaError, MetaResult, model};
80
81/// Some information of running (inflight) actors.
82#[derive(Clone, Debug)]
83pub struct InflightActorInfo {
84    pub worker_id: WorkerId,
85    pub vnode_bitmap: Option<Bitmap>,
86}
87
88#[derive(Clone, Debug)]
89pub struct InflightFragmentInfo {
90    pub fragment_id: crate::model::FragmentId,
91    pub distribution_type: DistributionType,
92    pub nodes: PbStreamNode,
93    pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
94    pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
95}
96
97#[derive(Clone, Debug)]
98pub struct FragmentParallelismInfo {
99    pub distribution_type: FragmentDistributionType,
100    pub actor_count: usize,
101    pub vnode_count: usize,
102}
103
104#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
105#[serde(rename_all = "camelCase")] // for dashboard
106pub struct StreamingJobInfo {
107    pub job_id: ObjectId,
108    pub obj_type: ObjectType,
109    pub name: String,
110    pub job_status: JobStatus,
111    pub parallelism: StreamingParallelism,
112    pub max_parallelism: i32,
113    pub resource_group: String,
114}
115
116impl CatalogControllerInner {
117    /// List all fragment vnode mapping info for all CREATED streaming jobs.
118    pub async fn all_running_fragment_mappings(
119        &self,
120    ) -> MetaResult<impl Iterator<Item = FragmentWorkerSlotMapping> + '_> {
121        let txn = self.db.begin().await?;
122
123        let job_ids: Vec<ObjectId> = StreamingJob::find()
124            .select_only()
125            .column(streaming_job::Column::JobId)
126            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
127            .into_tuple()
128            .all(&txn)
129            .await?;
130
131        let mut result = vec![];
132        for job_id in job_ids {
133            let mappings = get_fragment_mappings(&txn, job_id).await?;
134
135            result.extend(mappings.into_iter());
136        }
137
138        Ok(result.into_iter())
139    }
140}
141
142impl CatalogController {
143    pub(crate) async fn notify_fragment_mapping(
144        &self,
145        operation: NotificationOperation,
146        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
147    ) {
148        let fragment_ids = fragment_mappings
149            .iter()
150            .map(|mapping| mapping.fragment_id)
151            .collect_vec();
152        // notify all fragment mappings to frontend.
153        for fragment_mapping in fragment_mappings {
154            self.env
155                .notification_manager()
156                .notify_frontend(
157                    operation,
158                    NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
159                )
160                .await;
161        }
162
163        // update serving vnode mappings.
164        match operation {
165            NotificationOperation::Add | NotificationOperation::Update => {
166                self.env
167                    .notification_manager()
168                    .notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
169                        fragment_ids,
170                    ))
171                    .await;
172            }
173            NotificationOperation::Delete => {
174                self.env
175                    .notification_manager()
176                    .notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
177                        fragment_ids,
178                    ))
179                    .await;
180            }
181            op => {
182                tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
183            }
184        }
185    }
186
187    pub fn extract_fragment_and_actors_from_fragments(
188        stream_job_fragments: &StreamJobFragments,
189    ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
190        stream_job_fragments
191            .fragments
192            .values()
193            .map(|fragment| {
194                Self::extract_fragment_and_actors_for_new_job(
195                    stream_job_fragments.stream_job_id.table_id as _,
196                    fragment,
197                    &stream_job_fragments.actor_status,
198                    &stream_job_fragments.actor_splits,
199                )
200            })
201            .try_collect()
202    }
203
204    fn extract_fragment_and_actors_for_new_job(
205        job_id: ObjectId,
206        fragment: &Fragment,
207        actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
208        actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
209    ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
210        let vnode_count = fragment.vnode_count();
211        let Fragment {
212            fragment_id: pb_fragment_id,
213            fragment_type_mask: pb_fragment_type_mask,
214            distribution_type: pb_distribution_type,
215            actors: pb_actors,
216            state_table_ids: pb_state_table_ids,
217            nodes,
218            ..
219        } = fragment;
220
221        let state_table_ids = pb_state_table_ids.clone().into();
222
223        assert!(!pb_actors.is_empty());
224
225        let stream_node = {
226            let mut stream_node = nodes.clone();
227            visit_stream_node_mut(&mut stream_node, |body| {
228                #[expect(deprecated)]
229                if let NodeBody::Merge(m) = body {
230                    m.upstream_actor_id = vec![];
231                }
232            });
233
234            stream_node
235        };
236
237        let mut actors = vec![];
238
239        for actor in pb_actors {
240            let StreamActor {
241                actor_id,
242                fragment_id,
243                vnode_bitmap,
244                mview_definition: _,
245                expr_context: pb_expr_context,
246                ..
247            } = actor;
248
249            let splits = actor_splits.get(actor_id).map(|splits| {
250                ConnectorSplits::from(&PbConnectorSplits {
251                    splits: splits.iter().map(ConnectorSplit::from).collect(),
252                })
253            });
254            let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
255                anyhow::anyhow!(
256                    "actor {} in fragment {} has no actor_status",
257                    actor_id,
258                    fragment_id
259                )
260            })?;
261
262            let worker_id = status.worker_id() as _;
263
264            let pb_expr_context = pb_expr_context
265                .as_ref()
266                .expect("no expression context found");
267
268            #[expect(deprecated)]
269            actors.push(actor::Model {
270                actor_id: *actor_id as _,
271                fragment_id: *fragment_id as _,
272                status: status.get_state().unwrap().into(),
273                splits,
274                worker_id,
275                upstream_actor_ids: Default::default(),
276                vnode_bitmap: vnode_bitmap
277                    .as_ref()
278                    .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
279                expr_context: ExprContext::from(pb_expr_context),
280            });
281        }
282
283        let stream_node = StreamNode::from(&stream_node);
284
285        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
286            .unwrap()
287            .into();
288
289        #[expect(deprecated)]
290        let fragment = fragment::Model {
291            fragment_id: *pb_fragment_id as _,
292            job_id,
293            fragment_type_mask: *pb_fragment_type_mask as _,
294            distribution_type,
295            stream_node,
296            state_table_ids,
297            upstream_fragment_id: Default::default(),
298            vnode_count: vnode_count as _,
299        };
300
301        Ok((fragment, actors))
302    }
303
304    pub fn compose_table_fragments(
305        table_id: u32,
306        state: PbState,
307        ctx: Option<PbStreamContext>,
308        fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
309        parallelism: StreamingParallelism,
310        max_parallelism: usize,
311        job_definition: Option<String>,
312    ) -> MetaResult<StreamJobFragments> {
313        let mut pb_fragments = BTreeMap::new();
314        let mut pb_actor_splits = HashMap::new();
315        let mut pb_actor_status = BTreeMap::new();
316
317        for (fragment, actors) in fragments {
318            let (fragment, fragment_actor_status, fragment_actor_splits) =
319                Self::compose_fragment(fragment, actors, job_definition.clone())?;
320
321            pb_fragments.insert(fragment.fragment_id, fragment);
322
323            pb_actor_splits.extend(build_actor_split_impls(&fragment_actor_splits));
324            pb_actor_status.extend(fragment_actor_status.into_iter());
325        }
326
327        let table_fragments = StreamJobFragments {
328            stream_job_id: table_id.into(),
329            state: state as _,
330            fragments: pb_fragments,
331            actor_status: pb_actor_status,
332            actor_splits: pb_actor_splits,
333            ctx: ctx
334                .as_ref()
335                .map(StreamContext::from_protobuf)
336                .unwrap_or_default(),
337            assigned_parallelism: match parallelism {
338                StreamingParallelism::Custom => TableParallelism::Custom,
339                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
340                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
341            },
342            max_parallelism,
343        };
344
345        Ok(table_fragments)
346    }
347
348    #[allow(clippy::type_complexity)]
349    pub(crate) fn compose_fragment(
350        fragment: fragment::Model,
351        actors: Vec<actor::Model>,
352        job_definition: Option<String>,
353    ) -> MetaResult<(
354        Fragment,
355        HashMap<crate::model::ActorId, PbActorStatus>,
356        HashMap<crate::model::ActorId, PbConnectorSplits>,
357    )> {
358        let fragment::Model {
359            fragment_id,
360            fragment_type_mask,
361            distribution_type,
362            stream_node,
363            state_table_ids,
364            vnode_count,
365            ..
366        } = fragment;
367
368        let stream_node = stream_node.to_protobuf();
369        let mut upstream_fragments = HashSet::new();
370        visit_stream_node(&stream_node, |body| {
371            if let NodeBody::Merge(m) = body {
372                assert!(
373                    upstream_fragments.insert(m.upstream_fragment_id),
374                    "non-duplicate upstream fragment"
375                );
376            }
377        });
378
379        let mut pb_actors = vec![];
380
381        let mut pb_actor_status = HashMap::new();
382        let mut pb_actor_splits = HashMap::new();
383
384        for actor in actors {
385            if actor.fragment_id != fragment_id {
386                bail!(
387                    "fragment id {} from actor {} is different from fragment {}",
388                    actor.fragment_id,
389                    actor.actor_id,
390                    fragment_id
391                )
392            }
393
394            let actor::Model {
395                actor_id,
396                fragment_id,
397                status,
398                worker_id,
399                splits,
400                vnode_bitmap,
401                expr_context,
402                ..
403            } = actor;
404
405            let vnode_bitmap =
406                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
407            let pb_expr_context = Some(expr_context.to_protobuf());
408
409            pb_actor_status.insert(
410                actor_id as _,
411                PbActorStatus {
412                    location: PbActorLocation::from_worker(worker_id as u32),
413                    state: PbActorState::from(status) as _,
414                },
415            );
416
417            if let Some(splits) = splits {
418                pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
419            }
420
421            pb_actors.push(StreamActor {
422                actor_id: actor_id as _,
423                fragment_id: fragment_id as _,
424                vnode_bitmap,
425                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
426                expr_context: pb_expr_context,
427            })
428        }
429
430        let pb_state_table_ids = state_table_ids.into_u32_array();
431        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
432        let pb_fragment = Fragment {
433            fragment_id: fragment_id as _,
434            fragment_type_mask: fragment_type_mask as _,
435            distribution_type: pb_distribution_type,
436            actors: pb_actors,
437            state_table_ids: pb_state_table_ids,
438            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
439            nodes: stream_node,
440        };
441
442        Ok((pb_fragment, pb_actor_status, pb_actor_splits))
443    }
444
445    pub async fn running_fragment_parallelisms(
446        &self,
447        id_filter: Option<HashSet<FragmentId>>,
448    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
449        let inner = self.inner.read().await;
450
451        let query_alias = Alias::new("fragment_actor_count");
452        let count_alias = Alias::new("count");
453
454        let mut query = SelectStatement::new()
455            .column(actor::Column::FragmentId)
456            .expr_as(actor::Column::ActorId.count(), count_alias.clone())
457            .from(Actor)
458            .group_by_col(actor::Column::FragmentId)
459            .to_owned();
460
461        if let Some(id_filter) = id_filter {
462            query.cond_having(actor::Column::FragmentId.is_in(id_filter));
463        }
464
465        let outer = SelectStatement::new()
466            .column((FragmentModel, fragment::Column::FragmentId))
467            .column(count_alias)
468            .column(fragment::Column::DistributionType)
469            .column(fragment::Column::VnodeCount)
470            .from_subquery(query.to_owned(), query_alias.clone())
471            .inner_join(
472                FragmentModel,
473                Expr::col((query_alias, actor::Column::FragmentId))
474                    .equals((FragmentModel, fragment::Column::FragmentId)),
475            )
476            .to_owned();
477
478        let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
479            Selector::<SelectGetableTuple<(FragmentId, i64, DistributionType, i32)>>::into_tuple(
480                outer.to_owned(),
481            )
482            .all(&inner.db)
483            .await?;
484
485        Ok(fragment_parallelisms
486            .into_iter()
487            .map(|(fragment_id, count, distribution_type, vnode_count)| {
488                (
489                    fragment_id,
490                    FragmentParallelismInfo {
491                        distribution_type: distribution_type.into(),
492                        actor_count: count as usize,
493                        vnode_count: vnode_count as usize,
494                    },
495                )
496            })
497            .collect())
498    }
499
500    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
501        let inner = self.inner.read().await;
502        let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
503            .select_only()
504            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
505            .into_tuple()
506            .all(&inner.db)
507            .await?;
508        Ok(fragment_jobs.into_iter().collect())
509    }
510
511    pub async fn get_fragment_job_id(
512        &self,
513        fragment_ids: Vec<FragmentId>,
514    ) -> MetaResult<Vec<ObjectId>> {
515        let inner = self.inner.read().await;
516
517        let object_ids: Vec<ObjectId> = FragmentModel::find()
518            .select_only()
519            .column(fragment::Column::JobId)
520            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
521            .into_tuple()
522            .all(&inner.db)
523            .await?;
524
525        Ok(object_ids)
526    }
527
528    pub async fn get_fragment_desc_by_id(
529        &self,
530        fragment_id: FragmentId,
531    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
532        let inner = self.inner.read().await;
533
534        // Get the fragment description
535        let fragment_opt = FragmentModel::find()
536            .select_only()
537            .columns([
538                fragment::Column::FragmentId,
539                fragment::Column::JobId,
540                fragment::Column::FragmentTypeMask,
541                fragment::Column::DistributionType,
542                fragment::Column::StateTableIds,
543                fragment::Column::VnodeCount,
544                fragment::Column::StreamNode,
545            ])
546            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
547            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
548            .filter(fragment::Column::FragmentId.eq(fragment_id))
549            .group_by(fragment::Column::FragmentId)
550            .into_model::<FragmentDesc>()
551            .one(&inner.db)
552            .await?;
553
554        let Some(fragment) = fragment_opt else {
555            return Ok(None); // Fragment not found
556        };
557
558        // Get upstream fragments
559        let upstreams: Vec<FragmentId> = FragmentRelation::find()
560            .select_only()
561            .column(fragment_relation::Column::SourceFragmentId)
562            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
563            .into_tuple()
564            .all(&inner.db)
565            .await?;
566
567        Ok(Some((fragment, upstreams)))
568    }
569
570    pub async fn list_fragment_database_ids(
571        &self,
572        select_fragment_ids: Option<Vec<FragmentId>>,
573    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
574        let inner = self.inner.read().await;
575        let select = FragmentModel::find()
576            .select_only()
577            .column(fragment::Column::FragmentId)
578            .column(object::Column::DatabaseId)
579            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
580        let select = if let Some(select_fragment_ids) = select_fragment_ids {
581            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
582        } else {
583            select
584        };
585        Ok(select.into_tuple().all(&inner.db).await?)
586    }
587
588    pub async fn get_job_fragments_by_id(
589        &self,
590        job_id: ObjectId,
591    ) -> MetaResult<StreamJobFragments> {
592        let inner = self.inner.read().await;
593        let fragment_actors = FragmentModel::find()
594            .find_with_related(Actor)
595            .filter(fragment::Column::JobId.eq(job_id))
596            .all(&inner.db)
597            .await?;
598
599        let job_info = StreamingJob::find_by_id(job_id)
600            .one(&inner.db)
601            .await?
602            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
603
604        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
605            .await?
606            .remove(&job_id);
607
608        Self::compose_table_fragments(
609            job_id as _,
610            job_info.job_status.into(),
611            job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
612            fragment_actors,
613            job_info.parallelism.clone(),
614            job_info.max_parallelism as _,
615            job_definition,
616        )
617    }
618
619    pub async fn get_fragment_actor_dispatchers(
620        &self,
621        fragment_ids: Vec<FragmentId>,
622    ) -> MetaResult<FragmentActorDispatchers> {
623        let inner = self.inner.read().await;
624        get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
625    }
626
627    pub async fn get_fragment_downstream_relations(
628        &self,
629        fragment_ids: Vec<FragmentId>,
630    ) -> MetaResult<FragmentDownstreamRelation> {
631        let inner = self.inner.read().await;
632        let mut stream = FragmentRelation::find()
633            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
634            .stream(&inner.db)
635            .await?;
636        let mut relations = FragmentDownstreamRelation::new();
637        while let Some(relation) = stream.try_next().await? {
638            relations
639                .entry(relation.source_fragment_id as _)
640                .or_default()
641                .push(DownstreamFragmentRelation {
642                    downstream_fragment_id: relation.target_fragment_id as _,
643                    dispatcher_type: relation.dispatcher_type,
644                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
645                    output_mapping: PbDispatchOutputMapping {
646                        indices: relation.output_indices.into_u32_array(),
647                        types: relation
648                            .output_type_mapping
649                            .unwrap_or_default()
650                            .to_protobuf(),
651                    },
652                });
653        }
654        Ok(relations)
655    }
656
657    pub async fn get_job_fragment_backfill_scan_type(
658        &self,
659        job_id: ObjectId,
660    ) -> MetaResult<HashMap<model::FragmentId, PbStreamScanType>> {
661        let inner = self.inner.read().await;
662        let fragments: Vec<_> = FragmentModel::find()
663            .filter(fragment::Column::JobId.eq(job_id))
664            .all(&inner.db)
665            .await?;
666
667        let mut result = HashMap::new();
668
669        for fragment::Model {
670            fragment_id,
671            stream_node,
672            ..
673        } in fragments
674        {
675            let stream_node = stream_node.to_protobuf();
676            visit_stream_node(&stream_node, |body| {
677                if let NodeBody::StreamScan(node) = body {
678                    match node.stream_scan_type() {
679                        StreamScanType::Unspecified => {}
680                        scan_type => {
681                            result.insert(fragment_id as model::FragmentId, scan_type);
682                        }
683                    }
684                }
685            });
686        }
687
688        Ok(result)
689    }
690
691    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
692        let inner = self.inner.read().await;
693        let job_states = StreamingJob::find()
694            .select_only()
695            .column(streaming_job::Column::JobId)
696            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
697            .join(JoinType::InnerJoin, object::Relation::Database2.def())
698            .column(object::Column::ObjType)
699            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
700            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
701            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
702            .column_as(
703                Expr::if_null(
704                    Expr::col((table::Entity, table::Column::Name)),
705                    Expr::if_null(
706                        Expr::col((source::Entity, source::Column::Name)),
707                        Expr::if_null(
708                            Expr::col((sink::Entity, sink::Column::Name)),
709                            Expr::val("<unknown>"),
710                        ),
711                    ),
712                ),
713                "name",
714            )
715            .columns([
716                streaming_job::Column::JobStatus,
717                streaming_job::Column::Parallelism,
718                streaming_job::Column::MaxParallelism,
719            ])
720            .column_as(
721                Expr::if_null(
722                    Expr::col((
723                        streaming_job::Entity,
724                        streaming_job::Column::SpecificResourceGroup,
725                    )),
726                    Expr::col((database::Entity, database::Column::ResourceGroup)),
727                ),
728                "resource_group",
729            )
730            .into_model()
731            .all(&inner.db)
732            .await?;
733        Ok(job_states)
734    }
735
736    pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
737        let inner = self.inner.read().await;
738        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
739            .select_only()
740            .column(streaming_job::Column::MaxParallelism)
741            .into_tuple()
742            .one(&inner.db)
743            .await?
744            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
745        Ok(max_parallelism as usize)
746    }
747
748    /// Get all actor ids in the target streaming jobs.
749    pub async fn get_job_actor_mapping(
750        &self,
751        job_ids: Vec<ObjectId>,
752    ) -> MetaResult<HashMap<ObjectId, Vec<ActorId>>> {
753        let inner = self.inner.read().await;
754        let job_actors: Vec<(ObjectId, ActorId)> = Actor::find()
755            .select_only()
756            .column(fragment::Column::JobId)
757            .column(actor::Column::ActorId)
758            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
759            .filter(fragment::Column::JobId.is_in(job_ids))
760            .into_tuple()
761            .all(&inner.db)
762            .await?;
763        Ok(job_actors.into_iter().into_group_map())
764    }
765
766    /// Try to get internal table ids of each streaming job, used by metrics collection.
767    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
768        if let Ok(inner) = self.inner.try_read() {
769            if let Ok(job_state_tables) = FragmentModel::find()
770                .select_only()
771                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
772                .into_tuple::<(ObjectId, I32Array)>()
773                .all(&inner.db)
774                .await
775            {
776                let mut job_internal_table_ids = HashMap::new();
777                for (job_id, state_table_ids) in job_state_tables {
778                    job_internal_table_ids
779                        .entry(job_id)
780                        .or_insert_with(Vec::new)
781                        .extend(state_table_ids.into_inner());
782                }
783                return Some(job_internal_table_ids.into_iter().collect());
784            }
785        }
786        None
787    }
788
789    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
790        let inner = self.inner.read().await;
791        let count = FragmentModel::find().count(&inner.db).await?;
792        Ok(count > 0)
793    }
794
795    pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
796        let inner = self.inner.read().await;
797        let actor_cnt: Vec<(WorkerId, i64)> = Actor::find()
798            .select_only()
799            .column(actor::Column::WorkerId)
800            .column_as(actor::Column::ActorId.count(), "count")
801            .group_by(actor::Column::WorkerId)
802            .into_tuple()
803            .all(&inner.db)
804            .await?;
805
806        Ok(actor_cnt
807            .into_iter()
808            .map(|(worker_id, count)| (worker_id, count as usize))
809            .collect())
810    }
811
812    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
813    pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
814        let inner = self.inner.read().await;
815        let jobs = StreamingJob::find().all(&inner.db).await?;
816
817        let mut job_definition = resolve_streaming_job_definition(
818            &inner.db,
819            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
820        )
821        .await?;
822
823        let mut table_fragments = BTreeMap::new();
824        for job in jobs {
825            let fragment_actors = FragmentModel::find()
826                .find_with_related(Actor)
827                .filter(fragment::Column::JobId.eq(job.job_id))
828                .all(&inner.db)
829                .await?;
830
831            table_fragments.insert(
832                job.job_id as ObjectId,
833                Self::compose_table_fragments(
834                    job.job_id as _,
835                    job.job_status.into(),
836                    job.timezone.map(|tz| PbStreamContext { timezone: tz }),
837                    fragment_actors,
838                    job.parallelism.clone(),
839                    job.max_parallelism as _,
840                    job_definition.remove(&job.job_id),
841                )?,
842            );
843        }
844
845        Ok(table_fragments)
846    }
847
848    pub async fn upstream_fragments(
849        &self,
850        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
851    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
852        let inner = self.inner.read().await;
853        let mut stream = FragmentRelation::find()
854            .select_only()
855            .columns([
856                fragment_relation::Column::SourceFragmentId,
857                fragment_relation::Column::TargetFragmentId,
858            ])
859            .filter(
860                fragment_relation::Column::TargetFragmentId
861                    .is_in(fragment_ids.map(|id| id as FragmentId)),
862            )
863            .into_tuple::<(FragmentId, FragmentId)>()
864            .stream(&inner.db)
865            .await?;
866        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
867        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
868            upstream_fragments
869                .entry(downstream_fragment_id as crate::model::FragmentId)
870                .or_default()
871                .insert(upstream_fragment_id as crate::model::FragmentId);
872        }
873        Ok(upstream_fragments)
874    }
875
876    pub async fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
877        let inner = self.inner.read().await;
878        let actor_locations: Vec<PartialActorLocation> =
879            Actor::find().into_partial_model().all(&inner.db).await?;
880        Ok(actor_locations)
881    }
882
883    pub async fn list_actor_info(
884        &self,
885    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
886        let inner = self.inner.read().await;
887        let actor_locations: Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)> =
888            Actor::find()
889                .join(JoinType::LeftJoin, actor::Relation::Fragment.def())
890                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
891                .select_only()
892                .columns([actor::Column::ActorId, actor::Column::FragmentId])
893                .column_as(object::Column::Oid, "job_id")
894                .column_as(object::Column::SchemaId, "schema_id")
895                .column_as(object::Column::ObjType, "type")
896                .into_tuple()
897                .all(&inner.db)
898                .await?;
899        Ok(actor_locations)
900    }
901
902    pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
903        let inner = self.inner.read().await;
904
905        let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
906            .select_only()
907            .filter(actor::Column::Splits.is_not_null())
908            .columns([actor::Column::ActorId, actor::Column::FragmentId])
909            .into_tuple()
910            .all(&inner.db)
911            .await?;
912
913        Ok(source_actors)
914    }
915
916    pub async fn list_rw_table_scan_fragments(
917        &self,
918    ) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
919        let inner = self.inner.read().await;
920        let mut result = Vec::new();
921        let fragments = FragmentModel::find()
922            .select_only()
923            .columns([
924                fragment::Column::FragmentId,
925                fragment::Column::JobId,
926                fragment::Column::FragmentTypeMask,
927                fragment::Column::DistributionType,
928                fragment::Column::StateTableIds,
929                fragment::Column::VnodeCount,
930                fragment::Column::StreamNode,
931            ])
932            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
933            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
934            .join(JoinType::LeftJoin, fragment::Relation::Object.def())
935            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
936            .filter(
937                streaming_job::Column::JobStatus
938                    .eq(JobStatus::Initial)
939                    .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
940            )
941            .group_by(fragment::Column::FragmentId)
942            .into_model::<FragmentDesc>()
943            .all(&inner.db)
944            .await?;
945        for fragment in fragments {
946            let upstreams: Vec<FragmentId> = FragmentRelation::find()
947                .select_only()
948                .column(fragment_relation::Column::SourceFragmentId)
949                .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
950                .into_tuple()
951                .all(&inner.db)
952                .await?;
953            result.push((fragment, upstreams));
954        }
955        Ok(result)
956    }
957
958    pub async fn list_fragment_descs(&self) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
959        let inner = self.inner.read().await;
960        let mut result = Vec::new();
961        let fragments = FragmentModel::find()
962            .select_only()
963            .columns([
964                fragment::Column::FragmentId,
965                fragment::Column::JobId,
966                fragment::Column::FragmentTypeMask,
967                fragment::Column::DistributionType,
968                fragment::Column::StateTableIds,
969                fragment::Column::VnodeCount,
970                fragment::Column::StreamNode,
971            ])
972            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
973            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
974            .group_by(fragment::Column::FragmentId)
975            .into_model::<FragmentDesc>()
976            .all(&inner.db)
977            .await?;
978        for fragment in fragments {
979            let upstreams: Vec<FragmentId> = FragmentRelation::find()
980                .select_only()
981                .column(fragment_relation::Column::SourceFragmentId)
982                .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
983                .into_tuple()
984                .all(&inner.db)
985                .await?;
986            result.push((fragment, upstreams));
987        }
988        Ok(result)
989    }
990
991    pub async fn list_sink_actor_mapping(
992        &self,
993    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
994        let inner = self.inner.read().await;
995        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
996            .select_only()
997            .columns([sink::Column::SinkId, sink::Column::Name])
998            .into_tuple()
999            .all(&inner.db)
1000            .await?;
1001        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1002        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1003
1004        let actor_with_type: Vec<(ActorId, SinkId, i32)> = Actor::find()
1005            .select_only()
1006            .column(actor::Column::ActorId)
1007            .columns([fragment::Column::JobId, fragment::Column::FragmentTypeMask])
1008            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1009            .filter(fragment::Column::JobId.is_in(sink_ids))
1010            .into_tuple()
1011            .all(&inner.db)
1012            .await?;
1013
1014        let mut sink_actor_mapping = HashMap::new();
1015        for (actor_id, sink_id, type_mask) in actor_with_type {
1016            if type_mask & PbFragmentTypeFlag::Sink as i32 != 0 {
1017                sink_actor_mapping
1018                    .entry(sink_id)
1019                    .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1020                    .1
1021                    .push(actor_id);
1022            }
1023        }
1024
1025        Ok(sink_actor_mapping)
1026    }
1027
1028    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1029        let inner = self.inner.read().await;
1030        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1031            .select_only()
1032            .columns([
1033                fragment::Column::FragmentId,
1034                fragment::Column::JobId,
1035                fragment::Column::StateTableIds,
1036            ])
1037            .into_partial_model()
1038            .all(&inner.db)
1039            .await?;
1040        Ok(fragment_state_tables)
1041    }
1042
1043    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1044    /// collected
1045    pub async fn load_all_actors(
1046        &self,
1047        database_id: Option<DatabaseId>,
1048    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
1049    {
1050        let inner = self.inner.read().await;
1051        let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
1052        let filter_condition = if let Some(database_id) = database_id {
1053            filter_condition.and(object::Column::DatabaseId.eq(database_id))
1054        } else {
1055            filter_condition
1056        };
1057        #[expect(clippy::type_complexity)]
1058        let mut actor_info_stream: BoxStream<
1059            '_,
1060            Result<
1061                (
1062                    ActorId,
1063                    WorkerId,
1064                    Option<VnodeBitmap>,
1065                    FragmentId,
1066                    StreamNode,
1067                    I32Array,
1068                    DistributionType,
1069                    DatabaseId,
1070                    ObjectId,
1071                ),
1072                _,
1073            >,
1074        > = Actor::find()
1075            .select_only()
1076            .column(actor::Column::ActorId)
1077            .column(actor::Column::WorkerId)
1078            .column(actor::Column::VnodeBitmap)
1079            .column(fragment::Column::FragmentId)
1080            .column(fragment::Column::StreamNode)
1081            .column(fragment::Column::StateTableIds)
1082            .column(fragment::Column::DistributionType)
1083            .column(object::Column::DatabaseId)
1084            .column(object::Column::Oid)
1085            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1086            .join(JoinType::InnerJoin, fragment::Relation::Object.def())
1087            .filter(filter_condition)
1088            .into_tuple()
1089            .stream(&inner.db)
1090            .await?;
1091
1092        let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1093            HashMap::new();
1094
1095        while let Some((
1096            actor_id,
1097            worker_id,
1098            vnode_bitmap,
1099            fragment_id,
1100            node,
1101            state_table_ids,
1102            distribution_type,
1103            database_id,
1104            job_id,
1105        )) = actor_info_stream.try_next().await?
1106        {
1107            let fragment_infos = database_fragment_infos
1108                .entry(database_id)
1109                .or_default()
1110                .entry(job_id)
1111                .or_default();
1112            let state_table_ids = state_table_ids.into_inner();
1113            let state_table_ids = state_table_ids
1114                .into_iter()
1115                .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1116                .collect();
1117            let actor_info = InflightActorInfo {
1118                worker_id,
1119                vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1120            };
1121            match fragment_infos.entry(fragment_id) {
1122                Entry::Occupied(mut entry) => {
1123                    let info: &mut InflightFragmentInfo = entry.get_mut();
1124                    assert_eq!(info.state_table_ids, state_table_ids);
1125                    assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1126                }
1127                Entry::Vacant(entry) => {
1128                    entry.insert(InflightFragmentInfo {
1129                        fragment_id: fragment_id as _,
1130                        distribution_type,
1131                        nodes: node.to_protobuf(),
1132                        actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1133                        state_table_ids,
1134                    });
1135                }
1136            }
1137        }
1138
1139        debug!(?database_fragment_infos, "reload all actors");
1140
1141        Ok(database_fragment_infos)
1142    }
1143
1144    pub async fn migrate_actors(
1145        &self,
1146        plan: HashMap<WorkerSlotId, WorkerSlotId>,
1147    ) -> MetaResult<()> {
1148        let inner = self.inner.read().await;
1149        let txn = inner.db.begin().await?;
1150
1151        let actors: Vec<(
1152            FragmentId,
1153            DistributionType,
1154            ActorId,
1155            Option<VnodeBitmap>,
1156            WorkerId,
1157            ActorStatus,
1158        )> = Actor::find()
1159            .select_only()
1160            .columns([
1161                fragment::Column::FragmentId,
1162                fragment::Column::DistributionType,
1163            ])
1164            .columns([
1165                actor::Column::ActorId,
1166                actor::Column::VnodeBitmap,
1167                actor::Column::WorkerId,
1168                actor::Column::Status,
1169            ])
1170            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1171            .into_tuple()
1172            .all(&txn)
1173            .await?;
1174
1175        let mut actor_locations = HashMap::new();
1176
1177        for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1178            if *status != ActorStatus::Running {
1179                tracing::warn!(
1180                    "skipping actor {} in fragment {} with status {:?}",
1181                    actor_id,
1182                    fragment_id,
1183                    status
1184                );
1185                continue;
1186            }
1187
1188            actor_locations
1189                .entry(*worker_id)
1190                .or_insert(HashMap::new())
1191                .entry(*fragment_id)
1192                .or_insert(BTreeSet::new())
1193                .insert(*actor_id);
1194        }
1195
1196        let expired_or_changed_workers: HashSet<_> =
1197            plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1198
1199        let mut actor_migration_plan = HashMap::new();
1200        for (worker, fragment) in actor_locations {
1201            if expired_or_changed_workers.contains(&worker) {
1202                for (fragment_id, actors) in fragment {
1203                    debug!(
1204                        "worker {} expired or changed, migrating fragment {}",
1205                        worker, fragment_id
1206                    );
1207                    let worker_slot_to_actor: HashMap<_, _> = actors
1208                        .iter()
1209                        .enumerate()
1210                        .map(|(idx, actor_id)| {
1211                            (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1212                        })
1213                        .collect();
1214
1215                    for (worker_slot, actor) in worker_slot_to_actor {
1216                        if let Some(target) = plan.get(&worker_slot) {
1217                            actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1218                        }
1219                    }
1220                }
1221            }
1222        }
1223
1224        for (actor, worker) in actor_migration_plan {
1225            Actor::update_many()
1226                .col_expr(
1227                    actor::Column::WorkerId,
1228                    Expr::value(Value::Int(Some(worker))),
1229                )
1230                .filter(actor::Column::ActorId.eq(actor))
1231                .exec(&txn)
1232                .await?;
1233        }
1234
1235        txn.commit().await?;
1236
1237        self.notify_fragment_mapping(
1238            NotificationOperation::Update,
1239            rebuild_fragment_mapping_from_actors(actors),
1240        )
1241        .await;
1242
1243        Ok(())
1244    }
1245
1246    pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1247        let inner = self.inner.read().await;
1248
1249        let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1250            .select_only()
1251            .columns([fragment::Column::FragmentId])
1252            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1253            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1254            .into_tuple()
1255            .all(&inner.db)
1256            .await?;
1257
1258        let mut actor_locations = HashMap::new();
1259
1260        for (fragment_id, _, worker_id) in actors {
1261            *actor_locations
1262                .entry(worker_id)
1263                .or_insert(HashMap::new())
1264                .entry(fragment_id)
1265                .or_insert(0_usize) += 1;
1266        }
1267
1268        let mut result = HashSet::new();
1269        for (worker_id, mapping) in actor_locations {
1270            let max_fragment_len = mapping.values().max().unwrap();
1271
1272            result
1273                .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1274        }
1275
1276        Ok(result)
1277    }
1278
1279    pub async fn all_node_actors(
1280        &self,
1281        include_inactive: bool,
1282    ) -> MetaResult<HashMap<WorkerId, Vec<StreamActor>>> {
1283        let inner = self.inner.read().await;
1284        let fragment_actors = if include_inactive {
1285            FragmentModel::find()
1286                .find_with_related(Actor)
1287                .all(&inner.db)
1288                .await?
1289        } else {
1290            FragmentModel::find()
1291                .find_with_related(Actor)
1292                .filter(actor::Column::Status.eq(ActorStatus::Running))
1293                .all(&inner.db)
1294                .await?
1295        };
1296
1297        let job_definitions = resolve_streaming_job_definition(
1298            &inner.db,
1299            &HashSet::from_iter(fragment_actors.iter().map(|(fragment, _)| fragment.job_id)),
1300        )
1301        .await?;
1302
1303        let mut node_actors = HashMap::new();
1304        for (fragment, actors) in fragment_actors {
1305            let job_id = fragment.job_id;
1306            let (table_fragments, actor_status, _) = Self::compose_fragment(
1307                fragment,
1308                actors,
1309                job_definitions.get(&(job_id as _)).cloned(),
1310            )?;
1311            for actor in table_fragments.actors {
1312                let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
1313                node_actors
1314                    .entry(node_id)
1315                    .or_insert_with(Vec::new)
1316                    .push(actor);
1317            }
1318        }
1319
1320        Ok(node_actors)
1321    }
1322
1323    pub async fn get_worker_actor_ids(
1324        &self,
1325        job_ids: Vec<ObjectId>,
1326    ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
1327        let inner = self.inner.read().await;
1328        let actor_workers: Vec<(ActorId, WorkerId)> = Actor::find()
1329            .select_only()
1330            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1331            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1332            .filter(fragment::Column::JobId.is_in(job_ids))
1333            .into_tuple()
1334            .all(&inner.db)
1335            .await?;
1336
1337        let mut worker_actors = BTreeMap::new();
1338        for (actor_id, worker_id) in actor_workers {
1339            worker_actors
1340                .entry(worker_id)
1341                .or_insert_with(Vec::new)
1342                .push(actor_id);
1343        }
1344
1345        Ok(worker_actors)
1346    }
1347
1348    pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1349        let inner = self.inner.read().await;
1350        let txn = inner.db.begin().await?;
1351        for assignments in split_assignment.values() {
1352            for (actor_id, splits) in assignments {
1353                let actor_splits = splits.iter().map(Into::into).collect_vec();
1354                Actor::update(actor::ActiveModel {
1355                    actor_id: Set(*actor_id as _),
1356                    splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1357                        splits: actor_splits,
1358                    }))),
1359                    ..Default::default()
1360                })
1361                .exec(&txn)
1362                .await
1363                .map_err(|err| {
1364                    if err == DbErr::RecordNotUpdated {
1365                        MetaError::catalog_id_not_found("actor_id", actor_id)
1366                    } else {
1367                        err.into()
1368                    }
1369                })?;
1370            }
1371        }
1372        txn.commit().await?;
1373
1374        Ok(())
1375    }
1376
1377    #[await_tree::instrument]
1378    pub async fn fill_snapshot_backfill_epoch(
1379        &self,
1380        fragment_ids: impl Iterator<Item = FragmentId>,
1381        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1382        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1383    ) -> MetaResult<()> {
1384        let inner = self.inner.write().await;
1385        let txn = inner.db.begin().await?;
1386        for fragment_id in fragment_ids {
1387            let fragment = FragmentModel::find_by_id(fragment_id)
1388                .one(&txn)
1389                .await?
1390                .context(format!("fragment {} not found", fragment_id))?;
1391            let mut node = fragment.stream_node.to_protobuf();
1392            if crate::stream::fill_snapshot_backfill_epoch(
1393                &mut node,
1394                snapshot_backfill_info,
1395                cross_db_snapshot_backfill_info,
1396            )? {
1397                let node = StreamNode::from(&node);
1398                FragmentModel::update(fragment::ActiveModel {
1399                    fragment_id: Set(fragment_id),
1400                    stream_node: Set(node),
1401                    ..Default::default()
1402                })
1403                .exec(&txn)
1404                .await?;
1405            }
1406        }
1407        txn.commit().await?;
1408        Ok(())
1409    }
1410
1411    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1412    pub async fn get_running_actors_of_fragment(
1413        &self,
1414        fragment_id: FragmentId,
1415    ) -> MetaResult<Vec<ActorId>> {
1416        let inner = self.inner.read().await;
1417        let actors: Vec<ActorId> = Actor::find()
1418            .select_only()
1419            .column(actor::Column::ActorId)
1420            .filter(actor::Column::FragmentId.eq(fragment_id))
1421            .filter(actor::Column::Status.eq(ActorStatus::Running))
1422            .into_tuple()
1423            .all(&inner.db)
1424            .await?;
1425        Ok(actors)
1426    }
1427
1428    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1429    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1430    pub async fn get_running_actors_for_source_backfill(
1431        &self,
1432        source_backfill_fragment_id: FragmentId,
1433        source_fragment_id: FragmentId,
1434    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1435        let inner = self.inner.read().await;
1436        let txn = inner.db.begin().await?;
1437
1438        let fragment_relation: DispatcherType = FragmentRelation::find()
1439            .select_only()
1440            .column(fragment_relation::Column::DispatcherType)
1441            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1442            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1443            .into_tuple()
1444            .one(&txn)
1445            .await?
1446            .ok_or_else(|| {
1447                anyhow!(
1448                    "no fragment connection from source fragment {} to source backfill fragment {}",
1449                    source_fragment_id,
1450                    source_backfill_fragment_id
1451                )
1452            })?;
1453
1454        if fragment_relation != DispatcherType::NoShuffle {
1455            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1456        }
1457
1458        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1459            let result: MetaResult<DistributionType> = try {
1460                FragmentModel::find_by_id(fragment_id)
1461                    .select_only()
1462                    .column(fragment::Column::DistributionType)
1463                    .into_tuple()
1464                    .one(txn)
1465                    .await?
1466                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1467            };
1468            result
1469        };
1470
1471        let source_backfill_distribution_type =
1472            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1473        let source_distribution_type =
1474            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1475
1476        let load_fragment_actor_distribution = |txn, fragment_id: FragmentId| async move {
1477            Actor::find()
1478                .select_only()
1479                .column(actor::Column::ActorId)
1480                .column(actor::Column::VnodeBitmap)
1481                .filter(actor::Column::FragmentId.eq(fragment_id))
1482                .into_tuple()
1483                .stream(txn)
1484                .await?
1485                .map(|result| {
1486                    result.map(|(actor_id, vnode): (ActorId, Option<VnodeBitmap>)| {
1487                        (
1488                            actor_id as _,
1489                            vnode.map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1490                        )
1491                    })
1492                })
1493                .try_collect()
1494                .await
1495        };
1496
1497        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1498            load_fragment_actor_distribution(&txn, source_backfill_fragment_id).await?;
1499
1500        let source_actors = load_fragment_actor_distribution(&txn, source_fragment_id).await?;
1501
1502        Ok(resolve_no_shuffle_actor_dispatcher(
1503            source_distribution_type,
1504            &source_actors,
1505            source_backfill_distribution_type,
1506            &source_backfill_actors,
1507        )
1508        .into_iter()
1509        .map(|(source_actor, source_backfill_actor)| {
1510            (source_backfill_actor as _, source_actor as _)
1511        })
1512        .collect())
1513    }
1514
1515    pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
1516        let inner = self.inner.read().await;
1517        let actors: Vec<ActorId> = Actor::find()
1518            .select_only()
1519            .column(actor::Column::ActorId)
1520            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1521            .filter(fragment::Column::JobId.is_in(job_ids))
1522            .into_tuple()
1523            .all(&inner.db)
1524            .await?;
1525        Ok(actors)
1526    }
1527
1528    /// Get and filter the "**root**" fragments of the specified jobs.
1529    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1530    ///
1531    /// Root fragment connects to downstream jobs.
1532    ///
1533    /// ## What can be the root fragment
1534    /// - For sink, it should have one `Sink` fragment.
1535    /// - For MV, it should have one `MView` fragment.
1536    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1537    /// - For source, it should have one `Source` fragment.
1538    ///
1539    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1540    pub async fn get_root_fragments(
1541        &self,
1542        job_ids: Vec<ObjectId>,
1543    ) -> MetaResult<(HashMap<ObjectId, Fragment>, Vec<(ActorId, WorkerId)>)> {
1544        let inner = self.inner.read().await;
1545
1546        let job_definitions = resolve_streaming_job_definition(
1547            &inner.db,
1548            &HashSet::from_iter(job_ids.iter().copied()),
1549        )
1550        .await?;
1551
1552        let all_fragments = FragmentModel::find()
1553            .filter(fragment::Column::JobId.is_in(job_ids))
1554            .all(&inner.db)
1555            .await?;
1556        // job_id -> fragment
1557        let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1558        for fragment in all_fragments {
1559            if (fragment.fragment_type_mask & PbFragmentTypeFlag::Mview as i32) != 0
1560                || (fragment.fragment_type_mask & PbFragmentTypeFlag::Sink as i32) != 0
1561            {
1562                _ = root_fragments.insert(fragment.job_id, fragment);
1563            } else if fragment.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
1564                // look for Source fragment only if there's no MView fragment
1565                // (notice try_insert here vs insert above)
1566                _ = root_fragments.try_insert(fragment.job_id, fragment);
1567            }
1568        }
1569
1570        let mut root_fragments_pb = HashMap::new();
1571        for (_, fragment) in root_fragments {
1572            let actors = fragment.find_related(Actor).all(&inner.db).await?;
1573
1574            let job_id = fragment.job_id;
1575            root_fragments_pb.insert(
1576                fragment.job_id,
1577                Self::compose_fragment(
1578                    fragment,
1579                    actors,
1580                    job_definitions.get(&(job_id as _)).cloned(),
1581                )?
1582                .0,
1583            );
1584        }
1585
1586        let actors: Vec<(ActorId, WorkerId)> = Actor::find()
1587            .select_only()
1588            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1589            .into_tuple()
1590            .all(&inner.db)
1591            .await?;
1592
1593        Ok((root_fragments_pb, actors))
1594    }
1595
1596    pub async fn get_root_fragment(
1597        &self,
1598        job_id: ObjectId,
1599    ) -> MetaResult<(Fragment, Vec<(ActorId, WorkerId)>)> {
1600        let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1601        let root_fragment = root_fragments
1602            .remove(&job_id)
1603            .context(format!("root fragment for job {} not found", job_id))?;
1604        Ok((root_fragment, actors))
1605    }
1606
1607    /// Get the downstream fragments connected to the specified job.
1608    pub async fn get_downstream_fragments(
1609        &self,
1610        job_id: ObjectId,
1611    ) -> MetaResult<(Vec<(PbDispatcherType, Fragment)>, Vec<(ActorId, WorkerId)>)> {
1612        let (root_fragment, actors) = self.get_root_fragment(job_id).await?;
1613
1614        let inner = self.inner.read().await;
1615        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1616            .filter(
1617                fragment_relation::Column::SourceFragmentId
1618                    .eq(root_fragment.fragment_id as FragmentId),
1619            )
1620            .all(&inner.db)
1621            .await?;
1622        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
1623            .await?
1624            .remove(&job_id);
1625
1626        let mut downstream_fragments = vec![];
1627        for fragment_relation::Model {
1628            target_fragment_id: fragment_id,
1629            dispatcher_type,
1630            ..
1631        } in downstream_fragment_relations
1632        {
1633            let mut fragment_actors = FragmentModel::find_by_id(fragment_id)
1634                .find_with_related(Actor)
1635                .all(&inner.db)
1636                .await?;
1637            if fragment_actors.is_empty() {
1638                bail!("No fragment found for fragment id {}", fragment_id);
1639            }
1640            assert_eq!(fragment_actors.len(), 1);
1641            let (fragment, actors) = fragment_actors.pop().unwrap();
1642            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1643            let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0;
1644            downstream_fragments.push((dispatch_type, fragment));
1645        }
1646
1647        Ok((downstream_fragments, actors))
1648    }
1649
1650    pub async fn load_source_fragment_ids(
1651        &self,
1652    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1653        let inner = self.inner.read().await;
1654        let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1655            .select_only()
1656            .columns([
1657                fragment::Column::FragmentId,
1658                fragment::Column::FragmentTypeMask,
1659                fragment::Column::StreamNode,
1660            ])
1661            .into_tuple()
1662            .all(&inner.db)
1663            .await?;
1664        fragments.retain(|(_, mask, _)| *mask & PbFragmentTypeFlag::Source as i32 != 0);
1665
1666        let mut source_fragment_ids = HashMap::new();
1667        for (fragment_id, _, stream_node) in fragments {
1668            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1669                source_fragment_ids
1670                    .entry(source_id as SourceId)
1671                    .or_insert_with(BTreeSet::new)
1672                    .insert(fragment_id);
1673            }
1674        }
1675        Ok(source_fragment_ids)
1676    }
1677
1678    pub async fn load_backfill_fragment_ids(
1679        &self,
1680    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1681        let inner = self.inner.read().await;
1682        let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1683            .select_only()
1684            .columns([
1685                fragment::Column::FragmentId,
1686                fragment::Column::FragmentTypeMask,
1687                fragment::Column::StreamNode,
1688            ])
1689            .into_tuple()
1690            .all(&inner.db)
1691            .await?;
1692        fragments.retain(|(_, mask, _)| *mask & PbFragmentTypeFlag::SourceScan as i32 != 0);
1693
1694        let mut source_fragment_ids = HashMap::new();
1695        for (fragment_id, _, stream_node) in fragments {
1696            if let Some((source_id, upstream_source_fragment_id)) =
1697                stream_node.to_protobuf().find_source_backfill()
1698            {
1699                source_fragment_ids
1700                    .entry(source_id as SourceId)
1701                    .or_insert_with(BTreeSet::new)
1702                    .insert((fragment_id, upstream_source_fragment_id));
1703            }
1704        }
1705        Ok(source_fragment_ids)
1706    }
1707
1708    pub async fn load_actor_splits(&self) -> MetaResult<HashMap<ActorId, ConnectorSplits>> {
1709        let inner = self.inner.read().await;
1710        let splits: Vec<(ActorId, ConnectorSplits)> = Actor::find()
1711            .select_only()
1712            .columns([actor::Column::ActorId, actor::Column::Splits])
1713            .filter(actor::Column::Splits.is_not_null())
1714            .into_tuple()
1715            .all(&inner.db)
1716            .await?;
1717        Ok(splits.into_iter().collect())
1718    }
1719
1720    /// Get the actor count of `Materialize` or `Sink` fragment of the specified table.
1721    pub async fn get_actual_job_fragment_parallelism(
1722        &self,
1723        job_id: ObjectId,
1724    ) -> MetaResult<Option<usize>> {
1725        let inner = self.inner.read().await;
1726        let mut fragments: Vec<(FragmentId, i32, i64)> = FragmentModel::find()
1727            .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1728            .select_only()
1729            .columns([
1730                fragment::Column::FragmentId,
1731                fragment::Column::FragmentTypeMask,
1732            ])
1733            .column_as(actor::Column::ActorId.count(), "count")
1734            .filter(fragment::Column::JobId.eq(job_id))
1735            .group_by(fragment::Column::FragmentId)
1736            .into_tuple()
1737            .all(&inner.db)
1738            .await?;
1739
1740        fragments.retain(|(_, mask, _)| {
1741            *mask & PbFragmentTypeFlag::Mview as i32 != 0
1742                || *mask & PbFragmentTypeFlag::Sink as i32 != 0
1743        });
1744
1745        Ok(fragments
1746            .into_iter()
1747            .at_most_one()
1748            .ok()
1749            .flatten()
1750            .map(|(_, _, count)| count as usize))
1751    }
1752}
1753
1754#[cfg(test)]
1755mod tests {
1756    use std::collections::{BTreeMap, HashMap, HashSet};
1757
1758    use itertools::Itertools;
1759    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1760    use risingwave_common::util::iter_util::ZipEqDebug;
1761    use risingwave_common::util::stream_graph_visitor::visit_stream_node;
1762    use risingwave_meta_model::actor::ActorStatus;
1763    use risingwave_meta_model::fragment::DistributionType;
1764    use risingwave_meta_model::{
1765        ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1766        VnodeBitmap, actor, fragment,
1767    };
1768    use risingwave_pb::common::PbActorLocation;
1769    use risingwave_pb::meta::table_fragments::PbActorStatus;
1770    use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1771    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1772    use risingwave_pb::plan_common::PbExprContext;
1773    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1774    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1775    use risingwave_pb::stream_plan::{MergeNode, PbFragmentTypeFlag, PbStreamNode, PbUnionNode};
1776
1777    use crate::MetaResult;
1778    use crate::controller::catalog::CatalogController;
1779    use crate::model::{Fragment, StreamActor};
1780
1781    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1782
1783    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1784
1785    const TEST_FRAGMENT_ID: FragmentId = 1;
1786
1787    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1788
1789    const TEST_JOB_ID: ObjectId = 1;
1790
1791    const TEST_STATE_TABLE_ID: TableId = 1000;
1792
1793    fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1794        let mut upstream_actor_ids = BTreeMap::new();
1795        upstream_actor_ids.insert(
1796            TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1797            HashSet::from_iter([(actor_id + 100)]),
1798        );
1799        upstream_actor_ids.insert(
1800            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1801            HashSet::from_iter([(actor_id + 200)]),
1802        );
1803        upstream_actor_ids
1804    }
1805
1806    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1807        let mut input = vec![];
1808        for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1809            input.push(PbStreamNode {
1810                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1811                    upstream_fragment_id: *upstream_fragment_id as _,
1812                    ..Default::default()
1813                }))),
1814                ..Default::default()
1815            });
1816        }
1817
1818        PbStreamNode {
1819            input,
1820            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1821            ..Default::default()
1822        }
1823    }
1824
1825    #[tokio::test]
1826    async fn test_extract_fragment() -> MetaResult<()> {
1827        let actor_count = 3u32;
1828        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1829            .map(|actor_id| {
1830                (
1831                    actor_id as _,
1832                    generate_upstream_actor_ids_for_actor(actor_id),
1833                )
1834            })
1835            .collect();
1836
1837        let actor_bitmaps = ActorMapping::new_uniform(
1838            (0..actor_count).map(|i| i as _),
1839            VirtualNode::COUNT_FOR_TEST,
1840        )
1841        .to_bitmaps();
1842
1843        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1844
1845        let pb_actors = (0..actor_count)
1846            .map(|actor_id| StreamActor {
1847                actor_id: actor_id as _,
1848                fragment_id: TEST_FRAGMENT_ID as _,
1849                vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1850                mview_definition: "".to_owned(),
1851                expr_context: Some(PbExprContext {
1852                    time_zone: String::from("America/New_York"),
1853                    strict_mode: false,
1854                }),
1855            })
1856            .collect_vec();
1857
1858        let pb_fragment = Fragment {
1859            fragment_id: TEST_FRAGMENT_ID as _,
1860            fragment_type_mask: PbFragmentTypeFlag::Source as _,
1861            distribution_type: PbFragmentDistributionType::Hash as _,
1862            actors: pb_actors.clone(),
1863            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1864            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1865            nodes: stream_node.clone(),
1866        };
1867
1868        let pb_actor_status = (0..actor_count)
1869            .map(|actor_id| {
1870                (
1871                    actor_id,
1872                    PbActorStatus {
1873                        location: PbActorLocation::from_worker(0),
1874                        state: PbActorState::Running as _,
1875                    },
1876                )
1877            })
1878            .collect();
1879
1880        let pb_actor_splits = Default::default();
1881
1882        let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
1883            TEST_JOB_ID,
1884            &pb_fragment,
1885            &pb_actor_status,
1886            &pb_actor_splits,
1887        )?;
1888
1889        check_fragment(fragment, pb_fragment);
1890        check_actors(
1891            actors,
1892            &upstream_actor_ids,
1893            pb_actors,
1894            Default::default(),
1895            &stream_node,
1896        );
1897
1898        Ok(())
1899    }
1900
1901    #[tokio::test]
1902    async fn test_compose_fragment() -> MetaResult<()> {
1903        let actor_count = 3u32;
1904
1905        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1906            .map(|actor_id| {
1907                (
1908                    actor_id as _,
1909                    generate_upstream_actor_ids_for_actor(actor_id),
1910                )
1911            })
1912            .collect();
1913
1914        let mut actor_bitmaps = ActorMapping::new_uniform(
1915            (0..actor_count).map(|i| i as _),
1916            VirtualNode::COUNT_FOR_TEST,
1917        )
1918        .to_bitmaps();
1919
1920        let actors = (0..actor_count)
1921            .map(|actor_id| {
1922                let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
1923                    splits: vec![PbConnectorSplit {
1924                        split_type: "dummy".to_owned(),
1925                        ..Default::default()
1926                    }],
1927                }));
1928
1929                #[expect(deprecated)]
1930                actor::Model {
1931                    actor_id: actor_id as ActorId,
1932                    fragment_id: TEST_FRAGMENT_ID,
1933                    status: ActorStatus::Running,
1934                    splits: actor_splits,
1935                    worker_id: 0,
1936                    upstream_actor_ids: Default::default(),
1937                    vnode_bitmap: actor_bitmaps
1938                        .remove(&actor_id)
1939                        .map(|bitmap| bitmap.to_protobuf())
1940                        .as_ref()
1941                        .map(VnodeBitmap::from),
1942                    expr_context: ExprContext::from(&PbExprContext {
1943                        time_zone: String::from("America/New_York"),
1944                        strict_mode: false,
1945                    }),
1946                }
1947            })
1948            .collect_vec();
1949
1950        let stream_node = {
1951            let template_actor = actors.first().cloned().unwrap();
1952
1953            let template_upstream_actor_ids = upstream_actor_ids
1954                .get(&(template_actor.actor_id as _))
1955                .unwrap();
1956
1957            generate_merger_stream_node(template_upstream_actor_ids)
1958        };
1959
1960        #[expect(deprecated)]
1961        let fragment = fragment::Model {
1962            fragment_id: TEST_FRAGMENT_ID,
1963            job_id: TEST_JOB_ID,
1964            fragment_type_mask: 0,
1965            distribution_type: DistributionType::Hash,
1966            stream_node: StreamNode::from(&stream_node),
1967            state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
1968            upstream_fragment_id: Default::default(),
1969            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1970        };
1971
1972        let (pb_fragment, pb_actor_status, pb_actor_splits) =
1973            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
1974
1975        assert_eq!(pb_actor_status.len(), actor_count as usize);
1976        assert_eq!(pb_actor_splits.len(), actor_count as usize);
1977
1978        let pb_actors = pb_fragment.actors.clone();
1979
1980        check_fragment(fragment, pb_fragment);
1981        check_actors(
1982            actors,
1983            &upstream_actor_ids,
1984            pb_actors,
1985            pb_actor_splits,
1986            &stream_node,
1987        );
1988
1989        Ok(())
1990    }
1991
1992    fn check_actors(
1993        actors: Vec<actor::Model>,
1994        actor_upstreams: &FragmentActorUpstreams,
1995        pb_actors: Vec<StreamActor>,
1996        pb_actor_splits: HashMap<u32, PbConnectorSplits>,
1997        stream_node: &PbStreamNode,
1998    ) {
1999        for (
2000            actor::Model {
2001                actor_id,
2002                fragment_id,
2003                status,
2004                splits,
2005                worker_id: _,
2006                vnode_bitmap,
2007                expr_context,
2008                ..
2009            },
2010            StreamActor {
2011                actor_id: pb_actor_id,
2012                fragment_id: pb_fragment_id,
2013                vnode_bitmap: pb_vnode_bitmap,
2014                mview_definition,
2015                expr_context: pb_expr_context,
2016                ..
2017            },
2018        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2019        {
2020            assert_eq!(actor_id, pb_actor_id as ActorId);
2021            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2022
2023            assert_eq!(
2024                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2025                pb_vnode_bitmap,
2026            );
2027
2028            assert_eq!(mview_definition, "");
2029
2030            visit_stream_node(stream_node, |body| {
2031                if let PbNodeBody::Merge(m) = body {
2032                    assert!(
2033                        actor_upstreams
2034                            .get(&(actor_id as _))
2035                            .unwrap()
2036                            .contains_key(&m.upstream_fragment_id)
2037                    );
2038                }
2039            });
2040
2041            assert_eq!(status, ActorStatus::Running);
2042
2043            assert_eq!(
2044                splits,
2045                pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
2046            );
2047
2048            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2049        }
2050    }
2051
2052    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2053        let Fragment {
2054            fragment_id,
2055            fragment_type_mask,
2056            distribution_type: pb_distribution_type,
2057            actors: _,
2058            state_table_ids: pb_state_table_ids,
2059            maybe_vnode_count: _,
2060            nodes,
2061        } = pb_fragment;
2062
2063        assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2064        assert_eq!(fragment_type_mask, fragment.fragment_type_mask as u32);
2065        assert_eq!(
2066            pb_distribution_type,
2067            PbFragmentDistributionType::from(fragment.distribution_type)
2068        );
2069
2070        assert_eq!(
2071            pb_state_table_ids,
2072            fragment.state_table_ids.into_u32_array()
2073        );
2074        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2075    }
2076}