risingwave_meta/controller/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::CdcScanOptions;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34    Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39    VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink,
40    source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46    Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::fragment::{
49    FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan;
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57    PbDispatchOutputMapping, PbDispatcherType, PbStreamContext, PbStreamNode, PbStreamScanType,
58    StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63    ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, JoinType, PaginatorTrait,
64    QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
65};
66use serde::{Deserialize, Serialize};
67use tracing::debug;
68
69use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
70use crate::controller::catalog::CatalogController;
71use crate::controller::scale::{
72    FragmentRenderMap, NoShuffleEnsemble, find_fragment_no_shuffle_dags_detailed,
73    load_fragment_info, resolve_streaming_job_definition,
74};
75use crate::controller::utils::{
76    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
77    get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
78    resolve_no_shuffle_actor_dispatcher,
79};
80use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
81use crate::model::{
82    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
83    StreamActor, StreamContext, StreamJobFragments, TableParallelism,
84};
85use crate::rpc::ddl_controller::build_upstream_sink_info;
86use crate::stream::UpstreamSinkInfo;
87use crate::{MetaResult, model};
88
89/// Some information of running (inflight) actors.
90#[derive(Clone, Debug)]
91pub struct InflightActorInfo {
92    pub worker_id: WorkerId,
93    pub vnode_bitmap: Option<Bitmap>,
94    pub splits: Vec<SplitImpl>,
95}
96
97#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
98struct ActorInfo {
99    pub actor_id: ActorId,
100    pub fragment_id: FragmentId,
101    pub splits: ConnectorSplits,
102    pub worker_id: WorkerId,
103    pub vnode_bitmap: Option<VnodeBitmap>,
104    pub expr_context: ExprContext,
105}
106
107#[derive(Clone, Debug)]
108pub struct InflightFragmentInfo {
109    pub fragment_id: crate::model::FragmentId,
110    pub distribution_type: DistributionType,
111    pub fragment_type_mask: FragmentTypeMask,
112    pub vnode_count: usize,
113    pub nodes: PbStreamNode,
114    pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
115    pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
116}
117
118#[derive(Clone, Debug)]
119pub struct FragmentParallelismInfo {
120    pub distribution_type: FragmentDistributionType,
121    pub actor_count: usize,
122    pub vnode_count: usize,
123}
124
125#[easy_ext::ext(FragmentTypeMaskExt)]
126pub impl FragmentTypeMask {
127    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
128        Expr::col(fragment::Column::FragmentTypeMask)
129            .bit_and(Expr::value(flag as i32))
130            .ne(0)
131    }
132
133    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
134        Expr::col(fragment::Column::FragmentTypeMask)
135            .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
136            .ne(0)
137    }
138
139    fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
140        Expr::col(fragment::Column::FragmentTypeMask)
141            .bit_and(Expr::value(flag as i32))
142            .eq(0)
143    }
144}
145
146#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
147#[serde(rename_all = "camelCase")] // for dashboard
148pub struct StreamingJobInfo {
149    pub job_id: JobId,
150    pub obj_type: ObjectType,
151    pub name: String,
152    pub job_status: JobStatus,
153    pub parallelism: StreamingParallelism,
154    pub max_parallelism: i32,
155    pub resource_group: String,
156    pub database_id: DatabaseId,
157    pub schema_id: SchemaId,
158}
159
160impl NotificationManager {
161    pub(crate) fn notify_fragment_mapping(
162        &self,
163        operation: NotificationOperation,
164        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
165    ) {
166        let fragment_ids = fragment_mappings
167            .iter()
168            .map(|mapping| mapping.fragment_id)
169            .collect_vec();
170        if fragment_ids.is_empty() {
171            return;
172        }
173        // notify all fragment mappings to frontend.
174        for fragment_mapping in fragment_mappings {
175            self.notify_frontend_without_version(
176                operation,
177                NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
178            );
179        }
180
181        // update serving vnode mappings.
182        match operation {
183            NotificationOperation::Add | NotificationOperation::Update => {
184                self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
185                    fragment_ids,
186                ));
187            }
188            NotificationOperation::Delete => {
189                self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
190                    fragment_ids,
191                ));
192            }
193            op => {
194                tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
195            }
196        }
197    }
198}
199
200impl CatalogController {
201    pub fn prepare_fragment_models_from_fragments(
202        job_id: JobId,
203        fragments: impl Iterator<Item = &Fragment>,
204    ) -> MetaResult<Vec<fragment::Model>> {
205        fragments
206            .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
207            .try_collect()
208    }
209
210    pub fn prepare_fragment_model_for_new_job(
211        job_id: JobId,
212        fragment: &Fragment,
213    ) -> MetaResult<fragment::Model> {
214        let vnode_count = fragment.vnode_count();
215        let Fragment {
216            fragment_id: pb_fragment_id,
217            fragment_type_mask: pb_fragment_type_mask,
218            distribution_type: pb_distribution_type,
219            actors: pb_actors,
220            state_table_ids: pb_state_table_ids,
221            nodes,
222            ..
223        } = fragment;
224
225        let state_table_ids = pb_state_table_ids.clone().into();
226
227        assert!(!pb_actors.is_empty());
228
229        let fragment_parallelism = nodes
230            .node_body
231            .as_ref()
232            .and_then(|body| match body {
233                NodeBody::StreamCdcScan(node) => Some(node),
234                _ => None,
235            })
236            .and_then(|node| node.options.as_ref())
237            .map(CdcScanOptions::from_proto)
238            .filter(|opts| opts.is_parallelized_backfill())
239            .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
240
241        let stream_node = StreamNode::from(nodes);
242
243        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
244            .unwrap()
245            .into();
246
247        #[expect(deprecated)]
248        let fragment = fragment::Model {
249            fragment_id: *pb_fragment_id as _,
250            job_id,
251            fragment_type_mask: (*pb_fragment_type_mask).into(),
252            distribution_type,
253            stream_node,
254            state_table_ids,
255            upstream_fragment_id: Default::default(),
256            vnode_count: vnode_count as _,
257            parallelism: fragment_parallelism,
258        };
259
260        Ok(fragment)
261    }
262
263    fn compose_table_fragments(
264        job_id: JobId,
265        state: PbState,
266        ctx: Option<PbStreamContext>,
267        fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
268        parallelism: StreamingParallelism,
269        max_parallelism: usize,
270        job_definition: Option<String>,
271    ) -> MetaResult<StreamJobFragments> {
272        let mut pb_fragments = BTreeMap::new();
273        let mut pb_actor_status = BTreeMap::new();
274
275        for (fragment, actors) in fragments {
276            let (fragment, fragment_actor_status, _) =
277                Self::compose_fragment(fragment, actors, job_definition.clone())?;
278
279            pb_fragments.insert(fragment.fragment_id, fragment);
280            pb_actor_status.extend(fragment_actor_status.into_iter());
281        }
282
283        let table_fragments = StreamJobFragments {
284            stream_job_id: job_id,
285            state: state as _,
286            fragments: pb_fragments,
287            actor_status: pb_actor_status,
288            ctx: ctx
289                .as_ref()
290                .map(StreamContext::from_protobuf)
291                .unwrap_or_default(),
292            assigned_parallelism: match parallelism {
293                StreamingParallelism::Custom => TableParallelism::Custom,
294                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
295                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
296            },
297            max_parallelism,
298        };
299
300        Ok(table_fragments)
301    }
302
303    #[allow(clippy::type_complexity)]
304    fn compose_fragment(
305        fragment: fragment::Model,
306        actors: Vec<ActorInfo>,
307        job_definition: Option<String>,
308    ) -> MetaResult<(
309        Fragment,
310        HashMap<crate::model::ActorId, PbActorStatus>,
311        HashMap<crate::model::ActorId, PbConnectorSplits>,
312    )> {
313        let fragment::Model {
314            fragment_id,
315            fragment_type_mask,
316            distribution_type,
317            stream_node,
318            state_table_ids,
319            vnode_count,
320            ..
321        } = fragment;
322
323        let stream_node = stream_node.to_protobuf();
324        let mut upstream_fragments = HashSet::new();
325        visit_stream_node_body(&stream_node, |body| {
326            if let NodeBody::Merge(m) = body {
327                assert!(
328                    upstream_fragments.insert(m.upstream_fragment_id),
329                    "non-duplicate upstream fragment"
330                );
331            }
332        });
333
334        let mut pb_actors = vec![];
335
336        let mut pb_actor_status = HashMap::new();
337        let mut pb_actor_splits = HashMap::new();
338
339        for actor in actors {
340            if actor.fragment_id != fragment_id {
341                bail!(
342                    "fragment id {} from actor {} is different from fragment {}",
343                    actor.fragment_id,
344                    actor.actor_id,
345                    fragment_id
346                )
347            }
348
349            let ActorInfo {
350                actor_id,
351                fragment_id,
352                worker_id,
353                splits,
354                vnode_bitmap,
355                expr_context,
356                ..
357            } = actor;
358
359            let vnode_bitmap =
360                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
361            let pb_expr_context = Some(expr_context.to_protobuf());
362
363            pb_actor_status.insert(
364                actor_id as _,
365                PbActorStatus {
366                    location: PbActorLocation::from_worker(worker_id),
367                },
368            );
369
370            pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
371
372            pb_actors.push(StreamActor {
373                actor_id: actor_id as _,
374                fragment_id: fragment_id as _,
375                vnode_bitmap,
376                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
377                expr_context: pb_expr_context,
378            })
379        }
380
381        let pb_state_table_ids = state_table_ids.0;
382        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
383        let pb_fragment = Fragment {
384            fragment_id: fragment_id as _,
385            fragment_type_mask: fragment_type_mask.into(),
386            distribution_type: pb_distribution_type,
387            actors: pb_actors,
388            state_table_ids: pb_state_table_ids,
389            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
390            nodes: stream_node,
391        };
392
393        Ok((pb_fragment, pb_actor_status, pb_actor_splits))
394    }
395
396    pub fn running_fragment_parallelisms(
397        &self,
398        id_filter: Option<HashSet<FragmentId>>,
399    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
400        let info = self.env.shared_actor_infos().read_guard();
401
402        let mut result = HashMap::new();
403        for (fragment_id, fragment) in info.iter_over_fragments() {
404            if let Some(id_filter) = &id_filter
405                && !id_filter.contains(&(*fragment_id as _))
406            {
407                continue; // Skip fragments not in the filter
408            }
409
410            result.insert(
411                *fragment_id as _,
412                FragmentParallelismInfo {
413                    distribution_type: fragment.distribution_type.into(),
414                    actor_count: fragment.actors.len() as _,
415                    vnode_count: fragment.vnode_count,
416                },
417            );
418        }
419
420        Ok(result)
421    }
422
423    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
424        let inner = self.inner.read().await;
425        let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
426            .select_only()
427            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
428            .into_tuple()
429            .all(&inner.db)
430            .await?;
431        Ok(fragment_jobs.into_iter().collect())
432    }
433
434    pub async fn get_fragment_job_id(
435        &self,
436        fragment_ids: Vec<FragmentId>,
437    ) -> MetaResult<Vec<ObjectId>> {
438        let inner = self.inner.read().await;
439
440        let object_ids: Vec<ObjectId> = FragmentModel::find()
441            .select_only()
442            .column(fragment::Column::JobId)
443            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
444            .into_tuple()
445            .all(&inner.db)
446            .await?;
447
448        Ok(object_ids)
449    }
450
451    pub async fn get_fragment_desc_by_id(
452        &self,
453        fragment_id: FragmentId,
454    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
455        let inner = self.inner.read().await;
456
457        let fragment_model = match FragmentModel::find_by_id(fragment_id)
458            .one(&inner.db)
459            .await?
460        {
461            Some(fragment) => fragment,
462            None => return Ok(None),
463        };
464
465        let job_parallelism: Option<StreamingParallelism> =
466            StreamingJob::find_by_id(fragment_model.job_id)
467                .select_only()
468                .column(streaming_job::Column::Parallelism)
469                .into_tuple()
470                .one(&inner.db)
471                .await?;
472
473        let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
474            .select_only()
475            .columns([
476                fragment_relation::Column::SourceFragmentId,
477                fragment_relation::Column::DispatcherType,
478            ])
479            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
480            .into_tuple()
481            .all(&inner.db)
482            .await?;
483
484        let upstreams: Vec<_> = upstream_entries
485            .into_iter()
486            .map(|(source_id, _)| source_id)
487            .collect();
488
489        let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
490            .await
491            .map(Self::collect_root_fragment_mapping)?;
492        let root_fragments = root_fragment_map
493            .get(&fragment_id)
494            .cloned()
495            .unwrap_or_default();
496
497        let info = self.env.shared_actor_infos().read_guard();
498        let SharedFragmentInfo { actors, .. } = info
499            .get_fragment(fragment_model.fragment_id as _)
500            .unwrap_or_else(|| {
501                panic!(
502                    "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
503                    fragment_model.fragment_id,
504                    fragment_model.job_id
505                )
506            });
507
508        let parallelism_policy = Self::format_fragment_parallelism_policy(
509            &fragment_model,
510            job_parallelism.as_ref(),
511            &root_fragments,
512        );
513
514        let fragment = FragmentDesc {
515            fragment_id: fragment_model.fragment_id,
516            job_id: fragment_model.job_id,
517            fragment_type_mask: fragment_model.fragment_type_mask,
518            distribution_type: fragment_model.distribution_type,
519            state_table_ids: fragment_model.state_table_ids.clone(),
520            parallelism: actors.len() as _,
521            vnode_count: fragment_model.vnode_count,
522            stream_node: fragment_model.stream_node.clone(),
523            parallelism_policy,
524        };
525
526        Ok(Some((fragment, upstreams)))
527    }
528
529    pub async fn list_fragment_database_ids(
530        &self,
531        select_fragment_ids: Option<Vec<FragmentId>>,
532    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
533        let inner = self.inner.read().await;
534        let select = FragmentModel::find()
535            .select_only()
536            .column(fragment::Column::FragmentId)
537            .column(object::Column::DatabaseId)
538            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
539        let select = if let Some(select_fragment_ids) = select_fragment_ids {
540            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
541        } else {
542            select
543        };
544        Ok(select.into_tuple().all(&inner.db).await?)
545    }
546
547    pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
548        let inner = self.inner.read().await;
549
550        // Load fragments matching the job from the database
551        let fragments: Vec<_> = FragmentModel::find()
552            .filter(fragment::Column::JobId.eq(job_id))
553            .all(&inner.db)
554            .await?;
555
556        let job_info = StreamingJob::find_by_id(job_id)
557            .one(&inner.db)
558            .await?
559            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
560
561        let fragment_actors =
562            self.collect_fragment_actor_pairs(fragments, job_info.timezone.clone())?;
563
564        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
565            .await?
566            .remove(&job_id);
567
568        Self::compose_table_fragments(
569            job_id,
570            job_info.job_status.into(),
571            job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
572            fragment_actors,
573            job_info.parallelism.clone(),
574            job_info.max_parallelism as _,
575            job_definition,
576        )
577    }
578
579    pub async fn get_fragment_actor_dispatchers(
580        &self,
581        fragment_ids: Vec<FragmentId>,
582    ) -> MetaResult<FragmentActorDispatchers> {
583        let inner = self.inner.read().await;
584
585        self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
586            .await
587    }
588
589    pub async fn get_fragment_actor_dispatchers_txn(
590        &self,
591        c: &impl ConnectionTrait,
592        fragment_ids: Vec<FragmentId>,
593    ) -> MetaResult<FragmentActorDispatchers> {
594        let fragment_relations = FragmentRelation::find()
595            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
596            .all(c)
597            .await?;
598
599        type FragmentActorInfo = (
600            DistributionType,
601            Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
602        );
603
604        let shared_info = self.env.shared_actor_infos();
605        let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
606        let get_fragment_actors = |fragment_id: FragmentId| async move {
607            let result: MetaResult<FragmentActorInfo> = try {
608                let read_guard = shared_info.read_guard();
609
610                let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
611
612                (
613                    fragment.distribution_type,
614                    Arc::new(
615                        fragment
616                            .actors
617                            .iter()
618                            .map(|(actor_id, actor_info)| {
619                                (
620                                    *actor_id,
621                                    actor_info
622                                        .vnode_bitmap
623                                        .as_ref()
624                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
625                                )
626                            })
627                            .collect(),
628                    ),
629                )
630            };
631            result
632        };
633
634        let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
635        for fragment_relation::Model {
636            source_fragment_id,
637            target_fragment_id,
638            dispatcher_type,
639            dist_key_indices,
640            output_indices,
641            output_type_mapping,
642        } in fragment_relations
643        {
644            let (source_fragment_distribution, source_fragment_actors) = {
645                let (distribution, actors) = {
646                    match fragment_actor_cache.entry(source_fragment_id) {
647                        Entry::Occupied(entry) => entry.into_mut(),
648                        Entry::Vacant(entry) => {
649                            entry.insert(get_fragment_actors(source_fragment_id).await?)
650                        }
651                    }
652                };
653                (*distribution, actors.clone())
654            };
655            let (target_fragment_distribution, target_fragment_actors) = {
656                let (distribution, actors) = {
657                    match fragment_actor_cache.entry(target_fragment_id) {
658                        Entry::Occupied(entry) => entry.into_mut(),
659                        Entry::Vacant(entry) => {
660                            entry.insert(get_fragment_actors(target_fragment_id).await?)
661                        }
662                    }
663                };
664                (*distribution, actors.clone())
665            };
666            let output_mapping = PbDispatchOutputMapping {
667                indices: output_indices.into_u32_array(),
668                types: output_type_mapping.unwrap_or_default().to_protobuf(),
669            };
670            let dispatchers = compose_dispatchers(
671                source_fragment_distribution,
672                &source_fragment_actors,
673                target_fragment_id as _,
674                target_fragment_distribution,
675                &target_fragment_actors,
676                dispatcher_type,
677                dist_key_indices.into_u32_array(),
678                output_mapping,
679            );
680            let actor_dispatchers_map = actor_dispatchers_map
681                .entry(source_fragment_id as _)
682                .or_default();
683            for (actor_id, dispatchers) in dispatchers {
684                actor_dispatchers_map
685                    .entry(actor_id as _)
686                    .or_default()
687                    .push(dispatchers);
688            }
689        }
690        Ok(actor_dispatchers_map)
691    }
692
693    pub async fn get_fragment_downstream_relations(
694        &self,
695        fragment_ids: Vec<FragmentId>,
696    ) -> MetaResult<FragmentDownstreamRelation> {
697        let inner = self.inner.read().await;
698        let mut stream = FragmentRelation::find()
699            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
700            .stream(&inner.db)
701            .await?;
702        let mut relations = FragmentDownstreamRelation::new();
703        while let Some(relation) = stream.try_next().await? {
704            relations
705                .entry(relation.source_fragment_id as _)
706                .or_default()
707                .push(DownstreamFragmentRelation {
708                    downstream_fragment_id: relation.target_fragment_id as _,
709                    dispatcher_type: relation.dispatcher_type,
710                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
711                    output_mapping: PbDispatchOutputMapping {
712                        indices: relation.output_indices.into_u32_array(),
713                        types: relation
714                            .output_type_mapping
715                            .unwrap_or_default()
716                            .to_protobuf(),
717                    },
718                });
719        }
720        Ok(relations)
721    }
722
723    pub async fn get_job_fragment_backfill_scan_type(
724        &self,
725        job_id: ObjectId,
726    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
727        let inner = self.inner.read().await;
728        let fragments: Vec<_> = FragmentModel::find()
729            .filter(fragment::Column::JobId.eq(job_id))
730            .all(&inner.db)
731            .await?;
732
733        let mut result = HashMap::new();
734
735        for fragment::Model {
736            fragment_id,
737            stream_node,
738            ..
739        } in fragments
740        {
741            let stream_node = stream_node.to_protobuf();
742            visit_stream_node_body(&stream_node, |body| {
743                if let NodeBody::StreamScan(node) = body {
744                    match node.stream_scan_type() {
745                        StreamScanType::Unspecified => {}
746                        scan_type => {
747                            result.insert(fragment_id as crate::model::FragmentId, scan_type);
748                        }
749                    }
750                }
751            });
752        }
753
754        Ok(result)
755    }
756
757    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
758        let inner = self.inner.read().await;
759        let job_states = StreamingJob::find()
760            .select_only()
761            .column(streaming_job::Column::JobId)
762            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
763            .join(JoinType::InnerJoin, object::Relation::Database2.def())
764            .column(object::Column::ObjType)
765            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
766            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
767            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
768            .column_as(
769                Expr::if_null(
770                    Expr::col((table::Entity, table::Column::Name)),
771                    Expr::if_null(
772                        Expr::col((source::Entity, source::Column::Name)),
773                        Expr::if_null(
774                            Expr::col((sink::Entity, sink::Column::Name)),
775                            Expr::val("<unknown>"),
776                        ),
777                    ),
778                ),
779                "name",
780            )
781            .columns([
782                streaming_job::Column::JobStatus,
783                streaming_job::Column::Parallelism,
784                streaming_job::Column::MaxParallelism,
785            ])
786            .column_as(
787                Expr::if_null(
788                    Expr::col((
789                        streaming_job::Entity,
790                        streaming_job::Column::SpecificResourceGroup,
791                    )),
792                    Expr::col((database::Entity, database::Column::ResourceGroup)),
793                ),
794                "resource_group",
795            )
796            .column(object::Column::DatabaseId)
797            .column(object::Column::SchemaId)
798            .into_model()
799            .all(&inner.db)
800            .await?;
801        Ok(job_states)
802    }
803
804    pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
805        let inner = self.inner.read().await;
806        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
807            .select_only()
808            .column(streaming_job::Column::MaxParallelism)
809            .into_tuple()
810            .one(&inner.db)
811            .await?
812            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
813        Ok(max_parallelism as usize)
814    }
815
816    /// Try to get internal table ids of each streaming job, used by metrics collection.
817    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
818        if let Ok(inner) = self.inner.try_read()
819            && let Ok(job_state_tables) = FragmentModel::find()
820                .select_only()
821                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
822                .into_tuple::<(JobId, I32Array)>()
823                .all(&inner.db)
824                .await
825        {
826            let mut job_internal_table_ids = HashMap::new();
827            for (job_id, state_table_ids) in job_state_tables {
828                job_internal_table_ids
829                    .entry(job_id)
830                    .or_insert_with(Vec::new)
831                    .extend(
832                        state_table_ids
833                            .into_inner()
834                            .into_iter()
835                            .map(|table_id| TableId::new(table_id as _)),
836                    );
837            }
838            return Some(job_internal_table_ids.into_iter().collect());
839        }
840        None
841    }
842
843    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
844        let inner = self.inner.read().await;
845        let count = FragmentModel::find().count(&inner.db).await?;
846        Ok(count > 0)
847    }
848
849    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
850        let read_guard = self.env.shared_actor_infos().read_guard();
851        let actor_cnt: HashMap<WorkerId, _> = read_guard
852            .iter_over_fragments()
853            .flat_map(|(_, fragment)| {
854                fragment
855                    .actors
856                    .iter()
857                    .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
858            })
859            .into_group_map()
860            .into_iter()
861            .map(|(k, v)| (k, v.len()))
862            .collect();
863
864        Ok(actor_cnt)
865    }
866
867    fn collect_fragment_actor_map(
868        &self,
869        fragment_ids: &[FragmentId],
870        timezone: Option<String>,
871    ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
872        let guard = self.env.shared_actor_infos().read_guard();
873        let stream_context = StreamContext { timezone };
874        let pb_expr_context = stream_context.to_expr_context();
875        let expr_context: ExprContext = (&pb_expr_context).into();
876
877        let mut actor_map = HashMap::with_capacity(fragment_ids.len());
878        for fragment_id in fragment_ids {
879            let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
880                anyhow!("fragment {} not found in shared actor info", fragment_id)
881            })?;
882
883            let actors = fragment_info
884                .actors
885                .iter()
886                .map(|(actor_id, actor_info)| ActorInfo {
887                    actor_id: *actor_id as _,
888                    fragment_id: *fragment_id,
889                    splits: ConnectorSplits::from(&PbConnectorSplits {
890                        splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
891                    }),
892                    worker_id: actor_info.worker_id as _,
893                    vnode_bitmap: actor_info
894                        .vnode_bitmap
895                        .as_ref()
896                        .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
897                    expr_context: expr_context.clone(),
898                })
899                .collect();
900
901            actor_map.insert(*fragment_id, actors);
902        }
903
904        Ok(actor_map)
905    }
906
907    fn collect_fragment_actor_pairs(
908        &self,
909        fragments: Vec<fragment::Model>,
910        timezone: Option<String>,
911    ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
912        let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
913        let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, timezone)?;
914        fragments
915            .into_iter()
916            .map(|fragment| {
917                let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
918                    anyhow!(
919                        "fragment {} missing in shared actor info map",
920                        fragment.fragment_id
921                    )
922                })?;
923                Ok((fragment, actors))
924            })
925            .collect()
926    }
927
928    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
929    pub async fn table_fragments(&self) -> MetaResult<BTreeMap<JobId, StreamJobFragments>> {
930        let inner = self.inner.read().await;
931        let jobs = StreamingJob::find().all(&inner.db).await?;
932
933        let mut job_definition = resolve_streaming_job_definition(
934            &inner.db,
935            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
936        )
937        .await?;
938
939        let mut table_fragments = BTreeMap::new();
940        for job in jobs {
941            let fragments = FragmentModel::find()
942                .filter(fragment::Column::JobId.eq(job.job_id))
943                .all(&inner.db)
944                .await?;
945
946            let fragment_actors =
947                self.collect_fragment_actor_pairs(fragments, job.timezone.clone())?;
948
949            table_fragments.insert(
950                job.job_id,
951                Self::compose_table_fragments(
952                    job.job_id,
953                    job.job_status.into(),
954                    job.timezone.map(|tz| PbStreamContext { timezone: tz }),
955                    fragment_actors,
956                    job.parallelism.clone(),
957                    job.max_parallelism as _,
958                    job_definition.remove(&job.job_id),
959                )?,
960            );
961        }
962
963        Ok(table_fragments)
964    }
965
966    pub async fn upstream_fragments(
967        &self,
968        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
969    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
970        let inner = self.inner.read().await;
971        let mut stream = FragmentRelation::find()
972            .select_only()
973            .columns([
974                fragment_relation::Column::SourceFragmentId,
975                fragment_relation::Column::TargetFragmentId,
976            ])
977            .filter(
978                fragment_relation::Column::TargetFragmentId
979                    .is_in(fragment_ids.map(|id| id as FragmentId)),
980            )
981            .into_tuple::<(FragmentId, FragmentId)>()
982            .stream(&inner.db)
983            .await?;
984        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
985        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
986            upstream_fragments
987                .entry(downstream_fragment_id as crate::model::FragmentId)
988                .or_default()
989                .insert(upstream_fragment_id as crate::model::FragmentId);
990        }
991        Ok(upstream_fragments)
992    }
993
994    pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
995        let info = self.env.shared_actor_infos().read_guard();
996
997        let actor_locations = info
998            .iter_over_fragments()
999            .flat_map(|(fragment_id, fragment)| {
1000                fragment
1001                    .actors
1002                    .iter()
1003                    .map(|(actor_id, actor)| PartialActorLocation {
1004                        actor_id: *actor_id as _,
1005                        fragment_id: *fragment_id as _,
1006                        worker_id: actor.worker_id,
1007                    })
1008            })
1009            .collect_vec();
1010
1011        Ok(actor_locations)
1012    }
1013
1014    pub async fn list_actor_info(
1015        &self,
1016    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1017        let inner = self.inner.read().await;
1018
1019        let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1020            FragmentModel::find()
1021                .select_only()
1022                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1023                .column(fragment::Column::FragmentId)
1024                .column_as(object::Column::Oid, "job_id")
1025                .column_as(object::Column::SchemaId, "schema_id")
1026                .column_as(object::Column::ObjType, "type")
1027                .into_tuple()
1028                .all(&inner.db)
1029                .await?;
1030
1031        let actor_infos = {
1032            let info = self.env.shared_actor_infos().read_guard();
1033
1034            fragment_objects
1035                .into_iter()
1036                .flat_map(|(fragment_id, object_id, schema_id, object_type)| {
1037                    let SharedFragmentInfo {
1038                        fragment_id,
1039                        actors,
1040                        ..
1041                    } = info.get_fragment(fragment_id as _).unwrap();
1042                    actors.keys().map(move |actor_id| {
1043                        (
1044                            *actor_id as _,
1045                            *fragment_id as _,
1046                            object_id,
1047                            schema_id,
1048                            object_type,
1049                        )
1050                    })
1051                })
1052                .collect_vec()
1053        };
1054
1055        Ok(actor_infos)
1056    }
1057
1058    pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1059        let guard = self.env.shared_actor_info.read_guard();
1060        guard
1061            .iter_over_fragments()
1062            .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1063            .collect_vec()
1064    }
1065
1066    pub async fn list_fragment_descs(
1067        &self,
1068        is_creating: bool,
1069    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1070        let inner = self.inner.read().await;
1071
1072        let txn = inner.db.begin().await?;
1073
1074        let fragments: Vec<_> = if is_creating {
1075            FragmentModel::find()
1076                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1077                .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1078                .filter(
1079                    streaming_job::Column::JobStatus
1080                        .eq(JobStatus::Initial)
1081                        .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1082                )
1083                .all(&txn)
1084                .await?
1085        } else {
1086            FragmentModel::find().all(&txn).await?
1087        };
1088
1089        let fragment_ids = fragments.iter().map(|f| f.fragment_id).collect_vec();
1090
1091        let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragments.is_empty() {
1092            HashMap::new()
1093        } else {
1094            let job_ids = fragments.iter().map(|f| f.job_id).unique().collect_vec();
1095            StreamingJob::find()
1096                .select_only()
1097                .columns([
1098                    streaming_job::Column::JobId,
1099                    streaming_job::Column::Parallelism,
1100                ])
1101                .filter(streaming_job::Column::JobId.is_in(job_ids))
1102                .into_tuple()
1103                .all(&txn)
1104                .await?
1105                .into_iter()
1106                .collect()
1107        };
1108
1109        let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1110            if fragment_ids.is_empty() {
1111                Vec::new()
1112            } else {
1113                FragmentRelation::find()
1114                    .select_only()
1115                    .columns([
1116                        fragment_relation::Column::TargetFragmentId,
1117                        fragment_relation::Column::SourceFragmentId,
1118                        fragment_relation::Column::DispatcherType,
1119                    ])
1120                    .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1121                    .into_tuple()
1122                    .all(&txn)
1123                    .await?
1124            };
1125
1126        let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1127        for (target_id, source_id, _) in upstream_entries {
1128            all_upstreams.entry(target_id).or_default().push(source_id);
1129        }
1130
1131        let root_fragment_map = if fragment_ids.is_empty() {
1132            HashMap::new()
1133        } else {
1134            let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?;
1135            Self::collect_root_fragment_mapping(ensembles)
1136        };
1137
1138        let guard = self.env.shared_actor_info.read_guard();
1139
1140        let mut result = Vec::new();
1141
1142        for fragment_desc in fragments {
1143            // note: here sometimes fragment is not found in the cache, fallback to 0
1144            let parallelism = guard
1145                .get_fragment(fragment_desc.fragment_id as _)
1146                .map(|fragment| fragment.actors.len())
1147                .unwrap_or_default();
1148
1149            let root_fragments = root_fragment_map
1150                .get(&fragment_desc.fragment_id)
1151                .cloned()
1152                .unwrap_or_default();
1153
1154            let upstreams = all_upstreams
1155                .remove(&fragment_desc.fragment_id)
1156                .unwrap_or_default();
1157
1158            let parallelism_policy = Self::format_fragment_parallelism_policy(
1159                &fragment_desc,
1160                job_parallelisms.get(&fragment_desc.job_id),
1161                &root_fragments,
1162            );
1163
1164            let fragment = FragmentDistribution {
1165                fragment_id: fragment_desc.fragment_id,
1166                table_id: fragment_desc.job_id,
1167                distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type)
1168                    as _,
1169                state_table_ids: fragment_desc.state_table_ids.0,
1170                upstream_fragment_ids: upstreams.clone(),
1171                fragment_type_mask: fragment_desc.fragment_type_mask as _,
1172                parallelism: parallelism as _,
1173                vnode_count: fragment_desc.vnode_count as _,
1174                node: Some(fragment_desc.stream_node.to_protobuf()),
1175                parallelism_policy,
1176            };
1177
1178            result.push((fragment, upstreams));
1179        }
1180        Ok(result)
1181    }
1182
1183    /// Build a fragment-to-root lookup for all reported root fragment ensembles.
1184    fn collect_root_fragment_mapping(
1185        ensembles: Vec<NoShuffleEnsemble>,
1186    ) -> HashMap<FragmentId, Vec<FragmentId>> {
1187        let mut mapping = HashMap::new();
1188
1189        for ensemble in ensembles {
1190            let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1191            roots.sort_unstable();
1192            roots.dedup();
1193
1194            if roots.is_empty() {
1195                continue;
1196            }
1197
1198            let root_set: HashSet<_> = roots.iter().copied().collect();
1199
1200            for fragment_id in ensemble.component_fragments() {
1201                if root_set.contains(&fragment_id) {
1202                    mapping.insert(fragment_id, Vec::new());
1203                } else {
1204                    mapping.insert(fragment_id, roots.clone());
1205                }
1206            }
1207        }
1208
1209        mapping
1210    }
1211
1212    fn format_fragment_parallelism_policy(
1213        fragment: &fragment::Model,
1214        job_parallelism: Option<&StreamingParallelism>,
1215        root_fragments: &[FragmentId],
1216    ) -> String {
1217        if fragment.distribution_type == DistributionType::Single {
1218            return "single".to_owned();
1219        }
1220
1221        if let Some(parallelism) = fragment.parallelism.as_ref() {
1222            return format!(
1223                "override({})",
1224                Self::format_streaming_parallelism(parallelism)
1225            );
1226        }
1227
1228        if !root_fragments.is_empty() {
1229            let mut upstreams = root_fragments.to_vec();
1230            upstreams.sort_unstable();
1231            upstreams.dedup();
1232
1233            return format!("upstream_fragment({upstreams:?})");
1234        }
1235
1236        let inherited = job_parallelism
1237            .map(Self::format_streaming_parallelism)
1238            .unwrap_or_else(|| "unknown".to_owned());
1239        format!("inherit({inherited})")
1240    }
1241
1242    fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1243        match parallelism {
1244            StreamingParallelism::Adaptive => "adaptive".to_owned(),
1245            StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1246            StreamingParallelism::Custom => "custom".to_owned(),
1247        }
1248    }
1249
1250    pub async fn list_sink_actor_mapping(
1251        &self,
1252    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1253        let inner = self.inner.read().await;
1254        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1255            .select_only()
1256            .columns([sink::Column::SinkId, sink::Column::Name])
1257            .into_tuple()
1258            .all(&inner.db)
1259            .await?;
1260        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1261
1262        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1263
1264        let actor_with_type: Vec<(ActorId, SinkId)> = {
1265            let info = self.env.shared_actor_infos().read_guard();
1266
1267            info.iter_over_fragments()
1268                .filter(|(_, fragment)| {
1269                    sink_ids.contains(&fragment.job_id.as_sink_id())
1270                        && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1271                })
1272                .flat_map(|(_, fragment)| {
1273                    fragment
1274                        .actors
1275                        .keys()
1276                        .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1277                })
1278                .collect()
1279        };
1280
1281        let mut sink_actor_mapping = HashMap::new();
1282        for (actor_id, sink_id) in actor_with_type {
1283            sink_actor_mapping
1284                .entry(sink_id)
1285                .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1286                .1
1287                .push(actor_id);
1288        }
1289
1290        Ok(sink_actor_mapping)
1291    }
1292
1293    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1294        let inner = self.inner.read().await;
1295        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1296            .select_only()
1297            .columns([
1298                fragment::Column::FragmentId,
1299                fragment::Column::JobId,
1300                fragment::Column::StateTableIds,
1301            ])
1302            .into_partial_model()
1303            .all(&inner.db)
1304            .await?;
1305        Ok(fragment_state_tables)
1306    }
1307
1308    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1309    /// collected
1310    pub async fn load_all_actors_dynamic(
1311        &self,
1312        database_id: Option<DatabaseId>,
1313        worker_nodes: &ActiveStreamingWorkerNodes,
1314    ) -> MetaResult<FragmentRenderMap> {
1315        let adaptive_parallelism_strategy = {
1316            let system_params_reader = self.env.system_params_reader().await;
1317            system_params_reader.adaptive_parallelism_strategy()
1318        };
1319
1320        let inner = self.inner.read().await;
1321        let txn = inner.db.begin().await?;
1322
1323        let database_fragment_infos = load_fragment_info(
1324            &txn,
1325            self.env.actor_id_generator(),
1326            database_id,
1327            worker_nodes,
1328            adaptive_parallelism_strategy,
1329        )
1330        .await?;
1331
1332        debug!(?database_fragment_infos, "reload all actors");
1333
1334        Ok(database_fragment_infos)
1335    }
1336
1337    #[await_tree::instrument]
1338    pub async fn fill_snapshot_backfill_epoch(
1339        &self,
1340        fragment_ids: impl Iterator<Item = FragmentId>,
1341        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1342        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1343    ) -> MetaResult<()> {
1344        let inner = self.inner.write().await;
1345        let txn = inner.db.begin().await?;
1346        for fragment_id in fragment_ids {
1347            let fragment = FragmentModel::find_by_id(fragment_id)
1348                .one(&txn)
1349                .await?
1350                .context(format!("fragment {} not found", fragment_id))?;
1351            let mut node = fragment.stream_node.to_protobuf();
1352            if crate::stream::fill_snapshot_backfill_epoch(
1353                &mut node,
1354                snapshot_backfill_info,
1355                cross_db_snapshot_backfill_info,
1356            )? {
1357                let node = StreamNode::from(&node);
1358                FragmentModel::update(fragment::ActiveModel {
1359                    fragment_id: Set(fragment_id),
1360                    stream_node: Set(node),
1361                    ..Default::default()
1362                })
1363                .exec(&txn)
1364                .await?;
1365            }
1366        }
1367        txn.commit().await?;
1368        Ok(())
1369    }
1370
1371    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1372    pub fn get_running_actors_of_fragment(
1373        &self,
1374        fragment_id: FragmentId,
1375    ) -> MetaResult<HashSet<model::ActorId>> {
1376        let info = self.env.shared_actor_infos().read_guard();
1377
1378        let actors = info
1379            .get_fragment(fragment_id as _)
1380            .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1381            .unwrap_or_default();
1382
1383        Ok(actors)
1384    }
1385
1386    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1387    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1388    pub async fn get_running_actors_for_source_backfill(
1389        &self,
1390        source_backfill_fragment_id: FragmentId,
1391        source_fragment_id: FragmentId,
1392    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1393        let inner = self.inner.read().await;
1394        let txn = inner.db.begin().await?;
1395
1396        let fragment_relation: DispatcherType = FragmentRelation::find()
1397            .select_only()
1398            .column(fragment_relation::Column::DispatcherType)
1399            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1400            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1401            .into_tuple()
1402            .one(&txn)
1403            .await?
1404            .ok_or_else(|| {
1405                anyhow!(
1406                    "no fragment connection from source fragment {} to source backfill fragment {}",
1407                    source_fragment_id,
1408                    source_backfill_fragment_id
1409                )
1410            })?;
1411
1412        if fragment_relation != DispatcherType::NoShuffle {
1413            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1414        }
1415
1416        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1417            let result: MetaResult<DistributionType> = try {
1418                FragmentModel::find_by_id(fragment_id)
1419                    .select_only()
1420                    .column(fragment::Column::DistributionType)
1421                    .into_tuple()
1422                    .one(txn)
1423                    .await?
1424                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1425            };
1426            result
1427        };
1428
1429        let source_backfill_distribution_type =
1430            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1431        let source_distribution_type =
1432            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1433
1434        let load_fragment_actor_distribution =
1435            |actor_info: &SharedActorInfos,
1436             fragment_id: FragmentId|
1437             -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1438                let guard = actor_info.read_guard();
1439
1440                guard
1441                    .get_fragment(fragment_id as _)
1442                    .map(|fragment| {
1443                        fragment
1444                            .actors
1445                            .iter()
1446                            .map(|(actor_id, actor)| {
1447                                (
1448                                    *actor_id as _,
1449                                    actor
1450                                        .vnode_bitmap
1451                                        .as_ref()
1452                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1453                                )
1454                            })
1455                            .collect()
1456                    })
1457                    .unwrap_or_default()
1458            };
1459
1460        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1461            load_fragment_actor_distribution(
1462                self.env.shared_actor_infos(),
1463                source_backfill_fragment_id,
1464            );
1465
1466        let source_actors =
1467            load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1468
1469        Ok(resolve_no_shuffle_actor_dispatcher(
1470            source_distribution_type,
1471            &source_actors,
1472            source_backfill_distribution_type,
1473            &source_backfill_actors,
1474        )
1475        .into_iter()
1476        .map(|(source_actor, source_backfill_actor)| {
1477            (source_backfill_actor as _, source_actor as _)
1478        })
1479        .collect())
1480    }
1481
1482    /// Get and filter the "**root**" fragments of the specified jobs.
1483    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1484    ///
1485    /// Root fragment connects to downstream jobs.
1486    ///
1487    /// ## What can be the root fragment
1488    /// - For sink, it should have one `Sink` fragment.
1489    /// - For MV, it should have one `MView` fragment.
1490    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1491    /// - For source, it should have one `Source` fragment.
1492    ///
1493    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1494    pub async fn get_root_fragments(
1495        &self,
1496        job_ids: Vec<JobId>,
1497    ) -> MetaResult<(
1498        HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1499        HashMap<ActorId, WorkerId>,
1500    )> {
1501        let inner = self.inner.read().await;
1502
1503        let all_fragments = FragmentModel::find()
1504            .filter(fragment::Column::JobId.is_in(job_ids))
1505            .all(&inner.db)
1506            .await?;
1507        // job_id -> fragment
1508        let mut root_fragments = HashMap::<JobId, fragment::Model>::new();
1509        for fragment in all_fragments {
1510            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1511            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1512                _ = root_fragments.insert(fragment.job_id, fragment);
1513            } else if mask.contains(FragmentTypeFlag::Source) {
1514                // look for Source fragment only if there's no MView fragment
1515                // (notice try_insert here vs insert above)
1516                _ = root_fragments.try_insert(fragment.job_id, fragment);
1517            }
1518        }
1519
1520        let mut root_fragments_pb = HashMap::new();
1521
1522        let info = self.env.shared_actor_infos().read_guard();
1523
1524        let root_fragment_to_jobs: HashMap<_, _> = root_fragments
1525            .iter()
1526            .map(|(job_id, fragment)| (fragment.fragment_id, *job_id))
1527            .collect();
1528
1529        for fragment in root_fragment_to_jobs.keys() {
1530            let fragment_info = info.get_fragment(*fragment).context(format!(
1531                "fragment {} not found in shared actor info",
1532                fragment
1533            ))?;
1534
1535            let job_id = root_fragment_to_jobs[&fragment_info.fragment_id];
1536            let fragment = root_fragments
1537                .get(&job_id)
1538                .context(format!("root fragment for job {} not found", job_id))?;
1539
1540            root_fragments_pb.insert(
1541                job_id,
1542                (fragment_info.clone(), fragment.stream_node.to_protobuf()),
1543            );
1544        }
1545
1546        let mut all_actor_locations = HashMap::new();
1547
1548        for (_, SharedFragmentInfo { actors, .. }) in info.iter_over_fragments() {
1549            for (actor_id, actor_info) in actors {
1550                all_actor_locations.insert(*actor_id as ActorId, actor_info.worker_id);
1551            }
1552        }
1553
1554        Ok((root_fragments_pb, all_actor_locations))
1555    }
1556
1557    pub async fn get_root_fragment(
1558        &self,
1559        job_id: JobId,
1560    ) -> MetaResult<(SharedFragmentInfo, HashMap<ActorId, WorkerId>)> {
1561        let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1562        let (root_fragment, _) = root_fragments
1563            .remove(&job_id)
1564            .context(format!("root fragment for job {} not found", job_id))?;
1565
1566        Ok((root_fragment, actors))
1567    }
1568
1569    /// Get the downstream fragments connected to the specified job.
1570    pub async fn get_downstream_fragments(
1571        &self,
1572        job_id: JobId,
1573    ) -> MetaResult<(
1574        Vec<(
1575            stream_plan::DispatcherType,
1576            SharedFragmentInfo,
1577            PbStreamNode,
1578        )>,
1579        HashMap<ActorId, WorkerId>,
1580    )> {
1581        let (root_fragment, actor_locations) = self.get_root_fragment(job_id).await?;
1582
1583        let inner = self.inner.read().await;
1584        let txn = inner.db.begin().await?;
1585        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1586            .filter(
1587                fragment_relation::Column::SourceFragmentId
1588                    .eq(root_fragment.fragment_id as FragmentId),
1589            )
1590            .all(&txn)
1591            .await?;
1592
1593        let downstream_fragment_ids = downstream_fragment_relations
1594            .iter()
1595            .map(|model| model.target_fragment_id as FragmentId)
1596            .collect::<HashSet<_>>();
1597
1598        let downstream_fragment_nodes: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1599            .select_only()
1600            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1601            .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1602            .into_tuple()
1603            .all(&txn)
1604            .await?;
1605
1606        let downstream_fragment_nodes: HashMap<_, _> =
1607            downstream_fragment_nodes.into_iter().collect();
1608
1609        let mut downstream_fragments = vec![];
1610
1611        let info = self.env.shared_actor_infos().read_guard();
1612
1613        let fragment_map: HashMap<_, _> = downstream_fragment_relations
1614            .iter()
1615            .map(|model| (model.target_fragment_id, model.dispatcher_type))
1616            .collect();
1617
1618        for fragment_id in fragment_map.keys() {
1619            let fragment_info @ SharedFragmentInfo { actors, .. } =
1620                info.get_fragment(*fragment_id).unwrap();
1621
1622            let dispatcher_type = fragment_map[fragment_id];
1623
1624            if actors.is_empty() {
1625                bail!("No fragment found for fragment id {}", fragment_id);
1626            }
1627
1628            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1629
1630            let nodes = downstream_fragment_nodes
1631                .get(fragment_id)
1632                .context(format!(
1633                    "downstream fragment node for id {} not found",
1634                    fragment_id
1635                ))?
1636                .to_protobuf();
1637
1638            downstream_fragments.push((dispatch_type, fragment_info.clone(), nodes));
1639        }
1640        Ok((downstream_fragments, actor_locations))
1641    }
1642
1643    pub async fn load_source_fragment_ids(
1644        &self,
1645    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1646        let inner = self.inner.read().await;
1647        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1648            .select_only()
1649            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1650            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1651            .into_tuple()
1652            .all(&inner.db)
1653            .await?;
1654
1655        let mut source_fragment_ids = HashMap::new();
1656        for (fragment_id, stream_node) in fragments {
1657            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1658                source_fragment_ids
1659                    .entry(source_id)
1660                    .or_insert_with(BTreeSet::new)
1661                    .insert(fragment_id);
1662            }
1663        }
1664        Ok(source_fragment_ids)
1665    }
1666
1667    pub async fn load_backfill_fragment_ids(
1668        &self,
1669    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1670        let inner = self.inner.read().await;
1671        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1672            .select_only()
1673            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1674            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1675            .into_tuple()
1676            .all(&inner.db)
1677            .await?;
1678
1679        let mut source_fragment_ids = HashMap::new();
1680        for (fragment_id, stream_node) in fragments {
1681            if let Some((source_id, upstream_source_fragment_id)) =
1682                stream_node.to_protobuf().find_source_backfill()
1683            {
1684                source_fragment_ids
1685                    .entry(source_id)
1686                    .or_insert_with(BTreeSet::new)
1687                    .insert((fragment_id, upstream_source_fragment_id));
1688            }
1689        }
1690        Ok(source_fragment_ids)
1691    }
1692
1693    pub async fn get_all_upstream_sink_infos(
1694        &self,
1695        target_table: &PbTable,
1696        target_fragment_id: FragmentId,
1697    ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1698        let incoming_sinks = self.get_table_incoming_sinks(target_table.id).await?;
1699
1700        let inner = self.inner.read().await;
1701        let txn = inner.db.begin().await?;
1702
1703        let sink_ids = incoming_sinks.iter().map(|s| s.id).collect_vec();
1704        let sink_fragment_ids = get_sink_fragment_by_ids(&txn, sink_ids).await?;
1705
1706        let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1707        for pb_sink in &incoming_sinks {
1708            let sink_fragment_id =
1709                sink_fragment_ids
1710                    .get(&pb_sink.id)
1711                    .cloned()
1712                    .ok_or(anyhow::anyhow!(
1713                        "sink fragment not found for sink id {}",
1714                        pb_sink.id
1715                    ))?;
1716            let upstream_info = build_upstream_sink_info(
1717                pb_sink,
1718                sink_fragment_id,
1719                target_table,
1720                target_fragment_id,
1721            )?;
1722            upstream_sink_infos.push(upstream_info);
1723        }
1724
1725        Ok(upstream_sink_infos)
1726    }
1727
1728    pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1729        let inner = self.inner.read().await;
1730        let txn = inner.db.begin().await?;
1731
1732        let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1733            .select_only()
1734            .column(fragment::Column::FragmentId)
1735            .filter(
1736                fragment::Column::JobId
1737                    .eq(job_id)
1738                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1739            )
1740            .into_tuple()
1741            .all(&txn)
1742            .await?;
1743
1744        if mview_fragment.len() != 1 {
1745            return Err(anyhow::anyhow!(
1746                "expected exactly one mview fragment for job {}, found {}",
1747                job_id,
1748                mview_fragment.len()
1749            )
1750            .into());
1751        }
1752
1753        Ok(mview_fragment.into_iter().next().unwrap())
1754    }
1755
1756    pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1757        let inner = self.inner.read().await;
1758        let txn = inner.db.begin().await?;
1759        has_table_been_migrated(&txn, table_id).await
1760    }
1761
1762    pub async fn update_fragment_splits<C>(
1763        &self,
1764        txn: &C,
1765        fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
1766    ) -> MetaResult<()>
1767    where
1768        C: ConnectionTrait,
1769    {
1770        if fragment_splits.is_empty() {
1771            return Ok(());
1772        }
1773
1774        let models: Vec<fragment_splits::ActiveModel> = fragment_splits
1775            .iter()
1776            .map(|(fragment_id, splits)| fragment_splits::ActiveModel {
1777                fragment_id: Set(*fragment_id as _),
1778                splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1779                    splits: splits.iter().map(Into::into).collect_vec(),
1780                }))),
1781            })
1782            .collect();
1783
1784        FragmentSplits::insert_many(models)
1785            .on_conflict(
1786                OnConflict::column(fragment_splits::Column::FragmentId)
1787                    .update_column(fragment_splits::Column::Splits)
1788                    .to_owned(),
1789            )
1790            .exec(txn)
1791            .await?;
1792
1793        Ok(())
1794    }
1795}
1796
1797#[cfg(test)]
1798mod tests {
1799    use std::collections::{BTreeMap, HashMap, HashSet};
1800
1801    use itertools::Itertools;
1802    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1803    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1804    use risingwave_common::id::JobId;
1805    use risingwave_common::util::iter_util::ZipEqDebug;
1806    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1807    use risingwave_meta_model::fragment::DistributionType;
1808    use risingwave_meta_model::*;
1809    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1810    use risingwave_pb::plan_common::PbExprContext;
1811    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1812    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1813    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1814
1815    use super::ActorInfo;
1816    use crate::MetaResult;
1817    use crate::controller::catalog::CatalogController;
1818    use crate::model::{Fragment, StreamActor};
1819
1820    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1821
1822    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1823
1824    const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
1825
1826    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
1827
1828    const TEST_JOB_ID: JobId = JobId::new(1);
1829
1830    const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
1831
1832    fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
1833        let mut upstream_actor_ids = BTreeMap::new();
1834        upstream_actor_ids.insert(
1835            TEST_UPSTREAM_FRAGMENT_ID,
1836            HashSet::from_iter([(actor_id + 100)]),
1837        );
1838        upstream_actor_ids.insert(
1839            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1840            HashSet::from_iter([(actor_id + 200)]),
1841        );
1842        upstream_actor_ids
1843    }
1844
1845    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1846        let mut input = vec![];
1847        for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
1848            input.push(PbStreamNode {
1849                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1850                    upstream_fragment_id,
1851                    ..Default::default()
1852                }))),
1853                ..Default::default()
1854            });
1855        }
1856
1857        PbStreamNode {
1858            input,
1859            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1860            ..Default::default()
1861        }
1862    }
1863
1864    #[tokio::test]
1865    async fn test_extract_fragment() -> MetaResult<()> {
1866        let actor_count = 3u32;
1867        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1868            .map(|actor_id| {
1869                (
1870                    actor_id.into(),
1871                    generate_upstream_actor_ids_for_actor(actor_id.into()),
1872                )
1873            })
1874            .collect();
1875
1876        let actor_bitmaps = ActorMapping::new_uniform(
1877            (0..actor_count).map(|i| i.into()),
1878            VirtualNode::COUNT_FOR_TEST,
1879        )
1880        .to_bitmaps();
1881
1882        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1883
1884        let pb_actors = (0..actor_count)
1885            .map(|actor_id| StreamActor {
1886                actor_id: actor_id.into(),
1887                fragment_id: TEST_FRAGMENT_ID as _,
1888                vnode_bitmap: actor_bitmaps.get(&actor_id.into()).cloned(),
1889                mview_definition: "".to_owned(),
1890                expr_context: Some(PbExprContext {
1891                    time_zone: String::from("America/New_York"),
1892                    strict_mode: false,
1893                }),
1894            })
1895            .collect_vec();
1896
1897        let pb_fragment = Fragment {
1898            fragment_id: TEST_FRAGMENT_ID as _,
1899            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1900            distribution_type: PbFragmentDistributionType::Hash as _,
1901            actors: pb_actors,
1902            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1903            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1904            nodes: stream_node,
1905        };
1906
1907        let fragment =
1908            CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
1909
1910        check_fragment(fragment, pb_fragment);
1911
1912        Ok(())
1913    }
1914
1915    #[tokio::test]
1916    async fn test_compose_fragment() -> MetaResult<()> {
1917        let actor_count = 3u32;
1918
1919        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1920            .map(|actor_id| {
1921                (
1922                    actor_id.into(),
1923                    generate_upstream_actor_ids_for_actor(actor_id.into()),
1924                )
1925            })
1926            .collect();
1927
1928        let mut actor_bitmaps = ActorMapping::new_uniform(
1929            (0..actor_count).map(|i| i.into()),
1930            VirtualNode::COUNT_FOR_TEST,
1931        )
1932        .to_bitmaps();
1933
1934        let actors = (0..actor_count)
1935            .map(|actor_id| {
1936                let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
1937                    splits: vec![PbConnectorSplit {
1938                        split_type: "dummy".to_owned(),
1939                        ..Default::default()
1940                    }],
1941                });
1942
1943                ActorInfo {
1944                    actor_id: actor_id.into(),
1945                    fragment_id: TEST_FRAGMENT_ID,
1946                    splits: actor_splits,
1947                    worker_id: 0.into(),
1948                    vnode_bitmap: actor_bitmaps
1949                        .remove(&actor_id.into())
1950                        .map(|bitmap| bitmap.to_protobuf())
1951                        .as_ref()
1952                        .map(VnodeBitmap::from),
1953                    expr_context: ExprContext::from(&PbExprContext {
1954                        time_zone: String::from("America/New_York"),
1955                        strict_mode: false,
1956                    }),
1957                }
1958            })
1959            .collect_vec();
1960
1961        let stream_node = {
1962            let template_actor = actors.first().cloned().unwrap();
1963
1964            let template_upstream_actor_ids = upstream_actor_ids
1965                .get(&(template_actor.actor_id as _))
1966                .unwrap();
1967
1968            generate_merger_stream_node(template_upstream_actor_ids)
1969        };
1970
1971        #[expect(deprecated)]
1972        let fragment = fragment::Model {
1973            fragment_id: TEST_FRAGMENT_ID,
1974            job_id: TEST_JOB_ID,
1975            fragment_type_mask: 0,
1976            distribution_type: DistributionType::Hash,
1977            stream_node: StreamNode::from(&stream_node),
1978            state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
1979            upstream_fragment_id: Default::default(),
1980            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1981            parallelism: None,
1982        };
1983
1984        let (pb_fragment, pb_actor_status, pb_actor_splits) =
1985            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
1986
1987        assert_eq!(pb_actor_status.len(), actor_count as usize);
1988        assert!(
1989            pb_actor_status
1990                .values()
1991                .all(|actor_status| actor_status.location.is_some())
1992        );
1993        assert_eq!(pb_actor_splits.len(), actor_count as usize);
1994
1995        let pb_actors = pb_fragment.actors.clone();
1996
1997        check_fragment(fragment, pb_fragment);
1998        check_actors(
1999            actors,
2000            &upstream_actor_ids,
2001            pb_actors,
2002            pb_actor_splits,
2003            &stream_node,
2004        );
2005
2006        Ok(())
2007    }
2008
2009    fn check_actors(
2010        actors: Vec<ActorInfo>,
2011        actor_upstreams: &FragmentActorUpstreams,
2012        pb_actors: Vec<StreamActor>,
2013        pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2014        stream_node: &PbStreamNode,
2015    ) {
2016        for (
2017            ActorInfo {
2018                actor_id,
2019                fragment_id,
2020                splits,
2021                worker_id: _,
2022                vnode_bitmap,
2023                expr_context,
2024                ..
2025            },
2026            StreamActor {
2027                actor_id: pb_actor_id,
2028                fragment_id: pb_fragment_id,
2029                vnode_bitmap: pb_vnode_bitmap,
2030                mview_definition,
2031                expr_context: pb_expr_context,
2032                ..
2033            },
2034        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2035        {
2036            assert_eq!(actor_id, pb_actor_id as ActorId);
2037            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2038
2039            assert_eq!(
2040                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2041                pb_vnode_bitmap,
2042            );
2043
2044            assert_eq!(mview_definition, "");
2045
2046            visit_stream_node_body(stream_node, |body| {
2047                if let PbNodeBody::Merge(m) = body {
2048                    assert!(
2049                        actor_upstreams
2050                            .get(&(actor_id as _))
2051                            .unwrap()
2052                            .contains_key(&m.upstream_fragment_id)
2053                    );
2054                }
2055            });
2056
2057            assert_eq!(
2058                splits,
2059                pb_actor_splits
2060                    .get(&pb_actor_id)
2061                    .map(ConnectorSplits::from)
2062                    .unwrap_or_default()
2063            );
2064
2065            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2066        }
2067    }
2068
2069    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2070        let Fragment {
2071            fragment_id,
2072            fragment_type_mask,
2073            distribution_type: pb_distribution_type,
2074            actors: _,
2075            state_table_ids: pb_state_table_ids,
2076            maybe_vnode_count: _,
2077            nodes,
2078        } = pb_fragment;
2079
2080        assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2081        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2082        assert_eq!(
2083            pb_distribution_type,
2084            PbFragmentDistributionType::from(fragment.distribution_type)
2085        );
2086
2087        assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2088        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2089    }
2090
2091    #[test]
2092    fn test_parallelism_policy_with_root_fragments() {
2093        #[expect(deprecated)]
2094        let fragment = fragment::Model {
2095            fragment_id: 3.into(),
2096            job_id: TEST_JOB_ID,
2097            fragment_type_mask: 0,
2098            distribution_type: DistributionType::Hash,
2099            stream_node: StreamNode::from(&PbStreamNode::default()),
2100            state_table_ids: TableIdArray::default(),
2101            upstream_fragment_id: Default::default(),
2102            vnode_count: 0,
2103            parallelism: None,
2104        };
2105
2106        let job_parallelism = StreamingParallelism::Fixed(4);
2107
2108        let policy = super::CatalogController::format_fragment_parallelism_policy(
2109            &fragment,
2110            Some(&job_parallelism),
2111            &[],
2112        );
2113
2114        assert_eq!(policy, "inherit(fixed(4))");
2115    }
2116
2117    #[test]
2118    fn test_parallelism_policy_with_upstream_roots() {
2119        #[expect(deprecated)]
2120        let fragment = fragment::Model {
2121            fragment_id: 5.into(),
2122            job_id: TEST_JOB_ID,
2123            fragment_type_mask: 0,
2124            distribution_type: DistributionType::Hash,
2125            stream_node: StreamNode::from(&PbStreamNode::default()),
2126            state_table_ids: TableIdArray::default(),
2127            upstream_fragment_id: Default::default(),
2128            vnode_count: 0,
2129            parallelism: None,
2130        };
2131
2132        let policy = super::CatalogController::format_fragment_parallelism_policy(
2133            &fragment,
2134            None,
2135            &[3.into(), 1.into(), 2.into(), 1.into()],
2136        );
2137
2138        assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2139    }
2140}