risingwave_meta/controller/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::CdcScanOptions;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34    Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39    VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink,
40    source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46    Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::fragment::{
49    FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan;
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57    PbDispatchOutputMapping, PbDispatcherType, PbStreamContext, PbStreamNode, PbStreamScanType,
58    StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63    ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, JoinType, PaginatorTrait,
64    QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
65};
66use serde::{Deserialize, Serialize};
67use tracing::debug;
68
69use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
70use crate::controller::catalog::CatalogController;
71use crate::controller::scale::{
72    FragmentRenderMap, NoShuffleEnsemble, find_fragment_no_shuffle_dags_detailed,
73    load_fragment_info, resolve_streaming_job_definition,
74};
75use crate::controller::utils::{
76    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
77    get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
78    resolve_no_shuffle_actor_dispatcher,
79};
80use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
81use crate::model::{
82    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
83    StreamActor, StreamContext, StreamJobFragments, TableParallelism,
84};
85use crate::rpc::ddl_controller::build_upstream_sink_info;
86use crate::stream::UpstreamSinkInfo;
87use crate::{MetaResult, model};
88
89/// Some information of running (inflight) actors.
90#[derive(Clone, Debug)]
91pub struct InflightActorInfo {
92    pub worker_id: WorkerId,
93    pub vnode_bitmap: Option<Bitmap>,
94    pub splits: Vec<SplitImpl>,
95}
96
97#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
98struct ActorInfo {
99    pub actor_id: ActorId,
100    pub fragment_id: FragmentId,
101    pub splits: ConnectorSplits,
102    pub worker_id: WorkerId,
103    pub vnode_bitmap: Option<VnodeBitmap>,
104    pub expr_context: ExprContext,
105}
106
107#[derive(Clone, Debug)]
108pub struct InflightFragmentInfo {
109    pub fragment_id: crate::model::FragmentId,
110    pub distribution_type: DistributionType,
111    pub fragment_type_mask: FragmentTypeMask,
112    pub vnode_count: usize,
113    pub nodes: PbStreamNode,
114    pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
115    pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
116}
117
118#[derive(Clone, Debug)]
119pub struct FragmentParallelismInfo {
120    pub distribution_type: FragmentDistributionType,
121    pub actor_count: usize,
122    pub vnode_count: usize,
123}
124
125#[easy_ext::ext(FragmentTypeMaskExt)]
126pub impl FragmentTypeMask {
127    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
128        Expr::col(fragment::Column::FragmentTypeMask)
129            .bit_and(Expr::value(flag as i32))
130            .ne(0)
131    }
132
133    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
134        Expr::col(fragment::Column::FragmentTypeMask)
135            .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
136            .ne(0)
137    }
138
139    fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
140        Expr::col(fragment::Column::FragmentTypeMask)
141            .bit_and(Expr::value(flag as i32))
142            .eq(0)
143    }
144}
145
146#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
147#[serde(rename_all = "camelCase")] // for dashboard
148pub struct StreamingJobInfo {
149    pub job_id: ObjectId,
150    pub obj_type: ObjectType,
151    pub name: String,
152    pub job_status: JobStatus,
153    pub parallelism: StreamingParallelism,
154    pub max_parallelism: i32,
155    pub resource_group: String,
156    pub database_id: DatabaseId,
157    pub schema_id: SchemaId,
158}
159
160impl NotificationManager {
161    pub(crate) fn notify_fragment_mapping(
162        &self,
163        operation: NotificationOperation,
164        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
165    ) {
166        let fragment_ids = fragment_mappings
167            .iter()
168            .map(|mapping| mapping.fragment_id)
169            .collect_vec();
170        if fragment_ids.is_empty() {
171            return;
172        }
173        // notify all fragment mappings to frontend.
174        for fragment_mapping in fragment_mappings {
175            self.notify_frontend_without_version(
176                operation,
177                NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
178            );
179        }
180
181        // update serving vnode mappings.
182        match operation {
183            NotificationOperation::Add | NotificationOperation::Update => {
184                self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
185                    fragment_ids,
186                ));
187            }
188            NotificationOperation::Delete => {
189                self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
190                    fragment_ids,
191                ));
192            }
193            op => {
194                tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
195            }
196        }
197    }
198}
199
200impl CatalogController {
201    pub fn prepare_fragment_models_from_fragments(
202        job_id: JobId,
203        fragments: impl Iterator<Item = &Fragment>,
204    ) -> MetaResult<Vec<fragment::Model>> {
205        fragments
206            .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
207            .try_collect()
208    }
209
210    pub fn prepare_fragment_model_for_new_job(
211        job_id: JobId,
212        fragment: &Fragment,
213    ) -> MetaResult<fragment::Model> {
214        let vnode_count = fragment.vnode_count();
215        let Fragment {
216            fragment_id: pb_fragment_id,
217            fragment_type_mask: pb_fragment_type_mask,
218            distribution_type: pb_distribution_type,
219            actors: pb_actors,
220            state_table_ids: pb_state_table_ids,
221            nodes,
222            ..
223        } = fragment;
224
225        let state_table_ids = pb_state_table_ids
226            .iter()
227            .map(|table_id| table_id.as_raw_id() as i32)
228            .collect_vec()
229            .into();
230
231        assert!(!pb_actors.is_empty());
232
233        let fragment_parallelism = nodes
234            .node_body
235            .as_ref()
236            .and_then(|body| match body {
237                NodeBody::StreamCdcScan(node) => Some(node),
238                _ => None,
239            })
240            .and_then(|node| node.options.as_ref())
241            .map(CdcScanOptions::from_proto)
242            .filter(|opts| opts.is_parallelized_backfill())
243            .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
244
245        let stream_node = StreamNode::from(nodes);
246
247        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
248            .unwrap()
249            .into();
250
251        #[expect(deprecated)]
252        let fragment = fragment::Model {
253            fragment_id: *pb_fragment_id as _,
254            job_id,
255            fragment_type_mask: (*pb_fragment_type_mask).into(),
256            distribution_type,
257            stream_node,
258            state_table_ids,
259            upstream_fragment_id: Default::default(),
260            vnode_count: vnode_count as _,
261            parallelism: fragment_parallelism,
262        };
263
264        Ok(fragment)
265    }
266
267    fn compose_table_fragments(
268        job_id: JobId,
269        state: PbState,
270        ctx: Option<PbStreamContext>,
271        fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
272        parallelism: StreamingParallelism,
273        max_parallelism: usize,
274        job_definition: Option<String>,
275    ) -> MetaResult<StreamJobFragments> {
276        let mut pb_fragments = BTreeMap::new();
277        let mut pb_actor_status = BTreeMap::new();
278
279        for (fragment, actors) in fragments {
280            let (fragment, fragment_actor_status, _) =
281                Self::compose_fragment(fragment, actors, job_definition.clone())?;
282
283            pb_fragments.insert(fragment.fragment_id, fragment);
284            pb_actor_status.extend(fragment_actor_status.into_iter());
285        }
286
287        let table_fragments = StreamJobFragments {
288            stream_job_id: job_id,
289            state: state as _,
290            fragments: pb_fragments,
291            actor_status: pb_actor_status,
292            ctx: ctx
293                .as_ref()
294                .map(StreamContext::from_protobuf)
295                .unwrap_or_default(),
296            assigned_parallelism: match parallelism {
297                StreamingParallelism::Custom => TableParallelism::Custom,
298                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
299                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
300            },
301            max_parallelism,
302        };
303
304        Ok(table_fragments)
305    }
306
307    #[allow(clippy::type_complexity)]
308    fn compose_fragment(
309        fragment: fragment::Model,
310        actors: Vec<ActorInfo>,
311        job_definition: Option<String>,
312    ) -> MetaResult<(
313        Fragment,
314        HashMap<crate::model::ActorId, PbActorStatus>,
315        HashMap<crate::model::ActorId, PbConnectorSplits>,
316    )> {
317        let fragment::Model {
318            fragment_id,
319            fragment_type_mask,
320            distribution_type,
321            stream_node,
322            state_table_ids,
323            vnode_count,
324            ..
325        } = fragment;
326
327        let stream_node = stream_node.to_protobuf();
328        let mut upstream_fragments = HashSet::new();
329        visit_stream_node_body(&stream_node, |body| {
330            if let NodeBody::Merge(m) = body {
331                assert!(
332                    upstream_fragments.insert(m.upstream_fragment_id),
333                    "non-duplicate upstream fragment"
334                );
335            }
336        });
337
338        let mut pb_actors = vec![];
339
340        let mut pb_actor_status = HashMap::new();
341        let mut pb_actor_splits = HashMap::new();
342
343        for actor in actors {
344            if actor.fragment_id != fragment_id {
345                bail!(
346                    "fragment id {} from actor {} is different from fragment {}",
347                    actor.fragment_id,
348                    actor.actor_id,
349                    fragment_id
350                )
351            }
352
353            let ActorInfo {
354                actor_id,
355                fragment_id,
356                worker_id,
357                splits,
358                vnode_bitmap,
359                expr_context,
360                ..
361            } = actor;
362
363            let vnode_bitmap =
364                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
365            let pb_expr_context = Some(expr_context.to_protobuf());
366
367            pb_actor_status.insert(
368                actor_id as _,
369                PbActorStatus {
370                    location: PbActorLocation::from_worker(worker_id as u32),
371                },
372            );
373
374            pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
375
376            pb_actors.push(StreamActor {
377                actor_id: actor_id as _,
378                fragment_id: fragment_id as _,
379                vnode_bitmap,
380                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
381                expr_context: pb_expr_context,
382            })
383        }
384
385        let pb_state_table_ids = state_table_ids.into_u32_array();
386        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
387        let pb_fragment = Fragment {
388            fragment_id: fragment_id as _,
389            fragment_type_mask: fragment_type_mask.into(),
390            distribution_type: pb_distribution_type,
391            actors: pb_actors,
392            state_table_ids: pb_state_table_ids.into_iter().map_into().collect(),
393            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
394            nodes: stream_node,
395        };
396
397        Ok((pb_fragment, pb_actor_status, pb_actor_splits))
398    }
399
400    pub fn running_fragment_parallelisms(
401        &self,
402        id_filter: Option<HashSet<FragmentId>>,
403    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
404        let info = self.env.shared_actor_infos().read_guard();
405
406        let mut result = HashMap::new();
407        for (fragment_id, fragment) in info.iter_over_fragments() {
408            if let Some(id_filter) = &id_filter
409                && !id_filter.contains(&(*fragment_id as _))
410            {
411                continue; // Skip fragments not in the filter
412            }
413
414            result.insert(
415                *fragment_id as _,
416                FragmentParallelismInfo {
417                    distribution_type: fragment.distribution_type.into(),
418                    actor_count: fragment.actors.len() as _,
419                    vnode_count: fragment.vnode_count,
420                },
421            );
422        }
423
424        Ok(result)
425    }
426
427    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
428        let inner = self.inner.read().await;
429        let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
430            .select_only()
431            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
432            .into_tuple()
433            .all(&inner.db)
434            .await?;
435        Ok(fragment_jobs.into_iter().collect())
436    }
437
438    pub async fn get_fragment_job_id(
439        &self,
440        fragment_ids: Vec<FragmentId>,
441    ) -> MetaResult<Vec<ObjectId>> {
442        let inner = self.inner.read().await;
443
444        let object_ids: Vec<ObjectId> = FragmentModel::find()
445            .select_only()
446            .column(fragment::Column::JobId)
447            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
448            .into_tuple()
449            .all(&inner.db)
450            .await?;
451
452        Ok(object_ids)
453    }
454
455    pub async fn get_fragment_desc_by_id(
456        &self,
457        fragment_id: FragmentId,
458    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
459        let inner = self.inner.read().await;
460
461        let fragment_model = match FragmentModel::find_by_id(fragment_id)
462            .one(&inner.db)
463            .await?
464        {
465            Some(fragment) => fragment,
466            None => return Ok(None),
467        };
468
469        let job_parallelism: Option<StreamingParallelism> =
470            StreamingJob::find_by_id(fragment_model.job_id)
471                .select_only()
472                .column(streaming_job::Column::Parallelism)
473                .into_tuple()
474                .one(&inner.db)
475                .await?;
476
477        let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
478            .select_only()
479            .columns([
480                fragment_relation::Column::SourceFragmentId,
481                fragment_relation::Column::DispatcherType,
482            ])
483            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
484            .into_tuple()
485            .all(&inner.db)
486            .await?;
487
488        let upstreams: Vec<_> = upstream_entries
489            .into_iter()
490            .map(|(source_id, _)| source_id)
491            .collect();
492
493        let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
494            .await
495            .map(Self::collect_root_fragment_mapping)?;
496        let root_fragments = root_fragment_map
497            .get(&fragment_id)
498            .cloned()
499            .unwrap_or_default();
500
501        let info = self.env.shared_actor_infos().read_guard();
502        let SharedFragmentInfo { actors, .. } = info
503            .get_fragment(fragment_model.fragment_id as _)
504            .unwrap_or_else(|| {
505                panic!(
506                    "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
507                    fragment_model.fragment_id,
508                    fragment_model.job_id
509                )
510            });
511
512        let parallelism_policy = Self::format_fragment_parallelism_policy(
513            &fragment_model,
514            job_parallelism.as_ref(),
515            &root_fragments,
516        );
517
518        let fragment = FragmentDesc {
519            fragment_id: fragment_model.fragment_id,
520            job_id: fragment_model.job_id,
521            fragment_type_mask: fragment_model.fragment_type_mask,
522            distribution_type: fragment_model.distribution_type,
523            state_table_ids: fragment_model.state_table_ids.clone(),
524            parallelism: actors.len() as _,
525            vnode_count: fragment_model.vnode_count,
526            stream_node: fragment_model.stream_node.clone(),
527            parallelism_policy,
528        };
529
530        Ok(Some((fragment, upstreams)))
531    }
532
533    pub async fn list_fragment_database_ids(
534        &self,
535        select_fragment_ids: Option<Vec<FragmentId>>,
536    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
537        let inner = self.inner.read().await;
538        let select = FragmentModel::find()
539            .select_only()
540            .column(fragment::Column::FragmentId)
541            .column(object::Column::DatabaseId)
542            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
543        let select = if let Some(select_fragment_ids) = select_fragment_ids {
544            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
545        } else {
546            select
547        };
548        Ok(select.into_tuple().all(&inner.db).await?)
549    }
550
551    pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
552        let inner = self.inner.read().await;
553
554        // Load fragments matching the job from the database
555        let fragments: Vec<_> = FragmentModel::find()
556            .filter(fragment::Column::JobId.eq(job_id))
557            .all(&inner.db)
558            .await?;
559
560        let job_info = StreamingJob::find_by_id(job_id)
561            .one(&inner.db)
562            .await?
563            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
564
565        let fragment_actors =
566            self.collect_fragment_actor_pairs(fragments, job_info.timezone.clone())?;
567
568        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
569            .await?
570            .remove(&job_id);
571
572        Self::compose_table_fragments(
573            job_id,
574            job_info.job_status.into(),
575            job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
576            fragment_actors,
577            job_info.parallelism.clone(),
578            job_info.max_parallelism as _,
579            job_definition,
580        )
581    }
582
583    pub async fn get_fragment_actor_dispatchers(
584        &self,
585        fragment_ids: Vec<FragmentId>,
586    ) -> MetaResult<FragmentActorDispatchers> {
587        let inner = self.inner.read().await;
588
589        self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
590            .await
591    }
592
593    pub async fn get_fragment_actor_dispatchers_txn(
594        &self,
595        c: &impl ConnectionTrait,
596        fragment_ids: Vec<FragmentId>,
597    ) -> MetaResult<FragmentActorDispatchers> {
598        let fragment_relations = FragmentRelation::find()
599            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
600            .all(c)
601            .await?;
602
603        type FragmentActorInfo = (
604            DistributionType,
605            Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
606        );
607
608        let shared_info = self.env.shared_actor_infos();
609        let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
610        let get_fragment_actors = |fragment_id: FragmentId| async move {
611            let result: MetaResult<FragmentActorInfo> = try {
612                let read_guard = shared_info.read_guard();
613
614                let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
615
616                (
617                    fragment.distribution_type,
618                    Arc::new(
619                        fragment
620                            .actors
621                            .iter()
622                            .map(|(actor_id, actor_info)| {
623                                (
624                                    *actor_id,
625                                    actor_info
626                                        .vnode_bitmap
627                                        .as_ref()
628                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
629                                )
630                            })
631                            .collect(),
632                    ),
633                )
634            };
635            result
636        };
637
638        let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
639        for fragment_relation::Model {
640            source_fragment_id,
641            target_fragment_id,
642            dispatcher_type,
643            dist_key_indices,
644            output_indices,
645            output_type_mapping,
646        } in fragment_relations
647        {
648            let (source_fragment_distribution, source_fragment_actors) = {
649                let (distribution, actors) = {
650                    match fragment_actor_cache.entry(source_fragment_id) {
651                        Entry::Occupied(entry) => entry.into_mut(),
652                        Entry::Vacant(entry) => {
653                            entry.insert(get_fragment_actors(source_fragment_id).await?)
654                        }
655                    }
656                };
657                (*distribution, actors.clone())
658            };
659            let (target_fragment_distribution, target_fragment_actors) = {
660                let (distribution, actors) = {
661                    match fragment_actor_cache.entry(target_fragment_id) {
662                        Entry::Occupied(entry) => entry.into_mut(),
663                        Entry::Vacant(entry) => {
664                            entry.insert(get_fragment_actors(target_fragment_id).await?)
665                        }
666                    }
667                };
668                (*distribution, actors.clone())
669            };
670            let output_mapping = PbDispatchOutputMapping {
671                indices: output_indices.into_u32_array(),
672                types: output_type_mapping.unwrap_or_default().to_protobuf(),
673            };
674            let dispatchers = compose_dispatchers(
675                source_fragment_distribution,
676                &source_fragment_actors,
677                target_fragment_id as _,
678                target_fragment_distribution,
679                &target_fragment_actors,
680                dispatcher_type,
681                dist_key_indices.into_u32_array(),
682                output_mapping,
683            );
684            let actor_dispatchers_map = actor_dispatchers_map
685                .entry(source_fragment_id as _)
686                .or_default();
687            for (actor_id, dispatchers) in dispatchers {
688                actor_dispatchers_map
689                    .entry(actor_id as _)
690                    .or_default()
691                    .push(dispatchers);
692            }
693        }
694        Ok(actor_dispatchers_map)
695    }
696
697    pub async fn get_fragment_downstream_relations(
698        &self,
699        fragment_ids: Vec<FragmentId>,
700    ) -> MetaResult<FragmentDownstreamRelation> {
701        let inner = self.inner.read().await;
702        let mut stream = FragmentRelation::find()
703            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
704            .stream(&inner.db)
705            .await?;
706        let mut relations = FragmentDownstreamRelation::new();
707        while let Some(relation) = stream.try_next().await? {
708            relations
709                .entry(relation.source_fragment_id as _)
710                .or_default()
711                .push(DownstreamFragmentRelation {
712                    downstream_fragment_id: relation.target_fragment_id as _,
713                    dispatcher_type: relation.dispatcher_type,
714                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
715                    output_mapping: PbDispatchOutputMapping {
716                        indices: relation.output_indices.into_u32_array(),
717                        types: relation
718                            .output_type_mapping
719                            .unwrap_or_default()
720                            .to_protobuf(),
721                    },
722                });
723        }
724        Ok(relations)
725    }
726
727    pub async fn get_job_fragment_backfill_scan_type(
728        &self,
729        job_id: ObjectId,
730    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
731        let inner = self.inner.read().await;
732        let fragments: Vec<_> = FragmentModel::find()
733            .filter(fragment::Column::JobId.eq(job_id))
734            .all(&inner.db)
735            .await?;
736
737        let mut result = HashMap::new();
738
739        for fragment::Model {
740            fragment_id,
741            stream_node,
742            ..
743        } in fragments
744        {
745            let stream_node = stream_node.to_protobuf();
746            visit_stream_node_body(&stream_node, |body| {
747                if let NodeBody::StreamScan(node) = body {
748                    match node.stream_scan_type() {
749                        StreamScanType::Unspecified => {}
750                        scan_type => {
751                            result.insert(fragment_id as crate::model::FragmentId, scan_type);
752                        }
753                    }
754                }
755            });
756        }
757
758        Ok(result)
759    }
760
761    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
762        let inner = self.inner.read().await;
763        let job_states = StreamingJob::find()
764            .select_only()
765            .column(streaming_job::Column::JobId)
766            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
767            .join(JoinType::InnerJoin, object::Relation::Database2.def())
768            .column(object::Column::ObjType)
769            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
770            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
771            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
772            .column_as(
773                Expr::if_null(
774                    Expr::col((table::Entity, table::Column::Name)),
775                    Expr::if_null(
776                        Expr::col((source::Entity, source::Column::Name)),
777                        Expr::if_null(
778                            Expr::col((sink::Entity, sink::Column::Name)),
779                            Expr::val("<unknown>"),
780                        ),
781                    ),
782                ),
783                "name",
784            )
785            .columns([
786                streaming_job::Column::JobStatus,
787                streaming_job::Column::Parallelism,
788                streaming_job::Column::MaxParallelism,
789            ])
790            .column_as(
791                Expr::if_null(
792                    Expr::col((
793                        streaming_job::Entity,
794                        streaming_job::Column::SpecificResourceGroup,
795                    )),
796                    Expr::col((database::Entity, database::Column::ResourceGroup)),
797                ),
798                "resource_group",
799            )
800            .column(object::Column::DatabaseId)
801            .column(object::Column::SchemaId)
802            .into_model()
803            .all(&inner.db)
804            .await?;
805        Ok(job_states)
806    }
807
808    pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
809        let inner = self.inner.read().await;
810        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
811            .select_only()
812            .column(streaming_job::Column::MaxParallelism)
813            .into_tuple()
814            .one(&inner.db)
815            .await?
816            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
817        Ok(max_parallelism as usize)
818    }
819
820    /// Try to get internal table ids of each streaming job, used by metrics collection.
821    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
822        if let Ok(inner) = self.inner.try_read()
823            && let Ok(job_state_tables) = FragmentModel::find()
824                .select_only()
825                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
826                .into_tuple::<(JobId, I32Array)>()
827                .all(&inner.db)
828                .await
829        {
830            let mut job_internal_table_ids = HashMap::new();
831            for (job_id, state_table_ids) in job_state_tables {
832                job_internal_table_ids
833                    .entry(job_id)
834                    .or_insert_with(Vec::new)
835                    .extend(
836                        state_table_ids
837                            .into_inner()
838                            .into_iter()
839                            .map(|table_id| TableId::new(table_id as _)),
840                    );
841            }
842            return Some(job_internal_table_ids.into_iter().collect());
843        }
844        None
845    }
846
847    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
848        let inner = self.inner.read().await;
849        let count = FragmentModel::find().count(&inner.db).await?;
850        Ok(count > 0)
851    }
852
853    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
854        let read_guard = self.env.shared_actor_infos().read_guard();
855        let actor_cnt: HashMap<WorkerId, _> = read_guard
856            .iter_over_fragments()
857            .flat_map(|(_, fragment)| {
858                fragment
859                    .actors
860                    .iter()
861                    .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
862            })
863            .into_group_map()
864            .into_iter()
865            .map(|(k, v)| (k, v.len()))
866            .collect();
867
868        Ok(actor_cnt)
869    }
870
871    fn collect_fragment_actor_map(
872        &self,
873        fragment_ids: &[FragmentId],
874        timezone: Option<String>,
875    ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
876        let guard = self.env.shared_actor_infos().read_guard();
877        let stream_context = StreamContext { timezone };
878        let pb_expr_context = stream_context.to_expr_context();
879        let expr_context: ExprContext = (&pb_expr_context).into();
880
881        let mut actor_map = HashMap::with_capacity(fragment_ids.len());
882        for fragment_id in fragment_ids {
883            let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
884                anyhow!("fragment {} not found in shared actor info", fragment_id)
885            })?;
886
887            let actors = fragment_info
888                .actors
889                .iter()
890                .map(|(actor_id, actor_info)| ActorInfo {
891                    actor_id: *actor_id as _,
892                    fragment_id: *fragment_id,
893                    splits: ConnectorSplits::from(&PbConnectorSplits {
894                        splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
895                    }),
896                    worker_id: actor_info.worker_id as _,
897                    vnode_bitmap: actor_info
898                        .vnode_bitmap
899                        .as_ref()
900                        .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
901                    expr_context: expr_context.clone(),
902                })
903                .collect();
904
905            actor_map.insert(*fragment_id, actors);
906        }
907
908        Ok(actor_map)
909    }
910
911    fn collect_fragment_actor_pairs(
912        &self,
913        fragments: Vec<fragment::Model>,
914        timezone: Option<String>,
915    ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
916        let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
917        let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, timezone)?;
918        fragments
919            .into_iter()
920            .map(|fragment| {
921                let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
922                    anyhow!(
923                        "fragment {} missing in shared actor info map",
924                        fragment.fragment_id
925                    )
926                })?;
927                Ok((fragment, actors))
928            })
929            .collect()
930    }
931
932    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
933    pub async fn table_fragments(&self) -> MetaResult<BTreeMap<JobId, StreamJobFragments>> {
934        let inner = self.inner.read().await;
935        let jobs = StreamingJob::find().all(&inner.db).await?;
936
937        let mut job_definition = resolve_streaming_job_definition(
938            &inner.db,
939            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
940        )
941        .await?;
942
943        let mut table_fragments = BTreeMap::new();
944        for job in jobs {
945            let fragments = FragmentModel::find()
946                .filter(fragment::Column::JobId.eq(job.job_id))
947                .all(&inner.db)
948                .await?;
949
950            let fragment_actors =
951                self.collect_fragment_actor_pairs(fragments, job.timezone.clone())?;
952
953            table_fragments.insert(
954                job.job_id,
955                Self::compose_table_fragments(
956                    job.job_id,
957                    job.job_status.into(),
958                    job.timezone.map(|tz| PbStreamContext { timezone: tz }),
959                    fragment_actors,
960                    job.parallelism.clone(),
961                    job.max_parallelism as _,
962                    job_definition.remove(&job.job_id),
963                )?,
964            );
965        }
966
967        Ok(table_fragments)
968    }
969
970    pub async fn upstream_fragments(
971        &self,
972        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
973    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
974        let inner = self.inner.read().await;
975        let mut stream = FragmentRelation::find()
976            .select_only()
977            .columns([
978                fragment_relation::Column::SourceFragmentId,
979                fragment_relation::Column::TargetFragmentId,
980            ])
981            .filter(
982                fragment_relation::Column::TargetFragmentId
983                    .is_in(fragment_ids.map(|id| id as FragmentId)),
984            )
985            .into_tuple::<(FragmentId, FragmentId)>()
986            .stream(&inner.db)
987            .await?;
988        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
989        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
990            upstream_fragments
991                .entry(downstream_fragment_id as crate::model::FragmentId)
992                .or_default()
993                .insert(upstream_fragment_id as crate::model::FragmentId);
994        }
995        Ok(upstream_fragments)
996    }
997
998    pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
999        let info = self.env.shared_actor_infos().read_guard();
1000
1001        let actor_locations = info
1002            .iter_over_fragments()
1003            .flat_map(|(fragment_id, fragment)| {
1004                fragment
1005                    .actors
1006                    .iter()
1007                    .map(|(actor_id, actor)| PartialActorLocation {
1008                        actor_id: *actor_id as _,
1009                        fragment_id: *fragment_id as _,
1010                        worker_id: actor.worker_id,
1011                    })
1012            })
1013            .collect_vec();
1014
1015        Ok(actor_locations)
1016    }
1017
1018    pub async fn list_actor_info(
1019        &self,
1020    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1021        let inner = self.inner.read().await;
1022
1023        let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1024            FragmentModel::find()
1025                .select_only()
1026                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1027                .column(fragment::Column::FragmentId)
1028                .column_as(object::Column::Oid, "job_id")
1029                .column_as(object::Column::SchemaId, "schema_id")
1030                .column_as(object::Column::ObjType, "type")
1031                .into_tuple()
1032                .all(&inner.db)
1033                .await?;
1034
1035        let actor_infos = {
1036            let info = self.env.shared_actor_infos().read_guard();
1037
1038            fragment_objects
1039                .into_iter()
1040                .flat_map(|(fragment_id, object_id, schema_id, object_type)| {
1041                    let SharedFragmentInfo {
1042                        fragment_id,
1043                        actors,
1044                        ..
1045                    } = info.get_fragment(fragment_id as _).unwrap();
1046                    actors.keys().map(move |actor_id| {
1047                        (
1048                            *actor_id as _,
1049                            *fragment_id as _,
1050                            object_id,
1051                            schema_id,
1052                            object_type,
1053                        )
1054                    })
1055                })
1056                .collect_vec()
1057        };
1058
1059        Ok(actor_infos)
1060    }
1061
1062    pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1063        let guard = self.env.shared_actor_info.read_guard();
1064        guard
1065            .iter_over_fragments()
1066            .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1067            .collect_vec()
1068    }
1069
1070    pub async fn list_fragment_descs(
1071        &self,
1072        is_creating: bool,
1073    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1074        let inner = self.inner.read().await;
1075
1076        let txn = inner.db.begin().await?;
1077
1078        let fragments: Vec<_> = if is_creating {
1079            FragmentModel::find()
1080                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1081                .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1082                .filter(
1083                    streaming_job::Column::JobStatus
1084                        .eq(JobStatus::Initial)
1085                        .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1086                )
1087                .all(&txn)
1088                .await?
1089        } else {
1090            FragmentModel::find().all(&txn).await?
1091        };
1092
1093        let fragment_ids = fragments.iter().map(|f| f.fragment_id).collect_vec();
1094
1095        let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragments.is_empty() {
1096            HashMap::new()
1097        } else {
1098            let job_ids = fragments.iter().map(|f| f.job_id).unique().collect_vec();
1099            StreamingJob::find()
1100                .select_only()
1101                .columns([
1102                    streaming_job::Column::JobId,
1103                    streaming_job::Column::Parallelism,
1104                ])
1105                .filter(streaming_job::Column::JobId.is_in(job_ids))
1106                .into_tuple()
1107                .all(&txn)
1108                .await?
1109                .into_iter()
1110                .collect()
1111        };
1112
1113        let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1114            if fragment_ids.is_empty() {
1115                Vec::new()
1116            } else {
1117                FragmentRelation::find()
1118                    .select_only()
1119                    .columns([
1120                        fragment_relation::Column::TargetFragmentId,
1121                        fragment_relation::Column::SourceFragmentId,
1122                        fragment_relation::Column::DispatcherType,
1123                    ])
1124                    .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1125                    .into_tuple()
1126                    .all(&txn)
1127                    .await?
1128            };
1129
1130        let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1131        for (target_id, source_id, _) in upstream_entries {
1132            all_upstreams.entry(target_id).or_default().push(source_id);
1133        }
1134
1135        let root_fragment_map = if fragment_ids.is_empty() {
1136            HashMap::new()
1137        } else {
1138            let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?;
1139            Self::collect_root_fragment_mapping(ensembles)
1140        };
1141
1142        let guard = self.env.shared_actor_info.read_guard();
1143
1144        let mut result = Vec::new();
1145
1146        for fragment_desc in fragments {
1147            // note: here sometimes fragment is not found in the cache, fallback to 0
1148            let parallelism = guard
1149                .get_fragment(fragment_desc.fragment_id as _)
1150                .map(|fragment| fragment.actors.len())
1151                .unwrap_or_default();
1152
1153            let root_fragments = root_fragment_map
1154                .get(&fragment_desc.fragment_id)
1155                .cloned()
1156                .unwrap_or_default();
1157
1158            let upstreams = all_upstreams
1159                .remove(&fragment_desc.fragment_id)
1160                .unwrap_or_default();
1161
1162            let parallelism_policy = Self::format_fragment_parallelism_policy(
1163                &fragment_desc,
1164                job_parallelisms.get(&fragment_desc.job_id),
1165                &root_fragments,
1166            );
1167
1168            let fragment = FragmentDistribution {
1169                fragment_id: fragment_desc.fragment_id as _,
1170                table_id: fragment_desc.job_id.as_raw_id(),
1171                distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type)
1172                    as _,
1173                state_table_ids: fragment_desc.state_table_ids.into_u32_array(),
1174                upstream_fragment_ids: upstreams.iter().map(|id| *id as _).collect(),
1175                fragment_type_mask: fragment_desc.fragment_type_mask as _,
1176                parallelism: parallelism as _,
1177                vnode_count: fragment_desc.vnode_count as _,
1178                node: Some(fragment_desc.stream_node.to_protobuf()),
1179                parallelism_policy,
1180            };
1181
1182            result.push((fragment, upstreams));
1183        }
1184        Ok(result)
1185    }
1186
1187    /// Build a fragment-to-root lookup for all reported root fragment ensembles.
1188    fn collect_root_fragment_mapping(
1189        ensembles: Vec<NoShuffleEnsemble>,
1190    ) -> HashMap<FragmentId, Vec<FragmentId>> {
1191        let mut mapping = HashMap::new();
1192
1193        for ensemble in ensembles {
1194            let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1195            roots.sort_unstable();
1196            roots.dedup();
1197
1198            if roots.is_empty() {
1199                continue;
1200            }
1201
1202            let root_set: HashSet<_> = roots.iter().copied().collect();
1203
1204            for fragment_id in ensemble.component_fragments() {
1205                if root_set.contains(&fragment_id) {
1206                    mapping.insert(fragment_id, Vec::new());
1207                } else {
1208                    mapping.insert(fragment_id, roots.clone());
1209                }
1210            }
1211        }
1212
1213        mapping
1214    }
1215
1216    fn format_fragment_parallelism_policy(
1217        fragment: &fragment::Model,
1218        job_parallelism: Option<&StreamingParallelism>,
1219        root_fragments: &[FragmentId],
1220    ) -> String {
1221        if fragment.distribution_type == DistributionType::Single {
1222            return "single".to_owned();
1223        }
1224
1225        if let Some(parallelism) = fragment.parallelism.as_ref() {
1226            return format!(
1227                "override({})",
1228                Self::format_streaming_parallelism(parallelism)
1229            );
1230        }
1231
1232        if !root_fragments.is_empty() {
1233            let mut upstreams = root_fragments.to_vec();
1234            upstreams.sort_unstable();
1235            upstreams.dedup();
1236
1237            return format!("upstream_fragment({upstreams:?})");
1238        }
1239
1240        let inherited = job_parallelism
1241            .map(Self::format_streaming_parallelism)
1242            .unwrap_or_else(|| "unknown".to_owned());
1243        format!("inherit({inherited})")
1244    }
1245
1246    fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1247        match parallelism {
1248            StreamingParallelism::Adaptive => "adaptive".to_owned(),
1249            StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1250            StreamingParallelism::Custom => "custom".to_owned(),
1251        }
1252    }
1253
1254    pub async fn list_sink_actor_mapping(
1255        &self,
1256    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1257        let inner = self.inner.read().await;
1258        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1259            .select_only()
1260            .columns([sink::Column::SinkId, sink::Column::Name])
1261            .into_tuple()
1262            .all(&inner.db)
1263            .await?;
1264        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1265
1266        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1267
1268        let actor_with_type: Vec<(ActorId, ObjectId)> = {
1269            let info = self.env.shared_actor_infos().read_guard();
1270
1271            info.iter_over_fragments()
1272                .filter(|(_, fragment)| {
1273                    sink_ids.contains(&(fragment.job_id.as_raw_id() as _))
1274                        && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1275                })
1276                .flat_map(|(_, fragment)| {
1277                    fragment
1278                        .actors
1279                        .keys()
1280                        .map(move |actor_id| (*actor_id as _, fragment.job_id.as_raw_id() as _))
1281                })
1282                .collect()
1283        };
1284
1285        let mut sink_actor_mapping = HashMap::new();
1286        for (actor_id, sink_id) in actor_with_type {
1287            sink_actor_mapping
1288                .entry(sink_id)
1289                .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1290                .1
1291                .push(actor_id);
1292        }
1293
1294        Ok(sink_actor_mapping)
1295    }
1296
1297    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1298        let inner = self.inner.read().await;
1299        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1300            .select_only()
1301            .columns([
1302                fragment::Column::FragmentId,
1303                fragment::Column::JobId,
1304                fragment::Column::StateTableIds,
1305            ])
1306            .into_partial_model()
1307            .all(&inner.db)
1308            .await?;
1309        Ok(fragment_state_tables)
1310    }
1311
1312    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1313    /// collected
1314    pub async fn load_all_actors_dynamic(
1315        &self,
1316        database_id: Option<DatabaseId>,
1317        worker_nodes: &ActiveStreamingWorkerNodes,
1318    ) -> MetaResult<FragmentRenderMap> {
1319        let adaptive_parallelism_strategy = {
1320            let system_params_reader = self.env.system_params_reader().await;
1321            system_params_reader.adaptive_parallelism_strategy()
1322        };
1323
1324        let inner = self.inner.read().await;
1325        let txn = inner.db.begin().await?;
1326
1327        let database_fragment_infos = load_fragment_info(
1328            &txn,
1329            self.env.actor_id_generator(),
1330            database_id,
1331            worker_nodes,
1332            adaptive_parallelism_strategy,
1333        )
1334        .await?;
1335
1336        debug!(?database_fragment_infos, "reload all actors");
1337
1338        Ok(database_fragment_infos)
1339    }
1340
1341    #[await_tree::instrument]
1342    pub async fn fill_snapshot_backfill_epoch(
1343        &self,
1344        fragment_ids: impl Iterator<Item = FragmentId>,
1345        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1346        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1347    ) -> MetaResult<()> {
1348        let inner = self.inner.write().await;
1349        let txn = inner.db.begin().await?;
1350        for fragment_id in fragment_ids {
1351            let fragment = FragmentModel::find_by_id(fragment_id)
1352                .one(&txn)
1353                .await?
1354                .context(format!("fragment {} not found", fragment_id))?;
1355            let mut node = fragment.stream_node.to_protobuf();
1356            if crate::stream::fill_snapshot_backfill_epoch(
1357                &mut node,
1358                snapshot_backfill_info,
1359                cross_db_snapshot_backfill_info,
1360            )? {
1361                let node = StreamNode::from(&node);
1362                FragmentModel::update(fragment::ActiveModel {
1363                    fragment_id: Set(fragment_id),
1364                    stream_node: Set(node),
1365                    ..Default::default()
1366                })
1367                .exec(&txn)
1368                .await?;
1369            }
1370        }
1371        txn.commit().await?;
1372        Ok(())
1373    }
1374
1375    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1376    pub fn get_running_actors_of_fragment(
1377        &self,
1378        fragment_id: FragmentId,
1379    ) -> MetaResult<HashSet<model::ActorId>> {
1380        let info = self.env.shared_actor_infos().read_guard();
1381
1382        let actors = info
1383            .get_fragment(fragment_id as _)
1384            .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1385            .unwrap_or_default();
1386
1387        Ok(actors)
1388    }
1389
1390    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1391    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1392    pub async fn get_running_actors_for_source_backfill(
1393        &self,
1394        source_backfill_fragment_id: FragmentId,
1395        source_fragment_id: FragmentId,
1396    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1397        let inner = self.inner.read().await;
1398        let txn = inner.db.begin().await?;
1399
1400        let fragment_relation: DispatcherType = FragmentRelation::find()
1401            .select_only()
1402            .column(fragment_relation::Column::DispatcherType)
1403            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1404            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1405            .into_tuple()
1406            .one(&txn)
1407            .await?
1408            .ok_or_else(|| {
1409                anyhow!(
1410                    "no fragment connection from source fragment {} to source backfill fragment {}",
1411                    source_fragment_id,
1412                    source_backfill_fragment_id
1413                )
1414            })?;
1415
1416        if fragment_relation != DispatcherType::NoShuffle {
1417            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1418        }
1419
1420        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1421            let result: MetaResult<DistributionType> = try {
1422                FragmentModel::find_by_id(fragment_id)
1423                    .select_only()
1424                    .column(fragment::Column::DistributionType)
1425                    .into_tuple()
1426                    .one(txn)
1427                    .await?
1428                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1429            };
1430            result
1431        };
1432
1433        let source_backfill_distribution_type =
1434            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1435        let source_distribution_type =
1436            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1437
1438        let load_fragment_actor_distribution =
1439            |actor_info: &SharedActorInfos,
1440             fragment_id: FragmentId|
1441             -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1442                let guard = actor_info.read_guard();
1443
1444                guard
1445                    .get_fragment(fragment_id as _)
1446                    .map(|fragment| {
1447                        fragment
1448                            .actors
1449                            .iter()
1450                            .map(|(actor_id, actor)| {
1451                                (
1452                                    *actor_id as _,
1453                                    actor
1454                                        .vnode_bitmap
1455                                        .as_ref()
1456                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1457                                )
1458                            })
1459                            .collect()
1460                    })
1461                    .unwrap_or_default()
1462            };
1463
1464        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1465            load_fragment_actor_distribution(
1466                self.env.shared_actor_infos(),
1467                source_backfill_fragment_id,
1468            );
1469
1470        let source_actors =
1471            load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1472
1473        Ok(resolve_no_shuffle_actor_dispatcher(
1474            source_distribution_type,
1475            &source_actors,
1476            source_backfill_distribution_type,
1477            &source_backfill_actors,
1478        )
1479        .into_iter()
1480        .map(|(source_actor, source_backfill_actor)| {
1481            (source_backfill_actor as _, source_actor as _)
1482        })
1483        .collect())
1484    }
1485
1486    /// Get and filter the "**root**" fragments of the specified jobs.
1487    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1488    ///
1489    /// Root fragment connects to downstream jobs.
1490    ///
1491    /// ## What can be the root fragment
1492    /// - For sink, it should have one `Sink` fragment.
1493    /// - For MV, it should have one `MView` fragment.
1494    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1495    /// - For source, it should have one `Source` fragment.
1496    ///
1497    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1498    pub async fn get_root_fragments(
1499        &self,
1500        job_ids: Vec<JobId>,
1501    ) -> MetaResult<(
1502        HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1503        HashMap<ActorId, WorkerId>,
1504    )> {
1505        let inner = self.inner.read().await;
1506
1507        let all_fragments = FragmentModel::find()
1508            .filter(fragment::Column::JobId.is_in(job_ids))
1509            .all(&inner.db)
1510            .await?;
1511        // job_id -> fragment
1512        let mut root_fragments = HashMap::<JobId, fragment::Model>::new();
1513        for fragment in all_fragments {
1514            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1515            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1516                _ = root_fragments.insert(fragment.job_id, fragment);
1517            } else if mask.contains(FragmentTypeFlag::Source) {
1518                // look for Source fragment only if there's no MView fragment
1519                // (notice try_insert here vs insert above)
1520                _ = root_fragments.try_insert(fragment.job_id, fragment);
1521            }
1522        }
1523
1524        let mut root_fragments_pb = HashMap::new();
1525
1526        let info = self.env.shared_actor_infos().read_guard();
1527
1528        let root_fragment_to_jobs: HashMap<_, _> = root_fragments
1529            .iter()
1530            .map(|(job_id, fragment)| (fragment.fragment_id as u32, *job_id))
1531            .collect();
1532
1533        for fragment in root_fragment_to_jobs.keys() {
1534            let fragment_info = info.get_fragment(*fragment).context(format!(
1535                "fragment {} not found in shared actor info",
1536                fragment
1537            ))?;
1538
1539            let job_id = root_fragment_to_jobs[&fragment_info.fragment_id];
1540            let fragment = root_fragments
1541                .get(&job_id)
1542                .context(format!("root fragment for job {} not found", job_id))?;
1543
1544            root_fragments_pb.insert(
1545                job_id,
1546                (fragment_info.clone(), fragment.stream_node.to_protobuf()),
1547            );
1548        }
1549
1550        let mut all_actor_locations = HashMap::new();
1551
1552        for (_, SharedFragmentInfo { actors, .. }) in info.iter_over_fragments() {
1553            for (actor_id, actor_info) in actors {
1554                all_actor_locations.insert(*actor_id as ActorId, actor_info.worker_id);
1555            }
1556        }
1557
1558        Ok((root_fragments_pb, all_actor_locations))
1559    }
1560
1561    pub async fn get_root_fragment(
1562        &self,
1563        job_id: JobId,
1564    ) -> MetaResult<(SharedFragmentInfo, HashMap<ActorId, WorkerId>)> {
1565        let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1566        let (root_fragment, _) = root_fragments
1567            .remove(&job_id)
1568            .context(format!("root fragment for job {} not found", job_id))?;
1569
1570        Ok((root_fragment, actors))
1571    }
1572
1573    /// Get the downstream fragments connected to the specified job.
1574    pub async fn get_downstream_fragments(
1575        &self,
1576        job_id: JobId,
1577    ) -> MetaResult<(
1578        Vec<(
1579            stream_plan::DispatcherType,
1580            SharedFragmentInfo,
1581            PbStreamNode,
1582        )>,
1583        HashMap<ActorId, WorkerId>,
1584    )> {
1585        let (root_fragment, actor_locations) = self.get_root_fragment(job_id).await?;
1586
1587        let inner = self.inner.read().await;
1588        let txn = inner.db.begin().await?;
1589        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1590            .filter(
1591                fragment_relation::Column::SourceFragmentId
1592                    .eq(root_fragment.fragment_id as FragmentId),
1593            )
1594            .all(&txn)
1595            .await?;
1596
1597        let downstream_fragment_ids = downstream_fragment_relations
1598            .iter()
1599            .map(|model| model.target_fragment_id as FragmentId)
1600            .collect::<HashSet<_>>();
1601
1602        let downstream_fragment_nodes: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1603            .select_only()
1604            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1605            .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1606            .into_tuple()
1607            .all(&txn)
1608            .await?;
1609
1610        let downstream_fragment_nodes: HashMap<_, _> = downstream_fragment_nodes
1611            .into_iter()
1612            .map(|(id, node)| (id as u32, node))
1613            .collect();
1614
1615        let mut downstream_fragments = vec![];
1616
1617        let info = self.env.shared_actor_infos().read_guard();
1618
1619        let fragment_map: HashMap<_, _> = downstream_fragment_relations
1620            .iter()
1621            .map(|model| (model.target_fragment_id as u32, model.dispatcher_type))
1622            .collect();
1623
1624        for fragment_id in fragment_map.keys() {
1625            let fragment_info @ SharedFragmentInfo { actors, .. } =
1626                info.get_fragment(*fragment_id).unwrap();
1627
1628            let dispatcher_type = fragment_map[fragment_id];
1629
1630            if actors.is_empty() {
1631                bail!("No fragment found for fragment id {}", fragment_id);
1632            }
1633
1634            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1635
1636            let nodes = downstream_fragment_nodes
1637                .get(fragment_id)
1638                .context(format!(
1639                    "downstream fragment node for id {} not found",
1640                    fragment_id
1641                ))?
1642                .to_protobuf();
1643
1644            downstream_fragments.push((dispatch_type, fragment_info.clone(), nodes));
1645        }
1646        Ok((downstream_fragments, actor_locations))
1647    }
1648
1649    pub async fn load_source_fragment_ids(
1650        &self,
1651    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1652        let inner = self.inner.read().await;
1653        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1654            .select_only()
1655            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1656            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1657            .into_tuple()
1658            .all(&inner.db)
1659            .await?;
1660
1661        let mut source_fragment_ids = HashMap::new();
1662        for (fragment_id, stream_node) in fragments {
1663            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1664                source_fragment_ids
1665                    .entry(source_id as SourceId)
1666                    .or_insert_with(BTreeSet::new)
1667                    .insert(fragment_id);
1668            }
1669        }
1670        Ok(source_fragment_ids)
1671    }
1672
1673    pub async fn load_backfill_fragment_ids(
1674        &self,
1675    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1676        let inner = self.inner.read().await;
1677        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1678            .select_only()
1679            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1680            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1681            .into_tuple()
1682            .all(&inner.db)
1683            .await?;
1684
1685        let mut source_fragment_ids = HashMap::new();
1686        for (fragment_id, stream_node) in fragments {
1687            if let Some((source_id, upstream_source_fragment_id)) =
1688                stream_node.to_protobuf().find_source_backfill()
1689            {
1690                source_fragment_ids
1691                    .entry(source_id as SourceId)
1692                    .or_insert_with(BTreeSet::new)
1693                    .insert((fragment_id, upstream_source_fragment_id));
1694            }
1695        }
1696        Ok(source_fragment_ids)
1697    }
1698
1699    pub async fn get_all_upstream_sink_infos(
1700        &self,
1701        target_table: &PbTable,
1702        target_fragment_id: FragmentId,
1703    ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1704        let incoming_sinks = self
1705            .get_table_incoming_sinks(TableId::new(target_table.id))
1706            .await?;
1707
1708        let inner = self.inner.read().await;
1709        let txn = inner.db.begin().await?;
1710
1711        let sink_ids = incoming_sinks.iter().map(|s| s.id as SinkId).collect_vec();
1712        let sink_fragment_ids = get_sink_fragment_by_ids(&txn, sink_ids).await?;
1713
1714        let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1715        for pb_sink in &incoming_sinks {
1716            let sink_fragment_id =
1717                sink_fragment_ids
1718                    .get(&(pb_sink.id as _))
1719                    .cloned()
1720                    .ok_or(anyhow::anyhow!(
1721                        "sink fragment not found for sink id {}",
1722                        pb_sink.id
1723                    ))?;
1724            let upstream_info = build_upstream_sink_info(
1725                pb_sink,
1726                sink_fragment_id,
1727                target_table,
1728                target_fragment_id,
1729            )?;
1730            upstream_sink_infos.push(upstream_info);
1731        }
1732
1733        Ok(upstream_sink_infos)
1734    }
1735
1736    pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1737        let inner = self.inner.read().await;
1738        let txn = inner.db.begin().await?;
1739
1740        let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1741            .select_only()
1742            .column(fragment::Column::FragmentId)
1743            .filter(
1744                fragment::Column::JobId
1745                    .eq(job_id)
1746                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1747            )
1748            .into_tuple()
1749            .all(&txn)
1750            .await?;
1751
1752        if mview_fragment.len() != 1 {
1753            return Err(anyhow::anyhow!(
1754                "expected exactly one mview fragment for job {}, found {}",
1755                job_id,
1756                mview_fragment.len()
1757            )
1758            .into());
1759        }
1760
1761        Ok(mview_fragment.into_iter().next().unwrap())
1762    }
1763
1764    pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1765        let inner = self.inner.read().await;
1766        let txn = inner.db.begin().await?;
1767        has_table_been_migrated(&txn, table_id).await
1768    }
1769
1770    pub async fn update_fragment_splits<C>(
1771        &self,
1772        txn: &C,
1773        fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
1774    ) -> MetaResult<()>
1775    where
1776        C: ConnectionTrait,
1777    {
1778        if fragment_splits.is_empty() {
1779            return Ok(());
1780        }
1781
1782        let models: Vec<fragment_splits::ActiveModel> = fragment_splits
1783            .iter()
1784            .map(|(fragment_id, splits)| fragment_splits::ActiveModel {
1785                fragment_id: Set(*fragment_id as _),
1786                splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1787                    splits: splits.iter().map(Into::into).collect_vec(),
1788                }))),
1789            })
1790            .collect();
1791
1792        FragmentSplits::insert_many(models)
1793            .on_conflict(
1794                OnConflict::column(fragment_splits::Column::FragmentId)
1795                    .update_column(fragment_splits::Column::Splits)
1796                    .to_owned(),
1797            )
1798            .exec(txn)
1799            .await?;
1800
1801        Ok(())
1802    }
1803}
1804
1805#[cfg(test)]
1806mod tests {
1807    use std::collections::{BTreeMap, HashMap, HashSet};
1808
1809    use itertools::Itertools;
1810    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1811    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1812    use risingwave_common::id::JobId;
1813    use risingwave_common::util::iter_util::ZipEqDebug;
1814    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1815    use risingwave_meta_model::fragment::DistributionType;
1816    use risingwave_meta_model::{
1817        ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, StreamNode,
1818        StreamingParallelism, TableId, VnodeBitmap, fragment,
1819    };
1820    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1821    use risingwave_pb::plan_common::PbExprContext;
1822    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1823    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1824    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1825
1826    use super::ActorInfo;
1827    use crate::MetaResult;
1828    use crate::controller::catalog::CatalogController;
1829    use crate::model::{Fragment, StreamActor};
1830
1831    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1832
1833    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1834
1835    const TEST_FRAGMENT_ID: FragmentId = 1;
1836
1837    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1838
1839    const TEST_JOB_ID: JobId = JobId::new(1);
1840
1841    const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
1842
1843    fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1844        let mut upstream_actor_ids = BTreeMap::new();
1845        upstream_actor_ids.insert(
1846            TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1847            HashSet::from_iter([(actor_id + 100)]),
1848        );
1849        upstream_actor_ids.insert(
1850            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1851            HashSet::from_iter([(actor_id + 200)]),
1852        );
1853        upstream_actor_ids
1854    }
1855
1856    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1857        let mut input = vec![];
1858        for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1859            input.push(PbStreamNode {
1860                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1861                    upstream_fragment_id: *upstream_fragment_id as _,
1862                    ..Default::default()
1863                }))),
1864                ..Default::default()
1865            });
1866        }
1867
1868        PbStreamNode {
1869            input,
1870            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1871            ..Default::default()
1872        }
1873    }
1874
1875    #[tokio::test]
1876    async fn test_extract_fragment() -> MetaResult<()> {
1877        let actor_count = 3u32;
1878        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1879            .map(|actor_id| {
1880                (
1881                    actor_id as _,
1882                    generate_upstream_actor_ids_for_actor(actor_id),
1883                )
1884            })
1885            .collect();
1886
1887        let actor_bitmaps = ActorMapping::new_uniform(
1888            (0..actor_count).map(|i| i as _),
1889            VirtualNode::COUNT_FOR_TEST,
1890        )
1891        .to_bitmaps();
1892
1893        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1894
1895        let pb_actors = (0..actor_count)
1896            .map(|actor_id| StreamActor {
1897                actor_id: actor_id as _,
1898                fragment_id: TEST_FRAGMENT_ID as _,
1899                vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1900                mview_definition: "".to_owned(),
1901                expr_context: Some(PbExprContext {
1902                    time_zone: String::from("America/New_York"),
1903                    strict_mode: false,
1904                }),
1905            })
1906            .collect_vec();
1907
1908        let pb_fragment = Fragment {
1909            fragment_id: TEST_FRAGMENT_ID as _,
1910            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1911            distribution_type: PbFragmentDistributionType::Hash as _,
1912            actors: pb_actors,
1913            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1914            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1915            nodes: stream_node,
1916        };
1917
1918        let fragment =
1919            CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
1920
1921        check_fragment(fragment, pb_fragment);
1922
1923        Ok(())
1924    }
1925
1926    #[tokio::test]
1927    async fn test_compose_fragment() -> MetaResult<()> {
1928        let actor_count = 3u32;
1929
1930        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1931            .map(|actor_id| {
1932                (
1933                    actor_id as _,
1934                    generate_upstream_actor_ids_for_actor(actor_id),
1935                )
1936            })
1937            .collect();
1938
1939        let mut actor_bitmaps = ActorMapping::new_uniform(
1940            (0..actor_count).map(|i| i as _),
1941            VirtualNode::COUNT_FOR_TEST,
1942        )
1943        .to_bitmaps();
1944
1945        let actors = (0..actor_count)
1946            .map(|actor_id| {
1947                let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
1948                    splits: vec![PbConnectorSplit {
1949                        split_type: "dummy".to_owned(),
1950                        ..Default::default()
1951                    }],
1952                });
1953
1954                ActorInfo {
1955                    actor_id: actor_id as ActorId,
1956                    fragment_id: TEST_FRAGMENT_ID,
1957                    splits: actor_splits,
1958                    worker_id: 0,
1959                    vnode_bitmap: actor_bitmaps
1960                        .remove(&actor_id)
1961                        .map(|bitmap| bitmap.to_protobuf())
1962                        .as_ref()
1963                        .map(VnodeBitmap::from),
1964                    expr_context: ExprContext::from(&PbExprContext {
1965                        time_zone: String::from("America/New_York"),
1966                        strict_mode: false,
1967                    }),
1968                }
1969            })
1970            .collect_vec();
1971
1972        let stream_node = {
1973            let template_actor = actors.first().cloned().unwrap();
1974
1975            let template_upstream_actor_ids = upstream_actor_ids
1976                .get(&(template_actor.actor_id as _))
1977                .unwrap();
1978
1979            generate_merger_stream_node(template_upstream_actor_ids)
1980        };
1981
1982        #[expect(deprecated)]
1983        let fragment = fragment::Model {
1984            fragment_id: TEST_FRAGMENT_ID,
1985            job_id: TEST_JOB_ID,
1986            fragment_type_mask: 0,
1987            distribution_type: DistributionType::Hash,
1988            stream_node: StreamNode::from(&stream_node),
1989            state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID.as_raw_id() as _]),
1990            upstream_fragment_id: Default::default(),
1991            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1992            parallelism: None,
1993        };
1994
1995        let (pb_fragment, pb_actor_status, pb_actor_splits) =
1996            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
1997
1998        assert_eq!(pb_actor_status.len(), actor_count as usize);
1999        assert!(
2000            pb_actor_status
2001                .values()
2002                .all(|actor_status| actor_status.location.is_some())
2003        );
2004        assert_eq!(pb_actor_splits.len(), actor_count as usize);
2005
2006        let pb_actors = pb_fragment.actors.clone();
2007
2008        check_fragment(fragment, pb_fragment);
2009        check_actors(
2010            actors,
2011            &upstream_actor_ids,
2012            pb_actors,
2013            pb_actor_splits,
2014            &stream_node,
2015        );
2016
2017        Ok(())
2018    }
2019
2020    fn check_actors(
2021        actors: Vec<ActorInfo>,
2022        actor_upstreams: &FragmentActorUpstreams,
2023        pb_actors: Vec<StreamActor>,
2024        pb_actor_splits: HashMap<u32, PbConnectorSplits>,
2025        stream_node: &PbStreamNode,
2026    ) {
2027        for (
2028            ActorInfo {
2029                actor_id,
2030                fragment_id,
2031                splits,
2032                worker_id: _,
2033                vnode_bitmap,
2034                expr_context,
2035                ..
2036            },
2037            StreamActor {
2038                actor_id: pb_actor_id,
2039                fragment_id: pb_fragment_id,
2040                vnode_bitmap: pb_vnode_bitmap,
2041                mview_definition,
2042                expr_context: pb_expr_context,
2043                ..
2044            },
2045        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2046        {
2047            assert_eq!(actor_id, pb_actor_id as ActorId);
2048            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2049
2050            assert_eq!(
2051                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2052                pb_vnode_bitmap,
2053            );
2054
2055            assert_eq!(mview_definition, "");
2056
2057            visit_stream_node_body(stream_node, |body| {
2058                if let PbNodeBody::Merge(m) = body {
2059                    assert!(
2060                        actor_upstreams
2061                            .get(&(actor_id as _))
2062                            .unwrap()
2063                            .contains_key(&m.upstream_fragment_id)
2064                    );
2065                }
2066            });
2067
2068            assert_eq!(
2069                splits,
2070                pb_actor_splits
2071                    .get(&pb_actor_id)
2072                    .map(ConnectorSplits::from)
2073                    .unwrap_or_default()
2074            );
2075
2076            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2077        }
2078    }
2079
2080    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2081        let Fragment {
2082            fragment_id,
2083            fragment_type_mask,
2084            distribution_type: pb_distribution_type,
2085            actors: _,
2086            state_table_ids: pb_state_table_ids,
2087            maybe_vnode_count: _,
2088            nodes,
2089        } = pb_fragment;
2090
2091        assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2092        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2093        assert_eq!(
2094            pb_distribution_type,
2095            PbFragmentDistributionType::from(fragment.distribution_type)
2096        );
2097
2098        assert_eq!(
2099            pb_state_table_ids,
2100            fragment
2101                .state_table_ids
2102                .into_u32_array()
2103                .into_iter()
2104                .map_into()
2105                .collect_vec()
2106        );
2107        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2108    }
2109
2110    #[test]
2111    fn test_parallelism_policy_with_root_fragments() {
2112        #[expect(deprecated)]
2113        let fragment = fragment::Model {
2114            fragment_id: 3,
2115            job_id: TEST_JOB_ID,
2116            fragment_type_mask: 0,
2117            distribution_type: DistributionType::Hash,
2118            stream_node: StreamNode::from(&PbStreamNode::default()),
2119            state_table_ids: I32Array::default(),
2120            upstream_fragment_id: Default::default(),
2121            vnode_count: 0,
2122            parallelism: None,
2123        };
2124
2125        let job_parallelism = StreamingParallelism::Fixed(4);
2126
2127        let policy = super::CatalogController::format_fragment_parallelism_policy(
2128            &fragment,
2129            Some(&job_parallelism),
2130            &[],
2131        );
2132
2133        assert_eq!(policy, "inherit(fixed(4))");
2134    }
2135
2136    #[test]
2137    fn test_parallelism_policy_with_upstream_roots() {
2138        #[expect(deprecated)]
2139        let fragment = fragment::Model {
2140            fragment_id: 5,
2141            job_id: TEST_JOB_ID,
2142            fragment_type_mask: 0,
2143            distribution_type: DistributionType::Hash,
2144            stream_node: StreamNode::from(&PbStreamNode::default()),
2145            state_table_ids: I32Array::default(),
2146            upstream_fragment_id: Default::default(),
2147            vnode_count: 0,
2148            parallelism: None,
2149        };
2150
2151        let policy = super::CatalogController::format_fragment_parallelism_policy(
2152            &fragment,
2153            None,
2154            &[3, 1, 2, 1],
2155        );
2156
2157        assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2158    }
2159}