risingwave_meta/controller/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::CdcScanOptions;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34    Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39    VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink,
40    source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46    Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::fragment::{
49    FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan;
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57    PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, StreamScanType,
58};
59use sea_orm::ActiveValue::Set;
60use sea_orm::sea_query::Expr;
61use sea_orm::{
62    ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, JoinType, PaginatorTrait,
63    QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
64};
65use serde::{Deserialize, Serialize};
66
67use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
68use crate::controller::catalog::CatalogController;
69use crate::controller::scale::{
70    FragmentRenderMap, NoShuffleEnsemble, find_fragment_no_shuffle_dags_detailed,
71    load_fragment_info, resolve_streaming_job_definition,
72};
73use crate::controller::utils::{
74    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
75    get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
76    resolve_no_shuffle_actor_dispatcher,
77};
78use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
79use crate::model::{
80    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
81    StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
82    TableParallelism,
83};
84use crate::rpc::ddl_controller::build_upstream_sink_info;
85use crate::stream::UpstreamSinkInfo;
86use crate::{MetaResult, model};
87
88/// Some information of running (inflight) actors.
89#[derive(Clone, Debug)]
90pub struct InflightActorInfo {
91    pub worker_id: WorkerId,
92    pub vnode_bitmap: Option<Bitmap>,
93    pub splits: Vec<SplitImpl>,
94}
95
96#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
97struct ActorInfo {
98    pub actor_id: ActorId,
99    pub fragment_id: FragmentId,
100    pub splits: ConnectorSplits,
101    pub worker_id: WorkerId,
102    pub vnode_bitmap: Option<VnodeBitmap>,
103    pub expr_context: ExprContext,
104    pub config_override: Arc<str>,
105}
106
107#[derive(Clone, Debug)]
108pub struct InflightFragmentInfo {
109    pub fragment_id: crate::model::FragmentId,
110    pub distribution_type: DistributionType,
111    pub fragment_type_mask: FragmentTypeMask,
112    pub vnode_count: usize,
113    pub nodes: PbStreamNode,
114    pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
115    pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
116}
117
118#[derive(Clone, Debug)]
119pub struct FragmentParallelismInfo {
120    pub distribution_type: FragmentDistributionType,
121    pub actor_count: usize,
122    pub vnode_count: usize,
123}
124
125#[easy_ext::ext(FragmentTypeMaskExt)]
126pub impl FragmentTypeMask {
127    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
128        Expr::col(fragment::Column::FragmentTypeMask)
129            .bit_and(Expr::value(flag as i32))
130            .ne(0)
131    }
132
133    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
134        Expr::col(fragment::Column::FragmentTypeMask)
135            .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
136            .ne(0)
137    }
138
139    fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
140        Expr::col(fragment::Column::FragmentTypeMask)
141            .bit_and(Expr::value(flag as i32))
142            .eq(0)
143    }
144}
145
146#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
147#[serde(rename_all = "camelCase")] // for dashboard
148pub struct StreamingJobInfo {
149    pub job_id: JobId,
150    pub obj_type: ObjectType,
151    pub name: String,
152    pub job_status: JobStatus,
153    pub parallelism: StreamingParallelism,
154    pub max_parallelism: i32,
155    pub resource_group: String,
156    pub database_id: DatabaseId,
157    pub schema_id: SchemaId,
158}
159
160impl NotificationManager {
161    pub(crate) fn notify_fragment_mapping(
162        &self,
163        operation: NotificationOperation,
164        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
165    ) {
166        let fragment_ids = fragment_mappings
167            .iter()
168            .map(|mapping| mapping.fragment_id)
169            .collect_vec();
170        if fragment_ids.is_empty() {
171            return;
172        }
173        // notify all fragment mappings to frontend.
174        for fragment_mapping in fragment_mappings {
175            self.notify_frontend_without_version(
176                operation,
177                NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
178            );
179        }
180
181        // update serving vnode mappings.
182        match operation {
183            NotificationOperation::Add | NotificationOperation::Update => {
184                self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
185                    fragment_ids,
186                ));
187            }
188            NotificationOperation::Delete => {
189                self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
190                    fragment_ids,
191                ));
192            }
193            op => {
194                tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
195            }
196        }
197    }
198}
199
200impl CatalogController {
201    pub fn prepare_fragment_models_from_fragments(
202        job_id: JobId,
203        fragments: impl Iterator<Item = &Fragment>,
204    ) -> MetaResult<Vec<fragment::Model>> {
205        fragments
206            .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
207            .try_collect()
208    }
209
210    pub fn prepare_fragment_model_for_new_job(
211        job_id: JobId,
212        fragment: &Fragment,
213    ) -> MetaResult<fragment::Model> {
214        let vnode_count = fragment.vnode_count();
215        let Fragment {
216            fragment_id: pb_fragment_id,
217            fragment_type_mask: pb_fragment_type_mask,
218            distribution_type: pb_distribution_type,
219            actors: pb_actors,
220            state_table_ids: pb_state_table_ids,
221            nodes,
222            ..
223        } = fragment;
224
225        let state_table_ids = pb_state_table_ids.clone().into();
226
227        assert!(!pb_actors.is_empty());
228
229        let fragment_parallelism = nodes
230            .node_body
231            .as_ref()
232            .and_then(|body| match body {
233                NodeBody::StreamCdcScan(node) => Some(node),
234                _ => None,
235            })
236            .and_then(|node| node.options.as_ref())
237            .map(CdcScanOptions::from_proto)
238            .filter(|opts| opts.is_parallelized_backfill())
239            .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
240
241        let stream_node = StreamNode::from(nodes);
242
243        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
244            .unwrap()
245            .into();
246
247        #[expect(deprecated)]
248        let fragment = fragment::Model {
249            fragment_id: *pb_fragment_id as _,
250            job_id,
251            fragment_type_mask: (*pb_fragment_type_mask).into(),
252            distribution_type,
253            stream_node,
254            state_table_ids,
255            upstream_fragment_id: Default::default(),
256            vnode_count: vnode_count as _,
257            parallelism: fragment_parallelism,
258        };
259
260        Ok(fragment)
261    }
262
263    fn compose_table_fragments(
264        job_id: JobId,
265        state: PbState,
266        ctx: StreamContext,
267        fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
268        parallelism: StreamingParallelism,
269        max_parallelism: usize,
270        job_definition: Option<String>,
271    ) -> MetaResult<StreamJobFragments> {
272        let mut pb_fragments = BTreeMap::new();
273        let mut pb_actor_status = BTreeMap::new();
274
275        for (fragment, actors) in fragments {
276            let (fragment, fragment_actor_status, _) =
277                Self::compose_fragment(fragment, actors, job_definition.clone())?;
278
279            pb_fragments.insert(fragment.fragment_id, fragment);
280            pb_actor_status.extend(fragment_actor_status.into_iter());
281        }
282
283        let table_fragments = StreamJobFragments {
284            stream_job_id: job_id,
285            state: state as _,
286            fragments: pb_fragments,
287            actor_status: pb_actor_status,
288            ctx,
289            assigned_parallelism: match parallelism {
290                StreamingParallelism::Custom => TableParallelism::Custom,
291                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
292                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
293            },
294            max_parallelism,
295        };
296
297        Ok(table_fragments)
298    }
299
300    #[allow(clippy::type_complexity)]
301    fn compose_fragment(
302        fragment: fragment::Model,
303        actors: Vec<ActorInfo>,
304        job_definition: Option<String>,
305    ) -> MetaResult<(
306        Fragment,
307        HashMap<crate::model::ActorId, PbActorStatus>,
308        HashMap<crate::model::ActorId, PbConnectorSplits>,
309    )> {
310        let fragment::Model {
311            fragment_id,
312            fragment_type_mask,
313            distribution_type,
314            stream_node,
315            state_table_ids,
316            vnode_count,
317            ..
318        } = fragment;
319
320        let stream_node = stream_node.to_protobuf();
321        let mut upstream_fragments = HashSet::new();
322        visit_stream_node_body(&stream_node, |body| {
323            if let NodeBody::Merge(m) = body {
324                assert!(
325                    upstream_fragments.insert(m.upstream_fragment_id),
326                    "non-duplicate upstream fragment"
327                );
328            }
329        });
330
331        let mut pb_actors = vec![];
332
333        let mut pb_actor_status = HashMap::new();
334        let mut pb_actor_splits = HashMap::new();
335
336        for actor in actors {
337            if actor.fragment_id != fragment_id {
338                bail!(
339                    "fragment id {} from actor {} is different from fragment {}",
340                    actor.fragment_id,
341                    actor.actor_id,
342                    fragment_id
343                )
344            }
345
346            let ActorInfo {
347                actor_id,
348                fragment_id,
349                worker_id,
350                splits,
351                vnode_bitmap,
352                expr_context,
353                config_override,
354                ..
355            } = actor;
356
357            let vnode_bitmap =
358                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
359            let pb_expr_context = Some(expr_context.to_protobuf());
360
361            pb_actor_status.insert(
362                actor_id as _,
363                PbActorStatus {
364                    location: PbActorLocation::from_worker(worker_id),
365                },
366            );
367
368            pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
369
370            pb_actors.push(StreamActor {
371                actor_id: actor_id as _,
372                fragment_id: fragment_id as _,
373                vnode_bitmap,
374                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
375                expr_context: pb_expr_context,
376                config_override,
377            })
378        }
379
380        let pb_state_table_ids = state_table_ids.0;
381        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
382        let pb_fragment = Fragment {
383            fragment_id: fragment_id as _,
384            fragment_type_mask: fragment_type_mask.into(),
385            distribution_type: pb_distribution_type,
386            actors: pb_actors,
387            state_table_ids: pb_state_table_ids,
388            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
389            nodes: stream_node,
390        };
391
392        Ok((pb_fragment, pb_actor_status, pb_actor_splits))
393    }
394
395    pub fn running_fragment_parallelisms(
396        &self,
397        id_filter: Option<HashSet<FragmentId>>,
398    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
399        let info = self.env.shared_actor_infos().read_guard();
400
401        let mut result = HashMap::new();
402        for (fragment_id, fragment) in info.iter_over_fragments() {
403            if let Some(id_filter) = &id_filter
404                && !id_filter.contains(&(*fragment_id as _))
405            {
406                continue; // Skip fragments not in the filter
407            }
408
409            result.insert(
410                *fragment_id as _,
411                FragmentParallelismInfo {
412                    distribution_type: fragment.distribution_type.into(),
413                    actor_count: fragment.actors.len() as _,
414                    vnode_count: fragment.vnode_count,
415                },
416            );
417        }
418
419        Ok(result)
420    }
421
422    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
423        let inner = self.inner.read().await;
424        let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
425            .select_only()
426            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
427            .into_tuple()
428            .all(&inner.db)
429            .await?;
430        Ok(fragment_jobs.into_iter().collect())
431    }
432
433    pub async fn get_fragment_job_id(
434        &self,
435        fragment_ids: Vec<FragmentId>,
436    ) -> MetaResult<Vec<ObjectId>> {
437        let inner = self.inner.read().await;
438
439        let object_ids: Vec<ObjectId> = FragmentModel::find()
440            .select_only()
441            .column(fragment::Column::JobId)
442            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
443            .into_tuple()
444            .all(&inner.db)
445            .await?;
446
447        Ok(object_ids)
448    }
449
450    pub async fn get_fragment_desc_by_id(
451        &self,
452        fragment_id: FragmentId,
453    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
454        let inner = self.inner.read().await;
455
456        let fragment_model = match FragmentModel::find_by_id(fragment_id)
457            .one(&inner.db)
458            .await?
459        {
460            Some(fragment) => fragment,
461            None => return Ok(None),
462        };
463
464        let job_parallelism: Option<StreamingParallelism> =
465            StreamingJob::find_by_id(fragment_model.job_id)
466                .select_only()
467                .column(streaming_job::Column::Parallelism)
468                .into_tuple()
469                .one(&inner.db)
470                .await?;
471
472        let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
473            .select_only()
474            .columns([
475                fragment_relation::Column::SourceFragmentId,
476                fragment_relation::Column::DispatcherType,
477            ])
478            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
479            .into_tuple()
480            .all(&inner.db)
481            .await?;
482
483        let upstreams: Vec<_> = upstream_entries
484            .into_iter()
485            .map(|(source_id, _)| source_id)
486            .collect();
487
488        let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
489            .await
490            .map(Self::collect_root_fragment_mapping)?;
491        let root_fragments = root_fragment_map
492            .get(&fragment_id)
493            .cloned()
494            .unwrap_or_default();
495
496        let info = self.env.shared_actor_infos().read_guard();
497        let SharedFragmentInfo { actors, .. } = info
498            .get_fragment(fragment_model.fragment_id as _)
499            .unwrap_or_else(|| {
500                panic!(
501                    "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
502                    fragment_model.fragment_id,
503                    fragment_model.job_id
504                )
505            });
506
507        let parallelism_policy = Self::format_fragment_parallelism_policy(
508            &fragment_model,
509            job_parallelism.as_ref(),
510            &root_fragments,
511        );
512
513        let fragment = FragmentDesc {
514            fragment_id: fragment_model.fragment_id,
515            job_id: fragment_model.job_id,
516            fragment_type_mask: fragment_model.fragment_type_mask,
517            distribution_type: fragment_model.distribution_type,
518            state_table_ids: fragment_model.state_table_ids.clone(),
519            parallelism: actors.len() as _,
520            vnode_count: fragment_model.vnode_count,
521            stream_node: fragment_model.stream_node.clone(),
522            parallelism_policy,
523        };
524
525        Ok(Some((fragment, upstreams)))
526    }
527
528    pub async fn list_fragment_database_ids(
529        &self,
530        select_fragment_ids: Option<Vec<FragmentId>>,
531    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
532        let inner = self.inner.read().await;
533        let select = FragmentModel::find()
534            .select_only()
535            .column(fragment::Column::FragmentId)
536            .column(object::Column::DatabaseId)
537            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
538        let select = if let Some(select_fragment_ids) = select_fragment_ids {
539            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
540        } else {
541            select
542        };
543        Ok(select.into_tuple().all(&inner.db).await?)
544    }
545
546    pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
547        let inner = self.inner.read().await;
548
549        // Load fragments matching the job from the database
550        let fragments: Vec<_> = FragmentModel::find()
551            .filter(fragment::Column::JobId.eq(job_id))
552            .all(&inner.db)
553            .await?;
554
555        let job_info = StreamingJob::find_by_id(job_id)
556            .one(&inner.db)
557            .await?
558            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
559
560        let fragment_actors =
561            self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
562
563        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
564            .await?
565            .remove(&job_id);
566
567        Self::compose_table_fragments(
568            job_id,
569            job_info.job_status.into(),
570            job_info.stream_context(),
571            fragment_actors,
572            job_info.parallelism.clone(),
573            job_info.max_parallelism as _,
574            job_definition,
575        )
576    }
577
578    pub async fn get_fragment_actor_dispatchers(
579        &self,
580        fragment_ids: Vec<FragmentId>,
581    ) -> MetaResult<FragmentActorDispatchers> {
582        let inner = self.inner.read().await;
583
584        self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
585            .await
586    }
587
588    pub async fn get_fragment_actor_dispatchers_txn(
589        &self,
590        c: &impl ConnectionTrait,
591        fragment_ids: Vec<FragmentId>,
592    ) -> MetaResult<FragmentActorDispatchers> {
593        let fragment_relations = FragmentRelation::find()
594            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
595            .all(c)
596            .await?;
597
598        type FragmentActorInfo = (
599            DistributionType,
600            Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
601        );
602
603        let shared_info = self.env.shared_actor_infos();
604        let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
605        let get_fragment_actors = |fragment_id: FragmentId| async move {
606            let result: MetaResult<FragmentActorInfo> = try {
607                let read_guard = shared_info.read_guard();
608
609                let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
610
611                (
612                    fragment.distribution_type,
613                    Arc::new(
614                        fragment
615                            .actors
616                            .iter()
617                            .map(|(actor_id, actor_info)| {
618                                (
619                                    *actor_id,
620                                    actor_info
621                                        .vnode_bitmap
622                                        .as_ref()
623                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
624                                )
625                            })
626                            .collect(),
627                    ),
628                )
629            };
630            result
631        };
632
633        let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
634        for fragment_relation::Model {
635            source_fragment_id,
636            target_fragment_id,
637            dispatcher_type,
638            dist_key_indices,
639            output_indices,
640            output_type_mapping,
641        } in fragment_relations
642        {
643            let (source_fragment_distribution, source_fragment_actors) = {
644                let (distribution, actors) = {
645                    match fragment_actor_cache.entry(source_fragment_id) {
646                        Entry::Occupied(entry) => entry.into_mut(),
647                        Entry::Vacant(entry) => {
648                            entry.insert(get_fragment_actors(source_fragment_id).await?)
649                        }
650                    }
651                };
652                (*distribution, actors.clone())
653            };
654            let (target_fragment_distribution, target_fragment_actors) = {
655                let (distribution, actors) = {
656                    match fragment_actor_cache.entry(target_fragment_id) {
657                        Entry::Occupied(entry) => entry.into_mut(),
658                        Entry::Vacant(entry) => {
659                            entry.insert(get_fragment_actors(target_fragment_id).await?)
660                        }
661                    }
662                };
663                (*distribution, actors.clone())
664            };
665            let output_mapping = PbDispatchOutputMapping {
666                indices: output_indices.into_u32_array(),
667                types: output_type_mapping.unwrap_or_default().to_protobuf(),
668            };
669            let dispatchers = compose_dispatchers(
670                source_fragment_distribution,
671                &source_fragment_actors,
672                target_fragment_id as _,
673                target_fragment_distribution,
674                &target_fragment_actors,
675                dispatcher_type,
676                dist_key_indices.into_u32_array(),
677                output_mapping,
678            );
679            let actor_dispatchers_map = actor_dispatchers_map
680                .entry(source_fragment_id as _)
681                .or_default();
682            for (actor_id, dispatchers) in dispatchers {
683                actor_dispatchers_map
684                    .entry(actor_id as _)
685                    .or_default()
686                    .push(dispatchers);
687            }
688        }
689        Ok(actor_dispatchers_map)
690    }
691
692    pub async fn get_fragment_downstream_relations(
693        &self,
694        fragment_ids: Vec<FragmentId>,
695    ) -> MetaResult<FragmentDownstreamRelation> {
696        let inner = self.inner.read().await;
697        let mut stream = FragmentRelation::find()
698            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
699            .stream(&inner.db)
700            .await?;
701        let mut relations = FragmentDownstreamRelation::new();
702        while let Some(relation) = stream.try_next().await? {
703            relations
704                .entry(relation.source_fragment_id as _)
705                .or_default()
706                .push(DownstreamFragmentRelation {
707                    downstream_fragment_id: relation.target_fragment_id as _,
708                    dispatcher_type: relation.dispatcher_type,
709                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
710                    output_mapping: PbDispatchOutputMapping {
711                        indices: relation.output_indices.into_u32_array(),
712                        types: relation
713                            .output_type_mapping
714                            .unwrap_or_default()
715                            .to_protobuf(),
716                    },
717                });
718        }
719        Ok(relations)
720    }
721
722    pub async fn get_job_fragment_backfill_scan_type(
723        &self,
724        job_id: JobId,
725    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
726        let inner = self.inner.read().await;
727        let fragments: Vec<_> = FragmentModel::find()
728            .filter(fragment::Column::JobId.eq(job_id))
729            .all(&inner.db)
730            .await?;
731
732        let mut result = HashMap::new();
733
734        for fragment::Model {
735            fragment_id,
736            stream_node,
737            ..
738        } in fragments
739        {
740            let stream_node = stream_node.to_protobuf();
741            visit_stream_node_body(&stream_node, |body| {
742                if let NodeBody::StreamScan(node) = body {
743                    match node.stream_scan_type() {
744                        StreamScanType::Unspecified => {}
745                        scan_type => {
746                            result.insert(fragment_id as crate::model::FragmentId, scan_type);
747                        }
748                    }
749                }
750            });
751        }
752
753        Ok(result)
754    }
755
756    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
757        let inner = self.inner.read().await;
758        let job_states = StreamingJob::find()
759            .select_only()
760            .column(streaming_job::Column::JobId)
761            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
762            .join(JoinType::InnerJoin, object::Relation::Database2.def())
763            .column(object::Column::ObjType)
764            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
765            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
766            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
767            .column_as(
768                Expr::if_null(
769                    Expr::col((table::Entity, table::Column::Name)),
770                    Expr::if_null(
771                        Expr::col((source::Entity, source::Column::Name)),
772                        Expr::if_null(
773                            Expr::col((sink::Entity, sink::Column::Name)),
774                            Expr::val("<unknown>"),
775                        ),
776                    ),
777                ),
778                "name",
779            )
780            .columns([
781                streaming_job::Column::JobStatus,
782                streaming_job::Column::Parallelism,
783                streaming_job::Column::MaxParallelism,
784            ])
785            .column_as(
786                Expr::if_null(
787                    Expr::col((
788                        streaming_job::Entity,
789                        streaming_job::Column::SpecificResourceGroup,
790                    )),
791                    Expr::col((database::Entity, database::Column::ResourceGroup)),
792                ),
793                "resource_group",
794            )
795            .column(object::Column::DatabaseId)
796            .column(object::Column::SchemaId)
797            .into_model()
798            .all(&inner.db)
799            .await?;
800        Ok(job_states)
801    }
802
803    pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
804        let inner = self.inner.read().await;
805        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
806            .select_only()
807            .column(streaming_job::Column::MaxParallelism)
808            .into_tuple()
809            .one(&inner.db)
810            .await?
811            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
812        Ok(max_parallelism as usize)
813    }
814
815    /// Try to get internal table ids of each streaming job, used by metrics collection.
816    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
817        if let Ok(inner) = self.inner.try_read()
818            && let Ok(job_state_tables) = FragmentModel::find()
819                .select_only()
820                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
821                .into_tuple::<(JobId, I32Array)>()
822                .all(&inner.db)
823                .await
824        {
825            let mut job_internal_table_ids = HashMap::new();
826            for (job_id, state_table_ids) in job_state_tables {
827                job_internal_table_ids
828                    .entry(job_id)
829                    .or_insert_with(Vec::new)
830                    .extend(
831                        state_table_ids
832                            .into_inner()
833                            .into_iter()
834                            .map(|table_id| TableId::new(table_id as _)),
835                    );
836            }
837            return Some(job_internal_table_ids.into_iter().collect());
838        }
839        None
840    }
841
842    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
843        let inner = self.inner.read().await;
844        let count = FragmentModel::find().count(&inner.db).await?;
845        Ok(count > 0)
846    }
847
848    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
849        let read_guard = self.env.shared_actor_infos().read_guard();
850        let actor_cnt: HashMap<WorkerId, _> = read_guard
851            .iter_over_fragments()
852            .flat_map(|(_, fragment)| {
853                fragment
854                    .actors
855                    .iter()
856                    .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
857            })
858            .into_group_map()
859            .into_iter()
860            .map(|(k, v)| (k, v.len()))
861            .collect();
862
863        Ok(actor_cnt)
864    }
865
866    fn collect_fragment_actor_map(
867        &self,
868        fragment_ids: &[FragmentId],
869        stream_context: StreamContext,
870    ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
871        let guard = self.env.shared_actor_infos().read_guard();
872        let pb_expr_context = stream_context.to_expr_context();
873        let expr_context: ExprContext = (&pb_expr_context).into();
874
875        let mut actor_map = HashMap::with_capacity(fragment_ids.len());
876        for fragment_id in fragment_ids {
877            let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
878                anyhow!("fragment {} not found in shared actor info", fragment_id)
879            })?;
880
881            let actors = fragment_info
882                .actors
883                .iter()
884                .map(|(actor_id, actor_info)| ActorInfo {
885                    actor_id: *actor_id as _,
886                    fragment_id: *fragment_id,
887                    splits: ConnectorSplits::from(&PbConnectorSplits {
888                        splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
889                    }),
890                    worker_id: actor_info.worker_id as _,
891                    vnode_bitmap: actor_info
892                        .vnode_bitmap
893                        .as_ref()
894                        .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
895                    expr_context: expr_context.clone(),
896                    config_override: stream_context.config_override.clone(),
897                })
898                .collect();
899
900            actor_map.insert(*fragment_id, actors);
901        }
902
903        Ok(actor_map)
904    }
905
906    fn collect_fragment_actor_pairs(
907        &self,
908        fragments: Vec<fragment::Model>,
909        stream_context: StreamContext,
910    ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
911        let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
912        let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
913        fragments
914            .into_iter()
915            .map(|fragment| {
916                let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
917                    anyhow!(
918                        "fragment {} missing in shared actor info map",
919                        fragment.fragment_id
920                    )
921                })?;
922                Ok((fragment, actors))
923            })
924            .collect()
925    }
926
927    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
928    pub async fn table_fragments(&self) -> MetaResult<BTreeMap<JobId, StreamJobFragments>> {
929        let inner = self.inner.read().await;
930        let jobs = StreamingJob::find().all(&inner.db).await?;
931
932        let mut job_definition = resolve_streaming_job_definition(
933            &inner.db,
934            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
935        )
936        .await?;
937
938        let mut table_fragments = BTreeMap::new();
939        for job in jobs {
940            let fragments = FragmentModel::find()
941                .filter(fragment::Column::JobId.eq(job.job_id))
942                .all(&inner.db)
943                .await?;
944
945            let fragment_actors =
946                self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
947
948            table_fragments.insert(
949                job.job_id,
950                Self::compose_table_fragments(
951                    job.job_id,
952                    job.job_status.into(),
953                    job.stream_context(),
954                    fragment_actors,
955                    job.parallelism.clone(),
956                    job.max_parallelism as _,
957                    job_definition.remove(&job.job_id),
958                )?,
959            );
960        }
961
962        Ok(table_fragments)
963    }
964
965    pub async fn upstream_fragments(
966        &self,
967        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
968    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
969        let inner = self.inner.read().await;
970        let mut stream = FragmentRelation::find()
971            .select_only()
972            .columns([
973                fragment_relation::Column::SourceFragmentId,
974                fragment_relation::Column::TargetFragmentId,
975            ])
976            .filter(
977                fragment_relation::Column::TargetFragmentId
978                    .is_in(fragment_ids.map(|id| id as FragmentId)),
979            )
980            .into_tuple::<(FragmentId, FragmentId)>()
981            .stream(&inner.db)
982            .await?;
983        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
984        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
985            upstream_fragments
986                .entry(downstream_fragment_id as crate::model::FragmentId)
987                .or_default()
988                .insert(upstream_fragment_id as crate::model::FragmentId);
989        }
990        Ok(upstream_fragments)
991    }
992
993    pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
994        let info = self.env.shared_actor_infos().read_guard();
995
996        let actor_locations = info
997            .iter_over_fragments()
998            .flat_map(|(fragment_id, fragment)| {
999                fragment
1000                    .actors
1001                    .iter()
1002                    .map(|(actor_id, actor)| PartialActorLocation {
1003                        actor_id: *actor_id as _,
1004                        fragment_id: *fragment_id as _,
1005                        worker_id: actor.worker_id,
1006                    })
1007            })
1008            .collect_vec();
1009
1010        Ok(actor_locations)
1011    }
1012
1013    pub async fn list_actor_info(
1014        &self,
1015    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1016        let inner = self.inner.read().await;
1017
1018        let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1019            FragmentModel::find()
1020                .select_only()
1021                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1022                .column(fragment::Column::FragmentId)
1023                .column_as(object::Column::Oid, "job_id")
1024                .column_as(object::Column::SchemaId, "schema_id")
1025                .column_as(object::Column::ObjType, "type")
1026                .into_tuple()
1027                .all(&inner.db)
1028                .await?;
1029
1030        let actor_infos = {
1031            let info = self.env.shared_actor_infos().read_guard();
1032
1033            fragment_objects
1034                .into_iter()
1035                .flat_map(|(fragment_id, object_id, schema_id, object_type)| {
1036                    let SharedFragmentInfo {
1037                        fragment_id,
1038                        actors,
1039                        ..
1040                    } = info.get_fragment(fragment_id as _).unwrap();
1041                    actors.keys().map(move |actor_id| {
1042                        (
1043                            *actor_id as _,
1044                            *fragment_id as _,
1045                            object_id,
1046                            schema_id,
1047                            object_type,
1048                        )
1049                    })
1050                })
1051                .collect_vec()
1052        };
1053
1054        Ok(actor_infos)
1055    }
1056
1057    pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1058        let guard = self.env.shared_actor_info.read_guard();
1059        guard
1060            .iter_over_fragments()
1061            .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1062            .collect_vec()
1063    }
1064
1065    pub async fn list_fragment_descs(
1066        &self,
1067        is_creating: bool,
1068    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1069        let inner = self.inner.read().await;
1070
1071        let txn = inner.db.begin().await?;
1072
1073        let fragments: Vec<_> = if is_creating {
1074            FragmentModel::find()
1075                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1076                .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1077                .filter(
1078                    streaming_job::Column::JobStatus
1079                        .eq(JobStatus::Initial)
1080                        .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1081                )
1082                .all(&txn)
1083                .await?
1084        } else {
1085            FragmentModel::find().all(&txn).await?
1086        };
1087
1088        let fragment_ids = fragments.iter().map(|f| f.fragment_id).collect_vec();
1089
1090        let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragments.is_empty() {
1091            HashMap::new()
1092        } else {
1093            let job_ids = fragments.iter().map(|f| f.job_id).unique().collect_vec();
1094            StreamingJob::find()
1095                .select_only()
1096                .columns([
1097                    streaming_job::Column::JobId,
1098                    streaming_job::Column::Parallelism,
1099                ])
1100                .filter(streaming_job::Column::JobId.is_in(job_ids))
1101                .into_tuple()
1102                .all(&txn)
1103                .await?
1104                .into_iter()
1105                .collect()
1106        };
1107
1108        let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1109            if fragment_ids.is_empty() {
1110                Vec::new()
1111            } else {
1112                FragmentRelation::find()
1113                    .select_only()
1114                    .columns([
1115                        fragment_relation::Column::TargetFragmentId,
1116                        fragment_relation::Column::SourceFragmentId,
1117                        fragment_relation::Column::DispatcherType,
1118                    ])
1119                    .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1120                    .into_tuple()
1121                    .all(&txn)
1122                    .await?
1123            };
1124
1125        let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1126        for (target_id, source_id, _) in upstream_entries {
1127            all_upstreams.entry(target_id).or_default().push(source_id);
1128        }
1129
1130        let root_fragment_map = if fragment_ids.is_empty() {
1131            HashMap::new()
1132        } else {
1133            let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?;
1134            Self::collect_root_fragment_mapping(ensembles)
1135        };
1136
1137        let guard = self.env.shared_actor_info.read_guard();
1138
1139        let mut result = Vec::new();
1140
1141        for fragment_desc in fragments {
1142            // note: here sometimes fragment is not found in the cache, fallback to 0
1143            let parallelism = guard
1144                .get_fragment(fragment_desc.fragment_id as _)
1145                .map(|fragment| fragment.actors.len())
1146                .unwrap_or_default();
1147
1148            let root_fragments = root_fragment_map
1149                .get(&fragment_desc.fragment_id)
1150                .cloned()
1151                .unwrap_or_default();
1152
1153            let upstreams = all_upstreams
1154                .remove(&fragment_desc.fragment_id)
1155                .unwrap_or_default();
1156
1157            let parallelism_policy = Self::format_fragment_parallelism_policy(
1158                &fragment_desc,
1159                job_parallelisms.get(&fragment_desc.job_id),
1160                &root_fragments,
1161            );
1162
1163            let fragment = FragmentDistribution {
1164                fragment_id: fragment_desc.fragment_id,
1165                table_id: fragment_desc.job_id,
1166                distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type)
1167                    as _,
1168                state_table_ids: fragment_desc.state_table_ids.0,
1169                upstream_fragment_ids: upstreams.clone(),
1170                fragment_type_mask: fragment_desc.fragment_type_mask as _,
1171                parallelism: parallelism as _,
1172                vnode_count: fragment_desc.vnode_count as _,
1173                node: Some(fragment_desc.stream_node.to_protobuf()),
1174                parallelism_policy,
1175            };
1176
1177            result.push((fragment, upstreams));
1178        }
1179        Ok(result)
1180    }
1181
1182    /// Build a fragment-to-root lookup for all reported root fragment ensembles.
1183    fn collect_root_fragment_mapping(
1184        ensembles: Vec<NoShuffleEnsemble>,
1185    ) -> HashMap<FragmentId, Vec<FragmentId>> {
1186        let mut mapping = HashMap::new();
1187
1188        for ensemble in ensembles {
1189            let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1190            roots.sort_unstable();
1191            roots.dedup();
1192
1193            if roots.is_empty() {
1194                continue;
1195            }
1196
1197            let root_set: HashSet<_> = roots.iter().copied().collect();
1198
1199            for fragment_id in ensemble.component_fragments() {
1200                if root_set.contains(&fragment_id) {
1201                    mapping.insert(fragment_id, Vec::new());
1202                } else {
1203                    mapping.insert(fragment_id, roots.clone());
1204                }
1205            }
1206        }
1207
1208        mapping
1209    }
1210
1211    fn format_fragment_parallelism_policy(
1212        fragment: &fragment::Model,
1213        job_parallelism: Option<&StreamingParallelism>,
1214        root_fragments: &[FragmentId],
1215    ) -> String {
1216        if fragment.distribution_type == DistributionType::Single {
1217            return "single".to_owned();
1218        }
1219
1220        if let Some(parallelism) = fragment.parallelism.as_ref() {
1221            return format!(
1222                "override({})",
1223                Self::format_streaming_parallelism(parallelism)
1224            );
1225        }
1226
1227        if !root_fragments.is_empty() {
1228            let mut upstreams = root_fragments.to_vec();
1229            upstreams.sort_unstable();
1230            upstreams.dedup();
1231
1232            return format!("upstream_fragment({upstreams:?})");
1233        }
1234
1235        let inherited = job_parallelism
1236            .map(Self::format_streaming_parallelism)
1237            .unwrap_or_else(|| "unknown".to_owned());
1238        format!("inherit({inherited})")
1239    }
1240
1241    fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1242        match parallelism {
1243            StreamingParallelism::Adaptive => "adaptive".to_owned(),
1244            StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1245            StreamingParallelism::Custom => "custom".to_owned(),
1246        }
1247    }
1248
1249    pub async fn list_sink_actor_mapping(
1250        &self,
1251    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1252        let inner = self.inner.read().await;
1253        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1254            .select_only()
1255            .columns([sink::Column::SinkId, sink::Column::Name])
1256            .into_tuple()
1257            .all(&inner.db)
1258            .await?;
1259        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1260
1261        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1262
1263        let actor_with_type: Vec<(ActorId, SinkId)> = {
1264            let info = self.env.shared_actor_infos().read_guard();
1265
1266            info.iter_over_fragments()
1267                .filter(|(_, fragment)| {
1268                    sink_ids.contains(&fragment.job_id.as_sink_id())
1269                        && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1270                })
1271                .flat_map(|(_, fragment)| {
1272                    fragment
1273                        .actors
1274                        .keys()
1275                        .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1276                })
1277                .collect()
1278        };
1279
1280        let mut sink_actor_mapping = HashMap::new();
1281        for (actor_id, sink_id) in actor_with_type {
1282            sink_actor_mapping
1283                .entry(sink_id)
1284                .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1285                .1
1286                .push(actor_id);
1287        }
1288
1289        Ok(sink_actor_mapping)
1290    }
1291
1292    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1293        let inner = self.inner.read().await;
1294        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1295            .select_only()
1296            .columns([
1297                fragment::Column::FragmentId,
1298                fragment::Column::JobId,
1299                fragment::Column::StateTableIds,
1300            ])
1301            .into_partial_model()
1302            .all(&inner.db)
1303            .await?;
1304        Ok(fragment_state_tables)
1305    }
1306
1307    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1308    /// collected
1309    pub async fn load_all_actors_dynamic(
1310        &self,
1311        database_id: Option<DatabaseId>,
1312        worker_nodes: &ActiveStreamingWorkerNodes,
1313    ) -> MetaResult<FragmentRenderMap> {
1314        let adaptive_parallelism_strategy = {
1315            let system_params_reader = self.env.system_params_reader().await;
1316            system_params_reader.adaptive_parallelism_strategy()
1317        };
1318
1319        let inner = self.inner.read().await;
1320        let txn = inner.db.begin().await?;
1321
1322        let database_fragment_infos = load_fragment_info(
1323            &txn,
1324            self.env.actor_id_generator(),
1325            database_id,
1326            worker_nodes,
1327            adaptive_parallelism_strategy,
1328        )
1329        .await?;
1330
1331        tracing::trace!(?database_fragment_infos, "reload all actors");
1332
1333        Ok(database_fragment_infos)
1334    }
1335
1336    #[await_tree::instrument]
1337    pub async fn fill_snapshot_backfill_epoch(
1338        &self,
1339        fragment_ids: impl Iterator<Item = FragmentId>,
1340        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1341        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1342    ) -> MetaResult<()> {
1343        let inner = self.inner.write().await;
1344        let txn = inner.db.begin().await?;
1345        for fragment_id in fragment_ids {
1346            let fragment = FragmentModel::find_by_id(fragment_id)
1347                .one(&txn)
1348                .await?
1349                .context(format!("fragment {} not found", fragment_id))?;
1350            let mut node = fragment.stream_node.to_protobuf();
1351            if crate::stream::fill_snapshot_backfill_epoch(
1352                &mut node,
1353                snapshot_backfill_info,
1354                cross_db_snapshot_backfill_info,
1355            )? {
1356                let node = StreamNode::from(&node);
1357                FragmentModel::update(fragment::ActiveModel {
1358                    fragment_id: Set(fragment_id),
1359                    stream_node: Set(node),
1360                    ..Default::default()
1361                })
1362                .exec(&txn)
1363                .await?;
1364            }
1365        }
1366        txn.commit().await?;
1367        Ok(())
1368    }
1369
1370    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1371    pub fn get_running_actors_of_fragment(
1372        &self,
1373        fragment_id: FragmentId,
1374    ) -> MetaResult<HashSet<model::ActorId>> {
1375        let info = self.env.shared_actor_infos().read_guard();
1376
1377        let actors = info
1378            .get_fragment(fragment_id as _)
1379            .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1380            .unwrap_or_default();
1381
1382        Ok(actors)
1383    }
1384
1385    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1386    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1387    pub async fn get_running_actors_for_source_backfill(
1388        &self,
1389        source_backfill_fragment_id: FragmentId,
1390        source_fragment_id: FragmentId,
1391    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1392        let inner = self.inner.read().await;
1393        let txn = inner.db.begin().await?;
1394
1395        let fragment_relation: DispatcherType = FragmentRelation::find()
1396            .select_only()
1397            .column(fragment_relation::Column::DispatcherType)
1398            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1399            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1400            .into_tuple()
1401            .one(&txn)
1402            .await?
1403            .ok_or_else(|| {
1404                anyhow!(
1405                    "no fragment connection from source fragment {} to source backfill fragment {}",
1406                    source_fragment_id,
1407                    source_backfill_fragment_id
1408                )
1409            })?;
1410
1411        if fragment_relation != DispatcherType::NoShuffle {
1412            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1413        }
1414
1415        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1416            let result: MetaResult<DistributionType> = try {
1417                FragmentModel::find_by_id(fragment_id)
1418                    .select_only()
1419                    .column(fragment::Column::DistributionType)
1420                    .into_tuple()
1421                    .one(txn)
1422                    .await?
1423                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1424            };
1425            result
1426        };
1427
1428        let source_backfill_distribution_type =
1429            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1430        let source_distribution_type =
1431            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1432
1433        let load_fragment_actor_distribution =
1434            |actor_info: &SharedActorInfos,
1435             fragment_id: FragmentId|
1436             -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1437                let guard = actor_info.read_guard();
1438
1439                guard
1440                    .get_fragment(fragment_id as _)
1441                    .map(|fragment| {
1442                        fragment
1443                            .actors
1444                            .iter()
1445                            .map(|(actor_id, actor)| {
1446                                (
1447                                    *actor_id as _,
1448                                    actor
1449                                        .vnode_bitmap
1450                                        .as_ref()
1451                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1452                                )
1453                            })
1454                            .collect()
1455                    })
1456                    .unwrap_or_default()
1457            };
1458
1459        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1460            load_fragment_actor_distribution(
1461                self.env.shared_actor_infos(),
1462                source_backfill_fragment_id,
1463            );
1464
1465        let source_actors =
1466            load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1467
1468        Ok(resolve_no_shuffle_actor_dispatcher(
1469            source_distribution_type,
1470            &source_actors,
1471            source_backfill_distribution_type,
1472            &source_backfill_actors,
1473        )
1474        .into_iter()
1475        .map(|(source_actor, source_backfill_actor)| {
1476            (source_backfill_actor as _, source_actor as _)
1477        })
1478        .collect())
1479    }
1480
1481    /// Get and filter the "**root**" fragments of the specified jobs.
1482    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1483    ///
1484    /// Root fragment connects to downstream jobs.
1485    ///
1486    /// ## What can be the root fragment
1487    /// - For sink, it should have one `Sink` fragment.
1488    /// - For MV, it should have one `MView` fragment.
1489    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1490    /// - For source, it should have one `Source` fragment.
1491    ///
1492    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1493    pub async fn get_root_fragments(
1494        &self,
1495        job_ids: Vec<JobId>,
1496    ) -> MetaResult<(
1497        HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1498        HashMap<ActorId, WorkerId>,
1499    )> {
1500        let inner = self.inner.read().await;
1501
1502        let all_fragments = FragmentModel::find()
1503            .filter(fragment::Column::JobId.is_in(job_ids))
1504            .all(&inner.db)
1505            .await?;
1506        // job_id -> fragment
1507        let mut root_fragments = HashMap::<JobId, fragment::Model>::new();
1508        for fragment in all_fragments {
1509            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1510            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1511                _ = root_fragments.insert(fragment.job_id, fragment);
1512            } else if mask.contains(FragmentTypeFlag::Source) {
1513                // look for Source fragment only if there's no MView fragment
1514                // (notice try_insert here vs insert above)
1515                _ = root_fragments.try_insert(fragment.job_id, fragment);
1516            }
1517        }
1518
1519        let mut root_fragments_pb = HashMap::new();
1520
1521        let info = self.env.shared_actor_infos().read_guard();
1522
1523        let root_fragment_to_jobs: HashMap<_, _> = root_fragments
1524            .iter()
1525            .map(|(job_id, fragment)| (fragment.fragment_id, *job_id))
1526            .collect();
1527
1528        for fragment in root_fragment_to_jobs.keys() {
1529            let fragment_info = info.get_fragment(*fragment).context(format!(
1530                "fragment {} not found in shared actor info",
1531                fragment
1532            ))?;
1533
1534            let job_id = root_fragment_to_jobs[&fragment_info.fragment_id];
1535            let fragment = root_fragments
1536                .get(&job_id)
1537                .context(format!("root fragment for job {} not found", job_id))?;
1538
1539            root_fragments_pb.insert(
1540                job_id,
1541                (fragment_info.clone(), fragment.stream_node.to_protobuf()),
1542            );
1543        }
1544
1545        let mut all_actor_locations = HashMap::new();
1546
1547        for (_, SharedFragmentInfo { actors, .. }) in info.iter_over_fragments() {
1548            for (actor_id, actor_info) in actors {
1549                all_actor_locations.insert(*actor_id as ActorId, actor_info.worker_id);
1550            }
1551        }
1552
1553        Ok((root_fragments_pb, all_actor_locations))
1554    }
1555
1556    pub async fn get_root_fragment(
1557        &self,
1558        job_id: JobId,
1559    ) -> MetaResult<(SharedFragmentInfo, HashMap<ActorId, WorkerId>)> {
1560        let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1561        let (root_fragment, _) = root_fragments
1562            .remove(&job_id)
1563            .context(format!("root fragment for job {} not found", job_id))?;
1564
1565        Ok((root_fragment, actors))
1566    }
1567
1568    /// Get the downstream fragments connected to the specified job.
1569    pub async fn get_downstream_fragments(
1570        &self,
1571        job_id: JobId,
1572    ) -> MetaResult<(
1573        Vec<(
1574            stream_plan::DispatcherType,
1575            SharedFragmentInfo,
1576            PbStreamNode,
1577        )>,
1578        HashMap<ActorId, WorkerId>,
1579    )> {
1580        let (root_fragment, actor_locations) = self.get_root_fragment(job_id).await?;
1581
1582        let inner = self.inner.read().await;
1583        let txn = inner.db.begin().await?;
1584        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1585            .filter(
1586                fragment_relation::Column::SourceFragmentId
1587                    .eq(root_fragment.fragment_id as FragmentId),
1588            )
1589            .all(&txn)
1590            .await?;
1591
1592        let downstream_fragment_ids = downstream_fragment_relations
1593            .iter()
1594            .map(|model| model.target_fragment_id as FragmentId)
1595            .collect::<HashSet<_>>();
1596
1597        let downstream_fragment_nodes: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1598            .select_only()
1599            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1600            .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1601            .into_tuple()
1602            .all(&txn)
1603            .await?;
1604
1605        let downstream_fragment_nodes: HashMap<_, _> =
1606            downstream_fragment_nodes.into_iter().collect();
1607
1608        let mut downstream_fragments = vec![];
1609
1610        let info = self.env.shared_actor_infos().read_guard();
1611
1612        let fragment_map: HashMap<_, _> = downstream_fragment_relations
1613            .iter()
1614            .map(|model| (model.target_fragment_id, model.dispatcher_type))
1615            .collect();
1616
1617        for fragment_id in fragment_map.keys() {
1618            let fragment_info @ SharedFragmentInfo { actors, .. } =
1619                info.get_fragment(*fragment_id).unwrap();
1620
1621            let dispatcher_type = fragment_map[fragment_id];
1622
1623            if actors.is_empty() {
1624                bail!("No fragment found for fragment id {}", fragment_id);
1625            }
1626
1627            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1628
1629            let nodes = downstream_fragment_nodes
1630                .get(fragment_id)
1631                .context(format!(
1632                    "downstream fragment node for id {} not found",
1633                    fragment_id
1634                ))?
1635                .to_protobuf();
1636
1637            downstream_fragments.push((dispatch_type, fragment_info.clone(), nodes));
1638        }
1639        Ok((downstream_fragments, actor_locations))
1640    }
1641
1642    pub async fn load_source_fragment_ids(
1643        &self,
1644    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1645        let inner = self.inner.read().await;
1646        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1647            .select_only()
1648            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1649            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1650            .into_tuple()
1651            .all(&inner.db)
1652            .await?;
1653
1654        let mut source_fragment_ids = HashMap::new();
1655        for (fragment_id, stream_node) in fragments {
1656            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1657                source_fragment_ids
1658                    .entry(source_id)
1659                    .or_insert_with(BTreeSet::new)
1660                    .insert(fragment_id);
1661            }
1662        }
1663        Ok(source_fragment_ids)
1664    }
1665
1666    pub async fn load_backfill_fragment_ids(
1667        &self,
1668    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1669        let inner = self.inner.read().await;
1670        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1671            .select_only()
1672            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1673            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1674            .into_tuple()
1675            .all(&inner.db)
1676            .await?;
1677
1678        let mut source_fragment_ids = HashMap::new();
1679        for (fragment_id, stream_node) in fragments {
1680            if let Some((source_id, upstream_source_fragment_id)) =
1681                stream_node.to_protobuf().find_source_backfill()
1682            {
1683                source_fragment_ids
1684                    .entry(source_id)
1685                    .or_insert_with(BTreeSet::new)
1686                    .insert((fragment_id, upstream_source_fragment_id));
1687            }
1688        }
1689        Ok(source_fragment_ids)
1690    }
1691
1692    pub async fn get_all_upstream_sink_infos(
1693        &self,
1694        target_table: &PbTable,
1695        target_fragment_id: FragmentId,
1696    ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1697        let incoming_sinks = self.get_table_incoming_sinks(target_table.id).await?;
1698
1699        let inner = self.inner.read().await;
1700        let txn = inner.db.begin().await?;
1701
1702        let sink_ids = incoming_sinks.iter().map(|s| s.id).collect_vec();
1703        let sink_fragment_ids = get_sink_fragment_by_ids(&txn, sink_ids).await?;
1704
1705        let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1706        for pb_sink in &incoming_sinks {
1707            let sink_fragment_id =
1708                sink_fragment_ids
1709                    .get(&pb_sink.id)
1710                    .cloned()
1711                    .ok_or(anyhow::anyhow!(
1712                        "sink fragment not found for sink id {}",
1713                        pb_sink.id
1714                    ))?;
1715            let upstream_info = build_upstream_sink_info(
1716                pb_sink,
1717                sink_fragment_id,
1718                target_table,
1719                target_fragment_id,
1720            )?;
1721            upstream_sink_infos.push(upstream_info);
1722        }
1723
1724        Ok(upstream_sink_infos)
1725    }
1726
1727    pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1728        let inner = self.inner.read().await;
1729        let txn = inner.db.begin().await?;
1730
1731        let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1732            .select_only()
1733            .column(fragment::Column::FragmentId)
1734            .filter(
1735                fragment::Column::JobId
1736                    .eq(job_id)
1737                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1738            )
1739            .into_tuple()
1740            .all(&txn)
1741            .await?;
1742
1743        if mview_fragment.len() != 1 {
1744            return Err(anyhow::anyhow!(
1745                "expected exactly one mview fragment for job {}, found {}",
1746                job_id,
1747                mview_fragment.len()
1748            )
1749            .into());
1750        }
1751
1752        Ok(mview_fragment.into_iter().next().unwrap())
1753    }
1754
1755    pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1756        let inner = self.inner.read().await;
1757        let txn = inner.db.begin().await?;
1758        has_table_been_migrated(&txn, table_id).await
1759    }
1760
1761    pub async fn update_fragment_splits<C>(
1762        &self,
1763        txn: &C,
1764        fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
1765    ) -> MetaResult<()>
1766    where
1767        C: ConnectionTrait,
1768    {
1769        if fragment_splits.is_empty() {
1770            return Ok(());
1771        }
1772
1773        let models: Vec<fragment_splits::ActiveModel> = fragment_splits
1774            .iter()
1775            .map(|(fragment_id, splits)| fragment_splits::ActiveModel {
1776                fragment_id: Set(*fragment_id as _),
1777                splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1778                    splits: splits.iter().map(Into::into).collect_vec(),
1779                }))),
1780            })
1781            .collect();
1782
1783        FragmentSplits::insert_many(models)
1784            .on_conflict(
1785                OnConflict::column(fragment_splits::Column::FragmentId)
1786                    .update_column(fragment_splits::Column::Splits)
1787                    .to_owned(),
1788            )
1789            .exec(txn)
1790            .await?;
1791
1792        Ok(())
1793    }
1794}
1795
1796#[cfg(test)]
1797mod tests {
1798    use std::collections::{BTreeMap, HashMap, HashSet};
1799
1800    use itertools::Itertools;
1801    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1802    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1803    use risingwave_common::id::JobId;
1804    use risingwave_common::util::iter_util::ZipEqDebug;
1805    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1806    use risingwave_meta_model::fragment::DistributionType;
1807    use risingwave_meta_model::*;
1808    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1809    use risingwave_pb::plan_common::PbExprContext;
1810    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1811    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1812    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1813
1814    use super::ActorInfo;
1815    use crate::MetaResult;
1816    use crate::controller::catalog::CatalogController;
1817    use crate::model::{Fragment, StreamActor};
1818
1819    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1820
1821    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1822
1823    const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
1824
1825    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
1826
1827    const TEST_JOB_ID: JobId = JobId::new(1);
1828
1829    const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
1830
1831    fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
1832        let mut upstream_actor_ids = BTreeMap::new();
1833        upstream_actor_ids.insert(
1834            TEST_UPSTREAM_FRAGMENT_ID,
1835            HashSet::from_iter([(actor_id + 100)]),
1836        );
1837        upstream_actor_ids.insert(
1838            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1839            HashSet::from_iter([(actor_id + 200)]),
1840        );
1841        upstream_actor_ids
1842    }
1843
1844    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1845        let mut input = vec![];
1846        for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
1847            input.push(PbStreamNode {
1848                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1849                    upstream_fragment_id,
1850                    ..Default::default()
1851                }))),
1852                ..Default::default()
1853            });
1854        }
1855
1856        PbStreamNode {
1857            input,
1858            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1859            ..Default::default()
1860        }
1861    }
1862
1863    #[tokio::test]
1864    async fn test_extract_fragment() -> MetaResult<()> {
1865        let actor_count = 3u32;
1866        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1867            .map(|actor_id| {
1868                (
1869                    actor_id.into(),
1870                    generate_upstream_actor_ids_for_actor(actor_id.into()),
1871                )
1872            })
1873            .collect();
1874
1875        let actor_bitmaps = ActorMapping::new_uniform(
1876            (0..actor_count).map(|i| i.into()),
1877            VirtualNode::COUNT_FOR_TEST,
1878        )
1879        .to_bitmaps();
1880
1881        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1882
1883        let pb_actors = (0..actor_count)
1884            .map(|actor_id| StreamActor {
1885                actor_id: actor_id.into(),
1886                fragment_id: TEST_FRAGMENT_ID as _,
1887                vnode_bitmap: actor_bitmaps.get(&actor_id.into()).cloned(),
1888                mview_definition: "".to_owned(),
1889                expr_context: Some(PbExprContext {
1890                    time_zone: String::from("America/New_York"),
1891                    strict_mode: false,
1892                }),
1893                config_override: "".into(),
1894            })
1895            .collect_vec();
1896
1897        let pb_fragment = Fragment {
1898            fragment_id: TEST_FRAGMENT_ID as _,
1899            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1900            distribution_type: PbFragmentDistributionType::Hash as _,
1901            actors: pb_actors,
1902            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1903            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1904            nodes: stream_node,
1905        };
1906
1907        let fragment =
1908            CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
1909
1910        check_fragment(fragment, pb_fragment);
1911
1912        Ok(())
1913    }
1914
1915    #[tokio::test]
1916    async fn test_compose_fragment() -> MetaResult<()> {
1917        let actor_count = 3u32;
1918
1919        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1920            .map(|actor_id| {
1921                (
1922                    actor_id.into(),
1923                    generate_upstream_actor_ids_for_actor(actor_id.into()),
1924                )
1925            })
1926            .collect();
1927
1928        let mut actor_bitmaps = ActorMapping::new_uniform(
1929            (0..actor_count).map(|i| i.into()),
1930            VirtualNode::COUNT_FOR_TEST,
1931        )
1932        .to_bitmaps();
1933
1934        let actors = (0..actor_count)
1935            .map(|actor_id| {
1936                let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
1937                    splits: vec![PbConnectorSplit {
1938                        split_type: "dummy".to_owned(),
1939                        ..Default::default()
1940                    }],
1941                });
1942
1943                ActorInfo {
1944                    actor_id: actor_id.into(),
1945                    fragment_id: TEST_FRAGMENT_ID,
1946                    splits: actor_splits,
1947                    worker_id: 0.into(),
1948                    vnode_bitmap: actor_bitmaps
1949                        .remove(&actor_id.into())
1950                        .map(|bitmap| bitmap.to_protobuf())
1951                        .as_ref()
1952                        .map(VnodeBitmap::from),
1953                    expr_context: ExprContext::from(&PbExprContext {
1954                        time_zone: String::from("America/New_York"),
1955                        strict_mode: false,
1956                    }),
1957                    config_override: "a.b.c = true".into(),
1958                }
1959            })
1960            .collect_vec();
1961
1962        let stream_node = {
1963            let template_actor = actors.first().cloned().unwrap();
1964
1965            let template_upstream_actor_ids = upstream_actor_ids
1966                .get(&(template_actor.actor_id as _))
1967                .unwrap();
1968
1969            generate_merger_stream_node(template_upstream_actor_ids)
1970        };
1971
1972        #[expect(deprecated)]
1973        let fragment = fragment::Model {
1974            fragment_id: TEST_FRAGMENT_ID,
1975            job_id: TEST_JOB_ID,
1976            fragment_type_mask: 0,
1977            distribution_type: DistributionType::Hash,
1978            stream_node: StreamNode::from(&stream_node),
1979            state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
1980            upstream_fragment_id: Default::default(),
1981            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1982            parallelism: None,
1983        };
1984
1985        let (pb_fragment, pb_actor_status, pb_actor_splits) =
1986            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
1987
1988        assert_eq!(pb_actor_status.len(), actor_count as usize);
1989        assert!(
1990            pb_actor_status
1991                .values()
1992                .all(|actor_status| actor_status.location.is_some())
1993        );
1994        assert_eq!(pb_actor_splits.len(), actor_count as usize);
1995
1996        let pb_actors = pb_fragment.actors.clone();
1997
1998        check_fragment(fragment, pb_fragment);
1999        check_actors(
2000            actors,
2001            &upstream_actor_ids,
2002            pb_actors,
2003            pb_actor_splits,
2004            &stream_node,
2005        );
2006
2007        Ok(())
2008    }
2009
2010    fn check_actors(
2011        actors: Vec<ActorInfo>,
2012        actor_upstreams: &FragmentActorUpstreams,
2013        pb_actors: Vec<StreamActor>,
2014        pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2015        stream_node: &PbStreamNode,
2016    ) {
2017        for (
2018            ActorInfo {
2019                actor_id,
2020                fragment_id,
2021                splits,
2022                worker_id: _,
2023                vnode_bitmap,
2024                expr_context,
2025                ..
2026            },
2027            StreamActor {
2028                actor_id: pb_actor_id,
2029                fragment_id: pb_fragment_id,
2030                vnode_bitmap: pb_vnode_bitmap,
2031                mview_definition,
2032                expr_context: pb_expr_context,
2033                ..
2034            },
2035        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2036        {
2037            assert_eq!(actor_id, pb_actor_id as ActorId);
2038            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2039
2040            assert_eq!(
2041                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2042                pb_vnode_bitmap,
2043            );
2044
2045            assert_eq!(mview_definition, "");
2046
2047            visit_stream_node_body(stream_node, |body| {
2048                if let PbNodeBody::Merge(m) = body {
2049                    assert!(
2050                        actor_upstreams
2051                            .get(&(actor_id as _))
2052                            .unwrap()
2053                            .contains_key(&m.upstream_fragment_id)
2054                    );
2055                }
2056            });
2057
2058            assert_eq!(
2059                splits,
2060                pb_actor_splits
2061                    .get(&pb_actor_id)
2062                    .map(ConnectorSplits::from)
2063                    .unwrap_or_default()
2064            );
2065
2066            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2067        }
2068    }
2069
2070    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2071        let Fragment {
2072            fragment_id,
2073            fragment_type_mask,
2074            distribution_type: pb_distribution_type,
2075            actors: _,
2076            state_table_ids: pb_state_table_ids,
2077            maybe_vnode_count: _,
2078            nodes,
2079        } = pb_fragment;
2080
2081        assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2082        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2083        assert_eq!(
2084            pb_distribution_type,
2085            PbFragmentDistributionType::from(fragment.distribution_type)
2086        );
2087
2088        assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2089        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2090    }
2091
2092    #[test]
2093    fn test_parallelism_policy_with_root_fragments() {
2094        #[expect(deprecated)]
2095        let fragment = fragment::Model {
2096            fragment_id: 3.into(),
2097            job_id: TEST_JOB_ID,
2098            fragment_type_mask: 0,
2099            distribution_type: DistributionType::Hash,
2100            stream_node: StreamNode::from(&PbStreamNode::default()),
2101            state_table_ids: TableIdArray::default(),
2102            upstream_fragment_id: Default::default(),
2103            vnode_count: 0,
2104            parallelism: None,
2105        };
2106
2107        let job_parallelism = StreamingParallelism::Fixed(4);
2108
2109        let policy = super::CatalogController::format_fragment_parallelism_policy(
2110            &fragment,
2111            Some(&job_parallelism),
2112            &[],
2113        );
2114
2115        assert_eq!(policy, "inherit(fixed(4))");
2116    }
2117
2118    #[test]
2119    fn test_parallelism_policy_with_upstream_roots() {
2120        #[expect(deprecated)]
2121        let fragment = fragment::Model {
2122            fragment_id: 5.into(),
2123            job_id: TEST_JOB_ID,
2124            fragment_type_mask: 0,
2125            distribution_type: DistributionType::Hash,
2126            stream_node: StreamNode::from(&PbStreamNode::default()),
2127            state_table_ids: TableIdArray::default(),
2128            upstream_fragment_id: Default::default(),
2129            vnode_count: 0,
2130            parallelism: None,
2131        };
2132
2133        let policy = super::CatalogController::format_fragment_parallelism_policy(
2134            &fragment,
2135            None,
2136            &[3.into(), 1.into(), 2.into(), 1.into()],
2137        );
2138
2139        assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2140    }
2141}