risingwave_meta/controller/
fragment.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19
20use anyhow::{Context, anyhow};
21use futures::TryStreamExt;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
26use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
27use risingwave_common::id::JobId;
28use risingwave_common::system_param::reader::SystemParamsRead;
29use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
30use risingwave_connector::source::SplitImpl;
31use risingwave_connector::source::cdc::CdcScanOptions;
32use risingwave_meta_model::fragment::DistributionType;
33use risingwave_meta_model::object::ObjectType;
34use risingwave_meta_model::prelude::{
35    Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
36};
37use risingwave_meta_model::{
38    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
39    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
40    VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits, object, sink,
41    source, streaming_job, table,
42};
43use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
44use risingwave_pb::catalog::PbTable;
45use risingwave_pb::common::PbActorLocation;
46use risingwave_pb::meta::subscribe_response::{
47    Info as NotificationInfo, Operation as NotificationOperation,
48};
49use risingwave_pb::meta::table_fragments::fragment::{
50    FragmentDistributionType, PbFragmentDistributionType,
51};
52use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
53use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
54use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
55use risingwave_pb::stream_plan;
56use risingwave_pb::stream_plan::stream_node::NodeBody;
57use risingwave_pb::stream_plan::{
58    PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63    ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, JoinType, PaginatorTrait,
64    QueryFilter, QuerySelect, RelationTrait, StreamTrait, TransactionTrait,
65};
66use serde::{Deserialize, Serialize};
67
68use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
69use crate::controller::catalog::CatalogController;
70use crate::controller::scale::{
71    FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble, RenderedGraph, WorkerInfo,
72    find_fragment_no_shuffle_dags_detailed, load_fragment_context_for_jobs,
73    render_actor_assignments, resolve_streaming_job_definition,
74};
75use crate::controller::utils::{
76    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
77    get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
78    resolve_no_shuffle_actor_dispatcher,
79};
80use crate::error::MetaError;
81use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
82use crate::model::{
83    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
84    StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
85    TableParallelism,
86};
87use crate::rpc::ddl_controller::build_upstream_sink_info;
88use crate::stream::UpstreamSinkInfo;
89use crate::{MetaResult, model};
90
91/// Some information of running (inflight) actors.
92#[derive(Debug)]
93pub struct InflightActorInfo {
94    pub worker_id: WorkerId,
95    pub vnode_bitmap: Option<Bitmap>,
96    pub splits: Vec<SplitImpl>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100struct ActorInfo {
101    pub actor_id: ActorId,
102    pub fragment_id: FragmentId,
103    pub splits: ConnectorSplits,
104    pub worker_id: WorkerId,
105    pub vnode_bitmap: Option<VnodeBitmap>,
106    pub expr_context: ExprContext,
107    pub config_override: Arc<str>,
108}
109
110#[derive(Debug)]
111pub struct InflightFragmentInfo {
112    pub fragment_id: FragmentId,
113    pub distribution_type: DistributionType,
114    pub fragment_type_mask: FragmentTypeMask,
115    pub vnode_count: usize,
116    pub nodes: PbStreamNode,
117    pub actors: HashMap<ActorId, InflightActorInfo>,
118    pub state_table_ids: HashSet<TableId>,
119}
120
121#[derive(Clone, Debug)]
122pub struct FragmentParallelismInfo {
123    pub distribution_type: FragmentDistributionType,
124    pub actor_count: usize,
125    pub vnode_count: usize,
126}
127
128#[easy_ext::ext(FragmentTypeMaskExt)]
129pub impl FragmentTypeMask {
130    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
131        Expr::col(fragment::Column::FragmentTypeMask)
132            .bit_and(Expr::value(flag as i32))
133            .ne(0)
134    }
135
136    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
137        Expr::col(fragment::Column::FragmentTypeMask)
138            .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
139            .ne(0)
140    }
141
142    fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
143        Expr::col(fragment::Column::FragmentTypeMask)
144            .bit_and(Expr::value(flag as i32))
145            .eq(0)
146    }
147}
148
149#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
150#[serde(rename_all = "camelCase")] // for dashboard
151pub struct StreamingJobInfo {
152    pub job_id: JobId,
153    pub obj_type: ObjectType,
154    pub name: String,
155    pub job_status: JobStatus,
156    pub parallelism: StreamingParallelism,
157    pub max_parallelism: i32,
158    pub resource_group: String,
159    pub config_override: String,
160    pub database_id: DatabaseId,
161    pub schema_id: SchemaId,
162}
163
164impl NotificationManager {
165    pub(crate) fn notify_fragment_mapping(
166        &self,
167        operation: NotificationOperation,
168        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
169    ) {
170        let fragment_ids = fragment_mappings
171            .iter()
172            .map(|mapping| mapping.fragment_id)
173            .collect_vec();
174        if fragment_ids.is_empty() {
175            return;
176        }
177        // notify all fragment mappings to frontend.
178        for fragment_mapping in fragment_mappings {
179            self.notify_frontend_without_version(
180                operation,
181                NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
182            );
183        }
184
185        // update serving vnode mappings.
186        match operation {
187            NotificationOperation::Add | NotificationOperation::Update => {
188                self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
189                    fragment_ids,
190                ));
191            }
192            NotificationOperation::Delete => {
193                self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
194                    fragment_ids,
195                ));
196            }
197            op => {
198                tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
199            }
200        }
201    }
202}
203
204impl CatalogController {
205    pub fn prepare_fragment_models_from_fragments(
206        job_id: JobId,
207        fragments: impl Iterator<Item = &Fragment>,
208    ) -> MetaResult<Vec<fragment::Model>> {
209        fragments
210            .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
211            .try_collect()
212    }
213
214    pub fn prepare_fragment_model_for_new_job(
215        job_id: JobId,
216        fragment: &Fragment,
217    ) -> MetaResult<fragment::Model> {
218        let vnode_count = fragment.vnode_count();
219        let Fragment {
220            fragment_id: pb_fragment_id,
221            fragment_type_mask: pb_fragment_type_mask,
222            distribution_type: pb_distribution_type,
223            actors: pb_actors,
224            state_table_ids: pb_state_table_ids,
225            nodes,
226            ..
227        } = fragment;
228
229        let state_table_ids = pb_state_table_ids.clone().into();
230
231        assert!(!pb_actors.is_empty());
232
233        let fragment_parallelism = nodes
234            .node_body
235            .as_ref()
236            .and_then(|body| match body {
237                NodeBody::StreamCdcScan(node) => Some(node),
238                _ => None,
239            })
240            .and_then(|node| node.options.as_ref())
241            .map(CdcScanOptions::from_proto)
242            .filter(|opts| opts.is_parallelized_backfill())
243            .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
244
245        let stream_node = StreamNode::from(nodes);
246
247        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
248            .unwrap()
249            .into();
250
251        #[expect(deprecated)]
252        let fragment = fragment::Model {
253            fragment_id: *pb_fragment_id as _,
254            job_id,
255            fragment_type_mask: (*pb_fragment_type_mask).into(),
256            distribution_type,
257            stream_node,
258            state_table_ids,
259            upstream_fragment_id: Default::default(),
260            vnode_count: vnode_count as _,
261            parallelism: fragment_parallelism,
262        };
263
264        Ok(fragment)
265    }
266
267    fn compose_table_fragments(
268        job_id: JobId,
269        state: PbState,
270        ctx: StreamContext,
271        fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
272        parallelism: StreamingParallelism,
273        max_parallelism: usize,
274        job_definition: Option<String>,
275    ) -> MetaResult<StreamJobFragments> {
276        let mut pb_fragments = BTreeMap::new();
277        let mut pb_actor_status = BTreeMap::new();
278
279        for (fragment, actors) in fragments {
280            let (fragment, fragment_actor_status, _) =
281                Self::compose_fragment(fragment, actors, job_definition.clone())?;
282
283            pb_fragments.insert(fragment.fragment_id, fragment);
284            pb_actor_status.extend(fragment_actor_status.into_iter());
285        }
286
287        let table_fragments = StreamJobFragments {
288            stream_job_id: job_id,
289            state: state as _,
290            fragments: pb_fragments,
291            actor_status: pb_actor_status,
292            ctx,
293            assigned_parallelism: match parallelism {
294                StreamingParallelism::Custom => TableParallelism::Custom,
295                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
296                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
297            },
298            max_parallelism,
299        };
300
301        Ok(table_fragments)
302    }
303
304    #[allow(clippy::type_complexity)]
305    fn compose_fragment(
306        fragment: fragment::Model,
307        actors: Vec<ActorInfo>,
308        job_definition: Option<String>,
309    ) -> MetaResult<(
310        Fragment,
311        HashMap<crate::model::ActorId, PbActorStatus>,
312        HashMap<crate::model::ActorId, PbConnectorSplits>,
313    )> {
314        let fragment::Model {
315            fragment_id,
316            fragment_type_mask,
317            distribution_type,
318            stream_node,
319            state_table_ids,
320            vnode_count,
321            ..
322        } = fragment;
323
324        let stream_node = stream_node.to_protobuf();
325        let mut upstream_fragments = HashSet::new();
326        visit_stream_node_body(&stream_node, |body| {
327            if let NodeBody::Merge(m) = body {
328                assert!(
329                    upstream_fragments.insert(m.upstream_fragment_id),
330                    "non-duplicate upstream fragment"
331                );
332            }
333        });
334
335        let mut pb_actors = vec![];
336
337        let mut pb_actor_status = HashMap::new();
338        let mut pb_actor_splits = HashMap::new();
339
340        for actor in actors {
341            if actor.fragment_id != fragment_id {
342                bail!(
343                    "fragment id {} from actor {} is different from fragment {}",
344                    actor.fragment_id,
345                    actor.actor_id,
346                    fragment_id
347                )
348            }
349
350            let ActorInfo {
351                actor_id,
352                fragment_id,
353                worker_id,
354                splits,
355                vnode_bitmap,
356                expr_context,
357                config_override,
358                ..
359            } = actor;
360
361            let vnode_bitmap =
362                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
363            let pb_expr_context = Some(expr_context.to_protobuf());
364
365            pb_actor_status.insert(
366                actor_id as _,
367                PbActorStatus {
368                    location: PbActorLocation::from_worker(worker_id),
369                },
370            );
371
372            pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
373
374            pb_actors.push(StreamActor {
375                actor_id: actor_id as _,
376                fragment_id: fragment_id as _,
377                vnode_bitmap,
378                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
379                expr_context: pb_expr_context,
380                config_override,
381            })
382        }
383
384        let pb_state_table_ids = state_table_ids.0;
385        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
386        let pb_fragment = Fragment {
387            fragment_id: fragment_id as _,
388            fragment_type_mask: fragment_type_mask.into(),
389            distribution_type: pb_distribution_type,
390            actors: pb_actors,
391            state_table_ids: pb_state_table_ids,
392            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
393            nodes: stream_node,
394        };
395
396        Ok((pb_fragment, pb_actor_status, pb_actor_splits))
397    }
398
399    pub fn running_fragment_parallelisms(
400        &self,
401        id_filter: Option<HashSet<FragmentId>>,
402    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
403        let info = self.env.shared_actor_infos().read_guard();
404
405        let mut result = HashMap::new();
406        for (fragment_id, fragment) in info.iter_over_fragments() {
407            if let Some(id_filter) = &id_filter
408                && !id_filter.contains(&(*fragment_id as _))
409            {
410                continue; // Skip fragments not in the filter
411            }
412
413            result.insert(
414                *fragment_id as _,
415                FragmentParallelismInfo {
416                    distribution_type: fragment.distribution_type.into(),
417                    actor_count: fragment.actors.len() as _,
418                    vnode_count: fragment.vnode_count,
419                },
420            );
421        }
422
423        Ok(result)
424    }
425
426    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
427        let inner = self.inner.read().await;
428        let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
429            .select_only()
430            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
431            .into_tuple()
432            .all(&inner.db)
433            .await?;
434        Ok(fragment_jobs.into_iter().collect())
435    }
436
437    pub async fn get_fragment_job_id(
438        &self,
439        fragment_ids: Vec<FragmentId>,
440    ) -> MetaResult<Vec<ObjectId>> {
441        let inner = self.inner.read().await;
442
443        let object_ids: Vec<ObjectId> = FragmentModel::find()
444            .select_only()
445            .column(fragment::Column::JobId)
446            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
447            .into_tuple()
448            .all(&inner.db)
449            .await?;
450
451        Ok(object_ids)
452    }
453
454    pub async fn get_fragment_desc_by_id(
455        &self,
456        fragment_id: FragmentId,
457    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
458        let inner = self.inner.read().await;
459
460        let fragment_model = match FragmentModel::find_by_id(fragment_id)
461            .one(&inner.db)
462            .await?
463        {
464            Some(fragment) => fragment,
465            None => return Ok(None),
466        };
467
468        let job_parallelism: Option<StreamingParallelism> =
469            StreamingJob::find_by_id(fragment_model.job_id)
470                .select_only()
471                .column(streaming_job::Column::Parallelism)
472                .into_tuple()
473                .one(&inner.db)
474                .await?;
475
476        let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
477            .select_only()
478            .columns([
479                fragment_relation::Column::SourceFragmentId,
480                fragment_relation::Column::DispatcherType,
481            ])
482            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
483            .into_tuple()
484            .all(&inner.db)
485            .await?;
486
487        let upstreams: Vec<_> = upstream_entries
488            .into_iter()
489            .map(|(source_id, _)| source_id)
490            .collect();
491
492        let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
493            .await
494            .map(Self::collect_root_fragment_mapping)?;
495        let root_fragments = root_fragment_map
496            .get(&fragment_id)
497            .cloned()
498            .unwrap_or_default();
499
500        let info = self.env.shared_actor_infos().read_guard();
501        let SharedFragmentInfo { actors, .. } = info
502            .get_fragment(fragment_model.fragment_id as _)
503            .unwrap_or_else(|| {
504                panic!(
505                    "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
506                    fragment_model.fragment_id,
507                    fragment_model.job_id
508                )
509            });
510
511        let parallelism_policy = Self::format_fragment_parallelism_policy(
512            &fragment_model,
513            job_parallelism.as_ref(),
514            &root_fragments,
515        );
516
517        let fragment = FragmentDesc {
518            fragment_id: fragment_model.fragment_id,
519            job_id: fragment_model.job_id,
520            fragment_type_mask: fragment_model.fragment_type_mask,
521            distribution_type: fragment_model.distribution_type,
522            state_table_ids: fragment_model.state_table_ids.clone(),
523            parallelism: actors.len() as _,
524            vnode_count: fragment_model.vnode_count,
525            stream_node: fragment_model.stream_node.clone(),
526            parallelism_policy,
527        };
528
529        Ok(Some((fragment, upstreams)))
530    }
531
532    pub async fn list_fragment_database_ids(
533        &self,
534        select_fragment_ids: Option<Vec<FragmentId>>,
535    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
536        let inner = self.inner.read().await;
537        let select = FragmentModel::find()
538            .select_only()
539            .column(fragment::Column::FragmentId)
540            .column(object::Column::DatabaseId)
541            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
542        let select = if let Some(select_fragment_ids) = select_fragment_ids {
543            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
544        } else {
545            select
546        };
547        Ok(select.into_tuple().all(&inner.db).await?)
548    }
549
550    pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
551        let inner = self.inner.read().await;
552
553        // Load fragments matching the job from the database
554        let fragments: Vec<_> = FragmentModel::find()
555            .filter(fragment::Column::JobId.eq(job_id))
556            .all(&inner.db)
557            .await?;
558
559        let job_info = StreamingJob::find_by_id(job_id)
560            .one(&inner.db)
561            .await?
562            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
563
564        let fragment_actors =
565            self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
566
567        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
568            .await?
569            .remove(&job_id);
570
571        Self::compose_table_fragments(
572            job_id,
573            job_info.job_status.into(),
574            job_info.stream_context(),
575            fragment_actors,
576            job_info.parallelism.clone(),
577            job_info.max_parallelism as _,
578            job_definition,
579        )
580    }
581
582    pub async fn get_fragment_actor_dispatchers(
583        &self,
584        fragment_ids: Vec<FragmentId>,
585    ) -> MetaResult<FragmentActorDispatchers> {
586        let inner = self.inner.read().await;
587
588        self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
589            .await
590    }
591
592    pub async fn get_fragment_actor_dispatchers_txn(
593        &self,
594        c: &impl ConnectionTrait,
595        fragment_ids: Vec<FragmentId>,
596    ) -> MetaResult<FragmentActorDispatchers> {
597        let fragment_relations = FragmentRelation::find()
598            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
599            .all(c)
600            .await?;
601
602        type FragmentActorInfo = (
603            DistributionType,
604            Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
605        );
606
607        let shared_info = self.env.shared_actor_infos();
608        let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
609        let get_fragment_actors = |fragment_id: FragmentId| async move {
610            let result: MetaResult<FragmentActorInfo> = try {
611                let read_guard = shared_info.read_guard();
612
613                let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
614
615                (
616                    fragment.distribution_type,
617                    Arc::new(
618                        fragment
619                            .actors
620                            .iter()
621                            .map(|(actor_id, actor_info)| {
622                                (
623                                    *actor_id,
624                                    actor_info
625                                        .vnode_bitmap
626                                        .as_ref()
627                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
628                                )
629                            })
630                            .collect(),
631                    ),
632                )
633            };
634            result
635        };
636
637        let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
638        for fragment_relation::Model {
639            source_fragment_id,
640            target_fragment_id,
641            dispatcher_type,
642            dist_key_indices,
643            output_indices,
644            output_type_mapping,
645        } in fragment_relations
646        {
647            let (source_fragment_distribution, source_fragment_actors) = {
648                let (distribution, actors) = {
649                    match fragment_actor_cache.entry(source_fragment_id) {
650                        Entry::Occupied(entry) => entry.into_mut(),
651                        Entry::Vacant(entry) => {
652                            entry.insert(get_fragment_actors(source_fragment_id).await?)
653                        }
654                    }
655                };
656                (*distribution, actors.clone())
657            };
658            let (target_fragment_distribution, target_fragment_actors) = {
659                let (distribution, actors) = {
660                    match fragment_actor_cache.entry(target_fragment_id) {
661                        Entry::Occupied(entry) => entry.into_mut(),
662                        Entry::Vacant(entry) => {
663                            entry.insert(get_fragment_actors(target_fragment_id).await?)
664                        }
665                    }
666                };
667                (*distribution, actors.clone())
668            };
669            let output_mapping = PbDispatchOutputMapping {
670                indices: output_indices.into_u32_array(),
671                types: output_type_mapping.unwrap_or_default().to_protobuf(),
672            };
673            let dispatchers = compose_dispatchers(
674                source_fragment_distribution,
675                &source_fragment_actors,
676                target_fragment_id as _,
677                target_fragment_distribution,
678                &target_fragment_actors,
679                dispatcher_type,
680                dist_key_indices.into_u32_array(),
681                output_mapping,
682            );
683            let actor_dispatchers_map = actor_dispatchers_map
684                .entry(source_fragment_id as _)
685                .or_default();
686            for (actor_id, dispatchers) in dispatchers {
687                actor_dispatchers_map
688                    .entry(actor_id as _)
689                    .or_default()
690                    .push(dispatchers);
691            }
692        }
693        Ok(actor_dispatchers_map)
694    }
695
696    pub async fn get_fragment_downstream_relations(
697        &self,
698        fragment_ids: Vec<FragmentId>,
699    ) -> MetaResult<FragmentDownstreamRelation> {
700        let inner = self.inner.read().await;
701        self.get_fragment_downstream_relations_in_txn(&inner.db, fragment_ids)
702            .await
703    }
704
705    pub async fn get_fragment_downstream_relations_in_txn<C>(
706        &self,
707        txn: &C,
708        fragment_ids: Vec<FragmentId>,
709    ) -> MetaResult<FragmentDownstreamRelation>
710    where
711        C: ConnectionTrait + StreamTrait + Send,
712    {
713        let mut stream = FragmentRelation::find()
714            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
715            .stream(txn)
716            .await?;
717        let mut relations = FragmentDownstreamRelation::new();
718        while let Some(relation) = stream.try_next().await? {
719            relations
720                .entry(relation.source_fragment_id as _)
721                .or_default()
722                .push(DownstreamFragmentRelation {
723                    downstream_fragment_id: relation.target_fragment_id as _,
724                    dispatcher_type: relation.dispatcher_type,
725                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
726                    output_mapping: PbDispatchOutputMapping {
727                        indices: relation.output_indices.into_u32_array(),
728                        types: relation
729                            .output_type_mapping
730                            .unwrap_or_default()
731                            .to_protobuf(),
732                    },
733                });
734        }
735        Ok(relations)
736    }
737
738    pub async fn get_job_fragment_backfill_scan_type(
739        &self,
740        job_id: JobId,
741    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
742        let inner = self.inner.read().await;
743        let fragments: Vec<_> = FragmentModel::find()
744            .filter(fragment::Column::JobId.eq(job_id))
745            .all(&inner.db)
746            .await?;
747
748        let mut result = HashMap::new();
749
750        for fragment::Model {
751            fragment_id,
752            stream_node,
753            ..
754        } in fragments
755        {
756            let stream_node = stream_node.to_protobuf();
757            visit_stream_node_body(&stream_node, |body| {
758                if let NodeBody::StreamScan(node) = body {
759                    match node.stream_scan_type() {
760                        StreamScanType::Unspecified => {}
761                        scan_type => {
762                            result.insert(fragment_id as crate::model::FragmentId, scan_type);
763                        }
764                    }
765                }
766            });
767        }
768
769        Ok(result)
770    }
771
772    pub async fn count_streaming_jobs(&self) -> MetaResult<usize> {
773        let inner = self.inner.read().await;
774        let count = StreamingJob::find().count(&inner.db).await?;
775        Ok(usize::try_from(count).context("streaming job count overflow")?)
776    }
777
778    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
779        let inner = self.inner.read().await;
780        let job_states = StreamingJob::find()
781            .select_only()
782            .column(streaming_job::Column::JobId)
783            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
784            .join(JoinType::InnerJoin, object::Relation::Database2.def())
785            .column(object::Column::ObjType)
786            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
787            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
788            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
789            .column_as(
790                Expr::if_null(
791                    Expr::col((table::Entity, table::Column::Name)),
792                    Expr::if_null(
793                        Expr::col((source::Entity, source::Column::Name)),
794                        Expr::if_null(
795                            Expr::col((sink::Entity, sink::Column::Name)),
796                            Expr::val("<unknown>"),
797                        ),
798                    ),
799                ),
800                "name",
801            )
802            .columns([
803                streaming_job::Column::JobStatus,
804                streaming_job::Column::Parallelism,
805                streaming_job::Column::MaxParallelism,
806            ])
807            .column_as(
808                Expr::if_null(
809                    Expr::col((
810                        streaming_job::Entity,
811                        streaming_job::Column::SpecificResourceGroup,
812                    )),
813                    Expr::col((database::Entity, database::Column::ResourceGroup)),
814                ),
815                "resource_group",
816            )
817            .column_as(
818                Expr::if_null(
819                    Expr::col((streaming_job::Entity, streaming_job::Column::ConfigOverride)),
820                    Expr::val(""),
821                ),
822                "config_override",
823            )
824            .column(object::Column::DatabaseId)
825            .column(object::Column::SchemaId)
826            .into_model()
827            .all(&inner.db)
828            .await?;
829        Ok(job_states)
830    }
831
832    pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
833        let inner = self.inner.read().await;
834        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
835            .select_only()
836            .column(streaming_job::Column::MaxParallelism)
837            .into_tuple()
838            .one(&inner.db)
839            .await?
840            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
841        Ok(max_parallelism as usize)
842    }
843
844    /// Try to get internal table ids of each streaming job, used by metrics collection.
845    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
846        if let Ok(inner) = self.inner.try_read()
847            && let Ok(job_state_tables) = FragmentModel::find()
848                .select_only()
849                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
850                .into_tuple::<(JobId, I32Array)>()
851                .all(&inner.db)
852                .await
853        {
854            let mut job_internal_table_ids = HashMap::new();
855            for (job_id, state_table_ids) in job_state_tables {
856                job_internal_table_ids
857                    .entry(job_id)
858                    .or_insert_with(Vec::new)
859                    .extend(
860                        state_table_ids
861                            .into_inner()
862                            .into_iter()
863                            .map(|table_id| TableId::new(table_id as _)),
864                    );
865            }
866            return Some(job_internal_table_ids.into_iter().collect());
867        }
868        None
869    }
870
871    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
872        let inner = self.inner.read().await;
873        let count = FragmentModel::find().count(&inner.db).await?;
874        Ok(count > 0)
875    }
876
877    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
878        let read_guard = self.env.shared_actor_infos().read_guard();
879        let actor_cnt: HashMap<WorkerId, _> = read_guard
880            .iter_over_fragments()
881            .flat_map(|(_, fragment)| {
882                fragment
883                    .actors
884                    .iter()
885                    .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
886            })
887            .into_group_map()
888            .into_iter()
889            .map(|(k, v)| (k, v.len()))
890            .collect();
891
892        Ok(actor_cnt)
893    }
894
895    fn collect_fragment_actor_map(
896        &self,
897        fragment_ids: &[FragmentId],
898        stream_context: StreamContext,
899    ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
900        let guard = self.env.shared_actor_infos().read_guard();
901        let pb_expr_context = stream_context.to_expr_context();
902        let expr_context: ExprContext = (&pb_expr_context).into();
903
904        let mut actor_map = HashMap::with_capacity(fragment_ids.len());
905        for fragment_id in fragment_ids {
906            let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
907                anyhow!("fragment {} not found in shared actor info", fragment_id)
908            })?;
909
910            let actors = fragment_info
911                .actors
912                .iter()
913                .map(|(actor_id, actor_info)| ActorInfo {
914                    actor_id: *actor_id as _,
915                    fragment_id: *fragment_id,
916                    splits: ConnectorSplits::from(&PbConnectorSplits {
917                        splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
918                    }),
919                    worker_id: actor_info.worker_id as _,
920                    vnode_bitmap: actor_info
921                        .vnode_bitmap
922                        .as_ref()
923                        .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
924                    expr_context: expr_context.clone(),
925                    config_override: stream_context.config_override.clone(),
926                })
927                .collect();
928
929            actor_map.insert(*fragment_id, actors);
930        }
931
932        Ok(actor_map)
933    }
934
935    fn collect_fragment_actor_pairs(
936        &self,
937        fragments: Vec<fragment::Model>,
938        stream_context: StreamContext,
939    ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
940        let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
941        let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
942        fragments
943            .into_iter()
944            .map(|fragment| {
945                let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
946                    anyhow!(
947                        "fragment {} missing in shared actor info map",
948                        fragment.fragment_id
949                    )
950                })?;
951                Ok((fragment, actors))
952            })
953            .collect()
954    }
955
956    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
957    pub async fn table_fragments(&self) -> MetaResult<BTreeMap<JobId, StreamJobFragments>> {
958        let inner = self.inner.read().await;
959        let jobs = StreamingJob::find().all(&inner.db).await?;
960
961        let mut job_definition = resolve_streaming_job_definition(
962            &inner.db,
963            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
964        )
965        .await?;
966
967        let mut table_fragments = BTreeMap::new();
968        for job in jobs {
969            let fragments = FragmentModel::find()
970                .filter(fragment::Column::JobId.eq(job.job_id))
971                .all(&inner.db)
972                .await?;
973
974            let fragment_actors =
975                self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
976
977            table_fragments.insert(
978                job.job_id,
979                Self::compose_table_fragments(
980                    job.job_id,
981                    job.job_status.into(),
982                    job.stream_context(),
983                    fragment_actors,
984                    job.parallelism.clone(),
985                    job.max_parallelism as _,
986                    job_definition.remove(&job.job_id),
987                )?,
988            );
989        }
990
991        Ok(table_fragments)
992    }
993
994    pub async fn upstream_fragments(
995        &self,
996        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
997    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
998        let inner = self.inner.read().await;
999        let mut stream = FragmentRelation::find()
1000            .select_only()
1001            .columns([
1002                fragment_relation::Column::SourceFragmentId,
1003                fragment_relation::Column::TargetFragmentId,
1004            ])
1005            .filter(
1006                fragment_relation::Column::TargetFragmentId
1007                    .is_in(fragment_ids.map(|id| id as FragmentId)),
1008            )
1009            .into_tuple::<(FragmentId, FragmentId)>()
1010            .stream(&inner.db)
1011            .await?;
1012        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
1013        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
1014            upstream_fragments
1015                .entry(downstream_fragment_id as crate::model::FragmentId)
1016                .or_default()
1017                .insert(upstream_fragment_id as crate::model::FragmentId);
1018        }
1019        Ok(upstream_fragments)
1020    }
1021
1022    pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
1023        let info = self.env.shared_actor_infos().read_guard();
1024
1025        let actor_locations = info
1026            .iter_over_fragments()
1027            .flat_map(|(fragment_id, fragment)| {
1028                fragment
1029                    .actors
1030                    .iter()
1031                    .map(|(actor_id, actor)| PartialActorLocation {
1032                        actor_id: *actor_id as _,
1033                        fragment_id: *fragment_id as _,
1034                        worker_id: actor.worker_id,
1035                    })
1036            })
1037            .collect_vec();
1038
1039        Ok(actor_locations)
1040    }
1041
1042    pub async fn list_actor_info(
1043        &self,
1044    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1045        let inner = self.inner.read().await;
1046
1047        let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1048            FragmentModel::find()
1049                .select_only()
1050                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1051                .column(fragment::Column::FragmentId)
1052                .column_as(object::Column::Oid, "job_id")
1053                .column_as(object::Column::SchemaId, "schema_id")
1054                .column_as(object::Column::ObjType, "type")
1055                .into_tuple()
1056                .all(&inner.db)
1057                .await?;
1058
1059        let actor_infos = {
1060            let info = self.env.shared_actor_infos().read_guard();
1061
1062            let mut result = Vec::new();
1063
1064            for (fragment_id, object_id, schema_id, object_type) in fragment_objects {
1065                let Some(fragment) = info.get_fragment(fragment_id as _) else {
1066                    return Err(MetaError::unavailable(format!(
1067                        "shared actor info missing for fragment {fragment_id} while listing actors"
1068                    )));
1069                };
1070
1071                for actor_id in fragment.actors.keys() {
1072                    result.push((
1073                        *actor_id as _,
1074                        fragment.fragment_id as _,
1075                        object_id,
1076                        schema_id,
1077                        object_type,
1078                    ));
1079                }
1080            }
1081
1082            result
1083        };
1084
1085        Ok(actor_infos)
1086    }
1087
1088    pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1089        let guard = self.env.shared_actor_info.read_guard();
1090        guard
1091            .iter_over_fragments()
1092            .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1093            .collect_vec()
1094    }
1095
1096    pub async fn list_fragment_descs(
1097        &self,
1098        is_creating: bool,
1099    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1100        let inner = self.inner.read().await;
1101
1102        let txn = inner.db.begin().await?;
1103
1104        let fragments: Vec<_> = if is_creating {
1105            FragmentModel::find()
1106                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1107                .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1108                .filter(
1109                    streaming_job::Column::JobStatus
1110                        .eq(JobStatus::Initial)
1111                        .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1112                )
1113                .all(&txn)
1114                .await?
1115        } else {
1116            FragmentModel::find().all(&txn).await?
1117        };
1118
1119        let fragment_ids = fragments.iter().map(|f| f.fragment_id).collect_vec();
1120
1121        let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragments.is_empty() {
1122            HashMap::new()
1123        } else {
1124            let job_ids = fragments.iter().map(|f| f.job_id).unique().collect_vec();
1125            StreamingJob::find()
1126                .select_only()
1127                .columns([
1128                    streaming_job::Column::JobId,
1129                    streaming_job::Column::Parallelism,
1130                ])
1131                .filter(streaming_job::Column::JobId.is_in(job_ids))
1132                .into_tuple()
1133                .all(&txn)
1134                .await?
1135                .into_iter()
1136                .collect()
1137        };
1138
1139        let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1140            if fragment_ids.is_empty() {
1141                Vec::new()
1142            } else {
1143                FragmentRelation::find()
1144                    .select_only()
1145                    .columns([
1146                        fragment_relation::Column::TargetFragmentId,
1147                        fragment_relation::Column::SourceFragmentId,
1148                        fragment_relation::Column::DispatcherType,
1149                    ])
1150                    .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1151                    .into_tuple()
1152                    .all(&txn)
1153                    .await?
1154            };
1155
1156        let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1157        for (target_id, source_id, _) in upstream_entries {
1158            all_upstreams.entry(target_id).or_default().push(source_id);
1159        }
1160
1161        let root_fragment_map = if fragment_ids.is_empty() {
1162            HashMap::new()
1163        } else {
1164            let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?;
1165            Self::collect_root_fragment_mapping(ensembles)
1166        };
1167
1168        let guard = self.env.shared_actor_info.read_guard();
1169
1170        let mut result = Vec::new();
1171
1172        for fragment_desc in fragments {
1173            // note: here sometimes fragment is not found in the cache, fallback to 0
1174            let parallelism = guard
1175                .get_fragment(fragment_desc.fragment_id as _)
1176                .map(|fragment| fragment.actors.len())
1177                .unwrap_or_default();
1178
1179            let root_fragments = root_fragment_map
1180                .get(&fragment_desc.fragment_id)
1181                .cloned()
1182                .unwrap_or_default();
1183
1184            let upstreams = all_upstreams
1185                .remove(&fragment_desc.fragment_id)
1186                .unwrap_or_default();
1187
1188            let parallelism_policy = Self::format_fragment_parallelism_policy(
1189                &fragment_desc,
1190                job_parallelisms.get(&fragment_desc.job_id),
1191                &root_fragments,
1192            );
1193
1194            let fragment = FragmentDistribution {
1195                fragment_id: fragment_desc.fragment_id,
1196                table_id: fragment_desc.job_id,
1197                distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type)
1198                    as _,
1199                state_table_ids: fragment_desc.state_table_ids.0,
1200                upstream_fragment_ids: upstreams.clone(),
1201                fragment_type_mask: fragment_desc.fragment_type_mask as _,
1202                parallelism: parallelism as _,
1203                vnode_count: fragment_desc.vnode_count as _,
1204                node: Some(fragment_desc.stream_node.to_protobuf()),
1205                parallelism_policy,
1206            };
1207
1208            result.push((fragment, upstreams));
1209        }
1210        Ok(result)
1211    }
1212
1213    /// Build a fragment-to-root lookup for all reported root fragment ensembles.
1214    fn collect_root_fragment_mapping(
1215        ensembles: Vec<NoShuffleEnsemble>,
1216    ) -> HashMap<FragmentId, Vec<FragmentId>> {
1217        let mut mapping = HashMap::new();
1218
1219        for ensemble in ensembles {
1220            let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1221            roots.sort_unstable();
1222            roots.dedup();
1223
1224            if roots.is_empty() {
1225                continue;
1226            }
1227
1228            let root_set: HashSet<_> = roots.iter().copied().collect();
1229
1230            for fragment_id in ensemble.component_fragments() {
1231                if root_set.contains(&fragment_id) {
1232                    mapping.insert(fragment_id, Vec::new());
1233                } else {
1234                    mapping.insert(fragment_id, roots.clone());
1235                }
1236            }
1237        }
1238
1239        mapping
1240    }
1241
1242    fn format_fragment_parallelism_policy(
1243        fragment: &fragment::Model,
1244        job_parallelism: Option<&StreamingParallelism>,
1245        root_fragments: &[FragmentId],
1246    ) -> String {
1247        if fragment.distribution_type == DistributionType::Single {
1248            return "single".to_owned();
1249        }
1250
1251        if let Some(parallelism) = fragment.parallelism.as_ref() {
1252            return format!(
1253                "override({})",
1254                Self::format_streaming_parallelism(parallelism)
1255            );
1256        }
1257
1258        if !root_fragments.is_empty() {
1259            let mut upstreams = root_fragments.to_vec();
1260            upstreams.sort_unstable();
1261            upstreams.dedup();
1262
1263            return format!("upstream_fragment({upstreams:?})");
1264        }
1265
1266        let inherited = job_parallelism
1267            .map(Self::format_streaming_parallelism)
1268            .unwrap_or_else(|| "unknown".to_owned());
1269        format!("inherit({inherited})")
1270    }
1271
1272    fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1273        match parallelism {
1274            StreamingParallelism::Adaptive => "adaptive".to_owned(),
1275            StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1276            StreamingParallelism::Custom => "custom".to_owned(),
1277        }
1278    }
1279
1280    pub async fn list_sink_actor_mapping(
1281        &self,
1282    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1283        let inner = self.inner.read().await;
1284        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1285            .select_only()
1286            .columns([sink::Column::SinkId, sink::Column::Name])
1287            .into_tuple()
1288            .all(&inner.db)
1289            .await?;
1290        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1291
1292        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1293
1294        let actor_with_type: Vec<(ActorId, SinkId)> = {
1295            let info = self.env.shared_actor_infos().read_guard();
1296
1297            info.iter_over_fragments()
1298                .filter(|(_, fragment)| {
1299                    sink_ids.contains(&fragment.job_id.as_sink_id())
1300                        && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1301                })
1302                .flat_map(|(_, fragment)| {
1303                    fragment
1304                        .actors
1305                        .keys()
1306                        .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1307                })
1308                .collect()
1309        };
1310
1311        let mut sink_actor_mapping = HashMap::new();
1312        for (actor_id, sink_id) in actor_with_type {
1313            sink_actor_mapping
1314                .entry(sink_id)
1315                .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1316                .1
1317                .push(actor_id);
1318        }
1319
1320        Ok(sink_actor_mapping)
1321    }
1322
1323    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1324        let inner = self.inner.read().await;
1325        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1326            .select_only()
1327            .columns([
1328                fragment::Column::FragmentId,
1329                fragment::Column::JobId,
1330                fragment::Column::StateTableIds,
1331            ])
1332            .into_partial_model()
1333            .all(&inner.db)
1334            .await?;
1335        Ok(fragment_state_tables)
1336    }
1337
1338    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1339    /// collected
1340    pub async fn load_all_actors_dynamic(
1341        &self,
1342        database_id: Option<DatabaseId>,
1343        worker_nodes: &ActiveStreamingWorkerNodes,
1344    ) -> MetaResult<FragmentRenderMap> {
1345        let loaded = self.load_fragment_context(database_id).await?;
1346
1347        if loaded.is_empty() {
1348            return Ok(HashMap::new());
1349        }
1350
1351        let adaptive_parallelism_strategy = {
1352            let system_params_reader = self.env.system_params_reader().await;
1353            system_params_reader.adaptive_parallelism_strategy()
1354        };
1355
1356        let available_workers: BTreeMap<_, _> = worker_nodes
1357            .current()
1358            .values()
1359            .filter(|worker| worker.is_streaming_schedulable())
1360            .map(|worker| {
1361                (
1362                    worker.id,
1363                    WorkerInfo {
1364                        parallelism: NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
1365                        resource_group: worker.resource_group(),
1366                    },
1367                )
1368            })
1369            .collect();
1370
1371        let RenderedGraph { fragments, .. } = render_actor_assignments(
1372            self.env.actor_id_generator(),
1373            &available_workers,
1374            adaptive_parallelism_strategy,
1375            &loaded,
1376        )?;
1377
1378        tracing::trace!(?fragments, "reload all actors");
1379
1380        Ok(fragments)
1381    }
1382
1383    /// Async load stage: collects all metadata required for rendering actor assignments.
1384    pub async fn load_fragment_context(
1385        &self,
1386        database_id: Option<DatabaseId>,
1387    ) -> MetaResult<LoadedFragmentContext> {
1388        let inner = self.inner.read().await;
1389        let txn = inner.db.begin().await?;
1390
1391        self.load_fragment_context_in_txn(&txn, database_id).await
1392    }
1393
1394    pub async fn load_fragment_context_in_txn<C>(
1395        &self,
1396        txn: &C,
1397        database_id: Option<DatabaseId>,
1398    ) -> MetaResult<LoadedFragmentContext>
1399    where
1400        C: ConnectionTrait,
1401    {
1402        let mut query = StreamingJob::find()
1403            .select_only()
1404            .column(streaming_job::Column::JobId);
1405
1406        if let Some(database_id) = database_id {
1407            query = query
1408                .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1409                .filter(object::Column::DatabaseId.eq(database_id));
1410        }
1411
1412        let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
1413
1414        let jobs: HashSet<JobId> = jobs.into_iter().collect();
1415
1416        if jobs.is_empty() {
1417            return Ok(LoadedFragmentContext::default());
1418        }
1419
1420        load_fragment_context_for_jobs(txn, jobs).await
1421    }
1422
1423    #[await_tree::instrument]
1424    pub async fn fill_snapshot_backfill_epoch(
1425        &self,
1426        fragment_ids: impl Iterator<Item = FragmentId>,
1427        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1428        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1429    ) -> MetaResult<()> {
1430        let inner = self.inner.write().await;
1431        let txn = inner.db.begin().await?;
1432        for fragment_id in fragment_ids {
1433            let fragment = FragmentModel::find_by_id(fragment_id)
1434                .one(&txn)
1435                .await?
1436                .context(format!("fragment {} not found", fragment_id))?;
1437            let mut node = fragment.stream_node.to_protobuf();
1438            if crate::stream::fill_snapshot_backfill_epoch(
1439                &mut node,
1440                snapshot_backfill_info,
1441                cross_db_snapshot_backfill_info,
1442            )? {
1443                let node = StreamNode::from(&node);
1444                FragmentModel::update(fragment::ActiveModel {
1445                    fragment_id: Set(fragment_id),
1446                    stream_node: Set(node),
1447                    ..Default::default()
1448                })
1449                .exec(&txn)
1450                .await?;
1451            }
1452        }
1453        txn.commit().await?;
1454        Ok(())
1455    }
1456
1457    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1458    pub fn get_running_actors_of_fragment(
1459        &self,
1460        fragment_id: FragmentId,
1461    ) -> MetaResult<HashSet<model::ActorId>> {
1462        let info = self.env.shared_actor_infos().read_guard();
1463
1464        let actors = info
1465            .get_fragment(fragment_id as _)
1466            .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1467            .unwrap_or_default();
1468
1469        Ok(actors)
1470    }
1471
1472    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1473    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1474    pub async fn get_running_actors_for_source_backfill(
1475        &self,
1476        source_backfill_fragment_id: FragmentId,
1477        source_fragment_id: FragmentId,
1478    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1479        let inner = self.inner.read().await;
1480        let txn = inner.db.begin().await?;
1481
1482        let fragment_relation: DispatcherType = FragmentRelation::find()
1483            .select_only()
1484            .column(fragment_relation::Column::DispatcherType)
1485            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1486            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1487            .into_tuple()
1488            .one(&txn)
1489            .await?
1490            .ok_or_else(|| {
1491                anyhow!(
1492                    "no fragment connection from source fragment {} to source backfill fragment {}",
1493                    source_fragment_id,
1494                    source_backfill_fragment_id
1495                )
1496            })?;
1497
1498        if fragment_relation != DispatcherType::NoShuffle {
1499            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1500        }
1501
1502        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1503            let result: MetaResult<DistributionType> = try {
1504                FragmentModel::find_by_id(fragment_id)
1505                    .select_only()
1506                    .column(fragment::Column::DistributionType)
1507                    .into_tuple()
1508                    .one(txn)
1509                    .await?
1510                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1511            };
1512            result
1513        };
1514
1515        let source_backfill_distribution_type =
1516            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1517        let source_distribution_type =
1518            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1519
1520        let load_fragment_actor_distribution =
1521            |actor_info: &SharedActorInfos,
1522             fragment_id: FragmentId|
1523             -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1524                let guard = actor_info.read_guard();
1525
1526                guard
1527                    .get_fragment(fragment_id as _)
1528                    .map(|fragment| {
1529                        fragment
1530                            .actors
1531                            .iter()
1532                            .map(|(actor_id, actor)| {
1533                                (
1534                                    *actor_id as _,
1535                                    actor
1536                                        .vnode_bitmap
1537                                        .as_ref()
1538                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1539                                )
1540                            })
1541                            .collect()
1542                    })
1543                    .unwrap_or_default()
1544            };
1545
1546        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1547            load_fragment_actor_distribution(
1548                self.env.shared_actor_infos(),
1549                source_backfill_fragment_id,
1550            );
1551
1552        let source_actors =
1553            load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1554
1555        Ok(resolve_no_shuffle_actor_dispatcher(
1556            source_distribution_type,
1557            &source_actors,
1558            source_backfill_distribution_type,
1559            &source_backfill_actors,
1560        )
1561        .into_iter()
1562        .map(|(source_actor, source_backfill_actor)| {
1563            (source_backfill_actor as _, source_actor as _)
1564        })
1565        .collect())
1566    }
1567
1568    /// Get and filter the "**root**" fragments of the specified jobs.
1569    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1570    ///
1571    /// Root fragment connects to downstream jobs.
1572    ///
1573    /// ## What can be the root fragment
1574    /// - For sink, it should have one `Sink` fragment.
1575    /// - For MV, it should have one `MView` fragment.
1576    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1577    /// - For source, it should have one `Source` fragment.
1578    ///
1579    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1580    pub async fn get_root_fragments(
1581        &self,
1582        job_ids: Vec<JobId>,
1583    ) -> MetaResult<(
1584        HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1585        HashMap<ActorId, WorkerId>,
1586    )> {
1587        let inner = self.inner.read().await;
1588
1589        let all_fragments = FragmentModel::find()
1590            .filter(fragment::Column::JobId.is_in(job_ids))
1591            .all(&inner.db)
1592            .await?;
1593        // job_id -> fragment
1594        let mut root_fragments = HashMap::<JobId, fragment::Model>::new();
1595        for fragment in all_fragments {
1596            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1597            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1598                _ = root_fragments.insert(fragment.job_id, fragment);
1599            } else if mask.contains(FragmentTypeFlag::Source) {
1600                // look for Source fragment only if there's no MView fragment
1601                // (notice try_insert here vs insert above)
1602                _ = root_fragments.try_insert(fragment.job_id, fragment);
1603            }
1604        }
1605
1606        let mut root_fragments_pb = HashMap::new();
1607
1608        let info = self.env.shared_actor_infos().read_guard();
1609
1610        let root_fragment_to_jobs: HashMap<_, _> = root_fragments
1611            .iter()
1612            .map(|(job_id, fragment)| (fragment.fragment_id, *job_id))
1613            .collect();
1614
1615        for fragment in root_fragment_to_jobs.keys() {
1616            let fragment_info = info.get_fragment(*fragment).context(format!(
1617                "root fragment {} not found in shared actor info",
1618                fragment
1619            ))?;
1620
1621            let job_id = root_fragment_to_jobs[&fragment_info.fragment_id];
1622            let fragment = root_fragments
1623                .get(&job_id)
1624                .context(format!("root fragment for job {} not found", job_id))?;
1625
1626            root_fragments_pb.insert(
1627                job_id,
1628                (fragment_info.clone(), fragment.stream_node.to_protobuf()),
1629            );
1630        }
1631
1632        let mut all_actor_locations = HashMap::new();
1633
1634        for (_, SharedFragmentInfo { actors, .. }) in info.iter_over_fragments() {
1635            for (actor_id, actor_info) in actors {
1636                all_actor_locations.insert(*actor_id as ActorId, actor_info.worker_id);
1637            }
1638        }
1639
1640        Ok((root_fragments_pb, all_actor_locations))
1641    }
1642
1643    pub async fn get_root_fragment(
1644        &self,
1645        job_id: JobId,
1646    ) -> MetaResult<(SharedFragmentInfo, HashMap<ActorId, WorkerId>)> {
1647        let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1648        let (root_fragment, _) = root_fragments
1649            .remove(&job_id)
1650            .context(format!("root fragment for job {} not found", job_id))?;
1651
1652        Ok((root_fragment, actors))
1653    }
1654
1655    /// Get the downstream fragments connected to the specified job.
1656    pub async fn get_downstream_fragments(
1657        &self,
1658        job_id: JobId,
1659    ) -> MetaResult<(
1660        Vec<(
1661            stream_plan::DispatcherType,
1662            SharedFragmentInfo,
1663            PbStreamNode,
1664        )>,
1665        HashMap<ActorId, WorkerId>,
1666    )> {
1667        let (root_fragment, actor_locations) = self.get_root_fragment(job_id).await?;
1668
1669        let inner = self.inner.read().await;
1670        let txn = inner.db.begin().await?;
1671        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1672            .filter(
1673                fragment_relation::Column::SourceFragmentId
1674                    .eq(root_fragment.fragment_id as FragmentId),
1675            )
1676            .all(&txn)
1677            .await?;
1678
1679        let downstream_fragment_ids = downstream_fragment_relations
1680            .iter()
1681            .map(|model| model.target_fragment_id as FragmentId)
1682            .collect::<HashSet<_>>();
1683
1684        let downstream_fragment_nodes: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1685            .select_only()
1686            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1687            .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1688            .into_tuple()
1689            .all(&txn)
1690            .await?;
1691
1692        let downstream_fragment_nodes: HashMap<_, _> =
1693            downstream_fragment_nodes.into_iter().collect();
1694
1695        let mut downstream_fragments = vec![];
1696
1697        let info = self.env.shared_actor_infos().read_guard();
1698
1699        let fragment_map: HashMap<_, _> = downstream_fragment_relations
1700            .iter()
1701            .map(|model| (model.target_fragment_id, model.dispatcher_type))
1702            .collect();
1703
1704        for fragment_id in fragment_map.keys() {
1705            let fragment_info @ SharedFragmentInfo { actors, .. } =
1706                info.get_fragment(*fragment_id).unwrap();
1707
1708            let dispatcher_type = fragment_map[fragment_id];
1709
1710            if actors.is_empty() {
1711                bail!("No fragment found for fragment id {}", fragment_id);
1712            }
1713
1714            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1715
1716            let nodes = downstream_fragment_nodes
1717                .get(fragment_id)
1718                .context(format!(
1719                    "downstream fragment node for id {} not found",
1720                    fragment_id
1721                ))?
1722                .to_protobuf();
1723
1724            downstream_fragments.push((dispatch_type, fragment_info.clone(), nodes));
1725        }
1726        Ok((downstream_fragments, actor_locations))
1727    }
1728
1729    pub async fn load_source_fragment_ids(
1730        &self,
1731    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1732        let inner = self.inner.read().await;
1733        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1734            .select_only()
1735            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1736            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1737            .into_tuple()
1738            .all(&inner.db)
1739            .await?;
1740
1741        let mut source_fragment_ids = HashMap::new();
1742        for (fragment_id, stream_node) in fragments {
1743            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1744                source_fragment_ids
1745                    .entry(source_id)
1746                    .or_insert_with(BTreeSet::new)
1747                    .insert(fragment_id);
1748            }
1749        }
1750        Ok(source_fragment_ids)
1751    }
1752
1753    pub async fn load_backfill_fragment_ids(
1754        &self,
1755    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1756        let inner = self.inner.read().await;
1757        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1758            .select_only()
1759            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1760            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1761            .into_tuple()
1762            .all(&inner.db)
1763            .await?;
1764
1765        let mut source_fragment_ids = HashMap::new();
1766        for (fragment_id, stream_node) in fragments {
1767            if let Some((source_id, upstream_source_fragment_id)) =
1768                stream_node.to_protobuf().find_source_backfill()
1769            {
1770                source_fragment_ids
1771                    .entry(source_id)
1772                    .or_insert_with(BTreeSet::new)
1773                    .insert((fragment_id, upstream_source_fragment_id));
1774            }
1775        }
1776        Ok(source_fragment_ids)
1777    }
1778
1779    pub async fn get_all_upstream_sink_infos(
1780        &self,
1781        target_table: &PbTable,
1782        target_fragment_id: FragmentId,
1783    ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1784        let inner = self.inner.read().await;
1785        let txn = inner.db.begin().await?;
1786
1787        self.get_all_upstream_sink_infos_in_txn(&txn, target_table, target_fragment_id)
1788            .await
1789    }
1790
1791    pub async fn get_all_upstream_sink_infos_in_txn<C>(
1792        &self,
1793        txn: &C,
1794        target_table: &PbTable,
1795        target_fragment_id: FragmentId,
1796    ) -> MetaResult<Vec<UpstreamSinkInfo>>
1797    where
1798        C: ConnectionTrait,
1799    {
1800        let incoming_sinks = self
1801            .get_table_incoming_sinks_in_txn(txn, target_table.id)
1802            .await?;
1803
1804        let sink_ids = incoming_sinks.iter().map(|s| s.id).collect_vec();
1805        let sink_fragment_ids = get_sink_fragment_by_ids(txn, sink_ids).await?;
1806
1807        let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1808        for pb_sink in &incoming_sinks {
1809            let sink_fragment_id =
1810                sink_fragment_ids
1811                    .get(&pb_sink.id)
1812                    .cloned()
1813                    .ok_or(anyhow::anyhow!(
1814                        "sink fragment not found for sink id {}",
1815                        pb_sink.id
1816                    ))?;
1817            let upstream_info = build_upstream_sink_info(
1818                pb_sink,
1819                sink_fragment_id,
1820                target_table,
1821                target_fragment_id,
1822            )?;
1823            upstream_sink_infos.push(upstream_info);
1824        }
1825
1826        Ok(upstream_sink_infos)
1827    }
1828
1829    pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1830        let inner = self.inner.read().await;
1831        let txn = inner.db.begin().await?;
1832
1833        let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1834            .select_only()
1835            .column(fragment::Column::FragmentId)
1836            .filter(
1837                fragment::Column::JobId
1838                    .eq(job_id)
1839                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1840            )
1841            .into_tuple()
1842            .all(&txn)
1843            .await?;
1844
1845        if mview_fragment.len() != 1 {
1846            return Err(anyhow::anyhow!(
1847                "expected exactly one mview fragment for job {}, found {}",
1848                job_id,
1849                mview_fragment.len()
1850            )
1851            .into());
1852        }
1853
1854        Ok(mview_fragment.into_iter().next().unwrap())
1855    }
1856
1857    pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1858        let inner = self.inner.read().await;
1859        let txn = inner.db.begin().await?;
1860        has_table_been_migrated(&txn, table_id).await
1861    }
1862
1863    pub async fn update_fragment_splits<C>(
1864        &self,
1865        txn: &C,
1866        fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
1867    ) -> MetaResult<()>
1868    where
1869        C: ConnectionTrait,
1870    {
1871        if fragment_splits.is_empty() {
1872            return Ok(());
1873        }
1874
1875        let models: Vec<fragment_splits::ActiveModel> = fragment_splits
1876            .iter()
1877            .map(|(fragment_id, splits)| fragment_splits::ActiveModel {
1878                fragment_id: Set(*fragment_id as _),
1879                splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1880                    splits: splits.iter().map(Into::into).collect_vec(),
1881                }))),
1882            })
1883            .collect();
1884
1885        FragmentSplits::insert_many(models)
1886            .on_conflict(
1887                OnConflict::column(fragment_splits::Column::FragmentId)
1888                    .update_column(fragment_splits::Column::Splits)
1889                    .to_owned(),
1890            )
1891            .exec(txn)
1892            .await?;
1893
1894        Ok(())
1895    }
1896}
1897
1898#[cfg(test)]
1899mod tests {
1900    use std::collections::{BTreeMap, HashMap, HashSet};
1901
1902    use itertools::Itertools;
1903    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1904    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1905    use risingwave_common::id::JobId;
1906    use risingwave_common::util::iter_util::ZipEqDebug;
1907    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1908    use risingwave_meta_model::fragment::DistributionType;
1909    use risingwave_meta_model::*;
1910    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1911    use risingwave_pb::plan_common::PbExprContext;
1912    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1913    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1914    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1915
1916    use super::ActorInfo;
1917    use crate::MetaResult;
1918    use crate::controller::catalog::CatalogController;
1919    use crate::model::{Fragment, StreamActor};
1920
1921    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1922
1923    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1924
1925    const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
1926
1927    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
1928
1929    const TEST_JOB_ID: JobId = JobId::new(1);
1930
1931    const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
1932
1933    fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
1934        let mut upstream_actor_ids = BTreeMap::new();
1935        upstream_actor_ids.insert(
1936            TEST_UPSTREAM_FRAGMENT_ID,
1937            HashSet::from_iter([(actor_id + 100)]),
1938        );
1939        upstream_actor_ids.insert(
1940            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1941            HashSet::from_iter([(actor_id + 200)]),
1942        );
1943        upstream_actor_ids
1944    }
1945
1946    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1947        let mut input = vec![];
1948        for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
1949            input.push(PbStreamNode {
1950                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1951                    upstream_fragment_id,
1952                    ..Default::default()
1953                }))),
1954                ..Default::default()
1955            });
1956        }
1957
1958        PbStreamNode {
1959            input,
1960            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1961            ..Default::default()
1962        }
1963    }
1964
1965    #[tokio::test]
1966    async fn test_extract_fragment() -> MetaResult<()> {
1967        let actor_count = 3u32;
1968        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1969            .map(|actor_id| {
1970                (
1971                    actor_id.into(),
1972                    generate_upstream_actor_ids_for_actor(actor_id.into()),
1973                )
1974            })
1975            .collect();
1976
1977        let actor_bitmaps = ActorMapping::new_uniform(
1978            (0..actor_count).map(|i| i.into()),
1979            VirtualNode::COUNT_FOR_TEST,
1980        )
1981        .to_bitmaps();
1982
1983        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1984
1985        let pb_actors = (0..actor_count)
1986            .map(|actor_id| StreamActor {
1987                actor_id: actor_id.into(),
1988                fragment_id: TEST_FRAGMENT_ID as _,
1989                vnode_bitmap: actor_bitmaps.get(&actor_id.into()).cloned(),
1990                mview_definition: "".to_owned(),
1991                expr_context: Some(PbExprContext {
1992                    time_zone: String::from("America/New_York"),
1993                    strict_mode: false,
1994                }),
1995                config_override: "".into(),
1996            })
1997            .collect_vec();
1998
1999        let pb_fragment = Fragment {
2000            fragment_id: TEST_FRAGMENT_ID as _,
2001            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
2002            distribution_type: PbFragmentDistributionType::Hash as _,
2003            actors: pb_actors,
2004            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
2005            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
2006            nodes: stream_node,
2007        };
2008
2009        let fragment =
2010            CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
2011
2012        check_fragment(fragment, pb_fragment);
2013
2014        Ok(())
2015    }
2016
2017    #[tokio::test]
2018    async fn test_compose_fragment() -> MetaResult<()> {
2019        let actor_count = 3u32;
2020
2021        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2022            .map(|actor_id| {
2023                (
2024                    actor_id.into(),
2025                    generate_upstream_actor_ids_for_actor(actor_id.into()),
2026                )
2027            })
2028            .collect();
2029
2030        let mut actor_bitmaps = ActorMapping::new_uniform(
2031            (0..actor_count).map(|i| i.into()),
2032            VirtualNode::COUNT_FOR_TEST,
2033        )
2034        .to_bitmaps();
2035
2036        let actors = (0..actor_count)
2037            .map(|actor_id| {
2038                let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
2039                    splits: vec![PbConnectorSplit {
2040                        split_type: "dummy".to_owned(),
2041                        ..Default::default()
2042                    }],
2043                });
2044
2045                ActorInfo {
2046                    actor_id: actor_id.into(),
2047                    fragment_id: TEST_FRAGMENT_ID,
2048                    splits: actor_splits,
2049                    worker_id: 0.into(),
2050                    vnode_bitmap: actor_bitmaps
2051                        .remove(&actor_id.into())
2052                        .map(|bitmap| bitmap.to_protobuf())
2053                        .as_ref()
2054                        .map(VnodeBitmap::from),
2055                    expr_context: ExprContext::from(&PbExprContext {
2056                        time_zone: String::from("America/New_York"),
2057                        strict_mode: false,
2058                    }),
2059                    config_override: "a.b.c = true".into(),
2060                }
2061            })
2062            .collect_vec();
2063
2064        let stream_node = {
2065            let template_actor = actors.first().cloned().unwrap();
2066
2067            let template_upstream_actor_ids = upstream_actor_ids
2068                .get(&(template_actor.actor_id as _))
2069                .unwrap();
2070
2071            generate_merger_stream_node(template_upstream_actor_ids)
2072        };
2073
2074        #[expect(deprecated)]
2075        let fragment = fragment::Model {
2076            fragment_id: TEST_FRAGMENT_ID,
2077            job_id: TEST_JOB_ID,
2078            fragment_type_mask: 0,
2079            distribution_type: DistributionType::Hash,
2080            stream_node: StreamNode::from(&stream_node),
2081            state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
2082            upstream_fragment_id: Default::default(),
2083            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2084            parallelism: None,
2085        };
2086
2087        let (pb_fragment, pb_actor_status, pb_actor_splits) =
2088            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2089
2090        assert_eq!(pb_actor_status.len(), actor_count as usize);
2091        assert!(
2092            pb_actor_status
2093                .values()
2094                .all(|actor_status| actor_status.location.is_some())
2095        );
2096        assert_eq!(pb_actor_splits.len(), actor_count as usize);
2097
2098        let pb_actors = pb_fragment.actors.clone();
2099
2100        check_fragment(fragment, pb_fragment);
2101        check_actors(
2102            actors,
2103            &upstream_actor_ids,
2104            pb_actors,
2105            pb_actor_splits,
2106            &stream_node,
2107        );
2108
2109        Ok(())
2110    }
2111
2112    fn check_actors(
2113        actors: Vec<ActorInfo>,
2114        actor_upstreams: &FragmentActorUpstreams,
2115        pb_actors: Vec<StreamActor>,
2116        pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2117        stream_node: &PbStreamNode,
2118    ) {
2119        for (
2120            ActorInfo {
2121                actor_id,
2122                fragment_id,
2123                splits,
2124                worker_id: _,
2125                vnode_bitmap,
2126                expr_context,
2127                ..
2128            },
2129            StreamActor {
2130                actor_id: pb_actor_id,
2131                fragment_id: pb_fragment_id,
2132                vnode_bitmap: pb_vnode_bitmap,
2133                mview_definition,
2134                expr_context: pb_expr_context,
2135                ..
2136            },
2137        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2138        {
2139            assert_eq!(actor_id, pb_actor_id as ActorId);
2140            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2141
2142            assert_eq!(
2143                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2144                pb_vnode_bitmap,
2145            );
2146
2147            assert_eq!(mview_definition, "");
2148
2149            visit_stream_node_body(stream_node, |body| {
2150                if let PbNodeBody::Merge(m) = body {
2151                    assert!(
2152                        actor_upstreams
2153                            .get(&(actor_id as _))
2154                            .unwrap()
2155                            .contains_key(&m.upstream_fragment_id)
2156                    );
2157                }
2158            });
2159
2160            assert_eq!(
2161                splits,
2162                pb_actor_splits
2163                    .get(&pb_actor_id)
2164                    .map(ConnectorSplits::from)
2165                    .unwrap_or_default()
2166            );
2167
2168            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2169        }
2170    }
2171
2172    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2173        let Fragment {
2174            fragment_id,
2175            fragment_type_mask,
2176            distribution_type: pb_distribution_type,
2177            actors: _,
2178            state_table_ids: pb_state_table_ids,
2179            maybe_vnode_count: _,
2180            nodes,
2181        } = pb_fragment;
2182
2183        assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2184        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2185        assert_eq!(
2186            pb_distribution_type,
2187            PbFragmentDistributionType::from(fragment.distribution_type)
2188        );
2189
2190        assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2191        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2192    }
2193
2194    #[test]
2195    fn test_parallelism_policy_with_root_fragments() {
2196        #[expect(deprecated)]
2197        let fragment = fragment::Model {
2198            fragment_id: 3.into(),
2199            job_id: TEST_JOB_ID,
2200            fragment_type_mask: 0,
2201            distribution_type: DistributionType::Hash,
2202            stream_node: StreamNode::from(&PbStreamNode::default()),
2203            state_table_ids: TableIdArray::default(),
2204            upstream_fragment_id: Default::default(),
2205            vnode_count: 0,
2206            parallelism: None,
2207        };
2208
2209        let job_parallelism = StreamingParallelism::Fixed(4);
2210
2211        let policy = super::CatalogController::format_fragment_parallelism_policy(
2212            &fragment,
2213            Some(&job_parallelism),
2214            &[],
2215        );
2216
2217        assert_eq!(policy, "inherit(fixed(4))");
2218    }
2219
2220    #[test]
2221    fn test_parallelism_policy_with_upstream_roots() {
2222        #[expect(deprecated)]
2223        let fragment = fragment::Model {
2224            fragment_id: 5.into(),
2225            job_id: TEST_JOB_ID,
2226            fragment_type_mask: 0,
2227            distribution_type: DistributionType::Hash,
2228            stream_node: StreamNode::from(&PbStreamNode::default()),
2229            state_table_ids: TableIdArray::default(),
2230            upstream_fragment_id: Default::default(),
2231            vnode_count: 0,
2232            parallelism: None,
2233        };
2234
2235        let policy = super::CatalogController::format_fragment_parallelism_policy(
2236            &fragment,
2237            None,
2238            &[3.into(), 1.into(), 2.into(), 1.into()],
2239        );
2240
2241        assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2242    }
2243}