risingwave_meta/controller/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
26use risingwave_common::util::stream_graph_visitor::{
27    visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_connector::source::SplitImpl;
30use risingwave_meta_model::actor::ActorStatus;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34    Actor, Fragment as FragmentModel, FragmentRelation, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39    VnodeBitmap, WorkerId, actor, database, fragment, fragment_relation, object, sink, source,
40    streaming_job, table,
41};
42use risingwave_meta_model_migration::{Alias, SelectStatement};
43use risingwave_pb::common::PbActorLocation;
44use risingwave_pb::meta::subscribe_response::{
45    Info as NotificationInfo, Operation as NotificationOperation,
46};
47use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
48use risingwave_pb::meta::table_fragments::fragment::{
49    FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan::stream_node::NodeBody;
55use risingwave_pb::stream_plan::{
56    PbDispatchOutputMapping, PbDispatcherType, PbStreamContext, PbStreamNode, PbStreamScanType,
57    StreamScanType,
58};
59use sea_orm::ActiveValue::Set;
60use sea_orm::sea_query::Expr;
61use sea_orm::{
62    ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, ModelTrait, PaginatorTrait,
63    QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
64};
65use serde::{Deserialize, Serialize};
66use tracing::debug;
67
68use crate::barrier::SnapshotBackfillInfo;
69use crate::controller::catalog::{CatalogController, CatalogControllerInner};
70use crate::controller::scale::resolve_streaming_job_definition;
71use crate::controller::utils::{
72    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
73    get_fragment_mappings, resolve_no_shuffle_actor_dispatcher,
74};
75use crate::manager::{LocalNotification, NotificationManager};
76use crate::model::{
77    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
78    StreamActor, StreamContext, StreamJobFragments, TableParallelism,
79};
80use crate::stream::{SplitAssignment, build_actor_split_impls};
81use crate::{MetaError, MetaResult};
82
83/// Some information of running (inflight) actors.
84#[derive(Clone, Debug)]
85pub struct InflightActorInfo {
86    pub worker_id: WorkerId,
87    pub vnode_bitmap: Option<Bitmap>,
88}
89
90#[derive(Clone, Debug)]
91pub struct InflightFragmentInfo {
92    pub fragment_id: crate::model::FragmentId,
93    pub distribution_type: DistributionType,
94    pub nodes: PbStreamNode,
95    pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
96    pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
97}
98
99#[derive(Clone, Debug)]
100pub struct FragmentParallelismInfo {
101    pub distribution_type: FragmentDistributionType,
102    pub actor_count: usize,
103    pub vnode_count: usize,
104}
105
106#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
107#[serde(rename_all = "camelCase")] // for dashboard
108pub struct StreamingJobInfo {
109    pub job_id: ObjectId,
110    pub obj_type: ObjectType,
111    pub name: String,
112    pub job_status: JobStatus,
113    pub parallelism: StreamingParallelism,
114    pub max_parallelism: i32,
115    pub resource_group: String,
116    pub database_id: DatabaseId,
117    pub schema_id: SchemaId,
118}
119
120impl CatalogControllerInner {
121    /// List all fragment vnode mapping info for all CREATED streaming jobs.
122    pub async fn all_running_fragment_mappings(
123        &self,
124    ) -> MetaResult<impl Iterator<Item = FragmentWorkerSlotMapping> + '_> {
125        let txn = self.db.begin().await?;
126
127        let job_ids: Vec<ObjectId> = StreamingJob::find()
128            .select_only()
129            .column(streaming_job::Column::JobId)
130            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
131            .into_tuple()
132            .all(&txn)
133            .await?;
134
135        let mut result = vec![];
136        for job_id in job_ids {
137            let mappings = get_fragment_mappings(&txn, job_id).await?;
138
139            result.extend(mappings.into_iter());
140        }
141
142        Ok(result.into_iter())
143    }
144}
145
146impl NotificationManager {
147    pub(crate) fn notify_fragment_mapping(
148        &self,
149        operation: NotificationOperation,
150        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
151    ) {
152        let fragment_ids = fragment_mappings
153            .iter()
154            .map(|mapping| mapping.fragment_id)
155            .collect_vec();
156        if fragment_ids.is_empty() {
157            return;
158        }
159        // notify all fragment mappings to frontend.
160        for fragment_mapping in fragment_mappings {
161            self.notify_frontend_without_version(
162                operation,
163                NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
164            );
165        }
166
167        // update serving vnode mappings.
168        match operation {
169            NotificationOperation::Add | NotificationOperation::Update => {
170                self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
171                    fragment_ids,
172                ));
173            }
174            NotificationOperation::Delete => {
175                self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
176                    fragment_ids,
177                ));
178            }
179            op => {
180                tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
181            }
182        }
183    }
184}
185
186impl CatalogController {
187    pub fn extract_fragment_and_actors_from_fragments(
188        job_id: ObjectId,
189        fragments: impl Iterator<Item = &Fragment>,
190        actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
191        actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
192    ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
193        fragments
194            .map(|fragment| {
195                Self::extract_fragment_and_actors_for_new_job(
196                    job_id,
197                    fragment,
198                    actor_status,
199                    actor_splits,
200                )
201            })
202            .try_collect()
203    }
204
205    pub fn extract_fragment_and_actors_for_new_job(
206        job_id: ObjectId,
207        fragment: &Fragment,
208        actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
209        actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
210    ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
211        let vnode_count = fragment.vnode_count();
212        let Fragment {
213            fragment_id: pb_fragment_id,
214            fragment_type_mask: pb_fragment_type_mask,
215            distribution_type: pb_distribution_type,
216            actors: pb_actors,
217            state_table_ids: pb_state_table_ids,
218            nodes,
219            ..
220        } = fragment;
221
222        let state_table_ids = pb_state_table_ids.clone().into();
223
224        assert!(!pb_actors.is_empty());
225
226        let stream_node = {
227            let mut stream_node = nodes.clone();
228            visit_stream_node_mut(&mut stream_node, |body| {
229                #[expect(deprecated)]
230                if let NodeBody::Merge(m) = body {
231                    m.upstream_actor_id = vec![];
232                }
233            });
234
235            stream_node
236        };
237
238        let mut actors = vec![];
239
240        for actor in pb_actors {
241            let StreamActor {
242                actor_id,
243                fragment_id,
244                vnode_bitmap,
245                mview_definition: _,
246                expr_context: pb_expr_context,
247                ..
248            } = actor;
249
250            let splits = actor_splits.get(actor_id).map(|splits| {
251                ConnectorSplits::from(&PbConnectorSplits {
252                    splits: splits.iter().map(ConnectorSplit::from).collect(),
253                })
254            });
255            let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
256                anyhow::anyhow!(
257                    "actor {} in fragment {} has no actor_status",
258                    actor_id,
259                    fragment_id
260                )
261            })?;
262
263            let worker_id = status.worker_id() as _;
264
265            let pb_expr_context = pb_expr_context
266                .as_ref()
267                .expect("no expression context found");
268
269            #[expect(deprecated)]
270            actors.push(actor::Model {
271                actor_id: *actor_id as _,
272                fragment_id: *fragment_id as _,
273                status: status.get_state().unwrap().into(),
274                splits,
275                worker_id,
276                upstream_actor_ids: Default::default(),
277                vnode_bitmap: vnode_bitmap
278                    .as_ref()
279                    .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
280                expr_context: ExprContext::from(pb_expr_context),
281            });
282        }
283
284        let stream_node = StreamNode::from(&stream_node);
285
286        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
287            .unwrap()
288            .into();
289
290        #[expect(deprecated)]
291        let fragment = fragment::Model {
292            fragment_id: *pb_fragment_id as _,
293            job_id,
294            fragment_type_mask: (*pb_fragment_type_mask).into(),
295            distribution_type,
296            stream_node,
297            state_table_ids,
298            upstream_fragment_id: Default::default(),
299            vnode_count: vnode_count as _,
300        };
301
302        Ok((fragment, actors))
303    }
304
305    pub fn compose_table_fragments(
306        table_id: u32,
307        state: PbState,
308        ctx: Option<PbStreamContext>,
309        fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
310        parallelism: StreamingParallelism,
311        max_parallelism: usize,
312        job_definition: Option<String>,
313    ) -> MetaResult<StreamJobFragments> {
314        let mut pb_fragments = BTreeMap::new();
315        let mut pb_actor_splits = HashMap::new();
316        let mut pb_actor_status = BTreeMap::new();
317
318        for (fragment, actors) in fragments {
319            let (fragment, fragment_actor_status, fragment_actor_splits) =
320                Self::compose_fragment(fragment, actors, job_definition.clone())?;
321
322            pb_fragments.insert(fragment.fragment_id, fragment);
323
324            pb_actor_splits.extend(build_actor_split_impls(&fragment_actor_splits));
325            pb_actor_status.extend(fragment_actor_status.into_iter());
326        }
327
328        let table_fragments = StreamJobFragments {
329            stream_job_id: table_id.into(),
330            state: state as _,
331            fragments: pb_fragments,
332            actor_status: pb_actor_status,
333            actor_splits: pb_actor_splits,
334            ctx: ctx
335                .as_ref()
336                .map(StreamContext::from_protobuf)
337                .unwrap_or_default(),
338            assigned_parallelism: match parallelism {
339                StreamingParallelism::Custom => TableParallelism::Custom,
340                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
341                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
342            },
343            max_parallelism,
344        };
345
346        Ok(table_fragments)
347    }
348
349    #[allow(clippy::type_complexity)]
350    pub(crate) fn compose_fragment(
351        fragment: fragment::Model,
352        actors: Vec<actor::Model>,
353        job_definition: Option<String>,
354    ) -> MetaResult<(
355        Fragment,
356        HashMap<crate::model::ActorId, PbActorStatus>,
357        HashMap<crate::model::ActorId, PbConnectorSplits>,
358    )> {
359        let fragment::Model {
360            fragment_id,
361            fragment_type_mask,
362            distribution_type,
363            stream_node,
364            state_table_ids,
365            vnode_count,
366            ..
367        } = fragment;
368
369        let stream_node = stream_node.to_protobuf();
370        let mut upstream_fragments = HashSet::new();
371        visit_stream_node_body(&stream_node, |body| {
372            if let NodeBody::Merge(m) = body {
373                assert!(
374                    upstream_fragments.insert(m.upstream_fragment_id),
375                    "non-duplicate upstream fragment"
376                );
377            }
378        });
379
380        let mut pb_actors = vec![];
381
382        let mut pb_actor_status = HashMap::new();
383        let mut pb_actor_splits = HashMap::new();
384
385        for actor in actors {
386            if actor.fragment_id != fragment_id {
387                bail!(
388                    "fragment id {} from actor {} is different from fragment {}",
389                    actor.fragment_id,
390                    actor.actor_id,
391                    fragment_id
392                )
393            }
394
395            let actor::Model {
396                actor_id,
397                fragment_id,
398                status,
399                worker_id,
400                splits,
401                vnode_bitmap,
402                expr_context,
403                ..
404            } = actor;
405
406            let vnode_bitmap =
407                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
408            let pb_expr_context = Some(expr_context.to_protobuf());
409
410            pb_actor_status.insert(
411                actor_id as _,
412                PbActorStatus {
413                    location: PbActorLocation::from_worker(worker_id as u32),
414                    state: PbActorState::from(status) as _,
415                },
416            );
417
418            if let Some(splits) = splits {
419                pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
420            }
421
422            pb_actors.push(StreamActor {
423                actor_id: actor_id as _,
424                fragment_id: fragment_id as _,
425                vnode_bitmap,
426                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
427                expr_context: pb_expr_context,
428            })
429        }
430
431        let pb_state_table_ids = state_table_ids.into_u32_array();
432        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
433        let pb_fragment = Fragment {
434            fragment_id: fragment_id as _,
435            fragment_type_mask: fragment_type_mask.into(),
436            distribution_type: pb_distribution_type,
437            actors: pb_actors,
438            state_table_ids: pb_state_table_ids,
439            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
440            nodes: stream_node,
441        };
442
443        Ok((pb_fragment, pb_actor_status, pb_actor_splits))
444    }
445
446    pub async fn running_fragment_parallelisms(
447        &self,
448        id_filter: Option<HashSet<FragmentId>>,
449    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
450        let inner = self.inner.read().await;
451
452        let query_alias = Alias::new("fragment_actor_count");
453        let count_alias = Alias::new("count");
454
455        let mut query = SelectStatement::new()
456            .column(actor::Column::FragmentId)
457            .expr_as(actor::Column::ActorId.count(), count_alias.clone())
458            .from(Actor)
459            .group_by_col(actor::Column::FragmentId)
460            .to_owned();
461
462        if let Some(id_filter) = id_filter {
463            query.cond_having(actor::Column::FragmentId.is_in(id_filter));
464        }
465
466        let outer = SelectStatement::new()
467            .column((FragmentModel, fragment::Column::FragmentId))
468            .column(count_alias)
469            .column(fragment::Column::DistributionType)
470            .column(fragment::Column::VnodeCount)
471            .from_subquery(query.to_owned(), query_alias.clone())
472            .inner_join(
473                FragmentModel,
474                Expr::col((query_alias, actor::Column::FragmentId))
475                    .equals((FragmentModel, fragment::Column::FragmentId)),
476            )
477            .to_owned();
478
479        let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
480            Selector::<SelectGetableTuple<(FragmentId, i64, DistributionType, i32)>>::into_tuple(
481                outer.to_owned(),
482            )
483            .all(&inner.db)
484            .await?;
485
486        Ok(fragment_parallelisms
487            .into_iter()
488            .map(|(fragment_id, count, distribution_type, vnode_count)| {
489                (
490                    fragment_id,
491                    FragmentParallelismInfo {
492                        distribution_type: distribution_type.into(),
493                        actor_count: count as usize,
494                        vnode_count: vnode_count as usize,
495                    },
496                )
497            })
498            .collect())
499    }
500
501    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
502        let inner = self.inner.read().await;
503        let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
504            .select_only()
505            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
506            .into_tuple()
507            .all(&inner.db)
508            .await?;
509        Ok(fragment_jobs.into_iter().collect())
510    }
511
512    pub async fn get_fragment_job_id(
513        &self,
514        fragment_ids: Vec<FragmentId>,
515    ) -> MetaResult<Vec<ObjectId>> {
516        let inner = self.inner.read().await;
517
518        let object_ids: Vec<ObjectId> = FragmentModel::find()
519            .select_only()
520            .column(fragment::Column::JobId)
521            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
522            .into_tuple()
523            .all(&inner.db)
524            .await?;
525
526        Ok(object_ids)
527    }
528
529    pub async fn get_fragment_desc_by_id(
530        &self,
531        fragment_id: FragmentId,
532    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
533        let inner = self.inner.read().await;
534
535        // Get the fragment description
536        let fragment_opt = FragmentModel::find()
537            .select_only()
538            .columns([
539                fragment::Column::FragmentId,
540                fragment::Column::JobId,
541                fragment::Column::FragmentTypeMask,
542                fragment::Column::DistributionType,
543                fragment::Column::StateTableIds,
544                fragment::Column::VnodeCount,
545                fragment::Column::StreamNode,
546            ])
547            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
548            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
549            .filter(fragment::Column::FragmentId.eq(fragment_id))
550            .group_by(fragment::Column::FragmentId)
551            .into_model::<FragmentDesc>()
552            .one(&inner.db)
553            .await?;
554
555        let Some(fragment) = fragment_opt else {
556            return Ok(None); // Fragment not found
557        };
558
559        // Get upstream fragments
560        let upstreams: Vec<FragmentId> = FragmentRelation::find()
561            .select_only()
562            .column(fragment_relation::Column::SourceFragmentId)
563            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
564            .into_tuple()
565            .all(&inner.db)
566            .await?;
567
568        Ok(Some((fragment, upstreams)))
569    }
570
571    pub async fn list_fragment_database_ids(
572        &self,
573        select_fragment_ids: Option<Vec<FragmentId>>,
574    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
575        let inner = self.inner.read().await;
576        let select = FragmentModel::find()
577            .select_only()
578            .column(fragment::Column::FragmentId)
579            .column(object::Column::DatabaseId)
580            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
581        let select = if let Some(select_fragment_ids) = select_fragment_ids {
582            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
583        } else {
584            select
585        };
586        Ok(select.into_tuple().all(&inner.db).await?)
587    }
588
589    pub async fn get_job_fragments_by_id(
590        &self,
591        job_id: ObjectId,
592    ) -> MetaResult<StreamJobFragments> {
593        let inner = self.inner.read().await;
594        let fragment_actors = FragmentModel::find()
595            .find_with_related(Actor)
596            .filter(fragment::Column::JobId.eq(job_id))
597            .all(&inner.db)
598            .await?;
599
600        let job_info = StreamingJob::find_by_id(job_id)
601            .one(&inner.db)
602            .await?
603            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
604
605        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
606            .await?
607            .remove(&job_id);
608
609        Self::compose_table_fragments(
610            job_id as _,
611            job_info.job_status.into(),
612            job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
613            fragment_actors,
614            job_info.parallelism.clone(),
615            job_info.max_parallelism as _,
616            job_definition,
617        )
618    }
619
620    pub async fn get_fragment_actor_dispatchers(
621        &self,
622        fragment_ids: Vec<FragmentId>,
623    ) -> MetaResult<FragmentActorDispatchers> {
624        let inner = self.inner.read().await;
625        get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
626    }
627
628    pub async fn get_fragment_downstream_relations(
629        &self,
630        fragment_ids: Vec<FragmentId>,
631    ) -> MetaResult<FragmentDownstreamRelation> {
632        let inner = self.inner.read().await;
633        let mut stream = FragmentRelation::find()
634            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
635            .stream(&inner.db)
636            .await?;
637        let mut relations = FragmentDownstreamRelation::new();
638        while let Some(relation) = stream.try_next().await? {
639            relations
640                .entry(relation.source_fragment_id as _)
641                .or_default()
642                .push(DownstreamFragmentRelation {
643                    downstream_fragment_id: relation.target_fragment_id as _,
644                    dispatcher_type: relation.dispatcher_type,
645                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
646                    output_mapping: PbDispatchOutputMapping {
647                        indices: relation.output_indices.into_u32_array(),
648                        types: relation
649                            .output_type_mapping
650                            .unwrap_or_default()
651                            .to_protobuf(),
652                    },
653                });
654        }
655        Ok(relations)
656    }
657
658    pub async fn get_job_fragment_backfill_scan_type(
659        &self,
660        job_id: ObjectId,
661    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
662        let inner = self.inner.read().await;
663        let fragments: Vec<_> = FragmentModel::find()
664            .filter(fragment::Column::JobId.eq(job_id))
665            .all(&inner.db)
666            .await?;
667
668        let mut result = HashMap::new();
669
670        for fragment::Model {
671            fragment_id,
672            stream_node,
673            ..
674        } in fragments
675        {
676            let stream_node = stream_node.to_protobuf();
677            visit_stream_node_body(&stream_node, |body| {
678                if let NodeBody::StreamScan(node) = body {
679                    match node.stream_scan_type() {
680                        StreamScanType::Unspecified => {}
681                        scan_type => {
682                            result.insert(fragment_id as crate::model::FragmentId, scan_type);
683                        }
684                    }
685                }
686            });
687        }
688
689        Ok(result)
690    }
691
692    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
693        let inner = self.inner.read().await;
694        let job_states = StreamingJob::find()
695            .select_only()
696            .column(streaming_job::Column::JobId)
697            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
698            .join(JoinType::InnerJoin, object::Relation::Database2.def())
699            .column(object::Column::ObjType)
700            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
701            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
702            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
703            .column_as(
704                Expr::if_null(
705                    Expr::col((table::Entity, table::Column::Name)),
706                    Expr::if_null(
707                        Expr::col((source::Entity, source::Column::Name)),
708                        Expr::if_null(
709                            Expr::col((sink::Entity, sink::Column::Name)),
710                            Expr::val("<unknown>"),
711                        ),
712                    ),
713                ),
714                "name",
715            )
716            .columns([
717                streaming_job::Column::JobStatus,
718                streaming_job::Column::Parallelism,
719                streaming_job::Column::MaxParallelism,
720            ])
721            .column_as(
722                Expr::if_null(
723                    Expr::col((
724                        streaming_job::Entity,
725                        streaming_job::Column::SpecificResourceGroup,
726                    )),
727                    Expr::col((database::Entity, database::Column::ResourceGroup)),
728                ),
729                "resource_group",
730            )
731            .column(object::Column::DatabaseId)
732            .column(object::Column::SchemaId)
733            .into_model()
734            .all(&inner.db)
735            .await?;
736        Ok(job_states)
737    }
738
739    pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
740        let inner = self.inner.read().await;
741        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
742            .select_only()
743            .column(streaming_job::Column::MaxParallelism)
744            .into_tuple()
745            .one(&inner.db)
746            .await?
747            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
748        Ok(max_parallelism as usize)
749    }
750
751    /// Get all actor ids in the target streaming jobs.
752    pub async fn get_job_actor_mapping(
753        &self,
754        job_ids: Vec<ObjectId>,
755    ) -> MetaResult<HashMap<ObjectId, Vec<ActorId>>> {
756        let inner = self.inner.read().await;
757        let job_actors: Vec<(ObjectId, ActorId)> = Actor::find()
758            .select_only()
759            .column(fragment::Column::JobId)
760            .column(actor::Column::ActorId)
761            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
762            .filter(fragment::Column::JobId.is_in(job_ids))
763            .into_tuple()
764            .all(&inner.db)
765            .await?;
766        Ok(job_actors.into_iter().into_group_map())
767    }
768
769    /// Try to get internal table ids of each streaming job, used by metrics collection.
770    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
771        if let Ok(inner) = self.inner.try_read()
772            && let Ok(job_state_tables) = FragmentModel::find()
773                .select_only()
774                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
775                .into_tuple::<(ObjectId, I32Array)>()
776                .all(&inner.db)
777                .await
778        {
779            let mut job_internal_table_ids = HashMap::new();
780            for (job_id, state_table_ids) in job_state_tables {
781                job_internal_table_ids
782                    .entry(job_id)
783                    .or_insert_with(Vec::new)
784                    .extend(state_table_ids.into_inner());
785            }
786            return Some(job_internal_table_ids.into_iter().collect());
787        }
788        None
789    }
790
791    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
792        let inner = self.inner.read().await;
793        let count = FragmentModel::find().count(&inner.db).await?;
794        Ok(count > 0)
795    }
796
797    pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
798        let inner = self.inner.read().await;
799        let actor_cnt: Vec<(WorkerId, i64)> = Actor::find()
800            .select_only()
801            .column(actor::Column::WorkerId)
802            .column_as(actor::Column::ActorId.count(), "count")
803            .group_by(actor::Column::WorkerId)
804            .into_tuple()
805            .all(&inner.db)
806            .await?;
807
808        Ok(actor_cnt
809            .into_iter()
810            .map(|(worker_id, count)| (worker_id, count as usize))
811            .collect())
812    }
813
814    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
815    pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
816        let inner = self.inner.read().await;
817        let jobs = StreamingJob::find().all(&inner.db).await?;
818
819        let mut job_definition = resolve_streaming_job_definition(
820            &inner.db,
821            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
822        )
823        .await?;
824
825        let mut table_fragments = BTreeMap::new();
826        for job in jobs {
827            let fragment_actors = FragmentModel::find()
828                .find_with_related(Actor)
829                .filter(fragment::Column::JobId.eq(job.job_id))
830                .all(&inner.db)
831                .await?;
832
833            table_fragments.insert(
834                job.job_id as ObjectId,
835                Self::compose_table_fragments(
836                    job.job_id as _,
837                    job.job_status.into(),
838                    job.timezone.map(|tz| PbStreamContext { timezone: tz }),
839                    fragment_actors,
840                    job.parallelism.clone(),
841                    job.max_parallelism as _,
842                    job_definition.remove(&job.job_id),
843                )?,
844            );
845        }
846
847        Ok(table_fragments)
848    }
849
850    /// Returns pairs of (job id, actor ids), where actors belong to CDC table backfill fragment of the job.
851    pub async fn cdc_table_backfill_actor_ids(&self) -> MetaResult<HashMap<u32, HashSet<u32>>> {
852        let inner = self.inner.read().await;
853        let mut job_id_actor_ids = HashMap::default();
854        let stream_cdc_scan_flag = FragmentTypeFlag::StreamCdcScan as i32;
855        let fragment_type_mask = stream_cdc_scan_flag;
856        let fragment_actors: Vec<(fragment::Model, Vec<actor::Model>)> = FragmentModel::find()
857            .find_with_related(Actor)
858            .filter(fragment::Column::FragmentTypeMask.eq(fragment_type_mask))
859            .all(&inner.db)
860            .await?;
861        for (fragment, actors) in fragment_actors {
862            let e: &mut HashSet<u32> = job_id_actor_ids.entry(fragment.job_id as _).or_default();
863            e.extend(actors.iter().map(|a| a.actor_id as u32));
864        }
865        Ok(job_id_actor_ids)
866    }
867
868    pub async fn upstream_fragments(
869        &self,
870        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
871    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
872        let inner = self.inner.read().await;
873        let mut stream = FragmentRelation::find()
874            .select_only()
875            .columns([
876                fragment_relation::Column::SourceFragmentId,
877                fragment_relation::Column::TargetFragmentId,
878            ])
879            .filter(
880                fragment_relation::Column::TargetFragmentId
881                    .is_in(fragment_ids.map(|id| id as FragmentId)),
882            )
883            .into_tuple::<(FragmentId, FragmentId)>()
884            .stream(&inner.db)
885            .await?;
886        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
887        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
888            upstream_fragments
889                .entry(downstream_fragment_id as crate::model::FragmentId)
890                .or_default()
891                .insert(upstream_fragment_id as crate::model::FragmentId);
892        }
893        Ok(upstream_fragments)
894    }
895
896    pub async fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
897        let inner = self.inner.read().await;
898        let actor_locations: Vec<PartialActorLocation> =
899            Actor::find().into_partial_model().all(&inner.db).await?;
900        Ok(actor_locations)
901    }
902
903    pub async fn list_actor_info(
904        &self,
905    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
906        let inner = self.inner.read().await;
907        let actor_locations: Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)> =
908            Actor::find()
909                .join(JoinType::LeftJoin, actor::Relation::Fragment.def())
910                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
911                .select_only()
912                .columns([actor::Column::ActorId, actor::Column::FragmentId])
913                .column_as(object::Column::Oid, "job_id")
914                .column_as(object::Column::SchemaId, "schema_id")
915                .column_as(object::Column::ObjType, "type")
916                .into_tuple()
917                .all(&inner.db)
918                .await?;
919        Ok(actor_locations)
920    }
921
922    pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
923        let inner = self.inner.read().await;
924
925        let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
926            .select_only()
927            .filter(actor::Column::Splits.is_not_null())
928            .columns([actor::Column::ActorId, actor::Column::FragmentId])
929            .into_tuple()
930            .all(&inner.db)
931            .await?;
932
933        Ok(source_actors)
934    }
935
936    pub async fn list_creating_fragment_descs(
937        &self,
938    ) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
939        let inner = self.inner.read().await;
940        let mut result = Vec::new();
941        let fragments = FragmentModel::find()
942            .select_only()
943            .columns([
944                fragment::Column::FragmentId,
945                fragment::Column::JobId,
946                fragment::Column::FragmentTypeMask,
947                fragment::Column::DistributionType,
948                fragment::Column::StateTableIds,
949                fragment::Column::VnodeCount,
950                fragment::Column::StreamNode,
951            ])
952            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
953            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
954            .join(JoinType::LeftJoin, fragment::Relation::Object.def())
955            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
956            .filter(
957                streaming_job::Column::JobStatus
958                    .eq(JobStatus::Initial)
959                    .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
960            )
961            .group_by(fragment::Column::FragmentId)
962            .into_model::<FragmentDesc>()
963            .all(&inner.db)
964            .await?;
965        for fragment in fragments {
966            let upstreams: Vec<FragmentId> = FragmentRelation::find()
967                .select_only()
968                .column(fragment_relation::Column::SourceFragmentId)
969                .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
970                .into_tuple()
971                .all(&inner.db)
972                .await?;
973            result.push((fragment, upstreams));
974        }
975        Ok(result)
976    }
977
978    pub async fn list_fragment_descs(&self) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
979        let inner = self.inner.read().await;
980        let mut result = Vec::new();
981        let fragments = FragmentModel::find()
982            .select_only()
983            .columns([
984                fragment::Column::FragmentId,
985                fragment::Column::JobId,
986                fragment::Column::FragmentTypeMask,
987                fragment::Column::DistributionType,
988                fragment::Column::StateTableIds,
989                fragment::Column::VnodeCount,
990                fragment::Column::StreamNode,
991            ])
992            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
993            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
994            .group_by(fragment::Column::FragmentId)
995            .into_model::<FragmentDesc>()
996            .all(&inner.db)
997            .await?;
998        for fragment in fragments {
999            let upstreams: Vec<FragmentId> = FragmentRelation::find()
1000                .select_only()
1001                .column(fragment_relation::Column::SourceFragmentId)
1002                .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
1003                .into_tuple()
1004                .all(&inner.db)
1005                .await?;
1006            result.push((fragment, upstreams));
1007        }
1008        Ok(result)
1009    }
1010
1011    pub async fn list_sink_actor_mapping(
1012        &self,
1013    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1014        let inner = self.inner.read().await;
1015        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1016            .select_only()
1017            .columns([sink::Column::SinkId, sink::Column::Name])
1018            .into_tuple()
1019            .all(&inner.db)
1020            .await?;
1021        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1022        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1023
1024        let actor_with_type: Vec<(ActorId, SinkId, i32)> = Actor::find()
1025            .select_only()
1026            .column(actor::Column::ActorId)
1027            .columns([fragment::Column::JobId, fragment::Column::FragmentTypeMask])
1028            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1029            .filter(fragment::Column::JobId.is_in(sink_ids))
1030            .into_tuple()
1031            .all(&inner.db)
1032            .await?;
1033
1034        let mut sink_actor_mapping = HashMap::new();
1035        for (actor_id, sink_id, type_mask) in actor_with_type {
1036            if FragmentTypeMask::from(type_mask).contains(FragmentTypeFlag::Sink) {
1037                sink_actor_mapping
1038                    .entry(sink_id)
1039                    .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1040                    .1
1041                    .push(actor_id);
1042            }
1043        }
1044
1045        Ok(sink_actor_mapping)
1046    }
1047
1048    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1049        let inner = self.inner.read().await;
1050        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1051            .select_only()
1052            .columns([
1053                fragment::Column::FragmentId,
1054                fragment::Column::JobId,
1055                fragment::Column::StateTableIds,
1056            ])
1057            .into_partial_model()
1058            .all(&inner.db)
1059            .await?;
1060        Ok(fragment_state_tables)
1061    }
1062
1063    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1064    /// collected
1065    pub async fn load_all_actors(
1066        &self,
1067        database_id: Option<DatabaseId>,
1068    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
1069    {
1070        let inner = self.inner.read().await;
1071        let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
1072        let filter_condition = if let Some(database_id) = database_id {
1073            filter_condition.and(object::Column::DatabaseId.eq(database_id))
1074        } else {
1075            filter_condition
1076        };
1077        #[expect(clippy::type_complexity)]
1078        let mut actor_info_stream: BoxStream<
1079            '_,
1080            Result<
1081                (
1082                    ActorId,
1083                    WorkerId,
1084                    Option<VnodeBitmap>,
1085                    FragmentId,
1086                    StreamNode,
1087                    I32Array,
1088                    DistributionType,
1089                    DatabaseId,
1090                    ObjectId,
1091                ),
1092                _,
1093            >,
1094        > = Actor::find()
1095            .select_only()
1096            .column(actor::Column::ActorId)
1097            .column(actor::Column::WorkerId)
1098            .column(actor::Column::VnodeBitmap)
1099            .column(fragment::Column::FragmentId)
1100            .column(fragment::Column::StreamNode)
1101            .column(fragment::Column::StateTableIds)
1102            .column(fragment::Column::DistributionType)
1103            .column(object::Column::DatabaseId)
1104            .column(object::Column::Oid)
1105            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1106            .join(JoinType::InnerJoin, fragment::Relation::Object.def())
1107            .filter(filter_condition)
1108            .into_tuple()
1109            .stream(&inner.db)
1110            .await?;
1111
1112        let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1113            HashMap::new();
1114
1115        while let Some((
1116            actor_id,
1117            worker_id,
1118            vnode_bitmap,
1119            fragment_id,
1120            node,
1121            state_table_ids,
1122            distribution_type,
1123            database_id,
1124            job_id,
1125        )) = actor_info_stream.try_next().await?
1126        {
1127            let fragment_infos = database_fragment_infos
1128                .entry(database_id)
1129                .or_default()
1130                .entry(job_id)
1131                .or_default();
1132            let state_table_ids = state_table_ids.into_inner();
1133            let state_table_ids = state_table_ids
1134                .into_iter()
1135                .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1136                .collect();
1137            let actor_info = InflightActorInfo {
1138                worker_id,
1139                vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1140            };
1141            match fragment_infos.entry(fragment_id) {
1142                Entry::Occupied(mut entry) => {
1143                    let info: &mut InflightFragmentInfo = entry.get_mut();
1144                    assert_eq!(info.state_table_ids, state_table_ids);
1145                    assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1146                }
1147                Entry::Vacant(entry) => {
1148                    entry.insert(InflightFragmentInfo {
1149                        fragment_id: fragment_id as _,
1150                        distribution_type,
1151                        nodes: node.to_protobuf(),
1152                        actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1153                        state_table_ids,
1154                    });
1155                }
1156            }
1157        }
1158
1159        debug!(?database_fragment_infos, "reload all actors");
1160
1161        Ok(database_fragment_infos)
1162    }
1163
1164    pub async fn migrate_actors(
1165        &self,
1166        plan: HashMap<WorkerSlotId, WorkerSlotId>,
1167    ) -> MetaResult<()> {
1168        let inner = self.inner.read().await;
1169        let txn = inner.db.begin().await?;
1170
1171        let actors: Vec<(
1172            FragmentId,
1173            DistributionType,
1174            ActorId,
1175            Option<VnodeBitmap>,
1176            WorkerId,
1177            ActorStatus,
1178        )> = Actor::find()
1179            .select_only()
1180            .columns([
1181                fragment::Column::FragmentId,
1182                fragment::Column::DistributionType,
1183            ])
1184            .columns([
1185                actor::Column::ActorId,
1186                actor::Column::VnodeBitmap,
1187                actor::Column::WorkerId,
1188                actor::Column::Status,
1189            ])
1190            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1191            .into_tuple()
1192            .all(&txn)
1193            .await?;
1194
1195        let mut actor_locations = HashMap::new();
1196
1197        for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1198            if *status != ActorStatus::Running {
1199                tracing::warn!(
1200                    "skipping actor {} in fragment {} with status {:?}",
1201                    actor_id,
1202                    fragment_id,
1203                    status
1204                );
1205                continue;
1206            }
1207
1208            actor_locations
1209                .entry(*worker_id)
1210                .or_insert(HashMap::new())
1211                .entry(*fragment_id)
1212                .or_insert(BTreeSet::new())
1213                .insert(*actor_id);
1214        }
1215
1216        let expired_or_changed_workers: HashSet<_> =
1217            plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1218
1219        let mut actor_migration_plan = HashMap::new();
1220        for (worker, fragment) in actor_locations {
1221            if expired_or_changed_workers.contains(&worker) {
1222                for (fragment_id, actors) in fragment {
1223                    debug!(
1224                        "worker {} expired or changed, migrating fragment {}",
1225                        worker, fragment_id
1226                    );
1227                    let worker_slot_to_actor: HashMap<_, _> = actors
1228                        .iter()
1229                        .enumerate()
1230                        .map(|(idx, actor_id)| {
1231                            (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1232                        })
1233                        .collect();
1234
1235                    for (worker_slot, actor) in worker_slot_to_actor {
1236                        if let Some(target) = plan.get(&worker_slot) {
1237                            actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1238                        }
1239                    }
1240                }
1241            }
1242        }
1243
1244        for (actor, worker) in actor_migration_plan {
1245            Actor::update_many()
1246                .col_expr(
1247                    actor::Column::WorkerId,
1248                    Expr::value(Value::Int(Some(worker))),
1249                )
1250                .filter(actor::Column::ActorId.eq(actor))
1251                .exec(&txn)
1252                .await?;
1253        }
1254
1255        txn.commit().await?;
1256
1257        Ok(())
1258    }
1259
1260    pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1261        let inner = self.inner.read().await;
1262
1263        let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1264            .select_only()
1265            .columns([fragment::Column::FragmentId])
1266            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1267            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1268            .into_tuple()
1269            .all(&inner.db)
1270            .await?;
1271
1272        let mut actor_locations = HashMap::new();
1273
1274        for (fragment_id, _, worker_id) in actors {
1275            *actor_locations
1276                .entry(worker_id)
1277                .or_insert(HashMap::new())
1278                .entry(fragment_id)
1279                .or_insert(0_usize) += 1;
1280        }
1281
1282        let mut result = HashSet::new();
1283        for (worker_id, mapping) in actor_locations {
1284            let max_fragment_len = mapping.values().max().unwrap();
1285
1286            result
1287                .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1288        }
1289
1290        Ok(result)
1291    }
1292
1293    pub async fn all_node_actors(
1294        &self,
1295        include_inactive: bool,
1296    ) -> MetaResult<HashMap<WorkerId, Vec<StreamActor>>> {
1297        let inner = self.inner.read().await;
1298        let fragment_actors = if include_inactive {
1299            FragmentModel::find()
1300                .find_with_related(Actor)
1301                .all(&inner.db)
1302                .await?
1303        } else {
1304            FragmentModel::find()
1305                .find_with_related(Actor)
1306                .filter(actor::Column::Status.eq(ActorStatus::Running))
1307                .all(&inner.db)
1308                .await?
1309        };
1310
1311        let job_definitions = resolve_streaming_job_definition(
1312            &inner.db,
1313            &HashSet::from_iter(fragment_actors.iter().map(|(fragment, _)| fragment.job_id)),
1314        )
1315        .await?;
1316
1317        let mut node_actors = HashMap::new();
1318        for (fragment, actors) in fragment_actors {
1319            let job_id = fragment.job_id;
1320            let (table_fragments, actor_status, _) = Self::compose_fragment(
1321                fragment,
1322                actors,
1323                job_definitions.get(&(job_id as _)).cloned(),
1324            )?;
1325            for actor in table_fragments.actors {
1326                let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
1327                node_actors
1328                    .entry(node_id)
1329                    .or_insert_with(Vec::new)
1330                    .push(actor);
1331            }
1332        }
1333
1334        Ok(node_actors)
1335    }
1336
1337    pub async fn get_worker_actor_ids(
1338        &self,
1339        job_ids: Vec<ObjectId>,
1340    ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
1341        let inner = self.inner.read().await;
1342        let actor_workers: Vec<(ActorId, WorkerId)> = Actor::find()
1343            .select_only()
1344            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1345            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1346            .filter(fragment::Column::JobId.is_in(job_ids))
1347            .into_tuple()
1348            .all(&inner.db)
1349            .await?;
1350
1351        let mut worker_actors = BTreeMap::new();
1352        for (actor_id, worker_id) in actor_workers {
1353            worker_actors
1354                .entry(worker_id)
1355                .or_insert_with(Vec::new)
1356                .push(actor_id);
1357        }
1358
1359        Ok(worker_actors)
1360    }
1361
1362    pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1363        let inner = self.inner.read().await;
1364        let txn = inner.db.begin().await?;
1365        for assignments in split_assignment.values() {
1366            for (actor_id, splits) in assignments {
1367                let actor_splits = splits.iter().map(Into::into).collect_vec();
1368                Actor::update(actor::ActiveModel {
1369                    actor_id: Set(*actor_id as _),
1370                    splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1371                        splits: actor_splits,
1372                    }))),
1373                    ..Default::default()
1374                })
1375                .exec(&txn)
1376                .await
1377                .map_err(|err| {
1378                    if err == DbErr::RecordNotUpdated {
1379                        MetaError::catalog_id_not_found("actor_id", actor_id)
1380                    } else {
1381                        err.into()
1382                    }
1383                })?;
1384            }
1385        }
1386        txn.commit().await?;
1387
1388        Ok(())
1389    }
1390
1391    #[await_tree::instrument]
1392    pub async fn fill_snapshot_backfill_epoch(
1393        &self,
1394        fragment_ids: impl Iterator<Item = FragmentId>,
1395        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1396        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1397    ) -> MetaResult<()> {
1398        let inner = self.inner.write().await;
1399        let txn = inner.db.begin().await?;
1400        for fragment_id in fragment_ids {
1401            let fragment = FragmentModel::find_by_id(fragment_id)
1402                .one(&txn)
1403                .await?
1404                .context(format!("fragment {} not found", fragment_id))?;
1405            let mut node = fragment.stream_node.to_protobuf();
1406            if crate::stream::fill_snapshot_backfill_epoch(
1407                &mut node,
1408                snapshot_backfill_info,
1409                cross_db_snapshot_backfill_info,
1410            )? {
1411                let node = StreamNode::from(&node);
1412                FragmentModel::update(fragment::ActiveModel {
1413                    fragment_id: Set(fragment_id),
1414                    stream_node: Set(node),
1415                    ..Default::default()
1416                })
1417                .exec(&txn)
1418                .await?;
1419            }
1420        }
1421        txn.commit().await?;
1422        Ok(())
1423    }
1424
1425    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1426    pub async fn get_running_actors_of_fragment(
1427        &self,
1428        fragment_id: FragmentId,
1429    ) -> MetaResult<Vec<ActorId>> {
1430        let inner = self.inner.read().await;
1431        let actors: Vec<ActorId> = Actor::find()
1432            .select_only()
1433            .column(actor::Column::ActorId)
1434            .filter(actor::Column::FragmentId.eq(fragment_id))
1435            .filter(actor::Column::Status.eq(ActorStatus::Running))
1436            .into_tuple()
1437            .all(&inner.db)
1438            .await?;
1439        Ok(actors)
1440    }
1441
1442    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1443    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1444    pub async fn get_running_actors_for_source_backfill(
1445        &self,
1446        source_backfill_fragment_id: FragmentId,
1447        source_fragment_id: FragmentId,
1448    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1449        let inner = self.inner.read().await;
1450        let txn = inner.db.begin().await?;
1451
1452        let fragment_relation: DispatcherType = FragmentRelation::find()
1453            .select_only()
1454            .column(fragment_relation::Column::DispatcherType)
1455            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1456            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1457            .into_tuple()
1458            .one(&txn)
1459            .await?
1460            .ok_or_else(|| {
1461                anyhow!(
1462                    "no fragment connection from source fragment {} to source backfill fragment {}",
1463                    source_fragment_id,
1464                    source_backfill_fragment_id
1465                )
1466            })?;
1467
1468        if fragment_relation != DispatcherType::NoShuffle {
1469            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1470        }
1471
1472        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1473            let result: MetaResult<DistributionType> = try {
1474                FragmentModel::find_by_id(fragment_id)
1475                    .select_only()
1476                    .column(fragment::Column::DistributionType)
1477                    .into_tuple()
1478                    .one(txn)
1479                    .await?
1480                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1481            };
1482            result
1483        };
1484
1485        let source_backfill_distribution_type =
1486            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1487        let source_distribution_type =
1488            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1489
1490        let load_fragment_actor_distribution = |txn, fragment_id: FragmentId| async move {
1491            Actor::find()
1492                .select_only()
1493                .column(actor::Column::ActorId)
1494                .column(actor::Column::VnodeBitmap)
1495                .filter(actor::Column::FragmentId.eq(fragment_id))
1496                .into_tuple()
1497                .stream(txn)
1498                .await?
1499                .map(|result| {
1500                    result.map(|(actor_id, vnode): (ActorId, Option<VnodeBitmap>)| {
1501                        (
1502                            actor_id as _,
1503                            vnode.map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1504                        )
1505                    })
1506                })
1507                .try_collect()
1508                .await
1509        };
1510
1511        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1512            load_fragment_actor_distribution(&txn, source_backfill_fragment_id).await?;
1513
1514        let source_actors = load_fragment_actor_distribution(&txn, source_fragment_id).await?;
1515
1516        Ok(resolve_no_shuffle_actor_dispatcher(
1517            source_distribution_type,
1518            &source_actors,
1519            source_backfill_distribution_type,
1520            &source_backfill_actors,
1521        )
1522        .into_iter()
1523        .map(|(source_actor, source_backfill_actor)| {
1524            (source_backfill_actor as _, source_actor as _)
1525        })
1526        .collect())
1527    }
1528
1529    pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
1530        let inner = self.inner.read().await;
1531        let actors: Vec<ActorId> = Actor::find()
1532            .select_only()
1533            .column(actor::Column::ActorId)
1534            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1535            .filter(fragment::Column::JobId.is_in(job_ids))
1536            .into_tuple()
1537            .all(&inner.db)
1538            .await?;
1539        Ok(actors)
1540    }
1541
1542    /// Get and filter the "**root**" fragments of the specified jobs.
1543    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1544    ///
1545    /// Root fragment connects to downstream jobs.
1546    ///
1547    /// ## What can be the root fragment
1548    /// - For sink, it should have one `Sink` fragment.
1549    /// - For MV, it should have one `MView` fragment.
1550    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1551    /// - For source, it should have one `Source` fragment.
1552    ///
1553    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1554    pub async fn get_root_fragments(
1555        &self,
1556        job_ids: Vec<ObjectId>,
1557    ) -> MetaResult<(HashMap<ObjectId, Fragment>, Vec<(ActorId, WorkerId)>)> {
1558        let inner = self.inner.read().await;
1559
1560        let job_definitions = resolve_streaming_job_definition(
1561            &inner.db,
1562            &HashSet::from_iter(job_ids.iter().copied()),
1563        )
1564        .await?;
1565
1566        let all_fragments = FragmentModel::find()
1567            .filter(fragment::Column::JobId.is_in(job_ids))
1568            .all(&inner.db)
1569            .await?;
1570        // job_id -> fragment
1571        let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1572        for fragment in all_fragments {
1573            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1574            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1575                _ = root_fragments.insert(fragment.job_id, fragment);
1576            } else if mask.contains(FragmentTypeFlag::Source) {
1577                // look for Source fragment only if there's no MView fragment
1578                // (notice try_insert here vs insert above)
1579                _ = root_fragments.try_insert(fragment.job_id, fragment);
1580            }
1581        }
1582
1583        let mut root_fragments_pb = HashMap::new();
1584        for (_, fragment) in root_fragments {
1585            let actors = fragment.find_related(Actor).all(&inner.db).await?;
1586
1587            let job_id = fragment.job_id;
1588            root_fragments_pb.insert(
1589                fragment.job_id,
1590                Self::compose_fragment(
1591                    fragment,
1592                    actors,
1593                    job_definitions.get(&(job_id as _)).cloned(),
1594                )?
1595                .0,
1596            );
1597        }
1598
1599        let actors: Vec<(ActorId, WorkerId)> = Actor::find()
1600            .select_only()
1601            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1602            .into_tuple()
1603            .all(&inner.db)
1604            .await?;
1605
1606        Ok((root_fragments_pb, actors))
1607    }
1608
1609    pub async fn get_root_fragment(
1610        &self,
1611        job_id: ObjectId,
1612    ) -> MetaResult<(Fragment, Vec<(ActorId, WorkerId)>)> {
1613        let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1614        let root_fragment = root_fragments
1615            .remove(&job_id)
1616            .context(format!("root fragment for job {} not found", job_id))?;
1617        Ok((root_fragment, actors))
1618    }
1619
1620    /// Get the downstream fragments connected to the specified job.
1621    pub async fn get_downstream_fragments(
1622        &self,
1623        job_id: ObjectId,
1624    ) -> MetaResult<(Vec<(PbDispatcherType, Fragment)>, Vec<(ActorId, WorkerId)>)> {
1625        let (root_fragment, actors) = self.get_root_fragment(job_id).await?;
1626
1627        let inner = self.inner.read().await;
1628        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1629            .filter(
1630                fragment_relation::Column::SourceFragmentId
1631                    .eq(root_fragment.fragment_id as FragmentId),
1632            )
1633            .all(&inner.db)
1634            .await?;
1635        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
1636            .await?
1637            .remove(&job_id);
1638
1639        let mut downstream_fragments = vec![];
1640        for fragment_relation::Model {
1641            target_fragment_id: fragment_id,
1642            dispatcher_type,
1643            ..
1644        } in downstream_fragment_relations
1645        {
1646            let mut fragment_actors = FragmentModel::find_by_id(fragment_id)
1647                .find_with_related(Actor)
1648                .all(&inner.db)
1649                .await?;
1650            if fragment_actors.is_empty() {
1651                bail!("No fragment found for fragment id {}", fragment_id);
1652            }
1653            assert_eq!(fragment_actors.len(), 1);
1654            let (fragment, actors) = fragment_actors.pop().unwrap();
1655            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1656            let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0;
1657            downstream_fragments.push((dispatch_type, fragment));
1658        }
1659
1660        Ok((downstream_fragments, actors))
1661    }
1662
1663    pub async fn load_source_fragment_ids(
1664        &self,
1665    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1666        let inner = self.inner.read().await;
1667        let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1668            .select_only()
1669            .columns([
1670                fragment::Column::FragmentId,
1671                fragment::Column::FragmentTypeMask,
1672                fragment::Column::StreamNode,
1673            ])
1674            .into_tuple()
1675            .all(&inner.db)
1676            .await?;
1677        fragments.retain(|(_, mask, _)| {
1678            FragmentTypeMask::from(*mask).contains(FragmentTypeFlag::Source)
1679        });
1680
1681        let mut source_fragment_ids = HashMap::new();
1682        for (fragment_id, _, stream_node) in fragments {
1683            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1684                source_fragment_ids
1685                    .entry(source_id as SourceId)
1686                    .or_insert_with(BTreeSet::new)
1687                    .insert(fragment_id);
1688            }
1689        }
1690        Ok(source_fragment_ids)
1691    }
1692
1693    pub async fn load_backfill_fragment_ids(
1694        &self,
1695    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1696        let inner = self.inner.read().await;
1697        let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1698            .select_only()
1699            .columns([
1700                fragment::Column::FragmentId,
1701                fragment::Column::FragmentTypeMask,
1702                fragment::Column::StreamNode,
1703            ])
1704            .into_tuple()
1705            .all(&inner.db)
1706            .await?;
1707        fragments.retain(|(_, mask, _)| {
1708            FragmentTypeMask::from(*mask).contains(FragmentTypeFlag::SourceScan)
1709        });
1710
1711        let mut source_fragment_ids = HashMap::new();
1712        for (fragment_id, _, stream_node) in fragments {
1713            if let Some((source_id, upstream_source_fragment_id)) =
1714                stream_node.to_protobuf().find_source_backfill()
1715            {
1716                source_fragment_ids
1717                    .entry(source_id as SourceId)
1718                    .or_insert_with(BTreeSet::new)
1719                    .insert((fragment_id, upstream_source_fragment_id));
1720            }
1721        }
1722        Ok(source_fragment_ids)
1723    }
1724
1725    pub async fn load_actor_splits(&self) -> MetaResult<HashMap<ActorId, ConnectorSplits>> {
1726        let inner = self.inner.read().await;
1727        let splits: Vec<(ActorId, ConnectorSplits)> = Actor::find()
1728            .select_only()
1729            .columns([actor::Column::ActorId, actor::Column::Splits])
1730            .filter(actor::Column::Splits.is_not_null())
1731            .into_tuple()
1732            .all(&inner.db)
1733            .await?;
1734        Ok(splits.into_iter().collect())
1735    }
1736
1737    /// Get the actor count of `Materialize` or `Sink` fragment of the specified table.
1738    pub async fn get_actual_job_fragment_parallelism(
1739        &self,
1740        job_id: ObjectId,
1741    ) -> MetaResult<Option<usize>> {
1742        let inner = self.inner.read().await;
1743        let mut fragments: Vec<(FragmentId, i32, i64)> = FragmentModel::find()
1744            .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1745            .select_only()
1746            .columns([
1747                fragment::Column::FragmentId,
1748                fragment::Column::FragmentTypeMask,
1749            ])
1750            .column_as(actor::Column::ActorId.count(), "count")
1751            .filter(fragment::Column::JobId.eq(job_id))
1752            .group_by(fragment::Column::FragmentId)
1753            .into_tuple()
1754            .all(&inner.db)
1755            .await?;
1756
1757        fragments.retain(|(_, mask, _)| {
1758            FragmentTypeMask::from(*mask)
1759                .contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink])
1760        });
1761
1762        Ok(fragments
1763            .into_iter()
1764            .at_most_one()
1765            .ok()
1766            .flatten()
1767            .map(|(_, _, count)| count as usize))
1768    }
1769}
1770
1771#[cfg(test)]
1772mod tests {
1773    use std::collections::{BTreeMap, HashMap, HashSet};
1774
1775    use itertools::Itertools;
1776    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1777    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1778    use risingwave_common::util::iter_util::ZipEqDebug;
1779    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1780    use risingwave_meta_model::actor::ActorStatus;
1781    use risingwave_meta_model::fragment::DistributionType;
1782    use risingwave_meta_model::{
1783        ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1784        VnodeBitmap, actor, fragment,
1785    };
1786    use risingwave_pb::common::PbActorLocation;
1787    use risingwave_pb::meta::table_fragments::PbActorStatus;
1788    use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1789    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1790    use risingwave_pb::plan_common::PbExprContext;
1791    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1792    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1793    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1794
1795    use crate::MetaResult;
1796    use crate::controller::catalog::CatalogController;
1797    use crate::model::{Fragment, StreamActor};
1798
1799    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1800
1801    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1802
1803    const TEST_FRAGMENT_ID: FragmentId = 1;
1804
1805    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1806
1807    const TEST_JOB_ID: ObjectId = 1;
1808
1809    const TEST_STATE_TABLE_ID: TableId = 1000;
1810
1811    fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1812        let mut upstream_actor_ids = BTreeMap::new();
1813        upstream_actor_ids.insert(
1814            TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1815            HashSet::from_iter([(actor_id + 100)]),
1816        );
1817        upstream_actor_ids.insert(
1818            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1819            HashSet::from_iter([(actor_id + 200)]),
1820        );
1821        upstream_actor_ids
1822    }
1823
1824    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1825        let mut input = vec![];
1826        for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1827            input.push(PbStreamNode {
1828                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1829                    upstream_fragment_id: *upstream_fragment_id as _,
1830                    ..Default::default()
1831                }))),
1832                ..Default::default()
1833            });
1834        }
1835
1836        PbStreamNode {
1837            input,
1838            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1839            ..Default::default()
1840        }
1841    }
1842
1843    #[tokio::test]
1844    async fn test_extract_fragment() -> MetaResult<()> {
1845        let actor_count = 3u32;
1846        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1847            .map(|actor_id| {
1848                (
1849                    actor_id as _,
1850                    generate_upstream_actor_ids_for_actor(actor_id),
1851                )
1852            })
1853            .collect();
1854
1855        let actor_bitmaps = ActorMapping::new_uniform(
1856            (0..actor_count).map(|i| i as _),
1857            VirtualNode::COUNT_FOR_TEST,
1858        )
1859        .to_bitmaps();
1860
1861        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1862
1863        let pb_actors = (0..actor_count)
1864            .map(|actor_id| StreamActor {
1865                actor_id: actor_id as _,
1866                fragment_id: TEST_FRAGMENT_ID as _,
1867                vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1868                mview_definition: "".to_owned(),
1869                expr_context: Some(PbExprContext {
1870                    time_zone: String::from("America/New_York"),
1871                    strict_mode: false,
1872                }),
1873            })
1874            .collect_vec();
1875
1876        let pb_fragment = Fragment {
1877            fragment_id: TEST_FRAGMENT_ID as _,
1878            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1879            distribution_type: PbFragmentDistributionType::Hash as _,
1880            actors: pb_actors.clone(),
1881            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1882            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1883            nodes: stream_node.clone(),
1884        };
1885
1886        let pb_actor_status = (0..actor_count)
1887            .map(|actor_id| {
1888                (
1889                    actor_id,
1890                    PbActorStatus {
1891                        location: PbActorLocation::from_worker(0),
1892                        state: PbActorState::Running as _,
1893                    },
1894                )
1895            })
1896            .collect();
1897
1898        let pb_actor_splits = Default::default();
1899
1900        let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
1901            TEST_JOB_ID,
1902            &pb_fragment,
1903            &pb_actor_status,
1904            &pb_actor_splits,
1905        )?;
1906
1907        check_fragment(fragment, pb_fragment);
1908        check_actors(
1909            actors,
1910            &upstream_actor_ids,
1911            pb_actors,
1912            Default::default(),
1913            &stream_node,
1914        );
1915
1916        Ok(())
1917    }
1918
1919    #[tokio::test]
1920    async fn test_compose_fragment() -> MetaResult<()> {
1921        let actor_count = 3u32;
1922
1923        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1924            .map(|actor_id| {
1925                (
1926                    actor_id as _,
1927                    generate_upstream_actor_ids_for_actor(actor_id),
1928                )
1929            })
1930            .collect();
1931
1932        let mut actor_bitmaps = ActorMapping::new_uniform(
1933            (0..actor_count).map(|i| i as _),
1934            VirtualNode::COUNT_FOR_TEST,
1935        )
1936        .to_bitmaps();
1937
1938        let actors = (0..actor_count)
1939            .map(|actor_id| {
1940                let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
1941                    splits: vec![PbConnectorSplit {
1942                        split_type: "dummy".to_owned(),
1943                        ..Default::default()
1944                    }],
1945                }));
1946
1947                #[expect(deprecated)]
1948                actor::Model {
1949                    actor_id: actor_id as ActorId,
1950                    fragment_id: TEST_FRAGMENT_ID,
1951                    status: ActorStatus::Running,
1952                    splits: actor_splits,
1953                    worker_id: 0,
1954                    upstream_actor_ids: Default::default(),
1955                    vnode_bitmap: actor_bitmaps
1956                        .remove(&actor_id)
1957                        .map(|bitmap| bitmap.to_protobuf())
1958                        .as_ref()
1959                        .map(VnodeBitmap::from),
1960                    expr_context: ExprContext::from(&PbExprContext {
1961                        time_zone: String::from("America/New_York"),
1962                        strict_mode: false,
1963                    }),
1964                }
1965            })
1966            .collect_vec();
1967
1968        let stream_node = {
1969            let template_actor = actors.first().cloned().unwrap();
1970
1971            let template_upstream_actor_ids = upstream_actor_ids
1972                .get(&(template_actor.actor_id as _))
1973                .unwrap();
1974
1975            generate_merger_stream_node(template_upstream_actor_ids)
1976        };
1977
1978        #[expect(deprecated)]
1979        let fragment = fragment::Model {
1980            fragment_id: TEST_FRAGMENT_ID,
1981            job_id: TEST_JOB_ID,
1982            fragment_type_mask: 0,
1983            distribution_type: DistributionType::Hash,
1984            stream_node: StreamNode::from(&stream_node),
1985            state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
1986            upstream_fragment_id: Default::default(),
1987            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1988        };
1989
1990        let (pb_fragment, pb_actor_status, pb_actor_splits) =
1991            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
1992
1993        assert_eq!(pb_actor_status.len(), actor_count as usize);
1994        assert_eq!(pb_actor_splits.len(), actor_count as usize);
1995
1996        let pb_actors = pb_fragment.actors.clone();
1997
1998        check_fragment(fragment, pb_fragment);
1999        check_actors(
2000            actors,
2001            &upstream_actor_ids,
2002            pb_actors,
2003            pb_actor_splits,
2004            &stream_node,
2005        );
2006
2007        Ok(())
2008    }
2009
2010    fn check_actors(
2011        actors: Vec<actor::Model>,
2012        actor_upstreams: &FragmentActorUpstreams,
2013        pb_actors: Vec<StreamActor>,
2014        pb_actor_splits: HashMap<u32, PbConnectorSplits>,
2015        stream_node: &PbStreamNode,
2016    ) {
2017        for (
2018            actor::Model {
2019                actor_id,
2020                fragment_id,
2021                status,
2022                splits,
2023                worker_id: _,
2024                vnode_bitmap,
2025                expr_context,
2026                ..
2027            },
2028            StreamActor {
2029                actor_id: pb_actor_id,
2030                fragment_id: pb_fragment_id,
2031                vnode_bitmap: pb_vnode_bitmap,
2032                mview_definition,
2033                expr_context: pb_expr_context,
2034                ..
2035            },
2036        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2037        {
2038            assert_eq!(actor_id, pb_actor_id as ActorId);
2039            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2040
2041            assert_eq!(
2042                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2043                pb_vnode_bitmap,
2044            );
2045
2046            assert_eq!(mview_definition, "");
2047
2048            visit_stream_node_body(stream_node, |body| {
2049                if let PbNodeBody::Merge(m) = body {
2050                    assert!(
2051                        actor_upstreams
2052                            .get(&(actor_id as _))
2053                            .unwrap()
2054                            .contains_key(&m.upstream_fragment_id)
2055                    );
2056                }
2057            });
2058
2059            assert_eq!(status, ActorStatus::Running);
2060
2061            assert_eq!(
2062                splits,
2063                pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
2064            );
2065
2066            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2067        }
2068    }
2069
2070    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2071        let Fragment {
2072            fragment_id,
2073            fragment_type_mask,
2074            distribution_type: pb_distribution_type,
2075            actors: _,
2076            state_table_ids: pb_state_table_ids,
2077            maybe_vnode_count: _,
2078            nodes,
2079        } = pb_fragment;
2080
2081        assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2082        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2083        assert_eq!(
2084            pb_distribution_type,
2085            PbFragmentDistributionType::from(fragment.distribution_type)
2086        );
2087
2088        assert_eq!(
2089            pb_state_table_ids,
2090            fragment.state_table_ids.into_u32_array()
2091        );
2092        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2093    }
2094}