risingwave_meta/controller/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::CdcScanOptions;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34    Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39    VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink,
40    source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46    Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::fragment::{
49    FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan;
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57    PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, StreamScanType,
58};
59use sea_orm::ActiveValue::Set;
60use sea_orm::sea_query::Expr;
61use sea_orm::{
62    ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, JoinType, PaginatorTrait,
63    QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
64};
65use serde::{Deserialize, Serialize};
66
67use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
68use crate::controller::catalog::CatalogController;
69use crate::controller::scale::{
70    FragmentRenderMap, NoShuffleEnsemble, find_fragment_no_shuffle_dags_detailed,
71    load_fragment_info, resolve_streaming_job_definition,
72};
73use crate::controller::utils::{
74    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
75    get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
76    resolve_no_shuffle_actor_dispatcher,
77};
78use crate::error::MetaError;
79use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
80use crate::model::{
81    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
82    StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
83    TableParallelism,
84};
85use crate::rpc::ddl_controller::build_upstream_sink_info;
86use crate::stream::UpstreamSinkInfo;
87use crate::{MetaResult, model};
88
89/// Some information of running (inflight) actors.
90#[derive(Debug)]
91pub struct InflightActorInfo {
92    pub worker_id: WorkerId,
93    pub vnode_bitmap: Option<Bitmap>,
94    pub splits: Vec<SplitImpl>,
95}
96
97#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
98struct ActorInfo {
99    pub actor_id: ActorId,
100    pub fragment_id: FragmentId,
101    pub splits: ConnectorSplits,
102    pub worker_id: WorkerId,
103    pub vnode_bitmap: Option<VnodeBitmap>,
104    pub expr_context: ExprContext,
105    pub config_override: Arc<str>,
106}
107
108#[derive(Debug)]
109pub struct InflightFragmentInfo {
110    pub fragment_id: FragmentId,
111    pub distribution_type: DistributionType,
112    pub fragment_type_mask: FragmentTypeMask,
113    pub vnode_count: usize,
114    pub nodes: PbStreamNode,
115    pub actors: HashMap<ActorId, InflightActorInfo>,
116    pub state_table_ids: HashSet<TableId>,
117}
118
119#[derive(Clone, Debug)]
120pub struct FragmentParallelismInfo {
121    pub distribution_type: FragmentDistributionType,
122    pub actor_count: usize,
123    pub vnode_count: usize,
124}
125
126#[easy_ext::ext(FragmentTypeMaskExt)]
127pub impl FragmentTypeMask {
128    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
129        Expr::col(fragment::Column::FragmentTypeMask)
130            .bit_and(Expr::value(flag as i32))
131            .ne(0)
132    }
133
134    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
135        Expr::col(fragment::Column::FragmentTypeMask)
136            .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
137            .ne(0)
138    }
139
140    fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
141        Expr::col(fragment::Column::FragmentTypeMask)
142            .bit_and(Expr::value(flag as i32))
143            .eq(0)
144    }
145}
146
147#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
148#[serde(rename_all = "camelCase")] // for dashboard
149pub struct StreamingJobInfo {
150    pub job_id: JobId,
151    pub obj_type: ObjectType,
152    pub name: String,
153    pub job_status: JobStatus,
154    pub parallelism: StreamingParallelism,
155    pub max_parallelism: i32,
156    pub resource_group: String,
157    pub config_override: String,
158    pub database_id: DatabaseId,
159    pub schema_id: SchemaId,
160}
161
162impl NotificationManager {
163    pub(crate) fn notify_fragment_mapping(
164        &self,
165        operation: NotificationOperation,
166        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
167    ) {
168        let fragment_ids = fragment_mappings
169            .iter()
170            .map(|mapping| mapping.fragment_id)
171            .collect_vec();
172        if fragment_ids.is_empty() {
173            return;
174        }
175        // notify all fragment mappings to frontend.
176        for fragment_mapping in fragment_mappings {
177            self.notify_frontend_without_version(
178                operation,
179                NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
180            );
181        }
182
183        // update serving vnode mappings.
184        match operation {
185            NotificationOperation::Add | NotificationOperation::Update => {
186                self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
187                    fragment_ids,
188                ));
189            }
190            NotificationOperation::Delete => {
191                self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
192                    fragment_ids,
193                ));
194            }
195            op => {
196                tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
197            }
198        }
199    }
200}
201
202impl CatalogController {
203    pub fn prepare_fragment_models_from_fragments(
204        job_id: JobId,
205        fragments: impl Iterator<Item = &Fragment>,
206    ) -> MetaResult<Vec<fragment::Model>> {
207        fragments
208            .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
209            .try_collect()
210    }
211
212    pub fn prepare_fragment_model_for_new_job(
213        job_id: JobId,
214        fragment: &Fragment,
215    ) -> MetaResult<fragment::Model> {
216        let vnode_count = fragment.vnode_count();
217        let Fragment {
218            fragment_id: pb_fragment_id,
219            fragment_type_mask: pb_fragment_type_mask,
220            distribution_type: pb_distribution_type,
221            actors: pb_actors,
222            state_table_ids: pb_state_table_ids,
223            nodes,
224            ..
225        } = fragment;
226
227        let state_table_ids = pb_state_table_ids.clone().into();
228
229        assert!(!pb_actors.is_empty());
230
231        let fragment_parallelism = nodes
232            .node_body
233            .as_ref()
234            .and_then(|body| match body {
235                NodeBody::StreamCdcScan(node) => Some(node),
236                _ => None,
237            })
238            .and_then(|node| node.options.as_ref())
239            .map(CdcScanOptions::from_proto)
240            .filter(|opts| opts.is_parallelized_backfill())
241            .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
242
243        let stream_node = StreamNode::from(nodes);
244
245        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
246            .unwrap()
247            .into();
248
249        #[expect(deprecated)]
250        let fragment = fragment::Model {
251            fragment_id: *pb_fragment_id as _,
252            job_id,
253            fragment_type_mask: (*pb_fragment_type_mask).into(),
254            distribution_type,
255            stream_node,
256            state_table_ids,
257            upstream_fragment_id: Default::default(),
258            vnode_count: vnode_count as _,
259            parallelism: fragment_parallelism,
260        };
261
262        Ok(fragment)
263    }
264
265    fn compose_table_fragments(
266        job_id: JobId,
267        state: PbState,
268        ctx: StreamContext,
269        fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
270        parallelism: StreamingParallelism,
271        max_parallelism: usize,
272        job_definition: Option<String>,
273    ) -> MetaResult<StreamJobFragments> {
274        let mut pb_fragments = BTreeMap::new();
275        let mut pb_actor_status = BTreeMap::new();
276
277        for (fragment, actors) in fragments {
278            let (fragment, fragment_actor_status, _) =
279                Self::compose_fragment(fragment, actors, job_definition.clone())?;
280
281            pb_fragments.insert(fragment.fragment_id, fragment);
282            pb_actor_status.extend(fragment_actor_status.into_iter());
283        }
284
285        let table_fragments = StreamJobFragments {
286            stream_job_id: job_id,
287            state: state as _,
288            fragments: pb_fragments,
289            actor_status: pb_actor_status,
290            ctx,
291            assigned_parallelism: match parallelism {
292                StreamingParallelism::Custom => TableParallelism::Custom,
293                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
294                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
295            },
296            max_parallelism,
297        };
298
299        Ok(table_fragments)
300    }
301
302    #[allow(clippy::type_complexity)]
303    fn compose_fragment(
304        fragment: fragment::Model,
305        actors: Vec<ActorInfo>,
306        job_definition: Option<String>,
307    ) -> MetaResult<(
308        Fragment,
309        HashMap<crate::model::ActorId, PbActorStatus>,
310        HashMap<crate::model::ActorId, PbConnectorSplits>,
311    )> {
312        let fragment::Model {
313            fragment_id,
314            fragment_type_mask,
315            distribution_type,
316            stream_node,
317            state_table_ids,
318            vnode_count,
319            ..
320        } = fragment;
321
322        let stream_node = stream_node.to_protobuf();
323        let mut upstream_fragments = HashSet::new();
324        visit_stream_node_body(&stream_node, |body| {
325            if let NodeBody::Merge(m) = body {
326                assert!(
327                    upstream_fragments.insert(m.upstream_fragment_id),
328                    "non-duplicate upstream fragment"
329                );
330            }
331        });
332
333        let mut pb_actors = vec![];
334
335        let mut pb_actor_status = HashMap::new();
336        let mut pb_actor_splits = HashMap::new();
337
338        for actor in actors {
339            if actor.fragment_id != fragment_id {
340                bail!(
341                    "fragment id {} from actor {} is different from fragment {}",
342                    actor.fragment_id,
343                    actor.actor_id,
344                    fragment_id
345                )
346            }
347
348            let ActorInfo {
349                actor_id,
350                fragment_id,
351                worker_id,
352                splits,
353                vnode_bitmap,
354                expr_context,
355                config_override,
356                ..
357            } = actor;
358
359            let vnode_bitmap =
360                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
361            let pb_expr_context = Some(expr_context.to_protobuf());
362
363            pb_actor_status.insert(
364                actor_id as _,
365                PbActorStatus {
366                    location: PbActorLocation::from_worker(worker_id),
367                },
368            );
369
370            pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
371
372            pb_actors.push(StreamActor {
373                actor_id: actor_id as _,
374                fragment_id: fragment_id as _,
375                vnode_bitmap,
376                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
377                expr_context: pb_expr_context,
378                config_override,
379            })
380        }
381
382        let pb_state_table_ids = state_table_ids.0;
383        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
384        let pb_fragment = Fragment {
385            fragment_id: fragment_id as _,
386            fragment_type_mask: fragment_type_mask.into(),
387            distribution_type: pb_distribution_type,
388            actors: pb_actors,
389            state_table_ids: pb_state_table_ids,
390            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
391            nodes: stream_node,
392        };
393
394        Ok((pb_fragment, pb_actor_status, pb_actor_splits))
395    }
396
397    pub fn running_fragment_parallelisms(
398        &self,
399        id_filter: Option<HashSet<FragmentId>>,
400    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
401        let info = self.env.shared_actor_infos().read_guard();
402
403        let mut result = HashMap::new();
404        for (fragment_id, fragment) in info.iter_over_fragments() {
405            if let Some(id_filter) = &id_filter
406                && !id_filter.contains(&(*fragment_id as _))
407            {
408                continue; // Skip fragments not in the filter
409            }
410
411            result.insert(
412                *fragment_id as _,
413                FragmentParallelismInfo {
414                    distribution_type: fragment.distribution_type.into(),
415                    actor_count: fragment.actors.len() as _,
416                    vnode_count: fragment.vnode_count,
417                },
418            );
419        }
420
421        Ok(result)
422    }
423
424    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
425        let inner = self.inner.read().await;
426        let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
427            .select_only()
428            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
429            .into_tuple()
430            .all(&inner.db)
431            .await?;
432        Ok(fragment_jobs.into_iter().collect())
433    }
434
435    pub async fn get_fragment_job_id(
436        &self,
437        fragment_ids: Vec<FragmentId>,
438    ) -> MetaResult<Vec<ObjectId>> {
439        let inner = self.inner.read().await;
440
441        let object_ids: Vec<ObjectId> = FragmentModel::find()
442            .select_only()
443            .column(fragment::Column::JobId)
444            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
445            .into_tuple()
446            .all(&inner.db)
447            .await?;
448
449        Ok(object_ids)
450    }
451
452    pub async fn get_fragment_desc_by_id(
453        &self,
454        fragment_id: FragmentId,
455    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
456        let inner = self.inner.read().await;
457
458        let fragment_model = match FragmentModel::find_by_id(fragment_id)
459            .one(&inner.db)
460            .await?
461        {
462            Some(fragment) => fragment,
463            None => return Ok(None),
464        };
465
466        let job_parallelism: Option<StreamingParallelism> =
467            StreamingJob::find_by_id(fragment_model.job_id)
468                .select_only()
469                .column(streaming_job::Column::Parallelism)
470                .into_tuple()
471                .one(&inner.db)
472                .await?;
473
474        let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
475            .select_only()
476            .columns([
477                fragment_relation::Column::SourceFragmentId,
478                fragment_relation::Column::DispatcherType,
479            ])
480            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
481            .into_tuple()
482            .all(&inner.db)
483            .await?;
484
485        let upstreams: Vec<_> = upstream_entries
486            .into_iter()
487            .map(|(source_id, _)| source_id)
488            .collect();
489
490        let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
491            .await
492            .map(Self::collect_root_fragment_mapping)?;
493        let root_fragments = root_fragment_map
494            .get(&fragment_id)
495            .cloned()
496            .unwrap_or_default();
497
498        let info = self.env.shared_actor_infos().read_guard();
499        let SharedFragmentInfo { actors, .. } = info
500            .get_fragment(fragment_model.fragment_id as _)
501            .unwrap_or_else(|| {
502                panic!(
503                    "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
504                    fragment_model.fragment_id,
505                    fragment_model.job_id
506                )
507            });
508
509        let parallelism_policy = Self::format_fragment_parallelism_policy(
510            &fragment_model,
511            job_parallelism.as_ref(),
512            &root_fragments,
513        );
514
515        let fragment = FragmentDesc {
516            fragment_id: fragment_model.fragment_id,
517            job_id: fragment_model.job_id,
518            fragment_type_mask: fragment_model.fragment_type_mask,
519            distribution_type: fragment_model.distribution_type,
520            state_table_ids: fragment_model.state_table_ids.clone(),
521            parallelism: actors.len() as _,
522            vnode_count: fragment_model.vnode_count,
523            stream_node: fragment_model.stream_node.clone(),
524            parallelism_policy,
525        };
526
527        Ok(Some((fragment, upstreams)))
528    }
529
530    pub async fn list_fragment_database_ids(
531        &self,
532        select_fragment_ids: Option<Vec<FragmentId>>,
533    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
534        let inner = self.inner.read().await;
535        let select = FragmentModel::find()
536            .select_only()
537            .column(fragment::Column::FragmentId)
538            .column(object::Column::DatabaseId)
539            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
540        let select = if let Some(select_fragment_ids) = select_fragment_ids {
541            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
542        } else {
543            select
544        };
545        Ok(select.into_tuple().all(&inner.db).await?)
546    }
547
548    pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
549        let inner = self.inner.read().await;
550
551        // Load fragments matching the job from the database
552        let fragments: Vec<_> = FragmentModel::find()
553            .filter(fragment::Column::JobId.eq(job_id))
554            .all(&inner.db)
555            .await?;
556
557        let job_info = StreamingJob::find_by_id(job_id)
558            .one(&inner.db)
559            .await?
560            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
561
562        let fragment_actors =
563            self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
564
565        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
566            .await?
567            .remove(&job_id);
568
569        Self::compose_table_fragments(
570            job_id,
571            job_info.job_status.into(),
572            job_info.stream_context(),
573            fragment_actors,
574            job_info.parallelism.clone(),
575            job_info.max_parallelism as _,
576            job_definition,
577        )
578    }
579
580    pub async fn get_fragment_actor_dispatchers(
581        &self,
582        fragment_ids: Vec<FragmentId>,
583    ) -> MetaResult<FragmentActorDispatchers> {
584        let inner = self.inner.read().await;
585
586        self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
587            .await
588    }
589
590    pub async fn get_fragment_actor_dispatchers_txn(
591        &self,
592        c: &impl ConnectionTrait,
593        fragment_ids: Vec<FragmentId>,
594    ) -> MetaResult<FragmentActorDispatchers> {
595        let fragment_relations = FragmentRelation::find()
596            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
597            .all(c)
598            .await?;
599
600        type FragmentActorInfo = (
601            DistributionType,
602            Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
603        );
604
605        let shared_info = self.env.shared_actor_infos();
606        let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
607        let get_fragment_actors = |fragment_id: FragmentId| async move {
608            let result: MetaResult<FragmentActorInfo> = try {
609                let read_guard = shared_info.read_guard();
610
611                let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
612
613                (
614                    fragment.distribution_type,
615                    Arc::new(
616                        fragment
617                            .actors
618                            .iter()
619                            .map(|(actor_id, actor_info)| {
620                                (
621                                    *actor_id,
622                                    actor_info
623                                        .vnode_bitmap
624                                        .as_ref()
625                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
626                                )
627                            })
628                            .collect(),
629                    ),
630                )
631            };
632            result
633        };
634
635        let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
636        for fragment_relation::Model {
637            source_fragment_id,
638            target_fragment_id,
639            dispatcher_type,
640            dist_key_indices,
641            output_indices,
642            output_type_mapping,
643        } in fragment_relations
644        {
645            let (source_fragment_distribution, source_fragment_actors) = {
646                let (distribution, actors) = {
647                    match fragment_actor_cache.entry(source_fragment_id) {
648                        Entry::Occupied(entry) => entry.into_mut(),
649                        Entry::Vacant(entry) => {
650                            entry.insert(get_fragment_actors(source_fragment_id).await?)
651                        }
652                    }
653                };
654                (*distribution, actors.clone())
655            };
656            let (target_fragment_distribution, target_fragment_actors) = {
657                let (distribution, actors) = {
658                    match fragment_actor_cache.entry(target_fragment_id) {
659                        Entry::Occupied(entry) => entry.into_mut(),
660                        Entry::Vacant(entry) => {
661                            entry.insert(get_fragment_actors(target_fragment_id).await?)
662                        }
663                    }
664                };
665                (*distribution, actors.clone())
666            };
667            let output_mapping = PbDispatchOutputMapping {
668                indices: output_indices.into_u32_array(),
669                types: output_type_mapping.unwrap_or_default().to_protobuf(),
670            };
671            let dispatchers = compose_dispatchers(
672                source_fragment_distribution,
673                &source_fragment_actors,
674                target_fragment_id as _,
675                target_fragment_distribution,
676                &target_fragment_actors,
677                dispatcher_type,
678                dist_key_indices.into_u32_array(),
679                output_mapping,
680            );
681            let actor_dispatchers_map = actor_dispatchers_map
682                .entry(source_fragment_id as _)
683                .or_default();
684            for (actor_id, dispatchers) in dispatchers {
685                actor_dispatchers_map
686                    .entry(actor_id as _)
687                    .or_default()
688                    .push(dispatchers);
689            }
690        }
691        Ok(actor_dispatchers_map)
692    }
693
694    pub async fn get_fragment_downstream_relations(
695        &self,
696        fragment_ids: Vec<FragmentId>,
697    ) -> MetaResult<FragmentDownstreamRelation> {
698        let inner = self.inner.read().await;
699        let mut stream = FragmentRelation::find()
700            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
701            .stream(&inner.db)
702            .await?;
703        let mut relations = FragmentDownstreamRelation::new();
704        while let Some(relation) = stream.try_next().await? {
705            relations
706                .entry(relation.source_fragment_id as _)
707                .or_default()
708                .push(DownstreamFragmentRelation {
709                    downstream_fragment_id: relation.target_fragment_id as _,
710                    dispatcher_type: relation.dispatcher_type,
711                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
712                    output_mapping: PbDispatchOutputMapping {
713                        indices: relation.output_indices.into_u32_array(),
714                        types: relation
715                            .output_type_mapping
716                            .unwrap_or_default()
717                            .to_protobuf(),
718                    },
719                });
720        }
721        Ok(relations)
722    }
723
724    pub async fn get_job_fragment_backfill_scan_type(
725        &self,
726        job_id: JobId,
727    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
728        let inner = self.inner.read().await;
729        let fragments: Vec<_> = FragmentModel::find()
730            .filter(fragment::Column::JobId.eq(job_id))
731            .all(&inner.db)
732            .await?;
733
734        let mut result = HashMap::new();
735
736        for fragment::Model {
737            fragment_id,
738            stream_node,
739            ..
740        } in fragments
741        {
742            let stream_node = stream_node.to_protobuf();
743            visit_stream_node_body(&stream_node, |body| {
744                if let NodeBody::StreamScan(node) = body {
745                    match node.stream_scan_type() {
746                        StreamScanType::Unspecified => {}
747                        scan_type => {
748                            result.insert(fragment_id as crate::model::FragmentId, scan_type);
749                        }
750                    }
751                }
752            });
753        }
754
755        Ok(result)
756    }
757
758    pub async fn count_streaming_jobs(&self) -> MetaResult<usize> {
759        let inner = self.inner.read().await;
760        let count = StreamingJob::find().count(&inner.db).await?;
761        Ok(usize::try_from(count).context("streaming job count overflow")?)
762    }
763
764    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
765        let inner = self.inner.read().await;
766        let job_states = StreamingJob::find()
767            .select_only()
768            .column(streaming_job::Column::JobId)
769            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
770            .join(JoinType::InnerJoin, object::Relation::Database2.def())
771            .column(object::Column::ObjType)
772            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
773            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
774            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
775            .column_as(
776                Expr::if_null(
777                    Expr::col((table::Entity, table::Column::Name)),
778                    Expr::if_null(
779                        Expr::col((source::Entity, source::Column::Name)),
780                        Expr::if_null(
781                            Expr::col((sink::Entity, sink::Column::Name)),
782                            Expr::val("<unknown>"),
783                        ),
784                    ),
785                ),
786                "name",
787            )
788            .columns([
789                streaming_job::Column::JobStatus,
790                streaming_job::Column::Parallelism,
791                streaming_job::Column::MaxParallelism,
792            ])
793            .column_as(
794                Expr::if_null(
795                    Expr::col((
796                        streaming_job::Entity,
797                        streaming_job::Column::SpecificResourceGroup,
798                    )),
799                    Expr::col((database::Entity, database::Column::ResourceGroup)),
800                ),
801                "resource_group",
802            )
803            .column_as(
804                Expr::if_null(
805                    Expr::col((streaming_job::Entity, streaming_job::Column::ConfigOverride)),
806                    Expr::val(""),
807                ),
808                "config_override",
809            )
810            .column(object::Column::DatabaseId)
811            .column(object::Column::SchemaId)
812            .into_model()
813            .all(&inner.db)
814            .await?;
815        Ok(job_states)
816    }
817
818    pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
819        let inner = self.inner.read().await;
820        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
821            .select_only()
822            .column(streaming_job::Column::MaxParallelism)
823            .into_tuple()
824            .one(&inner.db)
825            .await?
826            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
827        Ok(max_parallelism as usize)
828    }
829
830    /// Try to get internal table ids of each streaming job, used by metrics collection.
831    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
832        if let Ok(inner) = self.inner.try_read()
833            && let Ok(job_state_tables) = FragmentModel::find()
834                .select_only()
835                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
836                .into_tuple::<(JobId, I32Array)>()
837                .all(&inner.db)
838                .await
839        {
840            let mut job_internal_table_ids = HashMap::new();
841            for (job_id, state_table_ids) in job_state_tables {
842                job_internal_table_ids
843                    .entry(job_id)
844                    .or_insert_with(Vec::new)
845                    .extend(
846                        state_table_ids
847                            .into_inner()
848                            .into_iter()
849                            .map(|table_id| TableId::new(table_id as _)),
850                    );
851            }
852            return Some(job_internal_table_ids.into_iter().collect());
853        }
854        None
855    }
856
857    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
858        let inner = self.inner.read().await;
859        let count = FragmentModel::find().count(&inner.db).await?;
860        Ok(count > 0)
861    }
862
863    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
864        let read_guard = self.env.shared_actor_infos().read_guard();
865        let actor_cnt: HashMap<WorkerId, _> = read_guard
866            .iter_over_fragments()
867            .flat_map(|(_, fragment)| {
868                fragment
869                    .actors
870                    .iter()
871                    .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
872            })
873            .into_group_map()
874            .into_iter()
875            .map(|(k, v)| (k, v.len()))
876            .collect();
877
878        Ok(actor_cnt)
879    }
880
881    fn collect_fragment_actor_map(
882        &self,
883        fragment_ids: &[FragmentId],
884        stream_context: StreamContext,
885    ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
886        let guard = self.env.shared_actor_infos().read_guard();
887        let pb_expr_context = stream_context.to_expr_context();
888        let expr_context: ExprContext = (&pb_expr_context).into();
889
890        let mut actor_map = HashMap::with_capacity(fragment_ids.len());
891        for fragment_id in fragment_ids {
892            let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
893                anyhow!("fragment {} not found in shared actor info", fragment_id)
894            })?;
895
896            let actors = fragment_info
897                .actors
898                .iter()
899                .map(|(actor_id, actor_info)| ActorInfo {
900                    actor_id: *actor_id as _,
901                    fragment_id: *fragment_id,
902                    splits: ConnectorSplits::from(&PbConnectorSplits {
903                        splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
904                    }),
905                    worker_id: actor_info.worker_id as _,
906                    vnode_bitmap: actor_info
907                        .vnode_bitmap
908                        .as_ref()
909                        .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
910                    expr_context: expr_context.clone(),
911                    config_override: stream_context.config_override.clone(),
912                })
913                .collect();
914
915            actor_map.insert(*fragment_id, actors);
916        }
917
918        Ok(actor_map)
919    }
920
921    fn collect_fragment_actor_pairs(
922        &self,
923        fragments: Vec<fragment::Model>,
924        stream_context: StreamContext,
925    ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
926        let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
927        let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
928        fragments
929            .into_iter()
930            .map(|fragment| {
931                let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
932                    anyhow!(
933                        "fragment {} missing in shared actor info map",
934                        fragment.fragment_id
935                    )
936                })?;
937                Ok((fragment, actors))
938            })
939            .collect()
940    }
941
942    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
943    pub async fn table_fragments(&self) -> MetaResult<BTreeMap<JobId, StreamJobFragments>> {
944        let inner = self.inner.read().await;
945        let jobs = StreamingJob::find().all(&inner.db).await?;
946
947        let mut job_definition = resolve_streaming_job_definition(
948            &inner.db,
949            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
950        )
951        .await?;
952
953        let mut table_fragments = BTreeMap::new();
954        for job in jobs {
955            let fragments = FragmentModel::find()
956                .filter(fragment::Column::JobId.eq(job.job_id))
957                .all(&inner.db)
958                .await?;
959
960            let fragment_actors =
961                self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
962
963            table_fragments.insert(
964                job.job_id,
965                Self::compose_table_fragments(
966                    job.job_id,
967                    job.job_status.into(),
968                    job.stream_context(),
969                    fragment_actors,
970                    job.parallelism.clone(),
971                    job.max_parallelism as _,
972                    job_definition.remove(&job.job_id),
973                )?,
974            );
975        }
976
977        Ok(table_fragments)
978    }
979
980    pub async fn upstream_fragments(
981        &self,
982        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
983    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
984        let inner = self.inner.read().await;
985        let mut stream = FragmentRelation::find()
986            .select_only()
987            .columns([
988                fragment_relation::Column::SourceFragmentId,
989                fragment_relation::Column::TargetFragmentId,
990            ])
991            .filter(
992                fragment_relation::Column::TargetFragmentId
993                    .is_in(fragment_ids.map(|id| id as FragmentId)),
994            )
995            .into_tuple::<(FragmentId, FragmentId)>()
996            .stream(&inner.db)
997            .await?;
998        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
999        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
1000            upstream_fragments
1001                .entry(downstream_fragment_id as crate::model::FragmentId)
1002                .or_default()
1003                .insert(upstream_fragment_id as crate::model::FragmentId);
1004        }
1005        Ok(upstream_fragments)
1006    }
1007
1008    pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
1009        let info = self.env.shared_actor_infos().read_guard();
1010
1011        let actor_locations = info
1012            .iter_over_fragments()
1013            .flat_map(|(fragment_id, fragment)| {
1014                fragment
1015                    .actors
1016                    .iter()
1017                    .map(|(actor_id, actor)| PartialActorLocation {
1018                        actor_id: *actor_id as _,
1019                        fragment_id: *fragment_id as _,
1020                        worker_id: actor.worker_id,
1021                    })
1022            })
1023            .collect_vec();
1024
1025        Ok(actor_locations)
1026    }
1027
1028    pub async fn list_actor_info(
1029        &self,
1030    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1031        let inner = self.inner.read().await;
1032
1033        let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1034            FragmentModel::find()
1035                .select_only()
1036                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1037                .column(fragment::Column::FragmentId)
1038                .column_as(object::Column::Oid, "job_id")
1039                .column_as(object::Column::SchemaId, "schema_id")
1040                .column_as(object::Column::ObjType, "type")
1041                .into_tuple()
1042                .all(&inner.db)
1043                .await?;
1044
1045        let actor_infos = {
1046            let info = self.env.shared_actor_infos().read_guard();
1047
1048            let mut result = Vec::new();
1049
1050            for (fragment_id, object_id, schema_id, object_type) in fragment_objects {
1051                let Some(fragment) = info.get_fragment(fragment_id as _) else {
1052                    return Err(MetaError::unavailable(format!(
1053                        "shared actor info missing for fragment {fragment_id} while listing actors"
1054                    )));
1055                };
1056
1057                for actor_id in fragment.actors.keys() {
1058                    result.push((
1059                        *actor_id as _,
1060                        fragment.fragment_id as _,
1061                        object_id,
1062                        schema_id,
1063                        object_type,
1064                    ));
1065                }
1066            }
1067
1068            result
1069        };
1070
1071        Ok(actor_infos)
1072    }
1073
1074    pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1075        let guard = self.env.shared_actor_info.read_guard();
1076        guard
1077            .iter_over_fragments()
1078            .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1079            .collect_vec()
1080    }
1081
1082    pub async fn list_fragment_descs(
1083        &self,
1084        is_creating: bool,
1085    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1086        let inner = self.inner.read().await;
1087
1088        let txn = inner.db.begin().await?;
1089
1090        let fragments: Vec<_> = if is_creating {
1091            FragmentModel::find()
1092                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1093                .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1094                .filter(
1095                    streaming_job::Column::JobStatus
1096                        .eq(JobStatus::Initial)
1097                        .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1098                )
1099                .all(&txn)
1100                .await?
1101        } else {
1102            FragmentModel::find().all(&txn).await?
1103        };
1104
1105        let fragment_ids = fragments.iter().map(|f| f.fragment_id).collect_vec();
1106
1107        let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragments.is_empty() {
1108            HashMap::new()
1109        } else {
1110            let job_ids = fragments.iter().map(|f| f.job_id).unique().collect_vec();
1111            StreamingJob::find()
1112                .select_only()
1113                .columns([
1114                    streaming_job::Column::JobId,
1115                    streaming_job::Column::Parallelism,
1116                ])
1117                .filter(streaming_job::Column::JobId.is_in(job_ids))
1118                .into_tuple()
1119                .all(&txn)
1120                .await?
1121                .into_iter()
1122                .collect()
1123        };
1124
1125        let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1126            if fragment_ids.is_empty() {
1127                Vec::new()
1128            } else {
1129                FragmentRelation::find()
1130                    .select_only()
1131                    .columns([
1132                        fragment_relation::Column::TargetFragmentId,
1133                        fragment_relation::Column::SourceFragmentId,
1134                        fragment_relation::Column::DispatcherType,
1135                    ])
1136                    .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1137                    .into_tuple()
1138                    .all(&txn)
1139                    .await?
1140            };
1141
1142        let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1143        for (target_id, source_id, _) in upstream_entries {
1144            all_upstreams.entry(target_id).or_default().push(source_id);
1145        }
1146
1147        let root_fragment_map = if fragment_ids.is_empty() {
1148            HashMap::new()
1149        } else {
1150            let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?;
1151            Self::collect_root_fragment_mapping(ensembles)
1152        };
1153
1154        let guard = self.env.shared_actor_info.read_guard();
1155
1156        let mut result = Vec::new();
1157
1158        for fragment_desc in fragments {
1159            // note: here sometimes fragment is not found in the cache, fallback to 0
1160            let parallelism = guard
1161                .get_fragment(fragment_desc.fragment_id as _)
1162                .map(|fragment| fragment.actors.len())
1163                .unwrap_or_default();
1164
1165            let root_fragments = root_fragment_map
1166                .get(&fragment_desc.fragment_id)
1167                .cloned()
1168                .unwrap_or_default();
1169
1170            let upstreams = all_upstreams
1171                .remove(&fragment_desc.fragment_id)
1172                .unwrap_or_default();
1173
1174            let parallelism_policy = Self::format_fragment_parallelism_policy(
1175                &fragment_desc,
1176                job_parallelisms.get(&fragment_desc.job_id),
1177                &root_fragments,
1178            );
1179
1180            let fragment = FragmentDistribution {
1181                fragment_id: fragment_desc.fragment_id,
1182                table_id: fragment_desc.job_id,
1183                distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type)
1184                    as _,
1185                state_table_ids: fragment_desc.state_table_ids.0,
1186                upstream_fragment_ids: upstreams.clone(),
1187                fragment_type_mask: fragment_desc.fragment_type_mask as _,
1188                parallelism: parallelism as _,
1189                vnode_count: fragment_desc.vnode_count as _,
1190                node: Some(fragment_desc.stream_node.to_protobuf()),
1191                parallelism_policy,
1192            };
1193
1194            result.push((fragment, upstreams));
1195        }
1196        Ok(result)
1197    }
1198
1199    /// Build a fragment-to-root lookup for all reported root fragment ensembles.
1200    fn collect_root_fragment_mapping(
1201        ensembles: Vec<NoShuffleEnsemble>,
1202    ) -> HashMap<FragmentId, Vec<FragmentId>> {
1203        let mut mapping = HashMap::new();
1204
1205        for ensemble in ensembles {
1206            let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1207            roots.sort_unstable();
1208            roots.dedup();
1209
1210            if roots.is_empty() {
1211                continue;
1212            }
1213
1214            let root_set: HashSet<_> = roots.iter().copied().collect();
1215
1216            for fragment_id in ensemble.component_fragments() {
1217                if root_set.contains(&fragment_id) {
1218                    mapping.insert(fragment_id, Vec::new());
1219                } else {
1220                    mapping.insert(fragment_id, roots.clone());
1221                }
1222            }
1223        }
1224
1225        mapping
1226    }
1227
1228    fn format_fragment_parallelism_policy(
1229        fragment: &fragment::Model,
1230        job_parallelism: Option<&StreamingParallelism>,
1231        root_fragments: &[FragmentId],
1232    ) -> String {
1233        if fragment.distribution_type == DistributionType::Single {
1234            return "single".to_owned();
1235        }
1236
1237        if let Some(parallelism) = fragment.parallelism.as_ref() {
1238            return format!(
1239                "override({})",
1240                Self::format_streaming_parallelism(parallelism)
1241            );
1242        }
1243
1244        if !root_fragments.is_empty() {
1245            let mut upstreams = root_fragments.to_vec();
1246            upstreams.sort_unstable();
1247            upstreams.dedup();
1248
1249            return format!("upstream_fragment({upstreams:?})");
1250        }
1251
1252        let inherited = job_parallelism
1253            .map(Self::format_streaming_parallelism)
1254            .unwrap_or_else(|| "unknown".to_owned());
1255        format!("inherit({inherited})")
1256    }
1257
1258    fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1259        match parallelism {
1260            StreamingParallelism::Adaptive => "adaptive".to_owned(),
1261            StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1262            StreamingParallelism::Custom => "custom".to_owned(),
1263        }
1264    }
1265
1266    pub async fn list_sink_actor_mapping(
1267        &self,
1268    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1269        let inner = self.inner.read().await;
1270        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1271            .select_only()
1272            .columns([sink::Column::SinkId, sink::Column::Name])
1273            .into_tuple()
1274            .all(&inner.db)
1275            .await?;
1276        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1277
1278        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1279
1280        let actor_with_type: Vec<(ActorId, SinkId)> = {
1281            let info = self.env.shared_actor_infos().read_guard();
1282
1283            info.iter_over_fragments()
1284                .filter(|(_, fragment)| {
1285                    sink_ids.contains(&fragment.job_id.as_sink_id())
1286                        && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1287                })
1288                .flat_map(|(_, fragment)| {
1289                    fragment
1290                        .actors
1291                        .keys()
1292                        .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1293                })
1294                .collect()
1295        };
1296
1297        let mut sink_actor_mapping = HashMap::new();
1298        for (actor_id, sink_id) in actor_with_type {
1299            sink_actor_mapping
1300                .entry(sink_id)
1301                .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1302                .1
1303                .push(actor_id);
1304        }
1305
1306        Ok(sink_actor_mapping)
1307    }
1308
1309    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1310        let inner = self.inner.read().await;
1311        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1312            .select_only()
1313            .columns([
1314                fragment::Column::FragmentId,
1315                fragment::Column::JobId,
1316                fragment::Column::StateTableIds,
1317            ])
1318            .into_partial_model()
1319            .all(&inner.db)
1320            .await?;
1321        Ok(fragment_state_tables)
1322    }
1323
1324    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1325    /// collected
1326    pub async fn load_all_actors_dynamic(
1327        &self,
1328        database_id: Option<DatabaseId>,
1329        worker_nodes: &ActiveStreamingWorkerNodes,
1330    ) -> MetaResult<FragmentRenderMap> {
1331        let adaptive_parallelism_strategy = {
1332            let system_params_reader = self.env.system_params_reader().await;
1333            system_params_reader.adaptive_parallelism_strategy()
1334        };
1335
1336        let inner = self.inner.read().await;
1337        let txn = inner.db.begin().await?;
1338
1339        let database_fragment_infos = load_fragment_info(
1340            &txn,
1341            self.env.actor_id_generator(),
1342            database_id,
1343            worker_nodes,
1344            adaptive_parallelism_strategy,
1345        )
1346        .await?;
1347
1348        tracing::trace!(?database_fragment_infos, "reload all actors");
1349
1350        Ok(database_fragment_infos)
1351    }
1352
1353    #[await_tree::instrument]
1354    pub async fn fill_snapshot_backfill_epoch(
1355        &self,
1356        fragment_ids: impl Iterator<Item = FragmentId>,
1357        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1358        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1359    ) -> MetaResult<()> {
1360        let inner = self.inner.write().await;
1361        let txn = inner.db.begin().await?;
1362        for fragment_id in fragment_ids {
1363            let fragment = FragmentModel::find_by_id(fragment_id)
1364                .one(&txn)
1365                .await?
1366                .context(format!("fragment {} not found", fragment_id))?;
1367            let mut node = fragment.stream_node.to_protobuf();
1368            if crate::stream::fill_snapshot_backfill_epoch(
1369                &mut node,
1370                snapshot_backfill_info,
1371                cross_db_snapshot_backfill_info,
1372            )? {
1373                let node = StreamNode::from(&node);
1374                FragmentModel::update(fragment::ActiveModel {
1375                    fragment_id: Set(fragment_id),
1376                    stream_node: Set(node),
1377                    ..Default::default()
1378                })
1379                .exec(&txn)
1380                .await?;
1381            }
1382        }
1383        txn.commit().await?;
1384        Ok(())
1385    }
1386
1387    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1388    pub fn get_running_actors_of_fragment(
1389        &self,
1390        fragment_id: FragmentId,
1391    ) -> MetaResult<HashSet<model::ActorId>> {
1392        let info = self.env.shared_actor_infos().read_guard();
1393
1394        let actors = info
1395            .get_fragment(fragment_id as _)
1396            .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1397            .unwrap_or_default();
1398
1399        Ok(actors)
1400    }
1401
1402    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1403    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1404    pub async fn get_running_actors_for_source_backfill(
1405        &self,
1406        source_backfill_fragment_id: FragmentId,
1407        source_fragment_id: FragmentId,
1408    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1409        let inner = self.inner.read().await;
1410        let txn = inner.db.begin().await?;
1411
1412        let fragment_relation: DispatcherType = FragmentRelation::find()
1413            .select_only()
1414            .column(fragment_relation::Column::DispatcherType)
1415            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1416            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1417            .into_tuple()
1418            .one(&txn)
1419            .await?
1420            .ok_or_else(|| {
1421                anyhow!(
1422                    "no fragment connection from source fragment {} to source backfill fragment {}",
1423                    source_fragment_id,
1424                    source_backfill_fragment_id
1425                )
1426            })?;
1427
1428        if fragment_relation != DispatcherType::NoShuffle {
1429            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1430        }
1431
1432        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1433            let result: MetaResult<DistributionType> = try {
1434                FragmentModel::find_by_id(fragment_id)
1435                    .select_only()
1436                    .column(fragment::Column::DistributionType)
1437                    .into_tuple()
1438                    .one(txn)
1439                    .await?
1440                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1441            };
1442            result
1443        };
1444
1445        let source_backfill_distribution_type =
1446            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1447        let source_distribution_type =
1448            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1449
1450        let load_fragment_actor_distribution =
1451            |actor_info: &SharedActorInfos,
1452             fragment_id: FragmentId|
1453             -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1454                let guard = actor_info.read_guard();
1455
1456                guard
1457                    .get_fragment(fragment_id as _)
1458                    .map(|fragment| {
1459                        fragment
1460                            .actors
1461                            .iter()
1462                            .map(|(actor_id, actor)| {
1463                                (
1464                                    *actor_id as _,
1465                                    actor
1466                                        .vnode_bitmap
1467                                        .as_ref()
1468                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1469                                )
1470                            })
1471                            .collect()
1472                    })
1473                    .unwrap_or_default()
1474            };
1475
1476        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1477            load_fragment_actor_distribution(
1478                self.env.shared_actor_infos(),
1479                source_backfill_fragment_id,
1480            );
1481
1482        let source_actors =
1483            load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1484
1485        Ok(resolve_no_shuffle_actor_dispatcher(
1486            source_distribution_type,
1487            &source_actors,
1488            source_backfill_distribution_type,
1489            &source_backfill_actors,
1490        )
1491        .into_iter()
1492        .map(|(source_actor, source_backfill_actor)| {
1493            (source_backfill_actor as _, source_actor as _)
1494        })
1495        .collect())
1496    }
1497
1498    /// Get and filter the "**root**" fragments of the specified jobs.
1499    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1500    ///
1501    /// Root fragment connects to downstream jobs.
1502    ///
1503    /// ## What can be the root fragment
1504    /// - For sink, it should have one `Sink` fragment.
1505    /// - For MV, it should have one `MView` fragment.
1506    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1507    /// - For source, it should have one `Source` fragment.
1508    ///
1509    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1510    pub async fn get_root_fragments(
1511        &self,
1512        job_ids: Vec<JobId>,
1513    ) -> MetaResult<(
1514        HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1515        HashMap<ActorId, WorkerId>,
1516    )> {
1517        let inner = self.inner.read().await;
1518
1519        let all_fragments = FragmentModel::find()
1520            .filter(fragment::Column::JobId.is_in(job_ids))
1521            .all(&inner.db)
1522            .await?;
1523        // job_id -> fragment
1524        let mut root_fragments = HashMap::<JobId, fragment::Model>::new();
1525        for fragment in all_fragments {
1526            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1527            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1528                _ = root_fragments.insert(fragment.job_id, fragment);
1529            } else if mask.contains(FragmentTypeFlag::Source) {
1530                // look for Source fragment only if there's no MView fragment
1531                // (notice try_insert here vs insert above)
1532                _ = root_fragments.try_insert(fragment.job_id, fragment);
1533            }
1534        }
1535
1536        let mut root_fragments_pb = HashMap::new();
1537
1538        let info = self.env.shared_actor_infos().read_guard();
1539
1540        let root_fragment_to_jobs: HashMap<_, _> = root_fragments
1541            .iter()
1542            .map(|(job_id, fragment)| (fragment.fragment_id, *job_id))
1543            .collect();
1544
1545        for fragment in root_fragment_to_jobs.keys() {
1546            let fragment_info = info.get_fragment(*fragment).context(format!(
1547                "root fragment {} not found in shared actor info",
1548                fragment
1549            ))?;
1550
1551            let job_id = root_fragment_to_jobs[&fragment_info.fragment_id];
1552            let fragment = root_fragments
1553                .get(&job_id)
1554                .context(format!("root fragment for job {} not found", job_id))?;
1555
1556            root_fragments_pb.insert(
1557                job_id,
1558                (fragment_info.clone(), fragment.stream_node.to_protobuf()),
1559            );
1560        }
1561
1562        let mut all_actor_locations = HashMap::new();
1563
1564        for (_, SharedFragmentInfo { actors, .. }) in info.iter_over_fragments() {
1565            for (actor_id, actor_info) in actors {
1566                all_actor_locations.insert(*actor_id as ActorId, actor_info.worker_id);
1567            }
1568        }
1569
1570        Ok((root_fragments_pb, all_actor_locations))
1571    }
1572
1573    pub async fn get_root_fragment(
1574        &self,
1575        job_id: JobId,
1576    ) -> MetaResult<(SharedFragmentInfo, HashMap<ActorId, WorkerId>)> {
1577        let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1578        let (root_fragment, _) = root_fragments
1579            .remove(&job_id)
1580            .context(format!("root fragment for job {} not found", job_id))?;
1581
1582        Ok((root_fragment, actors))
1583    }
1584
1585    /// Get the downstream fragments connected to the specified job.
1586    pub async fn get_downstream_fragments(
1587        &self,
1588        job_id: JobId,
1589    ) -> MetaResult<(
1590        Vec<(
1591            stream_plan::DispatcherType,
1592            SharedFragmentInfo,
1593            PbStreamNode,
1594        )>,
1595        HashMap<ActorId, WorkerId>,
1596    )> {
1597        let (root_fragment, actor_locations) = self.get_root_fragment(job_id).await?;
1598
1599        let inner = self.inner.read().await;
1600        let txn = inner.db.begin().await?;
1601        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1602            .filter(
1603                fragment_relation::Column::SourceFragmentId
1604                    .eq(root_fragment.fragment_id as FragmentId),
1605            )
1606            .all(&txn)
1607            .await?;
1608
1609        let downstream_fragment_ids = downstream_fragment_relations
1610            .iter()
1611            .map(|model| model.target_fragment_id as FragmentId)
1612            .collect::<HashSet<_>>();
1613
1614        let downstream_fragment_nodes: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1615            .select_only()
1616            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1617            .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1618            .into_tuple()
1619            .all(&txn)
1620            .await?;
1621
1622        let downstream_fragment_nodes: HashMap<_, _> =
1623            downstream_fragment_nodes.into_iter().collect();
1624
1625        let mut downstream_fragments = vec![];
1626
1627        let info = self.env.shared_actor_infos().read_guard();
1628
1629        let fragment_map: HashMap<_, _> = downstream_fragment_relations
1630            .iter()
1631            .map(|model| (model.target_fragment_id, model.dispatcher_type))
1632            .collect();
1633
1634        for fragment_id in fragment_map.keys() {
1635            let fragment_info @ SharedFragmentInfo { actors, .. } =
1636                info.get_fragment(*fragment_id).unwrap();
1637
1638            let dispatcher_type = fragment_map[fragment_id];
1639
1640            if actors.is_empty() {
1641                bail!("No fragment found for fragment id {}", fragment_id);
1642            }
1643
1644            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1645
1646            let nodes = downstream_fragment_nodes
1647                .get(fragment_id)
1648                .context(format!(
1649                    "downstream fragment node for id {} not found",
1650                    fragment_id
1651                ))?
1652                .to_protobuf();
1653
1654            downstream_fragments.push((dispatch_type, fragment_info.clone(), nodes));
1655        }
1656        Ok((downstream_fragments, actor_locations))
1657    }
1658
1659    pub async fn load_source_fragment_ids(
1660        &self,
1661    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1662        let inner = self.inner.read().await;
1663        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1664            .select_only()
1665            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1666            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1667            .into_tuple()
1668            .all(&inner.db)
1669            .await?;
1670
1671        let mut source_fragment_ids = HashMap::new();
1672        for (fragment_id, stream_node) in fragments {
1673            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1674                source_fragment_ids
1675                    .entry(source_id)
1676                    .or_insert_with(BTreeSet::new)
1677                    .insert(fragment_id);
1678            }
1679        }
1680        Ok(source_fragment_ids)
1681    }
1682
1683    pub async fn load_backfill_fragment_ids(
1684        &self,
1685    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1686        let inner = self.inner.read().await;
1687        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1688            .select_only()
1689            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1690            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1691            .into_tuple()
1692            .all(&inner.db)
1693            .await?;
1694
1695        let mut source_fragment_ids = HashMap::new();
1696        for (fragment_id, stream_node) in fragments {
1697            if let Some((source_id, upstream_source_fragment_id)) =
1698                stream_node.to_protobuf().find_source_backfill()
1699            {
1700                source_fragment_ids
1701                    .entry(source_id)
1702                    .or_insert_with(BTreeSet::new)
1703                    .insert((fragment_id, upstream_source_fragment_id));
1704            }
1705        }
1706        Ok(source_fragment_ids)
1707    }
1708
1709    pub async fn get_all_upstream_sink_infos(
1710        &self,
1711        target_table: &PbTable,
1712        target_fragment_id: FragmentId,
1713    ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1714        let incoming_sinks = self.get_table_incoming_sinks(target_table.id).await?;
1715
1716        let inner = self.inner.read().await;
1717        let txn = inner.db.begin().await?;
1718
1719        let sink_ids = incoming_sinks.iter().map(|s| s.id).collect_vec();
1720        let sink_fragment_ids = get_sink_fragment_by_ids(&txn, sink_ids).await?;
1721
1722        let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1723        for pb_sink in &incoming_sinks {
1724            let sink_fragment_id =
1725                sink_fragment_ids
1726                    .get(&pb_sink.id)
1727                    .cloned()
1728                    .ok_or(anyhow::anyhow!(
1729                        "sink fragment not found for sink id {}",
1730                        pb_sink.id
1731                    ))?;
1732            let upstream_info = build_upstream_sink_info(
1733                pb_sink,
1734                sink_fragment_id,
1735                target_table,
1736                target_fragment_id,
1737            )?;
1738            upstream_sink_infos.push(upstream_info);
1739        }
1740
1741        Ok(upstream_sink_infos)
1742    }
1743
1744    pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1745        let inner = self.inner.read().await;
1746        let txn = inner.db.begin().await?;
1747
1748        let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1749            .select_only()
1750            .column(fragment::Column::FragmentId)
1751            .filter(
1752                fragment::Column::JobId
1753                    .eq(job_id)
1754                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1755            )
1756            .into_tuple()
1757            .all(&txn)
1758            .await?;
1759
1760        if mview_fragment.len() != 1 {
1761            return Err(anyhow::anyhow!(
1762                "expected exactly one mview fragment for job {}, found {}",
1763                job_id,
1764                mview_fragment.len()
1765            )
1766            .into());
1767        }
1768
1769        Ok(mview_fragment.into_iter().next().unwrap())
1770    }
1771
1772    pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1773        let inner = self.inner.read().await;
1774        let txn = inner.db.begin().await?;
1775        has_table_been_migrated(&txn, table_id).await
1776    }
1777
1778    pub async fn update_fragment_splits<C>(
1779        &self,
1780        txn: &C,
1781        fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
1782    ) -> MetaResult<()>
1783    where
1784        C: ConnectionTrait,
1785    {
1786        if fragment_splits.is_empty() {
1787            return Ok(());
1788        }
1789
1790        let models: Vec<fragment_splits::ActiveModel> = fragment_splits
1791            .iter()
1792            .map(|(fragment_id, splits)| fragment_splits::ActiveModel {
1793                fragment_id: Set(*fragment_id as _),
1794                splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1795                    splits: splits.iter().map(Into::into).collect_vec(),
1796                }))),
1797            })
1798            .collect();
1799
1800        FragmentSplits::insert_many(models)
1801            .on_conflict(
1802                OnConflict::column(fragment_splits::Column::FragmentId)
1803                    .update_column(fragment_splits::Column::Splits)
1804                    .to_owned(),
1805            )
1806            .exec(txn)
1807            .await?;
1808
1809        Ok(())
1810    }
1811}
1812
1813#[cfg(test)]
1814mod tests {
1815    use std::collections::{BTreeMap, HashMap, HashSet};
1816
1817    use itertools::Itertools;
1818    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1819    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1820    use risingwave_common::id::JobId;
1821    use risingwave_common::util::iter_util::ZipEqDebug;
1822    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1823    use risingwave_meta_model::fragment::DistributionType;
1824    use risingwave_meta_model::*;
1825    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1826    use risingwave_pb::plan_common::PbExprContext;
1827    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1828    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1829    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1830
1831    use super::ActorInfo;
1832    use crate::MetaResult;
1833    use crate::controller::catalog::CatalogController;
1834    use crate::model::{Fragment, StreamActor};
1835
1836    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1837
1838    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1839
1840    const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
1841
1842    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
1843
1844    const TEST_JOB_ID: JobId = JobId::new(1);
1845
1846    const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
1847
1848    fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
1849        let mut upstream_actor_ids = BTreeMap::new();
1850        upstream_actor_ids.insert(
1851            TEST_UPSTREAM_FRAGMENT_ID,
1852            HashSet::from_iter([(actor_id + 100)]),
1853        );
1854        upstream_actor_ids.insert(
1855            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1856            HashSet::from_iter([(actor_id + 200)]),
1857        );
1858        upstream_actor_ids
1859    }
1860
1861    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1862        let mut input = vec![];
1863        for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
1864            input.push(PbStreamNode {
1865                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1866                    upstream_fragment_id,
1867                    ..Default::default()
1868                }))),
1869                ..Default::default()
1870            });
1871        }
1872
1873        PbStreamNode {
1874            input,
1875            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1876            ..Default::default()
1877        }
1878    }
1879
1880    #[tokio::test]
1881    async fn test_extract_fragment() -> MetaResult<()> {
1882        let actor_count = 3u32;
1883        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1884            .map(|actor_id| {
1885                (
1886                    actor_id.into(),
1887                    generate_upstream_actor_ids_for_actor(actor_id.into()),
1888                )
1889            })
1890            .collect();
1891
1892        let actor_bitmaps = ActorMapping::new_uniform(
1893            (0..actor_count).map(|i| i.into()),
1894            VirtualNode::COUNT_FOR_TEST,
1895        )
1896        .to_bitmaps();
1897
1898        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1899
1900        let pb_actors = (0..actor_count)
1901            .map(|actor_id| StreamActor {
1902                actor_id: actor_id.into(),
1903                fragment_id: TEST_FRAGMENT_ID as _,
1904                vnode_bitmap: actor_bitmaps.get(&actor_id.into()).cloned(),
1905                mview_definition: "".to_owned(),
1906                expr_context: Some(PbExprContext {
1907                    time_zone: String::from("America/New_York"),
1908                    strict_mode: false,
1909                }),
1910                config_override: "".into(),
1911            })
1912            .collect_vec();
1913
1914        let pb_fragment = Fragment {
1915            fragment_id: TEST_FRAGMENT_ID as _,
1916            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1917            distribution_type: PbFragmentDistributionType::Hash as _,
1918            actors: pb_actors,
1919            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1920            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1921            nodes: stream_node,
1922        };
1923
1924        let fragment =
1925            CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
1926
1927        check_fragment(fragment, pb_fragment);
1928
1929        Ok(())
1930    }
1931
1932    #[tokio::test]
1933    async fn test_compose_fragment() -> MetaResult<()> {
1934        let actor_count = 3u32;
1935
1936        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1937            .map(|actor_id| {
1938                (
1939                    actor_id.into(),
1940                    generate_upstream_actor_ids_for_actor(actor_id.into()),
1941                )
1942            })
1943            .collect();
1944
1945        let mut actor_bitmaps = ActorMapping::new_uniform(
1946            (0..actor_count).map(|i| i.into()),
1947            VirtualNode::COUNT_FOR_TEST,
1948        )
1949        .to_bitmaps();
1950
1951        let actors = (0..actor_count)
1952            .map(|actor_id| {
1953                let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
1954                    splits: vec![PbConnectorSplit {
1955                        split_type: "dummy".to_owned(),
1956                        ..Default::default()
1957                    }],
1958                });
1959
1960                ActorInfo {
1961                    actor_id: actor_id.into(),
1962                    fragment_id: TEST_FRAGMENT_ID,
1963                    splits: actor_splits,
1964                    worker_id: 0.into(),
1965                    vnode_bitmap: actor_bitmaps
1966                        .remove(&actor_id.into())
1967                        .map(|bitmap| bitmap.to_protobuf())
1968                        .as_ref()
1969                        .map(VnodeBitmap::from),
1970                    expr_context: ExprContext::from(&PbExprContext {
1971                        time_zone: String::from("America/New_York"),
1972                        strict_mode: false,
1973                    }),
1974                    config_override: "a.b.c = true".into(),
1975                }
1976            })
1977            .collect_vec();
1978
1979        let stream_node = {
1980            let template_actor = actors.first().cloned().unwrap();
1981
1982            let template_upstream_actor_ids = upstream_actor_ids
1983                .get(&(template_actor.actor_id as _))
1984                .unwrap();
1985
1986            generate_merger_stream_node(template_upstream_actor_ids)
1987        };
1988
1989        #[expect(deprecated)]
1990        let fragment = fragment::Model {
1991            fragment_id: TEST_FRAGMENT_ID,
1992            job_id: TEST_JOB_ID,
1993            fragment_type_mask: 0,
1994            distribution_type: DistributionType::Hash,
1995            stream_node: StreamNode::from(&stream_node),
1996            state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
1997            upstream_fragment_id: Default::default(),
1998            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1999            parallelism: None,
2000        };
2001
2002        let (pb_fragment, pb_actor_status, pb_actor_splits) =
2003            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2004
2005        assert_eq!(pb_actor_status.len(), actor_count as usize);
2006        assert!(
2007            pb_actor_status
2008                .values()
2009                .all(|actor_status| actor_status.location.is_some())
2010        );
2011        assert_eq!(pb_actor_splits.len(), actor_count as usize);
2012
2013        let pb_actors = pb_fragment.actors.clone();
2014
2015        check_fragment(fragment, pb_fragment);
2016        check_actors(
2017            actors,
2018            &upstream_actor_ids,
2019            pb_actors,
2020            pb_actor_splits,
2021            &stream_node,
2022        );
2023
2024        Ok(())
2025    }
2026
2027    fn check_actors(
2028        actors: Vec<ActorInfo>,
2029        actor_upstreams: &FragmentActorUpstreams,
2030        pb_actors: Vec<StreamActor>,
2031        pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2032        stream_node: &PbStreamNode,
2033    ) {
2034        for (
2035            ActorInfo {
2036                actor_id,
2037                fragment_id,
2038                splits,
2039                worker_id: _,
2040                vnode_bitmap,
2041                expr_context,
2042                ..
2043            },
2044            StreamActor {
2045                actor_id: pb_actor_id,
2046                fragment_id: pb_fragment_id,
2047                vnode_bitmap: pb_vnode_bitmap,
2048                mview_definition,
2049                expr_context: pb_expr_context,
2050                ..
2051            },
2052        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2053        {
2054            assert_eq!(actor_id, pb_actor_id as ActorId);
2055            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2056
2057            assert_eq!(
2058                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2059                pb_vnode_bitmap,
2060            );
2061
2062            assert_eq!(mview_definition, "");
2063
2064            visit_stream_node_body(stream_node, |body| {
2065                if let PbNodeBody::Merge(m) = body {
2066                    assert!(
2067                        actor_upstreams
2068                            .get(&(actor_id as _))
2069                            .unwrap()
2070                            .contains_key(&m.upstream_fragment_id)
2071                    );
2072                }
2073            });
2074
2075            assert_eq!(
2076                splits,
2077                pb_actor_splits
2078                    .get(&pb_actor_id)
2079                    .map(ConnectorSplits::from)
2080                    .unwrap_or_default()
2081            );
2082
2083            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2084        }
2085    }
2086
2087    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2088        let Fragment {
2089            fragment_id,
2090            fragment_type_mask,
2091            distribution_type: pb_distribution_type,
2092            actors: _,
2093            state_table_ids: pb_state_table_ids,
2094            maybe_vnode_count: _,
2095            nodes,
2096        } = pb_fragment;
2097
2098        assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2099        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2100        assert_eq!(
2101            pb_distribution_type,
2102            PbFragmentDistributionType::from(fragment.distribution_type)
2103        );
2104
2105        assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2106        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2107    }
2108
2109    #[test]
2110    fn test_parallelism_policy_with_root_fragments() {
2111        #[expect(deprecated)]
2112        let fragment = fragment::Model {
2113            fragment_id: 3.into(),
2114            job_id: TEST_JOB_ID,
2115            fragment_type_mask: 0,
2116            distribution_type: DistributionType::Hash,
2117            stream_node: StreamNode::from(&PbStreamNode::default()),
2118            state_table_ids: TableIdArray::default(),
2119            upstream_fragment_id: Default::default(),
2120            vnode_count: 0,
2121            parallelism: None,
2122        };
2123
2124        let job_parallelism = StreamingParallelism::Fixed(4);
2125
2126        let policy = super::CatalogController::format_fragment_parallelism_policy(
2127            &fragment,
2128            Some(&job_parallelism),
2129            &[],
2130        );
2131
2132        assert_eq!(policy, "inherit(fixed(4))");
2133    }
2134
2135    #[test]
2136    fn test_parallelism_policy_with_upstream_roots() {
2137        #[expect(deprecated)]
2138        let fragment = fragment::Model {
2139            fragment_id: 5.into(),
2140            job_id: TEST_JOB_ID,
2141            fragment_type_mask: 0,
2142            distribution_type: DistributionType::Hash,
2143            stream_node: StreamNode::from(&PbStreamNode::default()),
2144            state_table_ids: TableIdArray::default(),
2145            upstream_fragment_id: Default::default(),
2146            vnode_count: 0,
2147            parallelism: None,
2148        };
2149
2150        let policy = super::CatalogController::format_fragment_parallelism_policy(
2151            &fragment,
2152            None,
2153            &[3.into(), 1.into(), 2.into(), 1.into()],
2154        );
2155
2156        assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2157    }
2158}