risingwave_meta/controller/
fragment.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::{Either, Itertools};
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::AdaptiveParallelismStrategy;
28use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
29use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
30use risingwave_connector::source::SplitImpl;
31use risingwave_connector::source::cdc::CdcScanOptions;
32use risingwave_meta_model::fragment::DistributionType;
33use risingwave_meta_model::object::ObjectType;
34use risingwave_meta_model::prelude::{
35    Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
36};
37use risingwave_meta_model::{
38    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
39    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
40    TableIdArray, VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits,
41    object, sink, source, streaming_job, table,
42};
43use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
44use risingwave_pb::catalog::PbTable;
45use risingwave_pb::common::PbActorLocation;
46use risingwave_pb::meta::subscribe_response::{
47    Info as NotificationInfo, Operation as NotificationOperation,
48};
49use risingwave_pb::meta::table_fragments::fragment::{
50    FragmentDistributionType, PbFragmentDistributionType,
51};
52use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
53use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
54use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
55use risingwave_pb::stream_plan;
56use risingwave_pb::stream_plan::stream_node::NodeBody;
57use risingwave_pb::stream_plan::{
58    PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, SinkLogStoreType,
59    StreamScanType,
60};
61use sea_orm::ActiveValue::Set;
62use sea_orm::sea_query::Expr;
63use sea_orm::{
64    ColumnTrait, ConnectionTrait, DatabaseTransaction, EntityTrait, FromQueryResult, JoinType,
65    PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, StreamTrait, TransactionTrait,
66};
67use serde::{Deserialize, Serialize};
68
69use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
70use crate::controller::catalog::CatalogController;
71use crate::controller::scale::{
72    FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble, RenderedGraph,
73    find_fragment_no_shuffle_dags_detailed, load_fragment_context_for_jobs,
74    render_actor_assignments, resolve_streaming_job_definition,
75};
76use crate::controller::utils::{
77    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
78    get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
79    resolve_no_shuffle_actor_mapping,
80};
81use crate::error::MetaError;
82use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
83use crate::model::{
84    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
85    StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
86};
87use crate::rpc::ddl_controller::build_upstream_sink_info;
88use crate::stream::UpstreamSinkInfo;
89use crate::{MetaResult, model};
90
91/// Some information of running (inflight) actors.
92#[derive(Debug)]
93pub struct InflightActorInfo {
94    pub worker_id: WorkerId,
95    pub vnode_bitmap: Option<Bitmap>,
96    pub splits: Vec<SplitImpl>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100struct ActorInfo {
101    pub actor_id: ActorId,
102    pub fragment_id: FragmentId,
103    pub splits: ConnectorSplits,
104    pub worker_id: WorkerId,
105    pub vnode_bitmap: Option<VnodeBitmap>,
106    pub expr_context: ExprContext,
107    pub config_override: Arc<str>,
108}
109
110#[derive(Debug)]
111struct FragmentDescRow {
112    fragment_id: FragmentId,
113    job_id: JobId,
114    fragment_type_mask: i32,
115    distribution_type: DistributionType,
116    state_table_ids: TableIdArray,
117    vnode_count: i32,
118    parallelism_override: Option<StreamingParallelism>,
119    stream_node: Option<StreamNode>,
120}
121
122#[derive(Debug)]
123pub struct InflightFragmentInfo {
124    pub fragment_id: FragmentId,
125    pub distribution_type: DistributionType,
126    pub fragment_type_mask: FragmentTypeMask,
127    pub vnode_count: usize,
128    pub nodes: PbStreamNode,
129    pub actors: HashMap<ActorId, InflightActorInfo>,
130    pub state_table_ids: HashSet<TableId>,
131}
132
133#[derive(Clone, Debug)]
134pub struct FragmentParallelismInfo {
135    pub distribution_type: FragmentDistributionType,
136    pub vnode_count: usize,
137}
138
139#[easy_ext::ext(FragmentTypeMaskExt)]
140pub impl FragmentTypeMask {
141    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
142        Expr::col(fragment::Column::FragmentTypeMask)
143            .bit_and(Expr::value(flag as i32))
144            .ne(0)
145    }
146
147    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
148        Expr::col(fragment::Column::FragmentTypeMask)
149            .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
150            .ne(0)
151    }
152
153    fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
154        Expr::col(fragment::Column::FragmentTypeMask)
155            .bit_and(Expr::value(flag as i32))
156            .eq(0)
157    }
158}
159
160#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
161#[serde(rename_all = "camelCase")] // for dashboard
162pub struct StreamingJobInfo {
163    pub job_id: JobId,
164    pub obj_type: ObjectType,
165    pub name: String,
166    pub job_status: JobStatus,
167    pub parallelism: StreamingParallelism,
168    pub adaptive_parallelism_strategy: Option<String>,
169    // These backfill fields are only used to derive the effective parallelism shown for
170    // in-progress jobs. They are skipped in JSON responses so existing dashboard payloads keep
171    // the same schema.
172    #[serde(skip)]
173    pub backfill_parallelism: Option<StreamingParallelism>,
174    #[serde(skip)]
175    pub backfill_adaptive_parallelism_strategy: Option<String>,
176    pub max_parallelism: i32,
177    pub resource_group: String,
178    pub config_override: String,
179    pub database_id: DatabaseId,
180    pub schema_id: SchemaId,
181}
182
183impl NotificationManager {
184    /// Notify frontend about streaming worker slot mapping changes.
185    ///
186    /// This only sends streaming mapping updates to frontends. Serving mapping
187    /// notifications are decoupled and driven by fragment model changes (insert/delete)
188    /// instead of barrier-driven actor set changes.
189    pub(crate) fn notify_streaming_fragment_mapping(
190        &self,
191        operation: NotificationOperation,
192        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
193    ) {
194        if fragment_mappings.is_empty() {
195            return;
196        }
197        // notify all fragment mappings to frontend.
198        for fragment_mapping in fragment_mappings {
199            self.notify_frontend_without_version(
200                operation,
201                NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
202            );
203        }
204    }
205
206    /// Notify the serving module about fragment mapping changes.
207    ///
208    /// This should be called when fragments are inserted into or deleted from the meta store,
209    /// so that serving vnode mappings are kept in sync with the fragment model.
210    pub(crate) fn notify_serving_fragment_mapping_update(
211        &self,
212        fragment_ids: Vec<crate::model::FragmentId>,
213    ) {
214        if fragment_ids.is_empty() {
215            return;
216        }
217        self.notify_local_subscribers(LocalNotification::ServingFragmentMappingsUpsert(
218            fragment_ids,
219        ));
220    }
221
222    /// Notify the serving module about fragment mapping deletions.
223    ///
224    /// This should be called when fragments are deleted from the meta store,
225    /// so that serving vnode mappings are cleaned up.
226    pub(crate) fn notify_serving_fragment_mapping_delete(
227        &self,
228        fragment_ids: Vec<crate::model::FragmentId>,
229    ) {
230        if fragment_ids.is_empty() {
231            return;
232        }
233        self.notify_local_subscribers(LocalNotification::ServingFragmentMappingsDelete(
234            fragment_ids,
235        ));
236    }
237}
238
239impl CatalogController {
240    pub fn prepare_fragment_models_from_fragments(
241        job_id: JobId,
242        fragments: impl Iterator<Item = &Fragment>,
243    ) -> MetaResult<Vec<fragment::Model>> {
244        fragments
245            .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
246            .try_collect()
247    }
248
249    pub fn prepare_fragment_model_for_new_job(
250        job_id: JobId,
251        fragment: &Fragment,
252    ) -> MetaResult<fragment::Model> {
253        let vnode_count = fragment.vnode_count();
254        let Fragment {
255            fragment_id: pb_fragment_id,
256            fragment_type_mask: pb_fragment_type_mask,
257            distribution_type: pb_distribution_type,
258            state_table_ids: pb_state_table_ids,
259            nodes,
260            ..
261        } = fragment;
262
263        let state_table_ids = pb_state_table_ids.clone().into();
264
265        let fragment_parallelism = nodes
266            .node_body
267            .as_ref()
268            .and_then(|body| match body {
269                NodeBody::StreamCdcScan(node) => Some(node),
270                _ => None,
271            })
272            .and_then(|node| node.options.as_ref())
273            .map(CdcScanOptions::from_proto)
274            .filter(|opts| opts.is_parallelized_backfill())
275            .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
276
277        let stream_node = StreamNode::from(nodes);
278
279        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
280            .unwrap()
281            .into();
282
283        #[expect(deprecated)]
284        let fragment = fragment::Model {
285            fragment_id: *pb_fragment_id as _,
286            job_id,
287            fragment_type_mask: (*pb_fragment_type_mask).into(),
288            distribution_type,
289            stream_node,
290            state_table_ids,
291            upstream_fragment_id: Default::default(),
292            vnode_count: vnode_count as _,
293            parallelism: fragment_parallelism,
294        };
295
296        Ok(fragment)
297    }
298
299    #[expect(clippy::type_complexity)]
300    fn compose_table_fragments(
301        job_id: JobId,
302        state: PbState,
303        ctx: StreamContext,
304        fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
305        max_parallelism: usize,
306        job_definition: Option<String>,
307    ) -> MetaResult<(
308        StreamJobFragments,
309        HashMap<FragmentId, Vec<StreamActor>>,
310        HashMap<crate::model::ActorId, PbActorStatus>,
311    )> {
312        let mut pb_fragments = BTreeMap::new();
313        let mut fragment_actors = HashMap::new();
314        let mut all_actor_status = HashMap::new();
315
316        for (fragment, actors) in fragments {
317            let (fragment, actors, actor_status, _) =
318                Self::compose_fragment(fragment, actors, job_definition.clone())?;
319            let fragment_id = fragment.fragment_id;
320            fragment_actors.insert(fragment_id, actors);
321            all_actor_status.extend(actor_status);
322
323            pb_fragments.insert(fragment_id, fragment);
324        }
325
326        let table_fragments = StreamJobFragments {
327            stream_job_id: job_id,
328            state: state as _,
329            fragments: pb_fragments,
330            ctx,
331            max_parallelism,
332        };
333
334        Ok((table_fragments, fragment_actors, all_actor_status))
335    }
336
337    #[expect(clippy::type_complexity)]
338    fn compose_fragment(
339        fragment: fragment::Model,
340        actors: Vec<ActorInfo>,
341        job_definition: Option<String>,
342    ) -> MetaResult<(
343        Fragment,
344        Vec<StreamActor>,
345        HashMap<crate::model::ActorId, PbActorStatus>,
346        HashMap<crate::model::ActorId, PbConnectorSplits>,
347    )> {
348        let fragment::Model {
349            fragment_id,
350            fragment_type_mask,
351            distribution_type,
352            stream_node,
353            state_table_ids,
354            vnode_count,
355            ..
356        } = fragment;
357
358        let stream_node = stream_node.to_protobuf();
359        let mut upstream_fragments = HashSet::new();
360        visit_stream_node_body(&stream_node, |body| {
361            if let NodeBody::Merge(m) = body {
362                assert!(
363                    upstream_fragments.insert(m.upstream_fragment_id),
364                    "non-duplicate upstream fragment"
365                );
366            }
367        });
368
369        let mut pb_actors = vec![];
370
371        let mut pb_actor_status = HashMap::new();
372        let mut pb_actor_splits = HashMap::new();
373
374        for actor in actors {
375            if actor.fragment_id != fragment_id {
376                bail!(
377                    "fragment id {} from actor {} is different from fragment {}",
378                    actor.fragment_id,
379                    actor.actor_id,
380                    fragment_id
381                )
382            }
383
384            let ActorInfo {
385                actor_id,
386                fragment_id,
387                worker_id,
388                splits,
389                vnode_bitmap,
390                expr_context,
391                config_override,
392                ..
393            } = actor;
394
395            let vnode_bitmap =
396                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
397            let pb_expr_context = Some(expr_context.to_protobuf());
398
399            pb_actor_status.insert(
400                actor_id as _,
401                PbActorStatus {
402                    location: PbActorLocation::from_worker(worker_id),
403                },
404            );
405
406            pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
407
408            pb_actors.push(StreamActor {
409                actor_id: actor_id as _,
410                fragment_id: fragment_id as _,
411                vnode_bitmap,
412                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
413                expr_context: pb_expr_context,
414                config_override,
415            })
416        }
417
418        let pb_state_table_ids = state_table_ids.0;
419        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
420        let pb_fragment = Fragment {
421            fragment_id: fragment_id as _,
422            fragment_type_mask: fragment_type_mask.into(),
423            distribution_type: pb_distribution_type,
424            state_table_ids: pb_state_table_ids,
425            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
426            nodes: stream_node,
427        };
428
429        Ok((pb_fragment, pb_actors, pb_actor_status, pb_actor_splits))
430    }
431
432    /// Returns distribution type and vnode count for all (or filtered) fragments.
433    ///
434    /// Reads directly from the persistent catalog (fragment table) rather than
435    /// from the in-memory `shared_actor_infos`.  This is critical because the
436    /// serving vnode mapping must be available even before the barrier manager's
437    /// recovery has completed and populated `shared_actor_infos`.
438    pub async fn fragment_parallelisms(
439        &self,
440    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
441        let inner = self.inner.read().await;
442        let query = FragmentModel::find().select_only().columns([
443            fragment::Column::FragmentId,
444            fragment::Column::DistributionType,
445            fragment::Column::VnodeCount,
446        ]);
447        let fragments: Vec<(FragmentId, DistributionType, i32)> =
448            query.into_tuple().all(&inner.db).await?;
449
450        Ok(fragments
451            .into_iter()
452            .map(|(fragment_id, distribution_type, vnode_count)| {
453                (
454                    fragment_id,
455                    FragmentParallelismInfo {
456                        distribution_type: PbFragmentDistributionType::from(distribution_type),
457                        vnode_count: vnode_count as usize,
458                    },
459                )
460            })
461            .collect())
462    }
463
464    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
465        let inner = self.inner.read().await;
466        let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
467            .select_only()
468            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
469            .into_tuple()
470            .all(&inner.db)
471            .await?;
472        Ok(fragment_jobs.into_iter().collect())
473    }
474
475    pub async fn get_fragment_job_id(
476        &self,
477        fragment_ids: Vec<FragmentId>,
478    ) -> MetaResult<Vec<ObjectId>> {
479        let inner = self.inner.read().await;
480        self.get_fragment_job_id_in_txn(&inner.db, fragment_ids)
481            .await
482    }
483
484    pub async fn get_fragment_job_id_in_txn<C>(
485        &self,
486        txn: &C,
487        fragment_ids: Vec<FragmentId>,
488    ) -> MetaResult<Vec<ObjectId>>
489    where
490        C: ConnectionTrait + Send,
491    {
492        let object_ids: Vec<ObjectId> = FragmentModel::find()
493            .select_only()
494            .column(fragment::Column::JobId)
495            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
496            .into_tuple()
497            .all(txn)
498            .await?;
499
500        Ok(object_ids)
501    }
502
503    pub async fn get_fragment_desc_by_id(
504        &self,
505        fragment_id: FragmentId,
506    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
507        let inner = self.inner.read().await;
508
509        let fragment_model = match FragmentModel::find_by_id(fragment_id)
510            .one(&inner.db)
511            .await?
512        {
513            Some(fragment) => fragment,
514            None => return Ok(None),
515        };
516
517        let job_parallelism: Option<(StreamingParallelism, Option<String>)> =
518            StreamingJob::find_by_id(fragment_model.job_id)
519                .select_only()
520                .columns([
521                    streaming_job::Column::Parallelism,
522                    streaming_job::Column::AdaptiveParallelismStrategy,
523                ])
524                .into_tuple::<(StreamingParallelism, Option<String>)>()
525                .one(&inner.db)
526                .await?;
527
528        let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
529            .select_only()
530            .columns([
531                fragment_relation::Column::SourceFragmentId,
532                fragment_relation::Column::DispatcherType,
533            ])
534            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
535            .into_tuple()
536            .all(&inner.db)
537            .await?;
538
539        let upstreams: Vec<_> = upstream_entries
540            .into_iter()
541            .map(|(source_id, _)| source_id)
542            .collect();
543
544        let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
545            .await
546            .map(Self::collect_root_fragment_mapping)?;
547        let root_fragments = root_fragment_map
548            .get(&fragment_id)
549            .cloned()
550            .unwrap_or_default();
551
552        let info = self.env.shared_actor_infos().read_guard();
553        let SharedFragmentInfo { actors, .. } = info
554            .get_fragment(fragment_model.fragment_id as _)
555            .unwrap_or_else(|| {
556                panic!(
557                    "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
558                    fragment_model.fragment_id,
559                    fragment_model.job_id
560                )
561            });
562
563        let parallelism_policy = Self::format_fragment_parallelism_policy(
564            fragment_model.distribution_type,
565            fragment_model.parallelism.as_ref(),
566            job_parallelism.as_ref().map(|(parallelism, _)| parallelism),
567            job_parallelism
568                .as_ref()
569                .and_then(|(_, strategy)| strategy.as_deref()),
570            &root_fragments,
571        );
572
573        let fragment = FragmentDesc {
574            fragment_id: fragment_model.fragment_id,
575            job_id: fragment_model.job_id,
576            fragment_type_mask: fragment_model.fragment_type_mask,
577            distribution_type: fragment_model.distribution_type,
578            state_table_ids: fragment_model.state_table_ids.clone(),
579            parallelism: actors.len() as _,
580            vnode_count: fragment_model.vnode_count,
581            stream_node: fragment_model.stream_node.clone(),
582            parallelism_policy,
583        };
584
585        Ok(Some((fragment, upstreams)))
586    }
587
588    pub async fn list_fragment_database_ids(
589        &self,
590        select_fragment_ids: Option<Vec<FragmentId>>,
591    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
592        let inner = self.inner.read().await;
593        let select = FragmentModel::find()
594            .select_only()
595            .column(fragment::Column::FragmentId)
596            .column(object::Column::DatabaseId)
597            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
598        let select = if let Some(select_fragment_ids) = select_fragment_ids {
599            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
600        } else {
601            select
602        };
603        Ok(select.into_tuple().all(&inner.db).await?)
604    }
605
606    pub async fn get_job_fragments_by_id(
607        &self,
608        job_id: JobId,
609    ) -> MetaResult<(
610        StreamJobFragments,
611        HashMap<FragmentId, Vec<StreamActor>>,
612        HashMap<ActorId, PbActorStatus>,
613    )> {
614        let inner = self.inner.read().await;
615
616        // Load fragments matching the job from the database
617        let fragments: Vec<_> = FragmentModel::find()
618            .filter(fragment::Column::JobId.eq(job_id))
619            .all(&inner.db)
620            .await?;
621
622        let job_info = StreamingJob::find_by_id(job_id)
623            .one(&inner.db)
624            .await?
625            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
626
627        let fragment_actors =
628            self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
629
630        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
631            .await?
632            .remove(&job_id);
633
634        Self::compose_table_fragments(
635            job_id,
636            job_info.job_status.into(),
637            job_info.stream_context(),
638            fragment_actors,
639            job_info.max_parallelism as _,
640            job_definition,
641        )
642    }
643
644    pub async fn get_fragment_actor_dispatchers(
645        &self,
646        fragment_ids: Vec<FragmentId>,
647    ) -> MetaResult<FragmentActorDispatchers> {
648        let inner = self.inner.read().await;
649
650        self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
651            .await
652    }
653
654    pub async fn get_fragment_actor_dispatchers_txn(
655        &self,
656        c: &impl ConnectionTrait,
657        fragment_ids: Vec<FragmentId>,
658    ) -> MetaResult<FragmentActorDispatchers> {
659        let fragment_relations = FragmentRelation::find()
660            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
661            .all(c)
662            .await?;
663
664        type FragmentActorInfo = (
665            DistributionType,
666            Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
667        );
668
669        let shared_info = self.env.shared_actor_infos();
670        let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
671        let get_fragment_actors = |fragment_id: FragmentId| async move {
672            let result: MetaResult<FragmentActorInfo> = try {
673                let read_guard = shared_info.read_guard();
674
675                let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
676
677                (
678                    fragment.distribution_type,
679                    Arc::new(
680                        fragment
681                            .actors
682                            .iter()
683                            .map(|(actor_id, actor_info)| {
684                                (
685                                    *actor_id,
686                                    actor_info
687                                        .vnode_bitmap
688                                        .as_ref()
689                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
690                                )
691                            })
692                            .collect(),
693                    ),
694                )
695            };
696            result
697        };
698
699        let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
700        for fragment_relation::Model {
701            source_fragment_id,
702            target_fragment_id,
703            dispatcher_type,
704            dist_key_indices,
705            output_indices,
706            output_type_mapping,
707        } in fragment_relations
708        {
709            let (source_fragment_distribution, source_fragment_actors) = {
710                let (distribution, actors) = {
711                    match fragment_actor_cache.entry(source_fragment_id) {
712                        Entry::Occupied(entry) => entry.into_mut(),
713                        Entry::Vacant(entry) => {
714                            entry.insert(get_fragment_actors(source_fragment_id).await?)
715                        }
716                    }
717                };
718                (*distribution, actors.clone())
719            };
720            let (target_fragment_distribution, target_fragment_actors) = {
721                let (distribution, actors) = {
722                    match fragment_actor_cache.entry(target_fragment_id) {
723                        Entry::Occupied(entry) => entry.into_mut(),
724                        Entry::Vacant(entry) => {
725                            entry.insert(get_fragment_actors(target_fragment_id).await?)
726                        }
727                    }
728                };
729                (*distribution, actors.clone())
730            };
731            let output_mapping = PbDispatchOutputMapping {
732                indices: output_indices.into_u32_array(),
733                types: output_type_mapping.unwrap_or_default().to_protobuf(),
734            };
735            let (dispatchers, _) = compose_dispatchers(
736                source_fragment_distribution,
737                &source_fragment_actors,
738                target_fragment_id as _,
739                target_fragment_distribution,
740                &target_fragment_actors,
741                dispatcher_type,
742                dist_key_indices.into_u32_array(),
743                output_mapping,
744            );
745            let actor_dispatchers_map = actor_dispatchers_map
746                .entry(source_fragment_id as _)
747                .or_default();
748            for (actor_id, dispatchers) in dispatchers {
749                actor_dispatchers_map
750                    .entry(actor_id as _)
751                    .or_default()
752                    .push(dispatchers);
753            }
754        }
755        Ok(actor_dispatchers_map)
756    }
757
758    pub async fn get_fragment_downstream_relations(
759        &self,
760        fragment_ids: Vec<FragmentId>,
761    ) -> MetaResult<FragmentDownstreamRelation> {
762        let inner = self.inner.read().await;
763        self.get_fragment_downstream_relations_in_txn(&inner.db, fragment_ids)
764            .await
765    }
766
767    pub async fn get_fragment_downstream_relations_in_txn<C>(
768        &self,
769        txn: &C,
770        fragment_ids: Vec<FragmentId>,
771    ) -> MetaResult<FragmentDownstreamRelation>
772    where
773        C: ConnectionTrait + StreamTrait + Send,
774    {
775        let mut stream = FragmentRelation::find()
776            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
777            .stream(txn)
778            .await?;
779        let mut relations = FragmentDownstreamRelation::new();
780        while let Some(relation) = stream.try_next().await? {
781            relations
782                .entry(relation.source_fragment_id as _)
783                .or_default()
784                .push(DownstreamFragmentRelation {
785                    downstream_fragment_id: relation.target_fragment_id as _,
786                    dispatcher_type: relation.dispatcher_type,
787                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
788                    output_mapping: PbDispatchOutputMapping {
789                        indices: relation.output_indices.into_u32_array(),
790                        types: relation
791                            .output_type_mapping
792                            .unwrap_or_default()
793                            .to_protobuf(),
794                    },
795                });
796        }
797        Ok(relations)
798    }
799
800    pub async fn get_job_fragment_backfill_scan_type(
801        &self,
802        job_id: JobId,
803    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
804        let inner = self.inner.read().await;
805        self.get_job_fragment_backfill_scan_type_in_txn(&inner.db, job_id)
806            .await
807    }
808
809    pub async fn get_job_fragment_backfill_scan_type_in_txn<C>(
810        &self,
811        txn: &C,
812        job_id: JobId,
813    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>>
814    where
815        C: ConnectionTrait + Send,
816    {
817        let fragments: Vec<_> = FragmentModel::find()
818            .filter(fragment::Column::JobId.eq(job_id))
819            .all(txn)
820            .await?;
821
822        let mut result = HashMap::new();
823
824        for fragment::Model {
825            fragment_id,
826            stream_node,
827            ..
828        } in fragments
829        {
830            let stream_node = stream_node.to_protobuf();
831            visit_stream_node_body(&stream_node, |body| {
832                if let NodeBody::StreamScan(node) = body {
833                    match node.stream_scan_type() {
834                        StreamScanType::Unspecified => {}
835                        scan_type => {
836                            result.insert(fragment_id as crate::model::FragmentId, scan_type);
837                        }
838                    }
839                }
840            });
841        }
842
843        Ok(result)
844    }
845
846    pub async fn count_streaming_jobs(&self) -> MetaResult<usize> {
847        let inner = self.inner.read().await;
848        let count = StreamingJob::find().count(&inner.db).await?;
849        Ok(usize::try_from(count).context("streaming job count overflow")?)
850    }
851
852    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
853        let inner = self.inner.read().await;
854        let job_states = StreamingJob::find()
855            .select_only()
856            .column(streaming_job::Column::JobId)
857            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
858            .join(JoinType::InnerJoin, object::Relation::Database2.def())
859            .column(object::Column::ObjType)
860            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
861            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
862            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
863            .column_as(
864                Expr::if_null(
865                    Expr::col((table::Entity, table::Column::Name)),
866                    Expr::if_null(
867                        Expr::col((source::Entity, source::Column::Name)),
868                        Expr::if_null(
869                            Expr::col((sink::Entity, sink::Column::Name)),
870                            Expr::val("<unknown>"),
871                        ),
872                    ),
873                ),
874                "name",
875            )
876            .columns([
877                streaming_job::Column::JobStatus,
878                streaming_job::Column::Parallelism,
879                streaming_job::Column::AdaptiveParallelismStrategy,
880                streaming_job::Column::BackfillParallelism,
881                streaming_job::Column::BackfillAdaptiveParallelismStrategy,
882                streaming_job::Column::MaxParallelism,
883            ])
884            .column_as(
885                Expr::if_null(
886                    Expr::col((
887                        streaming_job::Entity,
888                        streaming_job::Column::SpecificResourceGroup,
889                    )),
890                    Expr::col((database::Entity, database::Column::ResourceGroup)),
891                ),
892                "resource_group",
893            )
894            .column_as(
895                Expr::if_null(
896                    Expr::col((streaming_job::Entity, streaming_job::Column::ConfigOverride)),
897                    Expr::val(""),
898                ),
899                "config_override",
900            )
901            .column(object::Column::DatabaseId)
902            .column(object::Column::SchemaId)
903            .into_model()
904            .all(&inner.db)
905            .await?;
906        Ok(job_states)
907    }
908
909    pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
910        let inner = self.inner.read().await;
911        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
912            .select_only()
913            .column(streaming_job::Column::MaxParallelism)
914            .into_tuple()
915            .one(&inner.db)
916            .await?
917            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
918        Ok(max_parallelism as usize)
919    }
920
921    /// Try to get internal table ids of each streaming job, used by metrics collection.
922    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
923        if let Ok(inner) = self.inner.try_read()
924            && let Ok(job_state_tables) = FragmentModel::find()
925                .select_only()
926                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
927                .into_tuple::<(JobId, I32Array)>()
928                .all(&inner.db)
929                .await
930        {
931            let mut job_internal_table_ids = HashMap::new();
932            for (job_id, state_table_ids) in job_state_tables {
933                job_internal_table_ids
934                    .entry(job_id)
935                    .or_insert_with(Vec::new)
936                    .extend(
937                        state_table_ids
938                            .into_inner()
939                            .into_iter()
940                            .map(|table_id| TableId::new(table_id as _)),
941                    );
942            }
943            return Some(job_internal_table_ids.into_iter().collect());
944        }
945        None
946    }
947
948    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
949        let inner = self.inner.read().await;
950        let count = FragmentModel::find().count(&inner.db).await?;
951        Ok(count > 0)
952    }
953
954    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
955        let read_guard = self.env.shared_actor_infos().read_guard();
956        let actor_cnt: HashMap<WorkerId, _> = read_guard
957            .iter_over_fragments()
958            .flat_map(|(_, fragment)| {
959                fragment
960                    .actors
961                    .iter()
962                    .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
963            })
964            .into_group_map()
965            .into_iter()
966            .map(|(k, v)| (k, v.len()))
967            .collect();
968
969        Ok(actor_cnt)
970    }
971
972    fn collect_fragment_actor_map(
973        &self,
974        fragment_ids: &[FragmentId],
975        stream_context: StreamContext,
976    ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
977        let guard = self.env.shared_actor_infos().read_guard();
978        let pb_expr_context = stream_context.to_expr_context();
979        let expr_context: ExprContext = (&pb_expr_context).into();
980
981        let mut actor_map = HashMap::with_capacity(fragment_ids.len());
982        for fragment_id in fragment_ids {
983            let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
984                anyhow!("fragment {} not found in shared actor info", fragment_id)
985            })?;
986
987            let actors = fragment_info
988                .actors
989                .iter()
990                .map(|(actor_id, actor_info)| ActorInfo {
991                    actor_id: *actor_id as _,
992                    fragment_id: *fragment_id,
993                    splits: ConnectorSplits::from(&PbConnectorSplits {
994                        splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
995                    }),
996                    worker_id: actor_info.worker_id as _,
997                    vnode_bitmap: actor_info
998                        .vnode_bitmap
999                        .as_ref()
1000                        .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
1001                    expr_context: expr_context.clone(),
1002                    config_override: stream_context.config_override.clone(),
1003                })
1004                .collect();
1005
1006            actor_map.insert(*fragment_id, actors);
1007        }
1008
1009        Ok(actor_map)
1010    }
1011
1012    fn collect_fragment_actor_pairs(
1013        &self,
1014        fragments: Vec<fragment::Model>,
1015        stream_context: StreamContext,
1016    ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
1017        let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
1018        let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
1019        fragments
1020            .into_iter()
1021            .map(|fragment| {
1022                let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
1023                    anyhow!(
1024                        "fragment {} missing in shared actor info map",
1025                        fragment.fragment_id
1026                    )
1027                })?;
1028                Ok((fragment, actors))
1029            })
1030            .collect()
1031    }
1032
1033    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
1034    pub async fn table_fragments(
1035        &self,
1036    ) -> MetaResult<
1037        BTreeMap<
1038            JobId,
1039            (
1040                StreamJobFragments,
1041                HashMap<FragmentId, Vec<StreamActor>>,
1042                HashMap<crate::model::ActorId, PbActorStatus>,
1043            ),
1044        >,
1045    > {
1046        let inner = self.inner.read().await;
1047        let jobs = StreamingJob::find().all(&inner.db).await?;
1048
1049        let mut job_definition = resolve_streaming_job_definition(
1050            &inner.db,
1051            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
1052        )
1053        .await?;
1054
1055        let mut table_fragments = BTreeMap::new();
1056        for job in jobs {
1057            let fragments = FragmentModel::find()
1058                .filter(fragment::Column::JobId.eq(job.job_id))
1059                .all(&inner.db)
1060                .await?;
1061
1062            let fragment_actors =
1063                self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
1064
1065            table_fragments.insert(
1066                job.job_id,
1067                Self::compose_table_fragments(
1068                    job.job_id,
1069                    job.job_status.into(),
1070                    job.stream_context(),
1071                    fragment_actors,
1072                    job.max_parallelism as _,
1073                    job_definition.remove(&job.job_id),
1074                )?,
1075            );
1076        }
1077
1078        Ok(table_fragments)
1079    }
1080
1081    pub async fn upstream_fragments(
1082        &self,
1083        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1084    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
1085        let inner = self.inner.read().await;
1086        self.upstream_fragments_in_txn(&inner.db, fragment_ids)
1087            .await
1088    }
1089
1090    pub async fn upstream_fragments_in_txn<C>(
1091        &self,
1092        txn: &C,
1093        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1094    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>>
1095    where
1096        C: ConnectionTrait + StreamTrait + Send,
1097    {
1098        let mut stream = FragmentRelation::find()
1099            .select_only()
1100            .columns([
1101                fragment_relation::Column::SourceFragmentId,
1102                fragment_relation::Column::TargetFragmentId,
1103            ])
1104            .filter(
1105                fragment_relation::Column::TargetFragmentId
1106                    .is_in(fragment_ids.map(|id| id as FragmentId)),
1107            )
1108            .into_tuple::<(FragmentId, FragmentId)>()
1109            .stream(txn)
1110            .await?;
1111        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
1112        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
1113            upstream_fragments
1114                .entry(downstream_fragment_id as crate::model::FragmentId)
1115                .or_default()
1116                .insert(upstream_fragment_id as crate::model::FragmentId);
1117        }
1118        Ok(upstream_fragments)
1119    }
1120
1121    pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
1122        let info = self.env.shared_actor_infos().read_guard();
1123
1124        let actor_locations = info
1125            .iter_over_fragments()
1126            .flat_map(|(fragment_id, fragment)| {
1127                fragment
1128                    .actors
1129                    .iter()
1130                    .map(|(actor_id, actor)| PartialActorLocation {
1131                        actor_id: *actor_id as _,
1132                        fragment_id: *fragment_id as _,
1133                        worker_id: actor.worker_id,
1134                    })
1135            })
1136            .collect_vec();
1137
1138        Ok(actor_locations)
1139    }
1140
1141    pub async fn list_actor_info(
1142        &self,
1143    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1144        let inner = self.inner.read().await;
1145
1146        let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1147            FragmentModel::find()
1148                .select_only()
1149                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1150                .column(fragment::Column::FragmentId)
1151                .column_as(object::Column::Oid, "job_id")
1152                .column_as(object::Column::SchemaId, "schema_id")
1153                .column_as(object::Column::ObjType, "type")
1154                .into_tuple()
1155                .all(&inner.db)
1156                .await?;
1157
1158        let actor_infos = {
1159            let info = self.env.shared_actor_infos().read_guard();
1160
1161            let mut result = Vec::new();
1162
1163            for (fragment_id, object_id, schema_id, object_type) in fragment_objects {
1164                let Some(fragment) = info.get_fragment(fragment_id as _) else {
1165                    return Err(MetaError::unavailable(format!(
1166                        "shared actor info missing for fragment {fragment_id} while listing actors"
1167                    )));
1168                };
1169
1170                for actor_id in fragment.actors.keys() {
1171                    result.push((
1172                        *actor_id as _,
1173                        fragment.fragment_id as _,
1174                        object_id,
1175                        schema_id,
1176                        object_type,
1177                    ));
1178                }
1179            }
1180
1181            result
1182        };
1183
1184        Ok(actor_infos)
1185    }
1186
1187    pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1188        let guard = self.env.shared_actor_info.read_guard();
1189        guard
1190            .iter_over_fragments()
1191            .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1192            .collect_vec()
1193    }
1194
1195    pub async fn list_fragment_descs_with_node(
1196        &self,
1197        is_creating: bool,
1198    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1199        let inner = self.inner.read().await;
1200        let txn = inner.db.begin().await?;
1201
1202        let fragments_query = Self::build_fragment_query(is_creating);
1203        let fragments: Vec<fragment::Model> = fragments_query.all(&txn).await?;
1204
1205        let rows = fragments
1206            .into_iter()
1207            .map(|fragment| FragmentDescRow {
1208                fragment_id: fragment.fragment_id,
1209                job_id: fragment.job_id,
1210                fragment_type_mask: fragment.fragment_type_mask,
1211                distribution_type: fragment.distribution_type,
1212                state_table_ids: fragment.state_table_ids,
1213                vnode_count: fragment.vnode_count,
1214                parallelism_override: fragment.parallelism,
1215                stream_node: Some(fragment.stream_node),
1216            })
1217            .collect_vec();
1218
1219        self.build_fragment_distributions(&txn, rows).await
1220    }
1221
1222    pub async fn list_fragment_descs_without_node(
1223        &self,
1224        is_creating: bool,
1225    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1226        let inner = self.inner.read().await;
1227        let txn = inner.db.begin().await?;
1228
1229        let fragments_query = Self::build_fragment_query(is_creating);
1230        #[expect(clippy::type_complexity)]
1231        let fragments: Vec<(
1232            FragmentId,
1233            JobId,
1234            i32,
1235            DistributionType,
1236            TableIdArray,
1237            i32,
1238            Option<StreamingParallelism>,
1239        )> = fragments_query
1240            .select_only()
1241            .columns([
1242                fragment::Column::FragmentId,
1243                fragment::Column::JobId,
1244                fragment::Column::FragmentTypeMask,
1245                fragment::Column::DistributionType,
1246                fragment::Column::StateTableIds,
1247                fragment::Column::VnodeCount,
1248                fragment::Column::Parallelism,
1249            ])
1250            .into_tuple()
1251            .all(&txn)
1252            .await?;
1253
1254        let rows = fragments
1255            .into_iter()
1256            .map(
1257                |(
1258                    fragment_id,
1259                    job_id,
1260                    fragment_type_mask,
1261                    distribution_type,
1262                    state_table_ids,
1263                    vnode_count,
1264                    parallelism_override,
1265                )| FragmentDescRow {
1266                    fragment_id,
1267                    job_id,
1268                    fragment_type_mask,
1269                    distribution_type,
1270                    state_table_ids,
1271                    vnode_count,
1272                    parallelism_override,
1273                    stream_node: None,
1274                },
1275            )
1276            .collect_vec();
1277
1278        self.build_fragment_distributions(&txn, rows).await
1279    }
1280
1281    fn build_fragment_query(is_creating: bool) -> sea_orm::Select<fragment::Entity> {
1282        if is_creating {
1283            FragmentModel::find()
1284                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1285                .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1286                .filter(
1287                    streaming_job::Column::JobStatus
1288                        .eq(JobStatus::Initial)
1289                        .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1290                )
1291        } else {
1292            FragmentModel::find()
1293        }
1294    }
1295
1296    async fn build_fragment_distributions(
1297        &self,
1298        txn: &DatabaseTransaction,
1299        rows: Vec<FragmentDescRow>,
1300    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1301        let fragment_ids = rows.iter().map(|row| row.fragment_id).collect_vec();
1302        let job_ids = rows.iter().map(|row| row.job_id).unique().collect_vec();
1303
1304        let job_parallelisms: HashMap<JobId, (StreamingParallelism, Option<String>)> =
1305            if fragment_ids.is_empty() {
1306                HashMap::new()
1307            } else {
1308                StreamingJob::find()
1309                    .select_only()
1310                    .columns([
1311                        streaming_job::Column::JobId,
1312                        streaming_job::Column::Parallelism,
1313                        streaming_job::Column::AdaptiveParallelismStrategy,
1314                    ])
1315                    .filter(streaming_job::Column::JobId.is_in(job_ids))
1316                    .into_tuple::<(JobId, StreamingParallelism, Option<String>)>()
1317                    .all(txn)
1318                    .await?
1319                    .into_iter()
1320                    .map(|(job_id, parallelism, strategy)| (job_id, (parallelism, strategy)))
1321                    .collect()
1322            };
1323
1324        let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1325            if fragment_ids.is_empty() {
1326                Vec::new()
1327            } else {
1328                FragmentRelation::find()
1329                    .select_only()
1330                    .columns([
1331                        fragment_relation::Column::TargetFragmentId,
1332                        fragment_relation::Column::SourceFragmentId,
1333                        fragment_relation::Column::DispatcherType,
1334                    ])
1335                    .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1336                    .into_tuple()
1337                    .all(txn)
1338                    .await?
1339            };
1340
1341        let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1342        for (target_id, source_id, _) in upstream_entries {
1343            all_upstreams.entry(target_id).or_default().push(source_id);
1344        }
1345
1346        let root_fragment_map = if fragment_ids.is_empty() {
1347            HashMap::new()
1348        } else {
1349            let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_ids).await?;
1350            Self::collect_root_fragment_mapping(ensembles)
1351        };
1352
1353        let guard = self.env.shared_actor_info.read_guard();
1354        let mut result = Vec::with_capacity(rows.len());
1355
1356        for row in rows {
1357            let parallelism = guard
1358                .get_fragment(row.fragment_id as _)
1359                .map(|fragment| fragment.actors.len())
1360                .unwrap_or_default();
1361
1362            let root_fragments = root_fragment_map
1363                .get(&row.fragment_id)
1364                .cloned()
1365                .unwrap_or_default();
1366
1367            let upstreams = all_upstreams.remove(&row.fragment_id).unwrap_or_default();
1368
1369            let parallelism_policy = Self::format_fragment_parallelism_policy(
1370                row.distribution_type,
1371                row.parallelism_override.as_ref(),
1372                job_parallelisms
1373                    .get(&row.job_id)
1374                    .map(|(parallelism, _)| parallelism),
1375                job_parallelisms
1376                    .get(&row.job_id)
1377                    .and_then(|(_, strategy)| strategy.as_deref()),
1378                &root_fragments,
1379            );
1380
1381            let fragment = FragmentDistribution {
1382                fragment_id: row.fragment_id,
1383                table_id: row.job_id,
1384                distribution_type: PbFragmentDistributionType::from(row.distribution_type) as _,
1385                state_table_ids: row.state_table_ids.0,
1386                upstream_fragment_ids: upstreams.clone(),
1387                fragment_type_mask: row.fragment_type_mask as _,
1388                parallelism: parallelism as _,
1389                vnode_count: row.vnode_count as _,
1390                node: row.stream_node.map(|node| node.to_protobuf()),
1391                parallelism_policy,
1392            };
1393
1394            result.push((fragment, upstreams));
1395        }
1396
1397        Ok(result)
1398    }
1399
1400    pub async fn list_sink_log_store_tables(&self) -> MetaResult<Vec<(SinkId, TableId)>> {
1401        let inner = self.inner.read().await;
1402        let txn = inner.db.begin().await?;
1403
1404        let fragments: Vec<(JobId, StreamNode)> = FragmentModel::find()
1405            .select_only()
1406            .columns([fragment::Column::JobId, fragment::Column::StreamNode])
1407            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Sink))
1408            .into_tuple()
1409            .all(&txn)
1410            .await?;
1411
1412        let mut mapping: HashMap<SinkId, TableId> = HashMap::new();
1413
1414        for (job_id, stream_node) in fragments {
1415            let sink_id = SinkId::new(job_id.as_raw_id());
1416            let stream_node = stream_node.to_protobuf();
1417            let mut internal_table_id: Option<TableId> = None;
1418
1419            visit_stream_node_body(&stream_node, |body| {
1420                if let NodeBody::Sink(node) = body
1421                    && node.log_store_type == SinkLogStoreType::KvLogStore as i32
1422                    && let Some(table) = &node.table
1423                {
1424                    internal_table_id = Some(TableId::new(table.id.as_raw_id()));
1425                }
1426            });
1427
1428            if let Some(table_id) = internal_table_id
1429                && let Some(existing) = mapping.insert(sink_id, table_id)
1430                && existing != table_id
1431            {
1432                tracing::warn!(
1433                    "sink {sink_id:?} has multiple log store tables: {existing:?} vs {table_id:?}"
1434                );
1435            }
1436        }
1437
1438        Ok(mapping.into_iter().collect())
1439    }
1440
1441    /// Build a fragment-to-root lookup for all reported root fragment ensembles.
1442    fn collect_root_fragment_mapping(
1443        ensembles: Vec<NoShuffleEnsemble>,
1444    ) -> HashMap<FragmentId, Vec<FragmentId>> {
1445        let mut mapping = HashMap::new();
1446
1447        for ensemble in ensembles {
1448            let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1449            roots.sort_unstable();
1450            roots.dedup();
1451
1452            if roots.is_empty() {
1453                continue;
1454            }
1455
1456            let root_set: HashSet<_> = roots.iter().copied().collect();
1457
1458            for fragment_id in ensemble.component_fragments() {
1459                if root_set.contains(&fragment_id) {
1460                    mapping.insert(fragment_id, Vec::new());
1461                } else {
1462                    mapping.insert(fragment_id, roots.clone());
1463                }
1464            }
1465        }
1466
1467        mapping
1468    }
1469
1470    fn format_fragment_parallelism_policy(
1471        distribution_type: DistributionType,
1472        fragment_parallelism: Option<&StreamingParallelism>,
1473        job_parallelism: Option<&StreamingParallelism>,
1474        job_adaptive_parallelism_strategy: Option<&str>,
1475        root_fragments: &[FragmentId],
1476    ) -> String {
1477        if distribution_type == DistributionType::Single {
1478            return "single".to_owned();
1479        }
1480
1481        if let Some(parallelism) = fragment_parallelism {
1482            return format!(
1483                "override({})",
1484                Self::format_streaming_parallelism(parallelism, job_adaptive_parallelism_strategy)
1485            );
1486        }
1487
1488        if !root_fragments.is_empty() {
1489            let mut upstreams = root_fragments.to_vec();
1490            upstreams.sort_unstable();
1491            upstreams.dedup();
1492
1493            return format!("upstream_fragment({upstreams:?})");
1494        }
1495
1496        let inherited = job_parallelism
1497            .map(|parallelism| {
1498                Self::format_streaming_parallelism(parallelism, job_adaptive_parallelism_strategy)
1499            })
1500            .unwrap_or_else(|| "unknown".to_owned());
1501        format!("inherit({inherited})")
1502    }
1503
1504    fn format_streaming_parallelism(
1505        parallelism: &StreamingParallelism,
1506        adaptive_parallelism_strategy: Option<&str>,
1507    ) -> String {
1508        match parallelism {
1509            StreamingParallelism::Adaptive => adaptive_parallelism_strategy
1510                .and_then(Self::format_adaptive_parallelism_strategy)
1511                .unwrap_or_else(|| "adaptive".to_owned()),
1512            StreamingParallelism::Fixed(n) => n.to_string(),
1513            StreamingParallelism::Custom => adaptive_parallelism_strategy
1514                .and_then(Self::format_adaptive_parallelism_strategy)
1515                .unwrap_or_else(|| "custom".to_owned()),
1516        }
1517    }
1518
1519    fn format_adaptive_parallelism_strategy(strategy: &str) -> Option<String> {
1520        parse_strategy(strategy)
1521            .ok()
1522            .map(|strategy| match strategy {
1523                AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full => {
1524                    "adaptive".to_owned()
1525                }
1526                AdaptiveParallelismStrategy::Bounded(n) => format!("bounded({n})"),
1527                AdaptiveParallelismStrategy::Ratio(r) => format!("ratio({r})"),
1528            })
1529    }
1530
1531    pub async fn list_sink_actor_mapping(
1532        &self,
1533    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1534        let inner = self.inner.read().await;
1535        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1536            .select_only()
1537            .columns([sink::Column::SinkId, sink::Column::Name])
1538            .into_tuple()
1539            .all(&inner.db)
1540            .await?;
1541        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1542
1543        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1544
1545        let actor_with_type: Vec<(ActorId, SinkId)> = {
1546            let info = self.env.shared_actor_infos().read_guard();
1547
1548            info.iter_over_fragments()
1549                .filter(|(_, fragment)| {
1550                    sink_ids.contains(&fragment.job_id.as_sink_id())
1551                        && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1552                })
1553                .flat_map(|(_, fragment)| {
1554                    fragment
1555                        .actors
1556                        .keys()
1557                        .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1558                })
1559                .collect()
1560        };
1561
1562        let mut sink_actor_mapping = HashMap::new();
1563        for (actor_id, sink_id) in actor_with_type {
1564            sink_actor_mapping
1565                .entry(sink_id)
1566                .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1567                .1
1568                .push(actor_id);
1569        }
1570
1571        Ok(sink_actor_mapping)
1572    }
1573
1574    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1575        let inner = self.inner.read().await;
1576        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1577            .select_only()
1578            .columns([
1579                fragment::Column::FragmentId,
1580                fragment::Column::JobId,
1581                fragment::Column::StateTableIds,
1582            ])
1583            .into_partial_model()
1584            .all(&inner.db)
1585            .await?;
1586        Ok(fragment_state_tables)
1587    }
1588
1589    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1590    /// collected
1591    pub async fn load_all_actors_dynamic(
1592        &self,
1593        database_id: Option<DatabaseId>,
1594        worker_nodes: &ActiveStreamingWorkerNodes,
1595    ) -> MetaResult<FragmentRenderMap> {
1596        let loaded = self.load_fragment_context(database_id).await?;
1597
1598        if loaded.is_empty() {
1599            return Ok(HashMap::new());
1600        }
1601
1602        let RenderedGraph { fragments, .. } = render_actor_assignments(
1603            self.env.actor_id_generator(),
1604            worker_nodes.current(),
1605            &loaded,
1606        )?;
1607
1608        tracing::trace!(?fragments, "reload all actors");
1609
1610        Ok(fragments)
1611    }
1612
1613    /// Async load stage: collects all metadata required for rendering actor assignments.
1614    pub async fn load_fragment_context(
1615        &self,
1616        database_id: Option<DatabaseId>,
1617    ) -> MetaResult<LoadedFragmentContext> {
1618        let inner = self.inner.read().await;
1619        let txn = inner.db.begin().await?;
1620
1621        self.load_fragment_context_in_txn(&txn, database_id).await
1622    }
1623
1624    pub async fn load_fragment_context_in_txn<C>(
1625        &self,
1626        txn: &C,
1627        database_id: Option<DatabaseId>,
1628    ) -> MetaResult<LoadedFragmentContext>
1629    where
1630        C: ConnectionTrait,
1631    {
1632        let mut query = StreamingJob::find()
1633            .select_only()
1634            .column(streaming_job::Column::JobId);
1635
1636        if let Some(database_id) = database_id {
1637            query = query
1638                .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1639                .filter(object::Column::DatabaseId.eq(database_id));
1640        }
1641
1642        let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
1643
1644        let jobs: HashSet<JobId> = jobs.into_iter().collect();
1645
1646        if jobs.is_empty() {
1647            return Ok(LoadedFragmentContext::default());
1648        }
1649
1650        load_fragment_context_for_jobs(txn, jobs).await
1651    }
1652
1653    #[await_tree::instrument]
1654    pub async fn fill_snapshot_backfill_epoch(
1655        &self,
1656        fragment_ids: impl Iterator<Item = FragmentId>,
1657        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1658        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1659    ) -> MetaResult<()> {
1660        let inner = self.inner.write().await;
1661        let txn = inner.db.begin().await?;
1662        for fragment_id in fragment_ids {
1663            let fragment = FragmentModel::find_by_id(fragment_id)
1664                .one(&txn)
1665                .await?
1666                .context(format!("fragment {} not found", fragment_id))?;
1667            let mut node = fragment.stream_node.to_protobuf();
1668            if crate::stream::fill_snapshot_backfill_epoch(
1669                &mut node,
1670                snapshot_backfill_info,
1671                cross_db_snapshot_backfill_info,
1672            )? {
1673                let node = StreamNode::from(&node);
1674                FragmentModel::update(fragment::ActiveModel {
1675                    fragment_id: Set(fragment_id),
1676                    stream_node: Set(node),
1677                    ..Default::default()
1678                })
1679                .exec(&txn)
1680                .await?;
1681            }
1682        }
1683        txn.commit().await?;
1684        Ok(())
1685    }
1686
1687    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1688    pub fn get_running_actors_of_fragment(
1689        &self,
1690        fragment_id: FragmentId,
1691    ) -> MetaResult<HashSet<model::ActorId>> {
1692        let info = self.env.shared_actor_infos().read_guard();
1693
1694        let actors = info
1695            .get_fragment(fragment_id as _)
1696            .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1697            .unwrap_or_default();
1698
1699        Ok(actors)
1700    }
1701
1702    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1703    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1704    pub async fn get_running_actors_for_source_backfill(
1705        &self,
1706        source_backfill_fragment_id: FragmentId,
1707        source_fragment_id: FragmentId,
1708    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1709        let inner = self.inner.read().await;
1710        let txn = inner.db.begin().await?;
1711
1712        let fragment_relation: DispatcherType = FragmentRelation::find()
1713            .select_only()
1714            .column(fragment_relation::Column::DispatcherType)
1715            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1716            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1717            .into_tuple()
1718            .one(&txn)
1719            .await?
1720            .ok_or_else(|| {
1721                anyhow!(
1722                    "no fragment connection from source fragment {} to source backfill fragment {}",
1723                    source_fragment_id,
1724                    source_backfill_fragment_id
1725                )
1726            })?;
1727
1728        if fragment_relation != DispatcherType::NoShuffle {
1729            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1730        }
1731
1732        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1733            let result: MetaResult<DistributionType> = try {
1734                FragmentModel::find_by_id(fragment_id)
1735                    .select_only()
1736                    .column(fragment::Column::DistributionType)
1737                    .into_tuple()
1738                    .one(txn)
1739                    .await?
1740                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1741            };
1742            result
1743        };
1744
1745        let source_backfill_distribution_type =
1746            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1747        let source_distribution_type =
1748            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1749
1750        let load_fragment_actor_distribution =
1751            |actor_info: &SharedActorInfos,
1752             fragment_id: FragmentId|
1753             -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1754                let guard = actor_info.read_guard();
1755
1756                guard
1757                    .get_fragment(fragment_id as _)
1758                    .map(|fragment| {
1759                        fragment
1760                            .actors
1761                            .iter()
1762                            .map(|(actor_id, actor)| {
1763                                (
1764                                    *actor_id as _,
1765                                    actor
1766                                        .vnode_bitmap
1767                                        .as_ref()
1768                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1769                                )
1770                            })
1771                            .collect()
1772                    })
1773                    .unwrap_or_default()
1774            };
1775
1776        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1777            load_fragment_actor_distribution(
1778                self.env.shared_actor_infos(),
1779                source_backfill_fragment_id,
1780            );
1781
1782        let source_actors =
1783            load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1784
1785        Ok(resolve_no_shuffle_actor_mapping(
1786            source_distribution_type,
1787            source_actors.iter().map(|(&id, bitmap)| (id, bitmap)),
1788            source_backfill_distribution_type,
1789            source_backfill_actors
1790                .iter()
1791                .map(|(&id, bitmap)| (id, bitmap)),
1792        )
1793        .into_iter()
1794        .map(|(source_actor, source_backfill_actor)| {
1795            (source_backfill_actor as _, source_actor as _)
1796        })
1797        .collect())
1798    }
1799
1800    /// Get and filter the "**root**" fragments of the specified jobs.
1801    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1802    ///
1803    /// Root fragment connects to downstream jobs.
1804    ///
1805    /// ## What can be the root fragment
1806    /// - For sink, it should have one `Sink` fragment.
1807    /// - For MV, it should have one `MView` fragment.
1808    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1809    /// - For source, it should have one `Source` fragment.
1810    ///
1811    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1812    pub async fn get_root_fragments(
1813        &self,
1814        job_ids: Vec<JobId>,
1815    ) -> MetaResult<HashMap<JobId, Fragment>> {
1816        let inner = self.inner.read().await;
1817
1818        let all_fragments = FragmentModel::find()
1819            .filter(fragment::Column::JobId.is_in(job_ids))
1820            .all(&inner.db)
1821            .await?;
1822        // job_id -> fragment
1823        let mut root_fragments = HashMap::<JobId, Fragment>::new();
1824        for fragment in all_fragments {
1825            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1826            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1827                _ = root_fragments.insert(fragment.job_id, fragment.into());
1828            } else if mask.contains(FragmentTypeFlag::Source) {
1829                // look for Source fragment only if there's no MView fragment
1830                // (notice try_insert here vs insert above)
1831                _ = root_fragments.try_insert(fragment.job_id, fragment.into());
1832            }
1833        }
1834
1835        Ok(root_fragments)
1836    }
1837
1838    pub async fn get_root_fragment(&self, job_id: JobId) -> MetaResult<Fragment> {
1839        let mut root_fragments = self.get_root_fragments(vec![job_id]).await?;
1840        let root_fragment = root_fragments
1841            .remove(&job_id)
1842            .context(format!("root fragment for job {} not found", job_id))?;
1843
1844        Ok(root_fragment)
1845    }
1846
1847    /// Get the downstream fragments connected to the specified job.
1848    pub async fn get_downstream_fragments(
1849        &self,
1850        job_id: JobId,
1851    ) -> MetaResult<Vec<(stream_plan::DispatcherType, Fragment)>> {
1852        let root_fragment = self.get_root_fragment(job_id).await?;
1853
1854        let inner = self.inner.read().await;
1855        let txn = inner.db.begin().await?;
1856        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1857            .filter(
1858                fragment_relation::Column::SourceFragmentId
1859                    .eq(root_fragment.fragment_id as FragmentId),
1860            )
1861            .all(&txn)
1862            .await?;
1863
1864        let downstream_fragment_ids = downstream_fragment_relations
1865            .iter()
1866            .map(|model| model.target_fragment_id as FragmentId)
1867            .collect::<HashSet<_>>();
1868
1869        let downstream_fragments: Vec<fragment::Model> = FragmentModel::find()
1870            .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1871            .all(&txn)
1872            .await?;
1873
1874        let mut downstream_fragments_map: HashMap<_, _> = downstream_fragments
1875            .into_iter()
1876            .map(|fragment| (fragment.fragment_id, fragment))
1877            .collect();
1878
1879        let mut downstream_fragments = vec![];
1880
1881        let fragment_map: HashMap<_, _> = downstream_fragment_relations
1882            .iter()
1883            .map(|model| (model.target_fragment_id, model.dispatcher_type))
1884            .collect();
1885
1886        for (fragment_id, dispatcher_type) in fragment_map {
1887            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1888
1889            let fragment = downstream_fragments_map
1890                .remove(&fragment_id)
1891                .context(format!(
1892                    "downstream fragment node for id {} not found",
1893                    fragment_id
1894                ))?
1895                .into();
1896
1897            downstream_fragments.push((dispatch_type, fragment));
1898        }
1899        Ok(downstream_fragments)
1900    }
1901
1902    pub async fn load_source_fragment_ids(
1903        &self,
1904    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1905        let inner = self.inner.read().await;
1906        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1907            .select_only()
1908            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1909            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1910            .into_tuple()
1911            .all(&inner.db)
1912            .await?;
1913
1914        let mut source_fragment_ids = HashMap::new();
1915        for (fragment_id, stream_node) in fragments {
1916            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1917                source_fragment_ids
1918                    .entry(source_id)
1919                    .or_insert_with(BTreeSet::new)
1920                    .insert(fragment_id);
1921            }
1922        }
1923        Ok(source_fragment_ids)
1924    }
1925
1926    pub async fn load_backfill_fragment_ids(
1927        &self,
1928    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1929        let inner = self.inner.read().await;
1930        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1931            .select_only()
1932            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1933            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1934            .into_tuple()
1935            .all(&inner.db)
1936            .await?;
1937
1938        let mut source_fragment_ids = HashMap::new();
1939        for (fragment_id, stream_node) in fragments {
1940            if let Some((source_id, upstream_source_fragment_id)) =
1941                stream_node.to_protobuf().find_source_backfill()
1942            {
1943                source_fragment_ids
1944                    .entry(source_id)
1945                    .or_insert_with(BTreeSet::new)
1946                    .insert((fragment_id, upstream_source_fragment_id));
1947            }
1948        }
1949        Ok(source_fragment_ids)
1950    }
1951
1952    pub async fn get_all_upstream_sink_infos(
1953        &self,
1954        target_table: &PbTable,
1955        target_fragment_id: FragmentId,
1956    ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1957        let inner = self.inner.read().await;
1958        let txn = inner.db.begin().await?;
1959
1960        self.get_all_upstream_sink_infos_in_txn(&txn, target_table, target_fragment_id)
1961            .await
1962    }
1963
1964    pub async fn get_all_upstream_sink_infos_in_txn<C>(
1965        &self,
1966        txn: &C,
1967        target_table: &PbTable,
1968        target_fragment_id: FragmentId,
1969    ) -> MetaResult<Vec<UpstreamSinkInfo>>
1970    where
1971        C: ConnectionTrait,
1972    {
1973        let incoming_sinks = Sink::find()
1974            .filter(sink::Column::TargetTable.eq(target_table.id))
1975            .all(txn)
1976            .await?;
1977
1978        let sink_ids = incoming_sinks.iter().map(|s| s.sink_id).collect_vec();
1979        let sink_fragment_ids = get_sink_fragment_by_ids(txn, sink_ids).await?;
1980
1981        let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1982        for sink in &incoming_sinks {
1983            let sink_fragment_id =
1984                sink_fragment_ids
1985                    .get(&sink.sink_id)
1986                    .cloned()
1987                    .ok_or(anyhow::anyhow!(
1988                        "sink fragment not found for sink id {}",
1989                        sink.sink_id
1990                    ))?;
1991            let upstream_info = build_upstream_sink_info(
1992                sink.sink_id,
1993                sink.original_target_columns
1994                    .as_ref()
1995                    .map(|cols| cols.to_protobuf())
1996                    .unwrap_or_default(),
1997                sink_fragment_id,
1998                target_table,
1999                target_fragment_id,
2000            )?;
2001            upstream_sink_infos.push(upstream_info);
2002        }
2003
2004        Ok(upstream_sink_infos)
2005    }
2006
2007    pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
2008        let inner = self.inner.read().await;
2009        let txn = inner.db.begin().await?;
2010
2011        let mview_fragment: Vec<FragmentId> = FragmentModel::find()
2012            .select_only()
2013            .column(fragment::Column::FragmentId)
2014            .filter(
2015                fragment::Column::JobId
2016                    .eq(job_id)
2017                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2018            )
2019            .into_tuple()
2020            .all(&txn)
2021            .await?;
2022
2023        if mview_fragment.len() != 1 {
2024            return Err(anyhow::anyhow!(
2025                "expected exactly one mview fragment for job {}, found {}",
2026                job_id,
2027                mview_fragment.len()
2028            )
2029            .into());
2030        }
2031
2032        Ok(mview_fragment.into_iter().next().unwrap())
2033    }
2034
2035    pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
2036        let inner = self.inner.read().await;
2037        let txn = inner.db.begin().await?;
2038        has_table_been_migrated(&txn, table_id).await
2039    }
2040
2041    pub async fn update_fragment_splits<C>(
2042        &self,
2043        txn: &C,
2044        fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
2045    ) -> MetaResult<()>
2046    where
2047        C: ConnectionTrait,
2048    {
2049        if fragment_splits.is_empty() {
2050            return Ok(());
2051        }
2052
2053        let existing_fragment_ids: HashSet<FragmentId> = FragmentModel::find()
2054            .select_only()
2055            .column(fragment::Column::FragmentId)
2056            .filter(fragment::Column::FragmentId.is_in(fragment_splits.keys().copied()))
2057            .into_tuple()
2058            .all(txn)
2059            .await?
2060            .into_iter()
2061            .collect();
2062
2063        // Filter out stale fragment ids to avoid FK violations when split updates race with drop.
2064        let (models, skipped_fragment_ids): (Vec<_>, Vec<_>) = fragment_splits
2065            .iter()
2066            .partition_map(|(fragment_id, splits)| {
2067                if existing_fragment_ids.contains(fragment_id) {
2068                    Either::Left(fragment_splits::ActiveModel {
2069                        fragment_id: Set(*fragment_id as _),
2070                        splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
2071                            splits: splits.iter().map(Into::into).collect_vec(),
2072                        }))),
2073                    })
2074                } else {
2075                    Either::Right(*fragment_id)
2076                }
2077            });
2078
2079        if !skipped_fragment_ids.is_empty() {
2080            tracing::warn!(
2081                skipped_fragment_ids = ?skipped_fragment_ids,
2082                total_fragment_ids = fragment_splits.len(),
2083                "skipping stale fragment split updates for missing fragments"
2084            );
2085        }
2086
2087        if models.is_empty() {
2088            return Ok(());
2089        }
2090
2091        FragmentSplits::insert_many(models)
2092            .on_conflict(
2093                OnConflict::column(fragment_splits::Column::FragmentId)
2094                    .update_column(fragment_splits::Column::Splits)
2095                    .to_owned(),
2096            )
2097            .exec(txn)
2098            .await?;
2099
2100        Ok(())
2101    }
2102}
2103
2104#[cfg(test)]
2105mod tests {
2106    use std::collections::{BTreeMap, HashMap, HashSet};
2107
2108    use itertools::Itertools;
2109    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
2110    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
2111    use risingwave_common::id::JobId;
2112    use risingwave_common::util::iter_util::ZipEqDebug;
2113    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
2114    use risingwave_meta_model::fragment::DistributionType;
2115    use risingwave_meta_model::*;
2116    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
2117    use risingwave_pb::plan_common::PbExprContext;
2118    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
2119    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
2120    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
2121
2122    use super::ActorInfo;
2123    use crate::MetaResult;
2124    use crate::controller::catalog::CatalogController;
2125    use crate::model::{Fragment, StreamActor};
2126
2127    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
2128
2129    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
2130
2131    const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
2132
2133    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
2134
2135    const TEST_JOB_ID: JobId = JobId::new(1);
2136
2137    const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
2138
2139    fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
2140        let mut upstream_actor_ids = BTreeMap::new();
2141        upstream_actor_ids.insert(
2142            TEST_UPSTREAM_FRAGMENT_ID,
2143            HashSet::from_iter([(actor_id + 100)]),
2144        );
2145        upstream_actor_ids.insert(
2146            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
2147            HashSet::from_iter([(actor_id + 200)]),
2148        );
2149        upstream_actor_ids
2150    }
2151
2152    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
2153        let mut input = vec![];
2154        for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
2155            input.push(PbStreamNode {
2156                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
2157                    upstream_fragment_id,
2158                    ..Default::default()
2159                }))),
2160                ..Default::default()
2161            });
2162        }
2163
2164        PbStreamNode {
2165            input,
2166            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
2167            ..Default::default()
2168        }
2169    }
2170
2171    #[tokio::test]
2172    async fn test_extract_fragment() -> MetaResult<()> {
2173        let actor_count = 3u32;
2174        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2175            .map(|actor_id| {
2176                (
2177                    actor_id.into(),
2178                    generate_upstream_actor_ids_for_actor(actor_id.into()),
2179                )
2180            })
2181            .collect();
2182
2183        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
2184
2185        let pb_fragment = Fragment {
2186            fragment_id: TEST_FRAGMENT_ID as _,
2187            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
2188            distribution_type: PbFragmentDistributionType::Hash as _,
2189            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
2190            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
2191            nodes: stream_node,
2192        };
2193
2194        let fragment =
2195            CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
2196
2197        check_fragment(fragment, pb_fragment);
2198
2199        Ok(())
2200    }
2201
2202    #[tokio::test]
2203    async fn test_compose_fragment() -> MetaResult<()> {
2204        let actor_count = 3u32;
2205
2206        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2207            .map(|actor_id| {
2208                (
2209                    actor_id.into(),
2210                    generate_upstream_actor_ids_for_actor(actor_id.into()),
2211                )
2212            })
2213            .collect();
2214
2215        let mut actor_bitmaps = ActorMapping::new_uniform(
2216            (0..actor_count).map(|i| i.into()),
2217            VirtualNode::COUNT_FOR_TEST,
2218        )
2219        .to_bitmaps();
2220
2221        let actors = (0..actor_count)
2222            .map(|actor_id| {
2223                let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
2224                    splits: vec![PbConnectorSplit {
2225                        split_type: "dummy".to_owned(),
2226                        ..Default::default()
2227                    }],
2228                });
2229
2230                ActorInfo {
2231                    actor_id: actor_id.into(),
2232                    fragment_id: TEST_FRAGMENT_ID,
2233                    splits: actor_splits,
2234                    worker_id: 0.into(),
2235                    vnode_bitmap: actor_bitmaps
2236                        .remove(&actor_id)
2237                        .map(|bitmap| bitmap.to_protobuf())
2238                        .as_ref()
2239                        .map(VnodeBitmap::from),
2240                    expr_context: ExprContext::from(&PbExprContext {
2241                        time_zone: String::from("America/New_York"),
2242                        strict_mode: false,
2243                    }),
2244                    config_override: "a.b.c = true".into(),
2245                }
2246            })
2247            .collect_vec();
2248
2249        let stream_node = {
2250            let template_actor = actors.first().cloned().unwrap();
2251
2252            let template_upstream_actor_ids =
2253                upstream_actor_ids.get(&template_actor.actor_id).unwrap();
2254
2255            generate_merger_stream_node(template_upstream_actor_ids)
2256        };
2257
2258        #[expect(deprecated)]
2259        let fragment = fragment::Model {
2260            fragment_id: TEST_FRAGMENT_ID,
2261            job_id: TEST_JOB_ID,
2262            fragment_type_mask: 0,
2263            distribution_type: DistributionType::Hash,
2264            stream_node: StreamNode::from(&stream_node),
2265            state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
2266            upstream_fragment_id: Default::default(),
2267            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2268            parallelism: None,
2269        };
2270
2271        let (pb_fragment, pb_actors, pb_actor_status, pb_actor_splits) =
2272            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2273
2274        assert_eq!(pb_actor_status.len(), actor_count as usize);
2275        assert!(
2276            pb_actor_status
2277                .values()
2278                .all(|actor_status| actor_status.location.is_some())
2279        );
2280        assert_eq!(pb_actor_splits.len(), actor_count as usize);
2281
2282        check_fragment(fragment, pb_fragment);
2283        check_actors(
2284            actors,
2285            &upstream_actor_ids,
2286            pb_actors,
2287            pb_actor_splits,
2288            &stream_node,
2289        );
2290
2291        Ok(())
2292    }
2293
2294    fn check_actors(
2295        actors: Vec<ActorInfo>,
2296        actor_upstreams: &FragmentActorUpstreams,
2297        pb_actors: Vec<StreamActor>,
2298        pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2299        stream_node: &PbStreamNode,
2300    ) {
2301        for (
2302            ActorInfo {
2303                actor_id,
2304                fragment_id,
2305                splits,
2306                worker_id: _,
2307                vnode_bitmap,
2308                expr_context,
2309                ..
2310            },
2311            StreamActor {
2312                actor_id: pb_actor_id,
2313                fragment_id: pb_fragment_id,
2314                vnode_bitmap: pb_vnode_bitmap,
2315                mview_definition,
2316                expr_context: pb_expr_context,
2317                ..
2318            },
2319        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2320        {
2321            assert_eq!(actor_id, pb_actor_id as ActorId);
2322            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2323
2324            assert_eq!(
2325                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2326                pb_vnode_bitmap,
2327            );
2328
2329            assert_eq!(mview_definition, "");
2330
2331            visit_stream_node_body(stream_node, |body| {
2332                if let PbNodeBody::Merge(m) = body {
2333                    assert!(
2334                        actor_upstreams
2335                            .get(&actor_id)
2336                            .unwrap()
2337                            .contains_key(&m.upstream_fragment_id)
2338                    );
2339                }
2340            });
2341
2342            assert_eq!(
2343                splits,
2344                pb_actor_splits
2345                    .get(&pb_actor_id)
2346                    .map(ConnectorSplits::from)
2347                    .unwrap_or_default()
2348            );
2349
2350            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2351        }
2352    }
2353
2354    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2355        let Fragment {
2356            fragment_id,
2357            fragment_type_mask,
2358            distribution_type: pb_distribution_type,
2359            state_table_ids: pb_state_table_ids,
2360            maybe_vnode_count: _,
2361            nodes,
2362        } = pb_fragment;
2363
2364        assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2365        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2366        assert_eq!(
2367            pb_distribution_type,
2368            PbFragmentDistributionType::from(fragment.distribution_type)
2369        );
2370
2371        assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2372        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2373    }
2374
2375    #[test]
2376    fn test_parallelism_policy_with_root_fragments() {
2377        #[expect(deprecated)]
2378        let fragment = fragment::Model {
2379            fragment_id: 3.into(),
2380            job_id: TEST_JOB_ID,
2381            fragment_type_mask: 0,
2382            distribution_type: DistributionType::Hash,
2383            stream_node: StreamNode::from(&PbStreamNode::default()),
2384            state_table_ids: TableIdArray::default(),
2385            upstream_fragment_id: Default::default(),
2386            vnode_count: 0,
2387            parallelism: None,
2388        };
2389
2390        let job_parallelism = StreamingParallelism::Fixed(4);
2391
2392        let policy = super::CatalogController::format_fragment_parallelism_policy(
2393            fragment.distribution_type,
2394            fragment.parallelism.as_ref(),
2395            Some(&job_parallelism),
2396            None,
2397            &[],
2398        );
2399
2400        assert_eq!(policy, "inherit(4)");
2401    }
2402
2403    #[test]
2404    fn test_parallelism_policy_with_adaptive_strategy() {
2405        #[expect(deprecated)]
2406        let fragment = fragment::Model {
2407            fragment_id: 4.into(),
2408            job_id: TEST_JOB_ID,
2409            fragment_type_mask: 0,
2410            distribution_type: DistributionType::Hash,
2411            stream_node: StreamNode::from(&PbStreamNode::default()),
2412            state_table_ids: TableIdArray::default(),
2413            upstream_fragment_id: Default::default(),
2414            vnode_count: 0,
2415            parallelism: None,
2416        };
2417
2418        let job_parallelism = StreamingParallelism::Adaptive;
2419
2420        let policy = super::CatalogController::format_fragment_parallelism_policy(
2421            fragment.distribution_type,
2422            fragment.parallelism.as_ref(),
2423            Some(&job_parallelism),
2424            Some("RATIO(0.5)"),
2425            &[],
2426        );
2427
2428        assert_eq!(policy, "inherit(ratio(0.5))");
2429    }
2430
2431    #[test]
2432    fn test_parallelism_policy_with_custom_strategy() {
2433        #[expect(deprecated)]
2434        let fragment = fragment::Model {
2435            fragment_id: 6.into(),
2436            job_id: TEST_JOB_ID,
2437            fragment_type_mask: 0,
2438            distribution_type: DistributionType::Hash,
2439            stream_node: StreamNode::from(&PbStreamNode::default()),
2440            state_table_ids: TableIdArray::default(),
2441            upstream_fragment_id: Default::default(),
2442            vnode_count: 0,
2443            parallelism: None,
2444        };
2445
2446        let job_parallelism = StreamingParallelism::Custom;
2447
2448        let policy = super::CatalogController::format_fragment_parallelism_policy(
2449            fragment.distribution_type,
2450            fragment.parallelism.as_ref(),
2451            Some(&job_parallelism),
2452            Some("BOUNDED(8)"),
2453            &[],
2454        );
2455
2456        assert_eq!(policy, "inherit(bounded(8))");
2457    }
2458
2459    #[test]
2460    fn test_parallelism_policy_with_invalid_adaptive_strategy_falls_back() {
2461        #[expect(deprecated)]
2462        let fragment = fragment::Model {
2463            fragment_id: 7.into(),
2464            job_id: TEST_JOB_ID,
2465            fragment_type_mask: 0,
2466            distribution_type: DistributionType::Hash,
2467            stream_node: StreamNode::from(&PbStreamNode::default()),
2468            state_table_ids: TableIdArray::default(),
2469            upstream_fragment_id: Default::default(),
2470            vnode_count: 0,
2471            parallelism: None,
2472        };
2473
2474        let job_parallelism = StreamingParallelism::Adaptive;
2475
2476        let policy = super::CatalogController::format_fragment_parallelism_policy(
2477            fragment.distribution_type,
2478            fragment.parallelism.as_ref(),
2479            Some(&job_parallelism),
2480            Some("NOT_A_STRATEGY"),
2481            &[],
2482        );
2483
2484        assert_eq!(policy, "inherit(adaptive)");
2485    }
2486
2487    #[test]
2488    fn test_parallelism_policy_with_upstream_roots() {
2489        #[expect(deprecated)]
2490        let fragment = fragment::Model {
2491            fragment_id: 5.into(),
2492            job_id: TEST_JOB_ID,
2493            fragment_type_mask: 0,
2494            distribution_type: DistributionType::Hash,
2495            stream_node: StreamNode::from(&PbStreamNode::default()),
2496            state_table_ids: TableIdArray::default(),
2497            upstream_fragment_id: Default::default(),
2498            vnode_count: 0,
2499            parallelism: None,
2500        };
2501
2502        let policy = super::CatalogController::format_fragment_parallelism_policy(
2503            fragment.distribution_type,
2504            fragment.parallelism.as_ref(),
2505            None,
2506            None,
2507            &[3.into(), 1.into(), 2.into(), 1.into()],
2508        );
2509
2510        assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2511    }
2512}