Skip to main content

risingwave_meta/controller/
fragment.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::{Either, Itertools};
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::AdaptiveParallelismStrategy;
28use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
29use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
30use risingwave_connector::source::SplitImpl;
31use risingwave_connector::source::cdc::CdcScanOptions;
32use risingwave_meta_model::fragment::DistributionType;
33use risingwave_meta_model::object::ObjectType;
34use risingwave_meta_model::prelude::{
35    Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
36};
37use risingwave_meta_model::{
38    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
39    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
40    TableIdArray, VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits,
41    object, sink, source, streaming_job, table,
42};
43use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
44use risingwave_pb::catalog::PbTable;
45use risingwave_pb::common::PbActorLocation;
46use risingwave_pb::meta::subscribe_response::{
47    Info as NotificationInfo, Operation as NotificationOperation,
48};
49use risingwave_pb::meta::table_fragments::fragment::{
50    FragmentDistributionType, PbFragmentDistributionType,
51};
52use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
53use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
54use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
55use risingwave_pb::stream_plan;
56use risingwave_pb::stream_plan::stream_node::NodeBody;
57use risingwave_pb::stream_plan::{
58    PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, SinkLogStoreType,
59    StreamScanType,
60};
61use sea_orm::ActiveValue::Set;
62use sea_orm::sea_query::Expr;
63use sea_orm::{
64    ColumnTrait, ConnectionTrait, DatabaseTransaction, EntityTrait, FromQueryResult, JoinType,
65    PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, StreamTrait, TransactionTrait,
66};
67use serde::{Deserialize, Serialize};
68
69use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
70use crate::controller::catalog::CatalogController;
71use crate::controller::scale::{
72    FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble, RenderedGraph,
73    find_fragment_no_shuffle_dags_detailed, load_fragment_context_for_jobs,
74    render_actor_assignments, resolve_streaming_job_definition,
75};
76use crate::controller::utils::{
77    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
78    get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
79    resolve_no_shuffle_actor_mapping,
80};
81use crate::error::MetaError;
82use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
83use crate::model::{
84    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
85    StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
86};
87use crate::rpc::ddl_controller::build_upstream_sink_info;
88use crate::stream::UpstreamSinkInfo;
89use crate::{MetaResult, model};
90
91/// Some information of running (inflight) actors.
92#[derive(Debug)]
93pub struct InflightActorInfo {
94    pub worker_id: WorkerId,
95    pub vnode_bitmap: Option<Bitmap>,
96    pub splits: Vec<SplitImpl>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100struct ActorInfo {
101    pub actor_id: ActorId,
102    pub fragment_id: FragmentId,
103    pub splits: ConnectorSplits,
104    pub worker_id: WorkerId,
105    pub vnode_bitmap: Option<VnodeBitmap>,
106    pub expr_context: ExprContext,
107    pub config_override: Arc<str>,
108}
109
110#[derive(Debug)]
111struct FragmentDescRow {
112    fragment_id: FragmentId,
113    job_id: JobId,
114    fragment_type_mask: i32,
115    distribution_type: DistributionType,
116    state_table_ids: TableIdArray,
117    vnode_count: i32,
118    parallelism_override: Option<StreamingParallelism>,
119    stream_node: Option<StreamNode>,
120}
121
122#[derive(Debug)]
123pub struct InflightFragmentInfo {
124    pub fragment_id: FragmentId,
125    pub distribution_type: DistributionType,
126    pub fragment_type_mask: FragmentTypeMask,
127    pub vnode_count: usize,
128    pub nodes: PbStreamNode,
129    pub actors: HashMap<ActorId, InflightActorInfo>,
130    pub state_table_ids: HashSet<TableId>,
131}
132
133#[derive(Clone, Debug)]
134pub struct FragmentParallelismInfo {
135    pub distribution_type: FragmentDistributionType,
136    pub vnode_count: usize,
137}
138
139#[easy_ext::ext(FragmentTypeMaskExt)]
140pub impl FragmentTypeMask {
141    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
142        Expr::col(fragment::Column::FragmentTypeMask)
143            .bit_and(Expr::value(flag as i32))
144            .ne(0)
145    }
146
147    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
148        Expr::col(fragment::Column::FragmentTypeMask)
149            .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
150            .ne(0)
151    }
152
153    fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
154        Expr::col(fragment::Column::FragmentTypeMask)
155            .bit_and(Expr::value(flag as i32))
156            .eq(0)
157    }
158}
159
160#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
161#[serde(rename_all = "camelCase")] // for dashboard
162pub struct StreamingJobInfo {
163    pub job_id: JobId,
164    pub obj_type: ObjectType,
165    pub name: String,
166    pub job_status: JobStatus,
167    pub parallelism: StreamingParallelism,
168    pub adaptive_parallelism_strategy: Option<String>,
169    // These backfill fields are only used to derive the effective parallelism shown for
170    // in-progress jobs. They are skipped in JSON responses so existing dashboard payloads keep
171    // the same schema.
172    #[serde(skip)]
173    pub backfill_parallelism: Option<StreamingParallelism>,
174    #[serde(skip)]
175    pub backfill_adaptive_parallelism_strategy: Option<String>,
176    pub max_parallelism: i32,
177    pub resource_group: String,
178    pub config_override: String,
179    pub database_id: DatabaseId,
180    pub schema_id: SchemaId,
181}
182
183impl NotificationManager {
184    /// Notify frontend about streaming worker slot mapping changes.
185    ///
186    /// This only sends streaming mapping updates to frontends. Serving mapping
187    /// notifications are decoupled and driven by fragment model changes (insert/delete)
188    /// instead of barrier-driven actor set changes.
189    pub(crate) fn notify_streaming_fragment_mapping(
190        &self,
191        operation: NotificationOperation,
192        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
193    ) {
194        if fragment_mappings.is_empty() {
195            return;
196        }
197        // notify all fragment mappings to frontend.
198        for fragment_mapping in fragment_mappings {
199            self.notify_frontend_without_version(
200                operation,
201                NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
202            );
203        }
204    }
205
206    /// Notify the serving module about fragment mapping changes.
207    ///
208    /// This should be called when fragments are inserted into or deleted from the meta store,
209    /// so that serving vnode mappings are kept in sync with the fragment model.
210    pub(crate) fn notify_serving_fragment_mapping_update(
211        &self,
212        fragment_ids: Vec<crate::model::FragmentId>,
213    ) {
214        if fragment_ids.is_empty() {
215            return;
216        }
217        self.notify_local_subscribers(LocalNotification::ServingFragmentMappingsUpsert(
218            fragment_ids,
219        ));
220    }
221
222    /// Notify the serving module about fragment mapping deletions.
223    ///
224    /// This should be called when fragments are deleted from the meta store,
225    /// so that serving vnode mappings are cleaned up.
226    pub(crate) fn notify_serving_fragment_mapping_delete(
227        &self,
228        fragment_ids: Vec<crate::model::FragmentId>,
229    ) {
230        if fragment_ids.is_empty() {
231            return;
232        }
233        self.notify_local_subscribers(LocalNotification::ServingFragmentMappingsDelete(
234            fragment_ids,
235        ));
236    }
237}
238
239impl CatalogController {
240    pub fn prepare_fragment_models_from_fragments(
241        job_id: JobId,
242        fragments: impl Iterator<Item = &Fragment>,
243    ) -> MetaResult<Vec<fragment::Model>> {
244        fragments
245            .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
246            .try_collect()
247    }
248
249    pub fn prepare_fragment_model_for_new_job(
250        job_id: JobId,
251        fragment: &Fragment,
252    ) -> MetaResult<fragment::Model> {
253        let vnode_count = fragment.vnode_count();
254        let Fragment {
255            fragment_id: pb_fragment_id,
256            fragment_type_mask: pb_fragment_type_mask,
257            distribution_type: pb_distribution_type,
258            state_table_ids: pb_state_table_ids,
259            nodes,
260            ..
261        } = fragment;
262
263        let state_table_ids = pb_state_table_ids.clone().into();
264
265        let fragment_parallelism = nodes
266            .node_body
267            .as_ref()
268            .and_then(|body| match body {
269                NodeBody::StreamCdcScan(node) => Some(node),
270                _ => None,
271            })
272            .and_then(|node| node.options.as_ref())
273            .map(CdcScanOptions::from_proto)
274            .filter(|opts| opts.is_parallelized_backfill())
275            .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
276
277        let stream_node = StreamNode::from(nodes);
278
279        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
280            .unwrap()
281            .into();
282
283        #[expect(deprecated)]
284        let fragment = fragment::Model {
285            fragment_id: *pb_fragment_id as _,
286            job_id,
287            fragment_type_mask: (*pb_fragment_type_mask).into(),
288            distribution_type,
289            stream_node,
290            state_table_ids,
291            upstream_fragment_id: Default::default(),
292            vnode_count: vnode_count as _,
293            parallelism: fragment_parallelism,
294        };
295
296        Ok(fragment)
297    }
298
299    #[expect(clippy::type_complexity)]
300    fn compose_table_fragments(
301        job_id: JobId,
302        state: PbState,
303        ctx: StreamContext,
304        fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
305        max_parallelism: usize,
306        job_definition: Option<String>,
307    ) -> MetaResult<(
308        StreamJobFragments,
309        HashMap<FragmentId, Vec<StreamActor>>,
310        HashMap<crate::model::ActorId, PbActorStatus>,
311    )> {
312        let mut pb_fragments = BTreeMap::new();
313        let mut fragment_actors = HashMap::new();
314        let mut all_actor_status = HashMap::new();
315
316        for (fragment, actors) in fragments {
317            let (fragment, actors, actor_status, _) =
318                Self::compose_fragment(fragment, actors, job_definition.clone())?;
319            let fragment_id = fragment.fragment_id;
320            fragment_actors.insert(fragment_id, actors);
321            all_actor_status.extend(actor_status);
322
323            pb_fragments.insert(fragment_id, fragment);
324        }
325
326        let table_fragments = StreamJobFragments {
327            stream_job_id: job_id,
328            state: state as _,
329            fragments: pb_fragments,
330            ctx,
331            max_parallelism,
332        };
333
334        Ok((table_fragments, fragment_actors, all_actor_status))
335    }
336
337    #[expect(clippy::type_complexity)]
338    fn compose_fragment(
339        fragment: fragment::Model,
340        actors: Vec<ActorInfo>,
341        job_definition: Option<String>,
342    ) -> MetaResult<(
343        Fragment,
344        Vec<StreamActor>,
345        HashMap<crate::model::ActorId, PbActorStatus>,
346        HashMap<crate::model::ActorId, PbConnectorSplits>,
347    )> {
348        let fragment::Model {
349            fragment_id,
350            fragment_type_mask,
351            distribution_type,
352            stream_node,
353            state_table_ids,
354            vnode_count,
355            ..
356        } = fragment;
357
358        let stream_node = stream_node.to_protobuf();
359        let mut upstream_fragments = HashSet::new();
360        visit_stream_node_body(&stream_node, |body| {
361            if let NodeBody::Merge(m) = body {
362                assert!(
363                    upstream_fragments.insert(m.upstream_fragment_id),
364                    "non-duplicate upstream fragment"
365                );
366            }
367        });
368
369        let mut pb_actors = vec![];
370
371        let mut pb_actor_status = HashMap::new();
372        let mut pb_actor_splits = HashMap::new();
373
374        for actor in actors {
375            if actor.fragment_id != fragment_id {
376                bail!(
377                    "fragment id {} from actor {} is different from fragment {}",
378                    actor.fragment_id,
379                    actor.actor_id,
380                    fragment_id
381                )
382            }
383
384            let ActorInfo {
385                actor_id,
386                fragment_id,
387                worker_id,
388                splits,
389                vnode_bitmap,
390                expr_context,
391                config_override,
392                ..
393            } = actor;
394
395            let vnode_bitmap =
396                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
397            let pb_expr_context = Some(expr_context.to_protobuf());
398
399            pb_actor_status.insert(
400                actor_id as _,
401                PbActorStatus {
402                    location: PbActorLocation::from_worker(worker_id),
403                },
404            );
405
406            pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
407
408            pb_actors.push(StreamActor {
409                actor_id: actor_id as _,
410                fragment_id: fragment_id as _,
411                vnode_bitmap,
412                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
413                expr_context: pb_expr_context,
414                config_override,
415            })
416        }
417
418        let pb_state_table_ids = state_table_ids.0;
419        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
420        let pb_fragment = Fragment {
421            fragment_id: fragment_id as _,
422            fragment_type_mask: fragment_type_mask.into(),
423            distribution_type: pb_distribution_type,
424            state_table_ids: pb_state_table_ids,
425            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
426            nodes: stream_node,
427        };
428
429        Ok((pb_fragment, pb_actors, pb_actor_status, pb_actor_splits))
430    }
431
432    /// Returns distribution type and vnode count for all (or filtered) fragments.
433    ///
434    /// Reads directly from the persistent catalog (fragment table) rather than
435    /// from the in-memory `shared_actor_infos`.  This is critical because the
436    /// serving vnode mapping must be available even before the barrier manager's
437    /// recovery has completed and populated `shared_actor_infos`.
438    pub async fn fragment_parallelisms(
439        &self,
440    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
441        let inner = self.inner.read().await;
442        let query = FragmentModel::find().select_only().columns([
443            fragment::Column::FragmentId,
444            fragment::Column::DistributionType,
445            fragment::Column::VnodeCount,
446        ]);
447        let fragments: Vec<(FragmentId, DistributionType, i32)> =
448            query.into_tuple().all(&inner.db).await?;
449
450        Ok(fragments
451            .into_iter()
452            .map(|(fragment_id, distribution_type, vnode_count)| {
453                (
454                    fragment_id,
455                    FragmentParallelismInfo {
456                        distribution_type: PbFragmentDistributionType::from(distribution_type),
457                        vnode_count: vnode_count as usize,
458                    },
459                )
460            })
461            .collect())
462    }
463
464    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
465        let inner = self.inner.read().await;
466        let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
467            .select_only()
468            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
469            .into_tuple()
470            .all(&inner.db)
471            .await?;
472        Ok(fragment_jobs.into_iter().collect())
473    }
474
475    pub async fn get_fragment_job_id(
476        &self,
477        fragment_ids: Vec<FragmentId>,
478    ) -> MetaResult<Vec<ObjectId>> {
479        let inner = self.inner.read().await;
480        self.get_fragment_job_id_in_txn(&inner.db, fragment_ids)
481            .await
482    }
483
484    pub async fn get_fragment_job_id_in_txn<C>(
485        &self,
486        txn: &C,
487        fragment_ids: Vec<FragmentId>,
488    ) -> MetaResult<Vec<ObjectId>>
489    where
490        C: ConnectionTrait + Send,
491    {
492        let object_ids: Vec<ObjectId> = FragmentModel::find()
493            .select_only()
494            .column(fragment::Column::JobId)
495            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
496            .into_tuple()
497            .all(txn)
498            .await?;
499
500        Ok(object_ids)
501    }
502
503    pub async fn get_fragment_desc_by_id(
504        &self,
505        fragment_id: FragmentId,
506    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
507        let inner = self.inner.read().await;
508
509        let fragment_model = match FragmentModel::find_by_id(fragment_id)
510            .one(&inner.db)
511            .await?
512        {
513            Some(fragment) => fragment,
514            None => return Ok(None),
515        };
516
517        let job_parallelism: Option<(StreamingParallelism, Option<String>)> =
518            StreamingJob::find_by_id(fragment_model.job_id)
519                .select_only()
520                .columns([
521                    streaming_job::Column::Parallelism,
522                    streaming_job::Column::AdaptiveParallelismStrategy,
523                ])
524                .into_tuple::<(StreamingParallelism, Option<String>)>()
525                .one(&inner.db)
526                .await?;
527
528        let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
529            .select_only()
530            .columns([
531                fragment_relation::Column::SourceFragmentId,
532                fragment_relation::Column::DispatcherType,
533            ])
534            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
535            .into_tuple()
536            .all(&inner.db)
537            .await?;
538
539        let upstreams: Vec<_> = upstream_entries
540            .into_iter()
541            .map(|(source_id, _)| source_id)
542            .collect();
543
544        let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
545            .await
546            .map(Self::collect_root_fragment_mapping)?;
547        let root_fragments = root_fragment_map
548            .get(&fragment_id)
549            .cloned()
550            .unwrap_or_default();
551
552        let info = self.env.shared_actor_infos().read_guard();
553        let SharedFragmentInfo { actors, .. } = info
554            .get_fragment(fragment_model.fragment_id as _)
555            .unwrap_or_else(|| {
556                panic!(
557                    "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
558                    fragment_model.fragment_id,
559                    fragment_model.job_id
560                )
561            });
562
563        let parallelism_policy = Self::format_fragment_parallelism_policy(
564            fragment_model.distribution_type,
565            fragment_model.parallelism.as_ref(),
566            job_parallelism.as_ref().map(|(parallelism, _)| parallelism),
567            job_parallelism
568                .as_ref()
569                .and_then(|(_, strategy)| strategy.as_deref()),
570            &root_fragments,
571        );
572
573        let fragment = FragmentDesc {
574            fragment_id: fragment_model.fragment_id,
575            job_id: fragment_model.job_id,
576            fragment_type_mask: fragment_model.fragment_type_mask,
577            distribution_type: fragment_model.distribution_type,
578            state_table_ids: fragment_model.state_table_ids.clone(),
579            parallelism: actors.len() as _,
580            vnode_count: fragment_model.vnode_count,
581            stream_node: fragment_model.stream_node.clone(),
582            parallelism_policy,
583        };
584
585        Ok(Some((fragment, upstreams)))
586    }
587
588    pub async fn list_fragment_database_ids(
589        &self,
590        select_fragment_ids: Option<Vec<FragmentId>>,
591    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
592        let inner = self.inner.read().await;
593        let select = FragmentModel::find()
594            .select_only()
595            .column(fragment::Column::FragmentId)
596            .column(object::Column::DatabaseId)
597            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
598        let select = if let Some(select_fragment_ids) = select_fragment_ids {
599            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
600        } else {
601            select
602        };
603        Ok(select.into_tuple().all(&inner.db).await?)
604    }
605
606    pub async fn get_job_fragments_by_id(
607        &self,
608        job_id: JobId,
609    ) -> MetaResult<(
610        StreamJobFragments,
611        HashMap<FragmentId, Vec<StreamActor>>,
612        HashMap<ActorId, PbActorStatus>,
613    )> {
614        let inner = self.inner.read().await;
615
616        // Load fragments matching the job from the database
617        let fragments: Vec<_> = FragmentModel::find()
618            .filter(fragment::Column::JobId.eq(job_id))
619            .all(&inner.db)
620            .await?;
621
622        let job_info = StreamingJob::find_by_id(job_id)
623            .one(&inner.db)
624            .await?
625            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
626
627        let fragment_actors =
628            self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
629
630        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
631            .await?
632            .remove(&job_id);
633
634        Self::compose_table_fragments(
635            job_id,
636            job_info.job_status.into(),
637            job_info.stream_context(),
638            fragment_actors,
639            job_info.max_parallelism as _,
640            job_definition,
641        )
642    }
643
644    pub async fn get_fragment_actor_dispatchers(
645        &self,
646        fragment_ids: Vec<FragmentId>,
647    ) -> MetaResult<FragmentActorDispatchers> {
648        let inner = self.inner.read().await;
649
650        self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
651            .await
652    }
653
654    pub async fn get_fragment_actor_dispatchers_txn(
655        &self,
656        c: &impl ConnectionTrait,
657        fragment_ids: Vec<FragmentId>,
658    ) -> MetaResult<FragmentActorDispatchers> {
659        let fragment_relations = FragmentRelation::find()
660            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
661            .all(c)
662            .await?;
663
664        type FragmentActorInfo = (
665            DistributionType,
666            Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
667        );
668
669        let shared_info = self.env.shared_actor_infos();
670        let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
671        let get_fragment_actors = |fragment_id: FragmentId| async move {
672            let result: MetaResult<FragmentActorInfo> = try {
673                let read_guard = shared_info.read_guard();
674
675                let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
676
677                (
678                    fragment.distribution_type,
679                    Arc::new(
680                        fragment
681                            .actors
682                            .iter()
683                            .map(|(actor_id, actor_info)| {
684                                (
685                                    *actor_id,
686                                    actor_info
687                                        .vnode_bitmap
688                                        .as_ref()
689                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
690                                )
691                            })
692                            .collect(),
693                    ),
694                )
695            };
696            result
697        };
698
699        let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
700        for fragment_relation::Model {
701            source_fragment_id,
702            target_fragment_id,
703            dispatcher_type,
704            dist_key_indices,
705            output_indices,
706            output_type_mapping,
707        } in fragment_relations
708        {
709            let (source_fragment_distribution, source_fragment_actors) = {
710                let (distribution, actors) = {
711                    match fragment_actor_cache.entry(source_fragment_id) {
712                        Entry::Occupied(entry) => entry.into_mut(),
713                        Entry::Vacant(entry) => {
714                            entry.insert(get_fragment_actors(source_fragment_id).await?)
715                        }
716                    }
717                };
718                (*distribution, actors.clone())
719            };
720            let (target_fragment_distribution, target_fragment_actors) = {
721                let (distribution, actors) = {
722                    match fragment_actor_cache.entry(target_fragment_id) {
723                        Entry::Occupied(entry) => entry.into_mut(),
724                        Entry::Vacant(entry) => {
725                            entry.insert(get_fragment_actors(target_fragment_id).await?)
726                        }
727                    }
728                };
729                (*distribution, actors.clone())
730            };
731            let output_mapping = PbDispatchOutputMapping {
732                indices: output_indices.into_u32_array(),
733                types: output_type_mapping.unwrap_or_default().to_protobuf(),
734            };
735            let (dispatchers, _) = compose_dispatchers(
736                source_fragment_distribution,
737                &source_fragment_actors,
738                target_fragment_id as _,
739                target_fragment_distribution,
740                &target_fragment_actors,
741                dispatcher_type,
742                dist_key_indices.into_u32_array(),
743                output_mapping,
744            );
745            let actor_dispatchers_map = actor_dispatchers_map
746                .entry(source_fragment_id as _)
747                .or_default();
748            for (actor_id, dispatchers) in dispatchers {
749                actor_dispatchers_map
750                    .entry(actor_id as _)
751                    .or_default()
752                    .push(dispatchers);
753            }
754        }
755        Ok(actor_dispatchers_map)
756    }
757
758    pub async fn get_fragment_downstream_relations(
759        &self,
760        fragment_ids: Vec<FragmentId>,
761    ) -> MetaResult<FragmentDownstreamRelation> {
762        let inner = self.inner.read().await;
763        self.get_fragment_downstream_relations_in_txn(&inner.db, fragment_ids)
764            .await
765    }
766
767    pub async fn get_fragment_downstream_relations_in_txn<C>(
768        &self,
769        txn: &C,
770        fragment_ids: Vec<FragmentId>,
771    ) -> MetaResult<FragmentDownstreamRelation>
772    where
773        C: ConnectionTrait + StreamTrait + Send,
774    {
775        let mut stream = FragmentRelation::find()
776            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
777            .stream(txn)
778            .await?;
779        let mut relations = FragmentDownstreamRelation::new();
780        while let Some(relation) = stream.try_next().await? {
781            relations
782                .entry(relation.source_fragment_id as _)
783                .or_default()
784                .push(DownstreamFragmentRelation {
785                    downstream_fragment_id: relation.target_fragment_id as _,
786                    dispatcher_type: relation.dispatcher_type,
787                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
788                    output_mapping: PbDispatchOutputMapping {
789                        indices: relation.output_indices.into_u32_array(),
790                        types: relation
791                            .output_type_mapping
792                            .unwrap_or_default()
793                            .to_protobuf(),
794                    },
795                });
796        }
797        Ok(relations)
798    }
799
800    pub async fn get_job_fragment_backfill_scan_type(
801        &self,
802        job_id: JobId,
803    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
804        let inner = self.inner.read().await;
805        self.get_job_fragment_backfill_scan_type_in_txn(&inner.db, job_id)
806            .await
807    }
808
809    pub async fn get_job_fragment_backfill_scan_type_in_txn<C>(
810        &self,
811        txn: &C,
812        job_id: JobId,
813    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>>
814    where
815        C: ConnectionTrait + Send,
816    {
817        let fragments: Vec<_> = FragmentModel::find()
818            .filter(fragment::Column::JobId.eq(job_id))
819            .all(txn)
820            .await?;
821
822        let mut result = HashMap::new();
823
824        for fragment::Model {
825            fragment_id,
826            stream_node,
827            ..
828        } in fragments
829        {
830            let stream_node = stream_node.to_protobuf();
831            visit_stream_node_body(&stream_node, |body| {
832                if let NodeBody::StreamScan(node) = body {
833                    match node.stream_scan_type() {
834                        StreamScanType::Unspecified => {}
835                        scan_type => {
836                            result.insert(fragment_id as crate::model::FragmentId, scan_type);
837                        }
838                    }
839                }
840            });
841        }
842
843        Ok(result)
844    }
845
846    pub async fn count_streaming_jobs(&self) -> MetaResult<usize> {
847        let inner = self.inner.read().await;
848        let count = StreamingJob::find().count(&inner.db).await?;
849        Ok(usize::try_from(count).context("streaming job count overflow")?)
850    }
851
852    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
853        let inner = self.inner.read().await;
854        let job_states = StreamingJob::find()
855            .select_only()
856            .column(streaming_job::Column::JobId)
857            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
858            .join(JoinType::InnerJoin, object::Relation::Database2.def())
859            .column(object::Column::ObjType)
860            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
861            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
862            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
863            .column_as(
864                Expr::if_null(
865                    Expr::col((table::Entity, table::Column::Name)),
866                    Expr::if_null(
867                        Expr::col((source::Entity, source::Column::Name)),
868                        Expr::if_null(
869                            Expr::col((sink::Entity, sink::Column::Name)),
870                            Expr::val("<unknown>"),
871                        ),
872                    ),
873                ),
874                "name",
875            )
876            .columns([
877                streaming_job::Column::JobStatus,
878                streaming_job::Column::Parallelism,
879                streaming_job::Column::AdaptiveParallelismStrategy,
880                streaming_job::Column::BackfillParallelism,
881                streaming_job::Column::BackfillAdaptiveParallelismStrategy,
882                streaming_job::Column::MaxParallelism,
883            ])
884            .column_as(
885                Expr::if_null(
886                    Expr::col((
887                        streaming_job::Entity,
888                        streaming_job::Column::SpecificResourceGroup,
889                    )),
890                    Expr::col((database::Entity, database::Column::ResourceGroup)),
891                ),
892                "resource_group",
893            )
894            .column_as(
895                Expr::if_null(
896                    Expr::col((streaming_job::Entity, streaming_job::Column::ConfigOverride)),
897                    Expr::val(""),
898                ),
899                "config_override",
900            )
901            .column(object::Column::DatabaseId)
902            .column(object::Column::SchemaId)
903            .into_model()
904            .all(&inner.db)
905            .await?;
906        Ok(job_states)
907    }
908
909    pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
910        let inner = self.inner.read().await;
911        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
912            .select_only()
913            .column(streaming_job::Column::MaxParallelism)
914            .into_tuple()
915            .one(&inner.db)
916            .await?
917            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
918        Ok(max_parallelism as usize)
919    }
920
921    /// Try to get internal table ids of each streaming job, used by metrics collection.
922    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
923        if let Ok(inner) = self.inner.try_read()
924            && let Ok(job_state_tables) = FragmentModel::find()
925                .select_only()
926                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
927                .into_tuple::<(JobId, I32Array)>()
928                .all(&inner.db)
929                .await
930        {
931            let mut job_internal_table_ids = HashMap::new();
932            for (job_id, state_table_ids) in job_state_tables {
933                job_internal_table_ids
934                    .entry(job_id)
935                    .or_insert_with(Vec::new)
936                    .extend(
937                        state_table_ids
938                            .into_inner()
939                            .into_iter()
940                            .map(|table_id| TableId::new(table_id as _)),
941                    );
942            }
943            return Some(job_internal_table_ids.into_iter().collect());
944        }
945        None
946    }
947
948    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
949        let inner = self.inner.read().await;
950        let count = FragmentModel::find().count(&inner.db).await?;
951        Ok(count > 0)
952    }
953
954    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
955        let read_guard = self.env.shared_actor_infos().read_guard();
956        let actor_cnt: HashMap<WorkerId, _> = read_guard
957            .iter_over_fragments()
958            .flat_map(|(_, fragment)| {
959                fragment
960                    .actors
961                    .iter()
962                    .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
963            })
964            .into_group_map()
965            .into_iter()
966            .map(|(k, v)| (k, v.len()))
967            .collect();
968
969        Ok(actor_cnt)
970    }
971
972    fn collect_fragment_actor_map(
973        &self,
974        fragment_ids: &[FragmentId],
975        stream_context: StreamContext,
976    ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
977        let guard = self.env.shared_actor_infos().read_guard();
978        let pb_expr_context = stream_context.to_expr_context();
979        let expr_context: ExprContext = (&pb_expr_context).into();
980
981        let mut actor_map = HashMap::with_capacity(fragment_ids.len());
982        for fragment_id in fragment_ids {
983            let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
984                anyhow!("fragment {} not found in shared actor info", fragment_id)
985            })?;
986
987            let actors = fragment_info
988                .actors
989                .iter()
990                .map(|(actor_id, actor_info)| ActorInfo {
991                    actor_id: *actor_id as _,
992                    fragment_id: *fragment_id,
993                    splits: ConnectorSplits::from(&PbConnectorSplits {
994                        splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
995                    }),
996                    worker_id: actor_info.worker_id as _,
997                    vnode_bitmap: actor_info
998                        .vnode_bitmap
999                        .as_ref()
1000                        .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
1001                    expr_context: expr_context.clone(),
1002                    config_override: stream_context.config_override.clone(),
1003                })
1004                .collect();
1005
1006            actor_map.insert(*fragment_id, actors);
1007        }
1008
1009        Ok(actor_map)
1010    }
1011
1012    fn collect_fragment_actor_pairs(
1013        &self,
1014        fragments: Vec<fragment::Model>,
1015        stream_context: StreamContext,
1016    ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
1017        let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
1018        let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
1019        fragments
1020            .into_iter()
1021            .map(|fragment| {
1022                let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
1023                    anyhow!(
1024                        "fragment {} missing in shared actor info map",
1025                        fragment.fragment_id
1026                    )
1027                })?;
1028                Ok((fragment, actors))
1029            })
1030            .collect()
1031    }
1032
1033    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
1034    pub async fn table_fragments(
1035        &self,
1036    ) -> MetaResult<
1037        BTreeMap<
1038            JobId,
1039            (
1040                StreamJobFragments,
1041                HashMap<FragmentId, Vec<StreamActor>>,
1042                HashMap<crate::model::ActorId, PbActorStatus>,
1043            ),
1044        >,
1045    > {
1046        let inner = self.inner.read().await;
1047        let jobs = StreamingJob::find().all(&inner.db).await?;
1048
1049        let mut job_definition = resolve_streaming_job_definition(
1050            &inner.db,
1051            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
1052        )
1053        .await?;
1054
1055        let mut table_fragments = BTreeMap::new();
1056        for job in jobs {
1057            let fragments = FragmentModel::find()
1058                .filter(fragment::Column::JobId.eq(job.job_id))
1059                .all(&inner.db)
1060                .await?;
1061
1062            let fragment_actors =
1063                self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
1064
1065            table_fragments.insert(
1066                job.job_id,
1067                Self::compose_table_fragments(
1068                    job.job_id,
1069                    job.job_status.into(),
1070                    job.stream_context(),
1071                    fragment_actors,
1072                    job.max_parallelism as _,
1073                    job_definition.remove(&job.job_id),
1074                )?,
1075            );
1076        }
1077
1078        Ok(table_fragments)
1079    }
1080
1081    pub async fn upstream_fragments(
1082        &self,
1083        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1084    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
1085        let inner = self.inner.read().await;
1086        self.upstream_fragments_in_txn(&inner.db, fragment_ids)
1087            .await
1088    }
1089
1090    pub async fn upstream_fragments_in_txn<C>(
1091        &self,
1092        txn: &C,
1093        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1094    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>>
1095    where
1096        C: ConnectionTrait + StreamTrait + Send,
1097    {
1098        let mut stream = FragmentRelation::find()
1099            .select_only()
1100            .columns([
1101                fragment_relation::Column::SourceFragmentId,
1102                fragment_relation::Column::TargetFragmentId,
1103            ])
1104            .filter(
1105                fragment_relation::Column::TargetFragmentId
1106                    .is_in(fragment_ids.map(|id| id as FragmentId)),
1107            )
1108            .into_tuple::<(FragmentId, FragmentId)>()
1109            .stream(txn)
1110            .await?;
1111        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
1112        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
1113            upstream_fragments
1114                .entry(downstream_fragment_id as crate::model::FragmentId)
1115                .or_default()
1116                .insert(upstream_fragment_id as crate::model::FragmentId);
1117        }
1118        Ok(upstream_fragments)
1119    }
1120
1121    pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
1122        let info = self.env.shared_actor_infos().read_guard();
1123
1124        let actor_locations = info
1125            .iter_over_fragments()
1126            .flat_map(|(fragment_id, fragment)| {
1127                fragment
1128                    .actors
1129                    .iter()
1130                    .map(|(actor_id, actor)| PartialActorLocation {
1131                        actor_id: *actor_id as _,
1132                        fragment_id: *fragment_id as _,
1133                        worker_id: actor.worker_id,
1134                    })
1135            })
1136            .collect_vec();
1137
1138        Ok(actor_locations)
1139    }
1140
1141    pub async fn list_actor_info(
1142        &self,
1143    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1144        let inner = self.inner.read().await;
1145
1146        let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1147            FragmentModel::find()
1148                .select_only()
1149                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1150                .column(fragment::Column::FragmentId)
1151                .column_as(object::Column::Oid, "job_id")
1152                .column_as(object::Column::SchemaId, "schema_id")
1153                .column_as(object::Column::ObjType, "type")
1154                .into_tuple()
1155                .all(&inner.db)
1156                .await?;
1157
1158        let actor_infos = {
1159            let info = self.env.shared_actor_infos().read_guard();
1160
1161            let mut result = Vec::new();
1162
1163            for (fragment_id, object_id, schema_id, object_type) in fragment_objects {
1164                let Some(fragment) = info.get_fragment(fragment_id as _) else {
1165                    return Err(MetaError::unavailable(format!(
1166                        "shared actor info missing for fragment {fragment_id} while listing actors"
1167                    )));
1168                };
1169
1170                for actor_id in fragment.actors.keys() {
1171                    result.push((
1172                        *actor_id as _,
1173                        fragment.fragment_id as _,
1174                        object_id,
1175                        schema_id,
1176                        object_type,
1177                    ));
1178                }
1179            }
1180
1181            result
1182        };
1183
1184        Ok(actor_infos)
1185    }
1186
1187    pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1188        let guard = self.env.shared_actor_info.read_guard();
1189        guard
1190            .iter_over_fragments()
1191            .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1192            .collect_vec()
1193    }
1194
1195    pub async fn list_fragment_descs_with_node(
1196        &self,
1197        is_creating: bool,
1198    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1199        let inner = self.inner.read().await;
1200        let txn = inner.db.begin().await?;
1201
1202        let fragments_query = Self::build_fragment_query(is_creating);
1203        let fragments: Vec<fragment::Model> = fragments_query.all(&txn).await?;
1204
1205        let rows = fragments
1206            .into_iter()
1207            .map(|fragment| FragmentDescRow {
1208                fragment_id: fragment.fragment_id,
1209                job_id: fragment.job_id,
1210                fragment_type_mask: fragment.fragment_type_mask,
1211                distribution_type: fragment.distribution_type,
1212                state_table_ids: fragment.state_table_ids,
1213                vnode_count: fragment.vnode_count,
1214                parallelism_override: fragment.parallelism,
1215                stream_node: Some(fragment.stream_node),
1216            })
1217            .collect_vec();
1218
1219        self.build_fragment_distributions(&txn, rows).await
1220    }
1221
1222    pub async fn list_fragment_descs_without_node(
1223        &self,
1224        is_creating: bool,
1225    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1226        let inner = self.inner.read().await;
1227        let txn = inner.db.begin().await?;
1228
1229        let fragments_query = Self::build_fragment_query(is_creating);
1230        #[expect(clippy::type_complexity)]
1231        let fragments: Vec<(
1232            FragmentId,
1233            JobId,
1234            i32,
1235            DistributionType,
1236            TableIdArray,
1237            i32,
1238            Option<StreamingParallelism>,
1239        )> = fragments_query
1240            .select_only()
1241            .columns([
1242                fragment::Column::FragmentId,
1243                fragment::Column::JobId,
1244                fragment::Column::FragmentTypeMask,
1245                fragment::Column::DistributionType,
1246                fragment::Column::StateTableIds,
1247                fragment::Column::VnodeCount,
1248                fragment::Column::Parallelism,
1249            ])
1250            .into_tuple()
1251            .all(&txn)
1252            .await?;
1253
1254        let rows = fragments
1255            .into_iter()
1256            .map(
1257                |(
1258                    fragment_id,
1259                    job_id,
1260                    fragment_type_mask,
1261                    distribution_type,
1262                    state_table_ids,
1263                    vnode_count,
1264                    parallelism_override,
1265                )| FragmentDescRow {
1266                    fragment_id,
1267                    job_id,
1268                    fragment_type_mask,
1269                    distribution_type,
1270                    state_table_ids,
1271                    vnode_count,
1272                    parallelism_override,
1273                    stream_node: None,
1274                },
1275            )
1276            .collect_vec();
1277
1278        self.build_fragment_distributions(&txn, rows).await
1279    }
1280
1281    fn build_fragment_query(is_creating: bool) -> sea_orm::Select<fragment::Entity> {
1282        if is_creating {
1283            FragmentModel::find()
1284                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1285                .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1286                .filter(
1287                    streaming_job::Column::JobStatus
1288                        .eq(JobStatus::Initial)
1289                        .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1290                )
1291        } else {
1292            FragmentModel::find()
1293        }
1294    }
1295
1296    async fn build_fragment_distributions(
1297        &self,
1298        txn: &DatabaseTransaction,
1299        rows: Vec<FragmentDescRow>,
1300    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1301        let fragment_ids = rows.iter().map(|row| row.fragment_id).collect_vec();
1302        let job_ids = rows.iter().map(|row| row.job_id).unique().collect_vec();
1303
1304        let job_parallelisms: HashMap<JobId, (StreamingParallelism, Option<String>)> =
1305            if fragment_ids.is_empty() {
1306                HashMap::new()
1307            } else {
1308                StreamingJob::find()
1309                    .select_only()
1310                    .columns([
1311                        streaming_job::Column::JobId,
1312                        streaming_job::Column::Parallelism,
1313                        streaming_job::Column::AdaptiveParallelismStrategy,
1314                    ])
1315                    .filter(streaming_job::Column::JobId.is_in(job_ids))
1316                    .into_tuple::<(JobId, StreamingParallelism, Option<String>)>()
1317                    .all(txn)
1318                    .await?
1319                    .into_iter()
1320                    .map(|(job_id, parallelism, strategy)| (job_id, (parallelism, strategy)))
1321                    .collect()
1322            };
1323
1324        let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1325            if fragment_ids.is_empty() {
1326                Vec::new()
1327            } else {
1328                FragmentRelation::find()
1329                    .select_only()
1330                    .columns([
1331                        fragment_relation::Column::TargetFragmentId,
1332                        fragment_relation::Column::SourceFragmentId,
1333                        fragment_relation::Column::DispatcherType,
1334                    ])
1335                    .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1336                    .into_tuple()
1337                    .all(txn)
1338                    .await?
1339            };
1340
1341        let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1342        for (target_id, source_id, _) in upstream_entries {
1343            all_upstreams.entry(target_id).or_default().push(source_id);
1344        }
1345
1346        let root_fragment_map = if fragment_ids.is_empty() {
1347            HashMap::new()
1348        } else {
1349            let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_ids).await?;
1350            Self::collect_root_fragment_mapping(ensembles)
1351        };
1352
1353        let guard = self.env.shared_actor_info.read_guard();
1354        let mut result = Vec::with_capacity(rows.len());
1355
1356        for row in rows {
1357            let parallelism = guard
1358                .get_fragment(row.fragment_id as _)
1359                .map(|fragment| fragment.actors.len())
1360                .unwrap_or_default();
1361
1362            let root_fragments = root_fragment_map
1363                .get(&row.fragment_id)
1364                .cloned()
1365                .unwrap_or_default();
1366
1367            let upstreams = all_upstreams.remove(&row.fragment_id).unwrap_or_default();
1368
1369            let parallelism_policy = Self::format_fragment_parallelism_policy(
1370                row.distribution_type,
1371                row.parallelism_override.as_ref(),
1372                job_parallelisms
1373                    .get(&row.job_id)
1374                    .map(|(parallelism, _)| parallelism),
1375                job_parallelisms
1376                    .get(&row.job_id)
1377                    .and_then(|(_, strategy)| strategy.as_deref()),
1378                &root_fragments,
1379            );
1380
1381            let fragment = FragmentDistribution {
1382                fragment_id: row.fragment_id,
1383                table_id: row.job_id,
1384                distribution_type: PbFragmentDistributionType::from(row.distribution_type) as _,
1385                state_table_ids: row.state_table_ids.0,
1386                upstream_fragment_ids: upstreams.clone(),
1387                fragment_type_mask: row.fragment_type_mask as _,
1388                parallelism: parallelism as _,
1389                vnode_count: row.vnode_count as _,
1390                node: row.stream_node.map(|node| node.to_protobuf()),
1391                parallelism_policy,
1392            };
1393
1394            result.push((fragment, upstreams));
1395        }
1396
1397        Ok(result)
1398    }
1399
1400    pub async fn list_sink_log_store_tables(&self) -> MetaResult<Vec<(SinkId, TableId)>> {
1401        let inner = self.inner.read().await;
1402        let txn = inner.db.begin().await?;
1403
1404        let fragments: Vec<(JobId, StreamNode)> = FragmentModel::find()
1405            .select_only()
1406            .columns([fragment::Column::JobId, fragment::Column::StreamNode])
1407            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Sink))
1408            .into_tuple()
1409            .all(&txn)
1410            .await?;
1411
1412        let mut mapping: HashMap<SinkId, TableId> = HashMap::new();
1413
1414        for (job_id, stream_node) in fragments {
1415            let sink_id = SinkId::new(job_id.as_raw_id());
1416            let stream_node = stream_node.to_protobuf();
1417            let mut internal_table_id: Option<TableId> = None;
1418
1419            visit_stream_node_body(&stream_node, |body| {
1420                if let NodeBody::Sink(node) = body
1421                    && node.log_store_type == SinkLogStoreType::KvLogStore as i32
1422                    && let Some(table) = &node.table
1423                {
1424                    internal_table_id = Some(TableId::new(table.id.as_raw_id()));
1425                }
1426            });
1427
1428            if let Some(table_id) = internal_table_id
1429                && let Some(existing) = mapping.insert(sink_id, table_id)
1430                && existing != table_id
1431            {
1432                tracing::warn!(
1433                    "sink {sink_id:?} has multiple log store tables: {existing:?} vs {table_id:?}"
1434                );
1435            }
1436        }
1437
1438        Ok(mapping.into_iter().collect())
1439    }
1440
1441    /// Build a fragment-to-root lookup for all reported root fragment ensembles.
1442    fn collect_root_fragment_mapping(
1443        ensembles: Vec<NoShuffleEnsemble>,
1444    ) -> HashMap<FragmentId, Vec<FragmentId>> {
1445        let mut mapping = HashMap::new();
1446
1447        for ensemble in ensembles {
1448            let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1449            roots.sort_unstable();
1450            roots.dedup();
1451
1452            if roots.is_empty() {
1453                continue;
1454            }
1455
1456            let root_set: HashSet<_> = roots.iter().copied().collect();
1457
1458            for fragment_id in ensemble.component_fragments() {
1459                if root_set.contains(&fragment_id) {
1460                    mapping.insert(fragment_id, Vec::new());
1461                } else {
1462                    mapping.insert(fragment_id, roots.clone());
1463                }
1464            }
1465        }
1466
1467        mapping
1468    }
1469
1470    fn format_fragment_parallelism_policy(
1471        distribution_type: DistributionType,
1472        fragment_parallelism: Option<&StreamingParallelism>,
1473        job_parallelism: Option<&StreamingParallelism>,
1474        job_adaptive_parallelism_strategy: Option<&str>,
1475        root_fragments: &[FragmentId],
1476    ) -> String {
1477        if distribution_type == DistributionType::Single {
1478            return "single".to_owned();
1479        }
1480
1481        if let Some(parallelism) = fragment_parallelism {
1482            return format!(
1483                "override({})",
1484                Self::format_streaming_parallelism(parallelism, job_adaptive_parallelism_strategy)
1485            );
1486        }
1487
1488        if !root_fragments.is_empty() {
1489            let mut upstreams = root_fragments.to_vec();
1490            upstreams.sort_unstable();
1491            upstreams.dedup();
1492
1493            return format!("upstream_fragment({upstreams:?})");
1494        }
1495
1496        let inherited = job_parallelism
1497            .map(|parallelism| {
1498                Self::format_streaming_parallelism(parallelism, job_adaptive_parallelism_strategy)
1499            })
1500            .unwrap_or_else(|| "unknown".to_owned());
1501        format!("inherit({inherited})")
1502    }
1503
1504    fn format_streaming_parallelism(
1505        parallelism: &StreamingParallelism,
1506        adaptive_parallelism_strategy: Option<&str>,
1507    ) -> String {
1508        match parallelism {
1509            StreamingParallelism::Adaptive => adaptive_parallelism_strategy
1510                .and_then(Self::format_adaptive_parallelism_strategy)
1511                .unwrap_or_else(|| "adaptive".to_owned()),
1512            StreamingParallelism::Fixed(n) => n.to_string(),
1513            StreamingParallelism::Custom => adaptive_parallelism_strategy
1514                .and_then(Self::format_adaptive_parallelism_strategy)
1515                .unwrap_or_else(|| "custom".to_owned()),
1516        }
1517    }
1518
1519    fn format_adaptive_parallelism_strategy(strategy: &str) -> Option<String> {
1520        parse_strategy(strategy)
1521            .ok()
1522            .map(|strategy| match strategy {
1523                AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full => {
1524                    "adaptive".to_owned()
1525                }
1526                AdaptiveParallelismStrategy::Bounded(n) => format!("bounded({n})"),
1527                AdaptiveParallelismStrategy::Ratio(r) => format!("ratio({r})"),
1528            })
1529    }
1530
1531    pub async fn list_sink_actor_mapping(
1532        &self,
1533    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1534        let inner = self.inner.read().await;
1535        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1536            .select_only()
1537            .columns([sink::Column::SinkId, sink::Column::Name])
1538            .into_tuple()
1539            .all(&inner.db)
1540            .await?;
1541        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1542
1543        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1544
1545        let actor_with_type: Vec<(ActorId, SinkId)> = {
1546            let info = self.env.shared_actor_infos().read_guard();
1547
1548            info.iter_over_fragments()
1549                .filter(|(_, fragment)| {
1550                    sink_ids.contains(&fragment.job_id.as_sink_id())
1551                        && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1552                })
1553                .flat_map(|(_, fragment)| {
1554                    fragment
1555                        .actors
1556                        .keys()
1557                        .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1558                })
1559                .collect()
1560        };
1561
1562        let mut sink_actor_mapping = HashMap::new();
1563        for (actor_id, sink_id) in actor_with_type {
1564            sink_actor_mapping
1565                .entry(sink_id)
1566                .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1567                .1
1568                .push(actor_id);
1569        }
1570
1571        Ok(sink_actor_mapping)
1572    }
1573
1574    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1575        let inner = self.inner.read().await;
1576        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1577            .select_only()
1578            .columns([
1579                fragment::Column::FragmentId,
1580                fragment::Column::JobId,
1581                fragment::Column::StateTableIds,
1582            ])
1583            .into_partial_model()
1584            .all(&inner.db)
1585            .await?;
1586        Ok(fragment_state_tables)
1587    }
1588
1589    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1590    /// collected
1591    pub async fn load_all_actors_dynamic(
1592        &self,
1593        database_id: Option<DatabaseId>,
1594        worker_nodes: &ActiveStreamingWorkerNodes,
1595    ) -> MetaResult<FragmentRenderMap> {
1596        let loaded = self.load_fragment_context(database_id).await?;
1597
1598        if loaded.is_empty() {
1599            return Ok(HashMap::new());
1600        }
1601
1602        let RenderedGraph { fragments, .. } = render_actor_assignments(
1603            self.env.actor_id_generator(),
1604            worker_nodes.current(),
1605            &loaded,
1606        )?;
1607
1608        tracing::trace!(?fragments, "reload all actors");
1609
1610        Ok(fragments)
1611    }
1612
1613    /// Async load stage: collects all metadata required for rendering actor assignments.
1614    pub async fn load_fragment_context(
1615        &self,
1616        database_id: Option<DatabaseId>,
1617    ) -> MetaResult<LoadedFragmentContext> {
1618        let inner = self.inner.read().await;
1619        let txn = inner.db.begin().await?;
1620
1621        self.load_fragment_context_in_txn(&txn, database_id).await
1622    }
1623
1624    pub async fn load_fragment_context_in_txn<C>(
1625        &self,
1626        txn: &C,
1627        database_id: Option<DatabaseId>,
1628    ) -> MetaResult<LoadedFragmentContext>
1629    where
1630        C: ConnectionTrait,
1631    {
1632        let mut query = StreamingJob::find()
1633            .select_only()
1634            .column(streaming_job::Column::JobId);
1635
1636        if let Some(database_id) = database_id {
1637            query = query
1638                .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1639                .filter(object::Column::DatabaseId.eq(database_id));
1640        }
1641
1642        let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
1643
1644        let jobs: HashSet<JobId> = jobs.into_iter().collect();
1645
1646        if jobs.is_empty() {
1647            return Ok(LoadedFragmentContext::default());
1648        }
1649
1650        load_fragment_context_for_jobs(txn, jobs).await
1651    }
1652
1653    #[await_tree::instrument]
1654    pub async fn fill_snapshot_backfill_epoch(
1655        &self,
1656        fragment_ids: impl Iterator<Item = FragmentId>,
1657        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1658        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1659    ) -> MetaResult<()> {
1660        let inner = self.inner.write().await;
1661        let txn = inner.db.begin().await?;
1662        for fragment_id in fragment_ids {
1663            let fragment = FragmentModel::find_by_id(fragment_id)
1664                .one(&txn)
1665                .await?
1666                .context(format!("fragment {} not found", fragment_id))?;
1667            let mut node = fragment.stream_node.to_protobuf();
1668            if crate::stream::fill_snapshot_backfill_epoch(
1669                &mut node,
1670                snapshot_backfill_info,
1671                cross_db_snapshot_backfill_info,
1672            )? {
1673                let node = StreamNode::from(&node);
1674                FragmentModel::update(fragment::ActiveModel {
1675                    fragment_id: Set(fragment_id),
1676                    stream_node: Set(node),
1677                    ..Default::default()
1678                })
1679                .exec(&txn)
1680                .await?;
1681            }
1682        }
1683        txn.commit().await?;
1684        Ok(())
1685    }
1686
1687    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1688    pub fn get_running_actors_of_fragment(
1689        &self,
1690        fragment_id: FragmentId,
1691    ) -> MetaResult<HashSet<model::ActorId>> {
1692        let info = self.env.shared_actor_infos().read_guard();
1693
1694        let actors = info
1695            .get_fragment(fragment_id as _)
1696            .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1697            .unwrap_or_default();
1698
1699        Ok(actors)
1700    }
1701
1702    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1703    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1704    pub async fn get_running_actors_for_source_backfill(
1705        &self,
1706        source_backfill_fragment_id: FragmentId,
1707        source_fragment_id: FragmentId,
1708    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1709        let inner = self.inner.read().await;
1710        let txn = inner.db.begin().await?;
1711
1712        let fragment_relation: DispatcherType = FragmentRelation::find()
1713            .select_only()
1714            .column(fragment_relation::Column::DispatcherType)
1715            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1716            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1717            .into_tuple()
1718            .one(&txn)
1719            .await?
1720            .ok_or_else(|| {
1721                anyhow!(
1722                    "no fragment connection from source fragment {} to source backfill fragment {}",
1723                    source_fragment_id,
1724                    source_backfill_fragment_id
1725                )
1726            })?;
1727
1728        if fragment_relation != DispatcherType::NoShuffle {
1729            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1730        }
1731
1732        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1733            let result: MetaResult<DistributionType> = try {
1734                FragmentModel::find_by_id(fragment_id)
1735                    .select_only()
1736                    .column(fragment::Column::DistributionType)
1737                    .into_tuple()
1738                    .one(txn)
1739                    .await
1740                    .map_err(MetaError::from)?
1741                    .ok_or_else(|| {
1742                        MetaError::from(anyhow!("failed to find fragment: {}", fragment_id))
1743                    })?
1744            };
1745            result
1746        };
1747
1748        let source_backfill_distribution_type =
1749            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1750        let source_distribution_type =
1751            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1752
1753        let load_fragment_actor_distribution =
1754            |actor_info: &SharedActorInfos,
1755             fragment_id: FragmentId|
1756             -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1757                let guard = actor_info.read_guard();
1758
1759                guard
1760                    .get_fragment(fragment_id as _)
1761                    .map(|fragment| {
1762                        fragment
1763                            .actors
1764                            .iter()
1765                            .map(|(actor_id, actor)| {
1766                                (
1767                                    *actor_id as _,
1768                                    actor
1769                                        .vnode_bitmap
1770                                        .as_ref()
1771                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1772                                )
1773                            })
1774                            .collect()
1775                    })
1776                    .unwrap_or_default()
1777            };
1778
1779        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1780            load_fragment_actor_distribution(
1781                self.env.shared_actor_infos(),
1782                source_backfill_fragment_id,
1783            );
1784
1785        let source_actors =
1786            load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1787
1788        Ok(resolve_no_shuffle_actor_mapping(
1789            source_distribution_type,
1790            source_actors.iter().map(|(&id, bitmap)| (id, bitmap)),
1791            source_backfill_distribution_type,
1792            source_backfill_actors
1793                .iter()
1794                .map(|(&id, bitmap)| (id, bitmap)),
1795        )
1796        .into_iter()
1797        .map(|(source_actor, source_backfill_actor)| {
1798            (source_backfill_actor as _, source_actor as _)
1799        })
1800        .collect())
1801    }
1802
1803    /// Get and filter the "**root**" fragments of the specified jobs.
1804    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1805    ///
1806    /// Root fragment connects to downstream jobs.
1807    ///
1808    /// ## What can be the root fragment
1809    /// - For sink, it should have one `Sink` fragment.
1810    /// - For MV, it should have one `MView` fragment.
1811    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1812    /// - For source, it should have one `Source` fragment.
1813    ///
1814    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1815    pub async fn get_root_fragments(
1816        &self,
1817        job_ids: Vec<JobId>,
1818    ) -> MetaResult<HashMap<JobId, Fragment>> {
1819        let inner = self.inner.read().await;
1820
1821        let all_fragments = FragmentModel::find()
1822            .filter(fragment::Column::JobId.is_in(job_ids))
1823            .all(&inner.db)
1824            .await?;
1825        // job_id -> fragment
1826        let mut root_fragments = HashMap::<JobId, Fragment>::new();
1827        for fragment in all_fragments {
1828            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1829            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1830                _ = root_fragments.insert(fragment.job_id, fragment.into());
1831            } else if mask.contains(FragmentTypeFlag::Source) {
1832                // look for Source fragment only if there's no MView fragment
1833                // (notice try_insert here vs insert above)
1834                _ = root_fragments.try_insert(fragment.job_id, fragment.into());
1835            }
1836        }
1837
1838        Ok(root_fragments)
1839    }
1840
1841    pub async fn get_root_fragment(&self, job_id: JobId) -> MetaResult<Fragment> {
1842        let mut root_fragments = self.get_root_fragments(vec![job_id]).await?;
1843        let root_fragment = root_fragments
1844            .remove(&job_id)
1845            .context(format!("root fragment for job {} not found", job_id))?;
1846
1847        Ok(root_fragment)
1848    }
1849
1850    /// Get the downstream fragments connected to the specified job.
1851    pub async fn get_downstream_fragments(
1852        &self,
1853        job_id: JobId,
1854    ) -> MetaResult<Vec<(stream_plan::DispatcherType, Fragment)>> {
1855        let root_fragment = self.get_root_fragment(job_id).await?;
1856
1857        let inner = self.inner.read().await;
1858        let txn = inner.db.begin().await?;
1859        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1860            .filter(
1861                fragment_relation::Column::SourceFragmentId
1862                    .eq(root_fragment.fragment_id as FragmentId),
1863            )
1864            .all(&txn)
1865            .await?;
1866
1867        let downstream_fragment_ids = downstream_fragment_relations
1868            .iter()
1869            .map(|model| model.target_fragment_id as FragmentId)
1870            .collect::<HashSet<_>>();
1871
1872        let downstream_fragments: Vec<fragment::Model> = FragmentModel::find()
1873            .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1874            .all(&txn)
1875            .await?;
1876
1877        let mut downstream_fragments_map: HashMap<_, _> = downstream_fragments
1878            .into_iter()
1879            .map(|fragment| (fragment.fragment_id, fragment))
1880            .collect();
1881
1882        let mut downstream_fragments = vec![];
1883
1884        let fragment_map: HashMap<_, _> = downstream_fragment_relations
1885            .iter()
1886            .map(|model| (model.target_fragment_id, model.dispatcher_type))
1887            .collect();
1888
1889        for (fragment_id, dispatcher_type) in fragment_map {
1890            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1891
1892            let fragment = downstream_fragments_map
1893                .remove(&fragment_id)
1894                .context(format!(
1895                    "downstream fragment node for id {} not found",
1896                    fragment_id
1897                ))?
1898                .into();
1899
1900            downstream_fragments.push((dispatch_type, fragment));
1901        }
1902        Ok(downstream_fragments)
1903    }
1904
1905    pub async fn load_source_fragment_ids(
1906        &self,
1907    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1908        let inner = self.inner.read().await;
1909        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1910            .select_only()
1911            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1912            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1913            .into_tuple()
1914            .all(&inner.db)
1915            .await?;
1916
1917        let mut source_fragment_ids = HashMap::new();
1918        for (fragment_id, stream_node) in fragments {
1919            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1920                source_fragment_ids
1921                    .entry(source_id)
1922                    .or_insert_with(BTreeSet::new)
1923                    .insert(fragment_id);
1924            }
1925        }
1926        Ok(source_fragment_ids)
1927    }
1928
1929    pub async fn load_backfill_fragment_ids(
1930        &self,
1931    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1932        let inner = self.inner.read().await;
1933        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1934            .select_only()
1935            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1936            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1937            .into_tuple()
1938            .all(&inner.db)
1939            .await?;
1940
1941        let mut source_fragment_ids = HashMap::new();
1942        for (fragment_id, stream_node) in fragments {
1943            if let Some((source_id, upstream_source_fragment_id)) =
1944                stream_node.to_protobuf().find_source_backfill()
1945            {
1946                source_fragment_ids
1947                    .entry(source_id)
1948                    .or_insert_with(BTreeSet::new)
1949                    .insert((fragment_id, upstream_source_fragment_id));
1950            }
1951        }
1952        Ok(source_fragment_ids)
1953    }
1954
1955    pub async fn get_all_upstream_sink_infos(
1956        &self,
1957        target_table: &PbTable,
1958        target_fragment_id: FragmentId,
1959    ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1960        let inner = self.inner.read().await;
1961        let txn = inner.db.begin().await?;
1962
1963        self.get_all_upstream_sink_infos_in_txn(&txn, target_table, target_fragment_id)
1964            .await
1965    }
1966
1967    pub async fn get_all_upstream_sink_infos_in_txn<C>(
1968        &self,
1969        txn: &C,
1970        target_table: &PbTable,
1971        target_fragment_id: FragmentId,
1972    ) -> MetaResult<Vec<UpstreamSinkInfo>>
1973    where
1974        C: ConnectionTrait,
1975    {
1976        let incoming_sinks = Sink::find()
1977            .filter(sink::Column::TargetTable.eq(target_table.id))
1978            .all(txn)
1979            .await?;
1980
1981        let sink_ids = incoming_sinks.iter().map(|s| s.sink_id).collect_vec();
1982        let sink_fragment_ids = get_sink_fragment_by_ids(txn, sink_ids).await?;
1983
1984        let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1985        for sink in &incoming_sinks {
1986            let sink_fragment_id =
1987                sink_fragment_ids
1988                    .get(&sink.sink_id)
1989                    .cloned()
1990                    .ok_or(anyhow::anyhow!(
1991                        "sink fragment not found for sink id {}",
1992                        sink.sink_id
1993                    ))?;
1994            let upstream_info = build_upstream_sink_info(
1995                sink.sink_id,
1996                sink.original_target_columns
1997                    .as_ref()
1998                    .map(|cols| cols.to_protobuf())
1999                    .unwrap_or_default(),
2000                sink_fragment_id,
2001                target_table,
2002                target_fragment_id,
2003            )?;
2004            upstream_sink_infos.push(upstream_info);
2005        }
2006
2007        Ok(upstream_sink_infos)
2008    }
2009
2010    pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
2011        let inner = self.inner.read().await;
2012        let txn = inner.db.begin().await?;
2013
2014        let mview_fragment: Vec<FragmentId> = FragmentModel::find()
2015            .select_only()
2016            .column(fragment::Column::FragmentId)
2017            .filter(
2018                fragment::Column::JobId
2019                    .eq(job_id)
2020                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2021            )
2022            .into_tuple()
2023            .all(&txn)
2024            .await?;
2025
2026        if mview_fragment.len() != 1 {
2027            return Err(anyhow::anyhow!(
2028                "expected exactly one mview fragment for job {}, found {}",
2029                job_id,
2030                mview_fragment.len()
2031            )
2032            .into());
2033        }
2034
2035        Ok(mview_fragment.into_iter().next().unwrap())
2036    }
2037
2038    pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
2039        let inner = self.inner.read().await;
2040        let txn = inner.db.begin().await?;
2041        has_table_been_migrated(&txn, table_id).await
2042    }
2043
2044    pub async fn update_fragment_splits<C>(
2045        &self,
2046        txn: &C,
2047        fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
2048    ) -> MetaResult<()>
2049    where
2050        C: ConnectionTrait,
2051    {
2052        if fragment_splits.is_empty() {
2053            return Ok(());
2054        }
2055
2056        let existing_fragment_ids: HashSet<FragmentId> = FragmentModel::find()
2057            .select_only()
2058            .column(fragment::Column::FragmentId)
2059            .filter(fragment::Column::FragmentId.is_in(fragment_splits.keys().copied()))
2060            .into_tuple()
2061            .all(txn)
2062            .await?
2063            .into_iter()
2064            .collect();
2065
2066        // Filter out stale fragment ids to avoid FK violations when split updates race with drop.
2067        let (models, skipped_fragment_ids): (Vec<_>, Vec<_>) = fragment_splits
2068            .iter()
2069            .partition_map(|(fragment_id, splits)| {
2070                if existing_fragment_ids.contains(fragment_id) {
2071                    Either::Left(fragment_splits::ActiveModel {
2072                        fragment_id: Set(*fragment_id as _),
2073                        splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
2074                            splits: splits.iter().map(Into::into).collect_vec(),
2075                        }))),
2076                    })
2077                } else {
2078                    Either::Right(*fragment_id)
2079                }
2080            });
2081
2082        if !skipped_fragment_ids.is_empty() {
2083            tracing::warn!(
2084                skipped_fragment_ids = ?skipped_fragment_ids,
2085                total_fragment_ids = fragment_splits.len(),
2086                "skipping stale fragment split updates for missing fragments"
2087            );
2088        }
2089
2090        if models.is_empty() {
2091            return Ok(());
2092        }
2093
2094        FragmentSplits::insert_many(models)
2095            .on_conflict(
2096                OnConflict::column(fragment_splits::Column::FragmentId)
2097                    .update_column(fragment_splits::Column::Splits)
2098                    .to_owned(),
2099            )
2100            .exec(txn)
2101            .await?;
2102
2103        Ok(())
2104    }
2105}
2106
2107#[cfg(test)]
2108mod tests {
2109    use std::collections::{BTreeMap, HashMap, HashSet};
2110
2111    use itertools::Itertools;
2112    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
2113    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
2114    use risingwave_common::id::JobId;
2115    use risingwave_common::util::iter_util::ZipEqDebug;
2116    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
2117    use risingwave_meta_model::fragment::DistributionType;
2118    use risingwave_meta_model::*;
2119    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
2120    use risingwave_pb::plan_common::PbExprContext;
2121    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
2122    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
2123    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
2124
2125    use super::ActorInfo;
2126    use crate::MetaResult;
2127    use crate::controller::catalog::CatalogController;
2128    use crate::model::{Fragment, StreamActor};
2129
2130    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
2131
2132    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
2133
2134    const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
2135
2136    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
2137
2138    const TEST_JOB_ID: JobId = JobId::new(1);
2139
2140    const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
2141
2142    fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
2143        let mut upstream_actor_ids = BTreeMap::new();
2144        upstream_actor_ids.insert(
2145            TEST_UPSTREAM_FRAGMENT_ID,
2146            HashSet::from_iter([(actor_id + 100)]),
2147        );
2148        upstream_actor_ids.insert(
2149            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
2150            HashSet::from_iter([(actor_id + 200)]),
2151        );
2152        upstream_actor_ids
2153    }
2154
2155    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
2156        let mut input = vec![];
2157        for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
2158            input.push(PbStreamNode {
2159                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
2160                    upstream_fragment_id,
2161                    ..Default::default()
2162                }))),
2163                ..Default::default()
2164            });
2165        }
2166
2167        PbStreamNode {
2168            input,
2169            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
2170            ..Default::default()
2171        }
2172    }
2173
2174    #[tokio::test]
2175    async fn test_extract_fragment() -> MetaResult<()> {
2176        let actor_count = 3u32;
2177        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2178            .map(|actor_id| {
2179                (
2180                    actor_id.into(),
2181                    generate_upstream_actor_ids_for_actor(actor_id.into()),
2182                )
2183            })
2184            .collect();
2185
2186        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
2187
2188        let pb_fragment = Fragment {
2189            fragment_id: TEST_FRAGMENT_ID as _,
2190            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
2191            distribution_type: PbFragmentDistributionType::Hash as _,
2192            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
2193            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
2194            nodes: stream_node,
2195        };
2196
2197        let fragment =
2198            CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
2199
2200        check_fragment(fragment, pb_fragment);
2201
2202        Ok(())
2203    }
2204
2205    #[tokio::test]
2206    async fn test_compose_fragment() -> MetaResult<()> {
2207        let actor_count = 3u32;
2208
2209        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2210            .map(|actor_id| {
2211                (
2212                    actor_id.into(),
2213                    generate_upstream_actor_ids_for_actor(actor_id.into()),
2214                )
2215            })
2216            .collect();
2217
2218        let mut actor_bitmaps = ActorMapping::new_uniform(
2219            (0..actor_count).map(|i| i.into()),
2220            VirtualNode::COUNT_FOR_TEST,
2221        )
2222        .to_bitmaps();
2223
2224        let actors = (0..actor_count)
2225            .map(|actor_id| {
2226                let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
2227                    splits: vec![PbConnectorSplit {
2228                        split_type: "dummy".to_owned(),
2229                        ..Default::default()
2230                    }],
2231                });
2232
2233                ActorInfo {
2234                    actor_id: actor_id.into(),
2235                    fragment_id: TEST_FRAGMENT_ID,
2236                    splits: actor_splits,
2237                    worker_id: 0.into(),
2238                    vnode_bitmap: actor_bitmaps
2239                        .remove(&actor_id)
2240                        .map(|bitmap| bitmap.to_protobuf())
2241                        .as_ref()
2242                        .map(VnodeBitmap::from),
2243                    expr_context: ExprContext::from(&PbExprContext {
2244                        time_zone: String::from("America/New_York"),
2245                        strict_mode: false,
2246                    }),
2247                    config_override: "a.b.c = true".into(),
2248                }
2249            })
2250            .collect_vec();
2251
2252        let stream_node = {
2253            let template_actor = actors.first().cloned().unwrap();
2254
2255            let template_upstream_actor_ids =
2256                upstream_actor_ids.get(&template_actor.actor_id).unwrap();
2257
2258            generate_merger_stream_node(template_upstream_actor_ids)
2259        };
2260
2261        #[expect(deprecated)]
2262        let fragment = fragment::Model {
2263            fragment_id: TEST_FRAGMENT_ID,
2264            job_id: TEST_JOB_ID,
2265            fragment_type_mask: 0,
2266            distribution_type: DistributionType::Hash,
2267            stream_node: StreamNode::from(&stream_node),
2268            state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
2269            upstream_fragment_id: Default::default(),
2270            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2271            parallelism: None,
2272        };
2273
2274        let (pb_fragment, pb_actors, pb_actor_status, pb_actor_splits) =
2275            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2276
2277        assert_eq!(pb_actor_status.len(), actor_count as usize);
2278        assert!(
2279            pb_actor_status
2280                .values()
2281                .all(|actor_status| actor_status.location.is_some())
2282        );
2283        assert_eq!(pb_actor_splits.len(), actor_count as usize);
2284
2285        check_fragment(fragment, pb_fragment);
2286        check_actors(
2287            actors,
2288            &upstream_actor_ids,
2289            pb_actors,
2290            pb_actor_splits,
2291            &stream_node,
2292        );
2293
2294        Ok(())
2295    }
2296
2297    fn check_actors(
2298        actors: Vec<ActorInfo>,
2299        actor_upstreams: &FragmentActorUpstreams,
2300        pb_actors: Vec<StreamActor>,
2301        pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2302        stream_node: &PbStreamNode,
2303    ) {
2304        for (
2305            ActorInfo {
2306                actor_id,
2307                fragment_id,
2308                splits,
2309                worker_id: _,
2310                vnode_bitmap,
2311                expr_context,
2312                ..
2313            },
2314            StreamActor {
2315                actor_id: pb_actor_id,
2316                fragment_id: pb_fragment_id,
2317                vnode_bitmap: pb_vnode_bitmap,
2318                mview_definition,
2319                expr_context: pb_expr_context,
2320                ..
2321            },
2322        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2323        {
2324            assert_eq!(actor_id, pb_actor_id as ActorId);
2325            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2326
2327            assert_eq!(
2328                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2329                pb_vnode_bitmap,
2330            );
2331
2332            assert_eq!(mview_definition, "");
2333
2334            visit_stream_node_body(stream_node, |body| {
2335                if let PbNodeBody::Merge(m) = body {
2336                    assert!(
2337                        actor_upstreams
2338                            .get(&actor_id)
2339                            .unwrap()
2340                            .contains_key(&m.upstream_fragment_id)
2341                    );
2342                }
2343            });
2344
2345            assert_eq!(
2346                splits,
2347                pb_actor_splits
2348                    .get(&pb_actor_id)
2349                    .map(ConnectorSplits::from)
2350                    .unwrap_or_default()
2351            );
2352
2353            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2354        }
2355    }
2356
2357    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2358        let Fragment {
2359            fragment_id,
2360            fragment_type_mask,
2361            distribution_type: pb_distribution_type,
2362            state_table_ids: pb_state_table_ids,
2363            maybe_vnode_count: _,
2364            nodes,
2365        } = pb_fragment;
2366
2367        assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2368        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2369        assert_eq!(
2370            pb_distribution_type,
2371            PbFragmentDistributionType::from(fragment.distribution_type)
2372        );
2373
2374        assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2375        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2376    }
2377
2378    #[test]
2379    fn test_parallelism_policy_with_root_fragments() {
2380        #[expect(deprecated)]
2381        let fragment = fragment::Model {
2382            fragment_id: 3.into(),
2383            job_id: TEST_JOB_ID,
2384            fragment_type_mask: 0,
2385            distribution_type: DistributionType::Hash,
2386            stream_node: StreamNode::from(&PbStreamNode::default()),
2387            state_table_ids: TableIdArray::default(),
2388            upstream_fragment_id: Default::default(),
2389            vnode_count: 0,
2390            parallelism: None,
2391        };
2392
2393        let job_parallelism = StreamingParallelism::Fixed(4);
2394
2395        let policy = super::CatalogController::format_fragment_parallelism_policy(
2396            fragment.distribution_type,
2397            fragment.parallelism.as_ref(),
2398            Some(&job_parallelism),
2399            None,
2400            &[],
2401        );
2402
2403        assert_eq!(policy, "inherit(4)");
2404    }
2405
2406    #[test]
2407    fn test_parallelism_policy_with_adaptive_strategy() {
2408        #[expect(deprecated)]
2409        let fragment = fragment::Model {
2410            fragment_id: 4.into(),
2411            job_id: TEST_JOB_ID,
2412            fragment_type_mask: 0,
2413            distribution_type: DistributionType::Hash,
2414            stream_node: StreamNode::from(&PbStreamNode::default()),
2415            state_table_ids: TableIdArray::default(),
2416            upstream_fragment_id: Default::default(),
2417            vnode_count: 0,
2418            parallelism: None,
2419        };
2420
2421        let job_parallelism = StreamingParallelism::Adaptive;
2422
2423        let policy = super::CatalogController::format_fragment_parallelism_policy(
2424            fragment.distribution_type,
2425            fragment.parallelism.as_ref(),
2426            Some(&job_parallelism),
2427            Some("RATIO(0.5)"),
2428            &[],
2429        );
2430
2431        assert_eq!(policy, "inherit(ratio(0.5))");
2432    }
2433
2434    #[test]
2435    fn test_parallelism_policy_with_custom_strategy() {
2436        #[expect(deprecated)]
2437        let fragment = fragment::Model {
2438            fragment_id: 6.into(),
2439            job_id: TEST_JOB_ID,
2440            fragment_type_mask: 0,
2441            distribution_type: DistributionType::Hash,
2442            stream_node: StreamNode::from(&PbStreamNode::default()),
2443            state_table_ids: TableIdArray::default(),
2444            upstream_fragment_id: Default::default(),
2445            vnode_count: 0,
2446            parallelism: None,
2447        };
2448
2449        let job_parallelism = StreamingParallelism::Custom;
2450
2451        let policy = super::CatalogController::format_fragment_parallelism_policy(
2452            fragment.distribution_type,
2453            fragment.parallelism.as_ref(),
2454            Some(&job_parallelism),
2455            Some("BOUNDED(8)"),
2456            &[],
2457        );
2458
2459        assert_eq!(policy, "inherit(bounded(8))");
2460    }
2461
2462    #[test]
2463    fn test_parallelism_policy_with_invalid_adaptive_strategy_falls_back() {
2464        #[expect(deprecated)]
2465        let fragment = fragment::Model {
2466            fragment_id: 7.into(),
2467            job_id: TEST_JOB_ID,
2468            fragment_type_mask: 0,
2469            distribution_type: DistributionType::Hash,
2470            stream_node: StreamNode::from(&PbStreamNode::default()),
2471            state_table_ids: TableIdArray::default(),
2472            upstream_fragment_id: Default::default(),
2473            vnode_count: 0,
2474            parallelism: None,
2475        };
2476
2477        let job_parallelism = StreamingParallelism::Adaptive;
2478
2479        let policy = super::CatalogController::format_fragment_parallelism_policy(
2480            fragment.distribution_type,
2481            fragment.parallelism.as_ref(),
2482            Some(&job_parallelism),
2483            Some("NOT_A_STRATEGY"),
2484            &[],
2485        );
2486
2487        assert_eq!(policy, "inherit(adaptive)");
2488    }
2489
2490    #[test]
2491    fn test_parallelism_policy_with_upstream_roots() {
2492        #[expect(deprecated)]
2493        let fragment = fragment::Model {
2494            fragment_id: 5.into(),
2495            job_id: TEST_JOB_ID,
2496            fragment_type_mask: 0,
2497            distribution_type: DistributionType::Hash,
2498            stream_node: StreamNode::from(&PbStreamNode::default()),
2499            state_table_ids: TableIdArray::default(),
2500            upstream_fragment_id: Default::default(),
2501            vnode_count: 0,
2502            parallelism: None,
2503        };
2504
2505        let policy = super::CatalogController::format_fragment_parallelism_policy(
2506            fragment.distribution_type,
2507            fragment.parallelism.as_ref(),
2508            None,
2509            None,
2510            &[3.into(), 1.into(), 2.into(), 1.into()],
2511        );
2512
2513        assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2514    }
2515}