risingwave_meta/controller/
fragment.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::{Either, Itertools};
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::CdcScanOptions;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34    Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39    TableIdArray, VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits,
40    object, sink, source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46    Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::fragment::{
49    FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan;
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57    PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, SinkLogStoreType,
58    StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63    ColumnTrait, ConnectionTrait, DatabaseTransaction, EntityTrait, FromQueryResult, JoinType,
64    PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, StreamTrait, TransactionTrait,
65};
66use serde::{Deserialize, Serialize};
67
68use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
69use crate::controller::catalog::CatalogController;
70use crate::controller::scale::{
71    FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble, RenderedGraph,
72    find_fragment_no_shuffle_dags_detailed, load_fragment_context_for_jobs,
73    render_actor_assignments, resolve_streaming_job_definition,
74};
75use crate::controller::utils::{
76    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
77    get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
78    resolve_no_shuffle_actor_mapping,
79};
80use crate::error::MetaError;
81use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
82use crate::model::{
83    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
84    StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
85    TableParallelism,
86};
87use crate::rpc::ddl_controller::build_upstream_sink_info;
88use crate::stream::UpstreamSinkInfo;
89use crate::{MetaResult, model};
90
91/// Some information of running (inflight) actors.
92#[derive(Debug)]
93pub struct InflightActorInfo {
94    pub worker_id: WorkerId,
95    pub vnode_bitmap: Option<Bitmap>,
96    pub splits: Vec<SplitImpl>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100struct ActorInfo {
101    pub actor_id: ActorId,
102    pub fragment_id: FragmentId,
103    pub splits: ConnectorSplits,
104    pub worker_id: WorkerId,
105    pub vnode_bitmap: Option<VnodeBitmap>,
106    pub expr_context: ExprContext,
107    pub config_override: Arc<str>,
108}
109
110#[derive(Debug)]
111struct FragmentDescRow {
112    fragment_id: FragmentId,
113    job_id: JobId,
114    fragment_type_mask: i32,
115    distribution_type: DistributionType,
116    state_table_ids: TableIdArray,
117    vnode_count: i32,
118    parallelism_override: Option<StreamingParallelism>,
119    stream_node: Option<StreamNode>,
120}
121
122#[derive(Debug)]
123pub struct InflightFragmentInfo {
124    pub fragment_id: FragmentId,
125    pub distribution_type: DistributionType,
126    pub fragment_type_mask: FragmentTypeMask,
127    pub vnode_count: usize,
128    pub nodes: PbStreamNode,
129    pub actors: HashMap<ActorId, InflightActorInfo>,
130    pub state_table_ids: HashSet<TableId>,
131}
132
133#[derive(Clone, Debug)]
134pub struct FragmentParallelismInfo {
135    pub distribution_type: FragmentDistributionType,
136    pub vnode_count: usize,
137}
138
139#[easy_ext::ext(FragmentTypeMaskExt)]
140pub impl FragmentTypeMask {
141    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
142        Expr::col(fragment::Column::FragmentTypeMask)
143            .bit_and(Expr::value(flag as i32))
144            .ne(0)
145    }
146
147    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
148        Expr::col(fragment::Column::FragmentTypeMask)
149            .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
150            .ne(0)
151    }
152
153    fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
154        Expr::col(fragment::Column::FragmentTypeMask)
155            .bit_and(Expr::value(flag as i32))
156            .eq(0)
157    }
158}
159
160#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
161#[serde(rename_all = "camelCase")] // for dashboard
162pub struct StreamingJobInfo {
163    pub job_id: JobId,
164    pub obj_type: ObjectType,
165    pub name: String,
166    pub job_status: JobStatus,
167    pub parallelism: StreamingParallelism,
168    pub max_parallelism: i32,
169    pub resource_group: String,
170    pub config_override: String,
171    pub database_id: DatabaseId,
172    pub schema_id: SchemaId,
173}
174
175impl NotificationManager {
176    /// Notify frontend about streaming worker slot mapping changes.
177    ///
178    /// This only sends streaming mapping updates to frontends. Serving mapping
179    /// notifications are decoupled and driven by fragment model changes (insert/delete)
180    /// instead of barrier-driven actor set changes.
181    pub(crate) fn notify_streaming_fragment_mapping(
182        &self,
183        operation: NotificationOperation,
184        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
185    ) {
186        if fragment_mappings.is_empty() {
187            return;
188        }
189        // notify all fragment mappings to frontend.
190        for fragment_mapping in fragment_mappings {
191            self.notify_frontend_without_version(
192                operation,
193                NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
194            );
195        }
196    }
197
198    /// Notify the serving module about fragment mapping changes.
199    ///
200    /// This should be called when fragments are inserted into or deleted from the meta store,
201    /// so that serving vnode mappings are kept in sync with the fragment model.
202    pub(crate) fn notify_serving_fragment_mapping_update(
203        &self,
204        fragment_ids: Vec<crate::model::FragmentId>,
205    ) {
206        if fragment_ids.is_empty() {
207            return;
208        }
209        self.notify_local_subscribers(LocalNotification::ServingFragmentMappingsUpsert(
210            fragment_ids,
211        ));
212    }
213
214    /// Notify the serving module about fragment mapping deletions.
215    ///
216    /// This should be called when fragments are deleted from the meta store,
217    /// so that serving vnode mappings are cleaned up.
218    pub(crate) fn notify_serving_fragment_mapping_delete(
219        &self,
220        fragment_ids: Vec<crate::model::FragmentId>,
221    ) {
222        if fragment_ids.is_empty() {
223            return;
224        }
225        self.notify_local_subscribers(LocalNotification::ServingFragmentMappingsDelete(
226            fragment_ids,
227        ));
228    }
229}
230
231impl CatalogController {
232    pub fn prepare_fragment_models_from_fragments(
233        job_id: JobId,
234        fragments: impl Iterator<Item = &Fragment>,
235    ) -> MetaResult<Vec<fragment::Model>> {
236        fragments
237            .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
238            .try_collect()
239    }
240
241    pub fn prepare_fragment_model_for_new_job(
242        job_id: JobId,
243        fragment: &Fragment,
244    ) -> MetaResult<fragment::Model> {
245        let vnode_count = fragment.vnode_count();
246        let Fragment {
247            fragment_id: pb_fragment_id,
248            fragment_type_mask: pb_fragment_type_mask,
249            distribution_type: pb_distribution_type,
250            state_table_ids: pb_state_table_ids,
251            nodes,
252            ..
253        } = fragment;
254
255        let state_table_ids = pb_state_table_ids.clone().into();
256
257        let fragment_parallelism = nodes
258            .node_body
259            .as_ref()
260            .and_then(|body| match body {
261                NodeBody::StreamCdcScan(node) => Some(node),
262                _ => None,
263            })
264            .and_then(|node| node.options.as_ref())
265            .map(CdcScanOptions::from_proto)
266            .filter(|opts| opts.is_parallelized_backfill())
267            .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
268
269        let stream_node = StreamNode::from(nodes);
270
271        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
272            .unwrap()
273            .into();
274
275        #[expect(deprecated)]
276        let fragment = fragment::Model {
277            fragment_id: *pb_fragment_id as _,
278            job_id,
279            fragment_type_mask: (*pb_fragment_type_mask).into(),
280            distribution_type,
281            stream_node,
282            state_table_ids,
283            upstream_fragment_id: Default::default(),
284            vnode_count: vnode_count as _,
285            parallelism: fragment_parallelism,
286        };
287
288        Ok(fragment)
289    }
290
291    #[expect(clippy::type_complexity)]
292    fn compose_table_fragments(
293        job_id: JobId,
294        state: PbState,
295        ctx: StreamContext,
296        fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
297        parallelism: StreamingParallelism,
298        max_parallelism: usize,
299        job_definition: Option<String>,
300    ) -> MetaResult<(
301        StreamJobFragments,
302        HashMap<FragmentId, Vec<StreamActor>>,
303        HashMap<crate::model::ActorId, PbActorStatus>,
304    )> {
305        let mut pb_fragments = BTreeMap::new();
306        let mut fragment_actors = HashMap::new();
307        let mut all_actor_status = HashMap::new();
308
309        for (fragment, actors) in fragments {
310            let (fragment, actors, actor_status, _) =
311                Self::compose_fragment(fragment, actors, job_definition.clone())?;
312            let fragment_id = fragment.fragment_id;
313            fragment_actors.insert(fragment_id, actors);
314            all_actor_status.extend(actor_status);
315
316            pb_fragments.insert(fragment_id, fragment);
317        }
318
319        let table_fragments = StreamJobFragments {
320            stream_job_id: job_id,
321            state: state as _,
322            fragments: pb_fragments,
323            ctx,
324            assigned_parallelism: match parallelism {
325                StreamingParallelism::Custom => TableParallelism::Custom,
326                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
327                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
328            },
329            max_parallelism,
330        };
331
332        Ok((table_fragments, fragment_actors, all_actor_status))
333    }
334
335    #[expect(clippy::type_complexity)]
336    fn compose_fragment(
337        fragment: fragment::Model,
338        actors: Vec<ActorInfo>,
339        job_definition: Option<String>,
340    ) -> MetaResult<(
341        Fragment,
342        Vec<StreamActor>,
343        HashMap<crate::model::ActorId, PbActorStatus>,
344        HashMap<crate::model::ActorId, PbConnectorSplits>,
345    )> {
346        let fragment::Model {
347            fragment_id,
348            fragment_type_mask,
349            distribution_type,
350            stream_node,
351            state_table_ids,
352            vnode_count,
353            ..
354        } = fragment;
355
356        let stream_node = stream_node.to_protobuf();
357        let mut upstream_fragments = HashSet::new();
358        visit_stream_node_body(&stream_node, |body| {
359            if let NodeBody::Merge(m) = body {
360                assert!(
361                    upstream_fragments.insert(m.upstream_fragment_id),
362                    "non-duplicate upstream fragment"
363                );
364            }
365        });
366
367        let mut pb_actors = vec![];
368
369        let mut pb_actor_status = HashMap::new();
370        let mut pb_actor_splits = HashMap::new();
371
372        for actor in actors {
373            if actor.fragment_id != fragment_id {
374                bail!(
375                    "fragment id {} from actor {} is different from fragment {}",
376                    actor.fragment_id,
377                    actor.actor_id,
378                    fragment_id
379                )
380            }
381
382            let ActorInfo {
383                actor_id,
384                fragment_id,
385                worker_id,
386                splits,
387                vnode_bitmap,
388                expr_context,
389                config_override,
390                ..
391            } = actor;
392
393            let vnode_bitmap =
394                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
395            let pb_expr_context = Some(expr_context.to_protobuf());
396
397            pb_actor_status.insert(
398                actor_id as _,
399                PbActorStatus {
400                    location: PbActorLocation::from_worker(worker_id),
401                },
402            );
403
404            pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
405
406            pb_actors.push(StreamActor {
407                actor_id: actor_id as _,
408                fragment_id: fragment_id as _,
409                vnode_bitmap,
410                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
411                expr_context: pb_expr_context,
412                config_override,
413            })
414        }
415
416        let pb_state_table_ids = state_table_ids.0;
417        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
418        let pb_fragment = Fragment {
419            fragment_id: fragment_id as _,
420            fragment_type_mask: fragment_type_mask.into(),
421            distribution_type: pb_distribution_type,
422            state_table_ids: pb_state_table_ids,
423            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
424            nodes: stream_node,
425        };
426
427        Ok((pb_fragment, pb_actors, pb_actor_status, pb_actor_splits))
428    }
429
430    /// Returns distribution type and vnode count for all (or filtered) fragments.
431    ///
432    /// Reads directly from the persistent catalog (fragment table) rather than
433    /// from the in-memory `shared_actor_infos`.  This is critical because the
434    /// serving vnode mapping must be available even before the barrier manager's
435    /// recovery has completed and populated `shared_actor_infos`.
436    pub async fn fragment_parallelisms(
437        &self,
438    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
439        let inner = self.inner.read().await;
440        let query = FragmentModel::find().select_only().columns([
441            fragment::Column::FragmentId,
442            fragment::Column::DistributionType,
443            fragment::Column::VnodeCount,
444        ]);
445        let fragments: Vec<(FragmentId, DistributionType, i32)> =
446            query.into_tuple().all(&inner.db).await?;
447
448        Ok(fragments
449            .into_iter()
450            .map(|(fragment_id, distribution_type, vnode_count)| {
451                (
452                    fragment_id,
453                    FragmentParallelismInfo {
454                        distribution_type: PbFragmentDistributionType::from(distribution_type),
455                        vnode_count: vnode_count as usize,
456                    },
457                )
458            })
459            .collect())
460    }
461
462    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
463        let inner = self.inner.read().await;
464        let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
465            .select_only()
466            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
467            .into_tuple()
468            .all(&inner.db)
469            .await?;
470        Ok(fragment_jobs.into_iter().collect())
471    }
472
473    pub async fn get_fragment_job_id(
474        &self,
475        fragment_ids: Vec<FragmentId>,
476    ) -> MetaResult<Vec<ObjectId>> {
477        let inner = self.inner.read().await;
478        self.get_fragment_job_id_in_txn(&inner.db, fragment_ids)
479            .await
480    }
481
482    pub async fn get_fragment_job_id_in_txn<C>(
483        &self,
484        txn: &C,
485        fragment_ids: Vec<FragmentId>,
486    ) -> MetaResult<Vec<ObjectId>>
487    where
488        C: ConnectionTrait + Send,
489    {
490        let object_ids: Vec<ObjectId> = FragmentModel::find()
491            .select_only()
492            .column(fragment::Column::JobId)
493            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
494            .into_tuple()
495            .all(txn)
496            .await?;
497
498        Ok(object_ids)
499    }
500
501    pub async fn get_fragment_desc_by_id(
502        &self,
503        fragment_id: FragmentId,
504    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
505        let inner = self.inner.read().await;
506
507        let fragment_model = match FragmentModel::find_by_id(fragment_id)
508            .one(&inner.db)
509            .await?
510        {
511            Some(fragment) => fragment,
512            None => return Ok(None),
513        };
514
515        let job_parallelism: Option<StreamingParallelism> =
516            StreamingJob::find_by_id(fragment_model.job_id)
517                .select_only()
518                .column(streaming_job::Column::Parallelism)
519                .into_tuple()
520                .one(&inner.db)
521                .await?;
522
523        let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
524            .select_only()
525            .columns([
526                fragment_relation::Column::SourceFragmentId,
527                fragment_relation::Column::DispatcherType,
528            ])
529            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
530            .into_tuple()
531            .all(&inner.db)
532            .await?;
533
534        let upstreams: Vec<_> = upstream_entries
535            .into_iter()
536            .map(|(source_id, _)| source_id)
537            .collect();
538
539        let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
540            .await
541            .map(Self::collect_root_fragment_mapping)?;
542        let root_fragments = root_fragment_map
543            .get(&fragment_id)
544            .cloned()
545            .unwrap_or_default();
546
547        let info = self.env.shared_actor_infos().read_guard();
548        let SharedFragmentInfo { actors, .. } = info
549            .get_fragment(fragment_model.fragment_id as _)
550            .unwrap_or_else(|| {
551                panic!(
552                    "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
553                    fragment_model.fragment_id,
554                    fragment_model.job_id
555                )
556            });
557
558        let parallelism_policy = Self::format_fragment_parallelism_policy(
559            fragment_model.distribution_type,
560            fragment_model.parallelism.as_ref(),
561            job_parallelism.as_ref(),
562            &root_fragments,
563        );
564
565        let fragment = FragmentDesc {
566            fragment_id: fragment_model.fragment_id,
567            job_id: fragment_model.job_id,
568            fragment_type_mask: fragment_model.fragment_type_mask,
569            distribution_type: fragment_model.distribution_type,
570            state_table_ids: fragment_model.state_table_ids.clone(),
571            parallelism: actors.len() as _,
572            vnode_count: fragment_model.vnode_count,
573            stream_node: fragment_model.stream_node.clone(),
574            parallelism_policy,
575        };
576
577        Ok(Some((fragment, upstreams)))
578    }
579
580    pub async fn list_fragment_database_ids(
581        &self,
582        select_fragment_ids: Option<Vec<FragmentId>>,
583    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
584        let inner = self.inner.read().await;
585        let select = FragmentModel::find()
586            .select_only()
587            .column(fragment::Column::FragmentId)
588            .column(object::Column::DatabaseId)
589            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
590        let select = if let Some(select_fragment_ids) = select_fragment_ids {
591            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
592        } else {
593            select
594        };
595        Ok(select.into_tuple().all(&inner.db).await?)
596    }
597
598    pub async fn get_job_fragments_by_id(
599        &self,
600        job_id: JobId,
601    ) -> MetaResult<(
602        StreamJobFragments,
603        HashMap<FragmentId, Vec<StreamActor>>,
604        HashMap<ActorId, PbActorStatus>,
605    )> {
606        let inner = self.inner.read().await;
607
608        // Load fragments matching the job from the database
609        let fragments: Vec<_> = FragmentModel::find()
610            .filter(fragment::Column::JobId.eq(job_id))
611            .all(&inner.db)
612            .await?;
613
614        let job_info = StreamingJob::find_by_id(job_id)
615            .one(&inner.db)
616            .await?
617            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
618
619        let fragment_actors =
620            self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
621
622        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
623            .await?
624            .remove(&job_id);
625
626        Self::compose_table_fragments(
627            job_id,
628            job_info.job_status.into(),
629            job_info.stream_context(),
630            fragment_actors,
631            job_info.parallelism,
632            job_info.max_parallelism as _,
633            job_definition,
634        )
635    }
636
637    pub async fn get_fragment_actor_dispatchers(
638        &self,
639        fragment_ids: Vec<FragmentId>,
640    ) -> MetaResult<FragmentActorDispatchers> {
641        let inner = self.inner.read().await;
642
643        self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
644            .await
645    }
646
647    pub async fn get_fragment_actor_dispatchers_txn(
648        &self,
649        c: &impl ConnectionTrait,
650        fragment_ids: Vec<FragmentId>,
651    ) -> MetaResult<FragmentActorDispatchers> {
652        let fragment_relations = FragmentRelation::find()
653            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
654            .all(c)
655            .await?;
656
657        type FragmentActorInfo = (
658            DistributionType,
659            Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
660        );
661
662        let shared_info = self.env.shared_actor_infos();
663        let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
664        let get_fragment_actors = |fragment_id: FragmentId| async move {
665            let result: MetaResult<FragmentActorInfo> = try {
666                let read_guard = shared_info.read_guard();
667
668                let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
669
670                (
671                    fragment.distribution_type,
672                    Arc::new(
673                        fragment
674                            .actors
675                            .iter()
676                            .map(|(actor_id, actor_info)| {
677                                (
678                                    *actor_id,
679                                    actor_info
680                                        .vnode_bitmap
681                                        .as_ref()
682                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
683                                )
684                            })
685                            .collect(),
686                    ),
687                )
688            };
689            result
690        };
691
692        let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
693        for fragment_relation::Model {
694            source_fragment_id,
695            target_fragment_id,
696            dispatcher_type,
697            dist_key_indices,
698            output_indices,
699            output_type_mapping,
700        } in fragment_relations
701        {
702            let (source_fragment_distribution, source_fragment_actors) = {
703                let (distribution, actors) = {
704                    match fragment_actor_cache.entry(source_fragment_id) {
705                        Entry::Occupied(entry) => entry.into_mut(),
706                        Entry::Vacant(entry) => {
707                            entry.insert(get_fragment_actors(source_fragment_id).await?)
708                        }
709                    }
710                };
711                (*distribution, actors.clone())
712            };
713            let (target_fragment_distribution, target_fragment_actors) = {
714                let (distribution, actors) = {
715                    match fragment_actor_cache.entry(target_fragment_id) {
716                        Entry::Occupied(entry) => entry.into_mut(),
717                        Entry::Vacant(entry) => {
718                            entry.insert(get_fragment_actors(target_fragment_id).await?)
719                        }
720                    }
721                };
722                (*distribution, actors.clone())
723            };
724            let output_mapping = PbDispatchOutputMapping {
725                indices: output_indices.into_u32_array(),
726                types: output_type_mapping.unwrap_or_default().to_protobuf(),
727            };
728            let (dispatchers, _) = compose_dispatchers(
729                source_fragment_distribution,
730                &source_fragment_actors,
731                target_fragment_id as _,
732                target_fragment_distribution,
733                &target_fragment_actors,
734                dispatcher_type,
735                dist_key_indices.into_u32_array(),
736                output_mapping,
737            );
738            let actor_dispatchers_map = actor_dispatchers_map
739                .entry(source_fragment_id as _)
740                .or_default();
741            for (actor_id, dispatchers) in dispatchers {
742                actor_dispatchers_map
743                    .entry(actor_id as _)
744                    .or_default()
745                    .push(dispatchers);
746            }
747        }
748        Ok(actor_dispatchers_map)
749    }
750
751    pub async fn get_fragment_downstream_relations(
752        &self,
753        fragment_ids: Vec<FragmentId>,
754    ) -> MetaResult<FragmentDownstreamRelation> {
755        let inner = self.inner.read().await;
756        self.get_fragment_downstream_relations_in_txn(&inner.db, fragment_ids)
757            .await
758    }
759
760    pub async fn get_fragment_downstream_relations_in_txn<C>(
761        &self,
762        txn: &C,
763        fragment_ids: Vec<FragmentId>,
764    ) -> MetaResult<FragmentDownstreamRelation>
765    where
766        C: ConnectionTrait + StreamTrait + Send,
767    {
768        let mut stream = FragmentRelation::find()
769            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
770            .stream(txn)
771            .await?;
772        let mut relations = FragmentDownstreamRelation::new();
773        while let Some(relation) = stream.try_next().await? {
774            relations
775                .entry(relation.source_fragment_id as _)
776                .or_default()
777                .push(DownstreamFragmentRelation {
778                    downstream_fragment_id: relation.target_fragment_id as _,
779                    dispatcher_type: relation.dispatcher_type,
780                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
781                    output_mapping: PbDispatchOutputMapping {
782                        indices: relation.output_indices.into_u32_array(),
783                        types: relation
784                            .output_type_mapping
785                            .unwrap_or_default()
786                            .to_protobuf(),
787                    },
788                });
789        }
790        Ok(relations)
791    }
792
793    pub async fn get_job_fragment_backfill_scan_type(
794        &self,
795        job_id: JobId,
796    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
797        let inner = self.inner.read().await;
798        self.get_job_fragment_backfill_scan_type_in_txn(&inner.db, job_id)
799            .await
800    }
801
802    pub async fn get_job_fragment_backfill_scan_type_in_txn<C>(
803        &self,
804        txn: &C,
805        job_id: JobId,
806    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>>
807    where
808        C: ConnectionTrait + Send,
809    {
810        let fragments: Vec<_> = FragmentModel::find()
811            .filter(fragment::Column::JobId.eq(job_id))
812            .all(txn)
813            .await?;
814
815        let mut result = HashMap::new();
816
817        for fragment::Model {
818            fragment_id,
819            stream_node,
820            ..
821        } in fragments
822        {
823            let stream_node = stream_node.to_protobuf();
824            visit_stream_node_body(&stream_node, |body| {
825                if let NodeBody::StreamScan(node) = body {
826                    match node.stream_scan_type() {
827                        StreamScanType::Unspecified => {}
828                        scan_type => {
829                            result.insert(fragment_id as crate::model::FragmentId, scan_type);
830                        }
831                    }
832                }
833            });
834        }
835
836        Ok(result)
837    }
838
839    pub async fn count_streaming_jobs(&self) -> MetaResult<usize> {
840        let inner = self.inner.read().await;
841        let count = StreamingJob::find().count(&inner.db).await?;
842        Ok(usize::try_from(count).context("streaming job count overflow")?)
843    }
844
845    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
846        let inner = self.inner.read().await;
847        let job_states = StreamingJob::find()
848            .select_only()
849            .column(streaming_job::Column::JobId)
850            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
851            .join(JoinType::InnerJoin, object::Relation::Database2.def())
852            .column(object::Column::ObjType)
853            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
854            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
855            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
856            .column_as(
857                Expr::if_null(
858                    Expr::col((table::Entity, table::Column::Name)),
859                    Expr::if_null(
860                        Expr::col((source::Entity, source::Column::Name)),
861                        Expr::if_null(
862                            Expr::col((sink::Entity, sink::Column::Name)),
863                            Expr::val("<unknown>"),
864                        ),
865                    ),
866                ),
867                "name",
868            )
869            .columns([
870                streaming_job::Column::JobStatus,
871                streaming_job::Column::Parallelism,
872                streaming_job::Column::MaxParallelism,
873            ])
874            .column_as(
875                Expr::if_null(
876                    Expr::col((
877                        streaming_job::Entity,
878                        streaming_job::Column::SpecificResourceGroup,
879                    )),
880                    Expr::col((database::Entity, database::Column::ResourceGroup)),
881                ),
882                "resource_group",
883            )
884            .column_as(
885                Expr::if_null(
886                    Expr::col((streaming_job::Entity, streaming_job::Column::ConfigOverride)),
887                    Expr::val(""),
888                ),
889                "config_override",
890            )
891            .column(object::Column::DatabaseId)
892            .column(object::Column::SchemaId)
893            .into_model()
894            .all(&inner.db)
895            .await?;
896        Ok(job_states)
897    }
898
899    pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
900        let inner = self.inner.read().await;
901        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
902            .select_only()
903            .column(streaming_job::Column::MaxParallelism)
904            .into_tuple()
905            .one(&inner.db)
906            .await?
907            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
908        Ok(max_parallelism as usize)
909    }
910
911    /// Try to get internal table ids of each streaming job, used by metrics collection.
912    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
913        if let Ok(inner) = self.inner.try_read()
914            && let Ok(job_state_tables) = FragmentModel::find()
915                .select_only()
916                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
917                .into_tuple::<(JobId, I32Array)>()
918                .all(&inner.db)
919                .await
920        {
921            let mut job_internal_table_ids = HashMap::new();
922            for (job_id, state_table_ids) in job_state_tables {
923                job_internal_table_ids
924                    .entry(job_id)
925                    .or_insert_with(Vec::new)
926                    .extend(
927                        state_table_ids
928                            .into_inner()
929                            .into_iter()
930                            .map(|table_id| TableId::new(table_id as _)),
931                    );
932            }
933            return Some(job_internal_table_ids.into_iter().collect());
934        }
935        None
936    }
937
938    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
939        let inner = self.inner.read().await;
940        let count = FragmentModel::find().count(&inner.db).await?;
941        Ok(count > 0)
942    }
943
944    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
945        let read_guard = self.env.shared_actor_infos().read_guard();
946        let actor_cnt: HashMap<WorkerId, _> = read_guard
947            .iter_over_fragments()
948            .flat_map(|(_, fragment)| {
949                fragment
950                    .actors
951                    .iter()
952                    .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
953            })
954            .into_group_map()
955            .into_iter()
956            .map(|(k, v)| (k, v.len()))
957            .collect();
958
959        Ok(actor_cnt)
960    }
961
962    fn collect_fragment_actor_map(
963        &self,
964        fragment_ids: &[FragmentId],
965        stream_context: StreamContext,
966    ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
967        let guard = self.env.shared_actor_infos().read_guard();
968        let pb_expr_context = stream_context.to_expr_context();
969        let expr_context: ExprContext = (&pb_expr_context).into();
970
971        let mut actor_map = HashMap::with_capacity(fragment_ids.len());
972        for fragment_id in fragment_ids {
973            let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
974                anyhow!("fragment {} not found in shared actor info", fragment_id)
975            })?;
976
977            let actors = fragment_info
978                .actors
979                .iter()
980                .map(|(actor_id, actor_info)| ActorInfo {
981                    actor_id: *actor_id as _,
982                    fragment_id: *fragment_id,
983                    splits: ConnectorSplits::from(&PbConnectorSplits {
984                        splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
985                    }),
986                    worker_id: actor_info.worker_id as _,
987                    vnode_bitmap: actor_info
988                        .vnode_bitmap
989                        .as_ref()
990                        .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
991                    expr_context: expr_context.clone(),
992                    config_override: stream_context.config_override.clone(),
993                })
994                .collect();
995
996            actor_map.insert(*fragment_id, actors);
997        }
998
999        Ok(actor_map)
1000    }
1001
1002    fn collect_fragment_actor_pairs(
1003        &self,
1004        fragments: Vec<fragment::Model>,
1005        stream_context: StreamContext,
1006    ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
1007        let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
1008        let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
1009        fragments
1010            .into_iter()
1011            .map(|fragment| {
1012                let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
1013                    anyhow!(
1014                        "fragment {} missing in shared actor info map",
1015                        fragment.fragment_id
1016                    )
1017                })?;
1018                Ok((fragment, actors))
1019            })
1020            .collect()
1021    }
1022
1023    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
1024    pub async fn table_fragments(
1025        &self,
1026    ) -> MetaResult<
1027        BTreeMap<
1028            JobId,
1029            (
1030                StreamJobFragments,
1031                HashMap<FragmentId, Vec<StreamActor>>,
1032                HashMap<crate::model::ActorId, PbActorStatus>,
1033            ),
1034        >,
1035    > {
1036        let inner = self.inner.read().await;
1037        let jobs = StreamingJob::find().all(&inner.db).await?;
1038
1039        let mut job_definition = resolve_streaming_job_definition(
1040            &inner.db,
1041            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
1042        )
1043        .await?;
1044
1045        let mut table_fragments = BTreeMap::new();
1046        for job in jobs {
1047            let fragments = FragmentModel::find()
1048                .filter(fragment::Column::JobId.eq(job.job_id))
1049                .all(&inner.db)
1050                .await?;
1051
1052            let fragment_actors =
1053                self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
1054
1055            table_fragments.insert(
1056                job.job_id,
1057                Self::compose_table_fragments(
1058                    job.job_id,
1059                    job.job_status.into(),
1060                    job.stream_context(),
1061                    fragment_actors,
1062                    job.parallelism,
1063                    job.max_parallelism as _,
1064                    job_definition.remove(&job.job_id),
1065                )?,
1066            );
1067        }
1068
1069        Ok(table_fragments)
1070    }
1071
1072    pub async fn upstream_fragments(
1073        &self,
1074        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1075    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
1076        let inner = self.inner.read().await;
1077        self.upstream_fragments_in_txn(&inner.db, fragment_ids)
1078            .await
1079    }
1080
1081    pub async fn upstream_fragments_in_txn<C>(
1082        &self,
1083        txn: &C,
1084        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1085    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>>
1086    where
1087        C: ConnectionTrait + StreamTrait + Send,
1088    {
1089        let mut stream = FragmentRelation::find()
1090            .select_only()
1091            .columns([
1092                fragment_relation::Column::SourceFragmentId,
1093                fragment_relation::Column::TargetFragmentId,
1094            ])
1095            .filter(
1096                fragment_relation::Column::TargetFragmentId
1097                    .is_in(fragment_ids.map(|id| id as FragmentId)),
1098            )
1099            .into_tuple::<(FragmentId, FragmentId)>()
1100            .stream(txn)
1101            .await?;
1102        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
1103        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
1104            upstream_fragments
1105                .entry(downstream_fragment_id as crate::model::FragmentId)
1106                .or_default()
1107                .insert(upstream_fragment_id as crate::model::FragmentId);
1108        }
1109        Ok(upstream_fragments)
1110    }
1111
1112    pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
1113        let info = self.env.shared_actor_infos().read_guard();
1114
1115        let actor_locations = info
1116            .iter_over_fragments()
1117            .flat_map(|(fragment_id, fragment)| {
1118                fragment
1119                    .actors
1120                    .iter()
1121                    .map(|(actor_id, actor)| PartialActorLocation {
1122                        actor_id: *actor_id as _,
1123                        fragment_id: *fragment_id as _,
1124                        worker_id: actor.worker_id,
1125                    })
1126            })
1127            .collect_vec();
1128
1129        Ok(actor_locations)
1130    }
1131
1132    pub async fn list_actor_info(
1133        &self,
1134    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1135        let inner = self.inner.read().await;
1136
1137        let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1138            FragmentModel::find()
1139                .select_only()
1140                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1141                .column(fragment::Column::FragmentId)
1142                .column_as(object::Column::Oid, "job_id")
1143                .column_as(object::Column::SchemaId, "schema_id")
1144                .column_as(object::Column::ObjType, "type")
1145                .into_tuple()
1146                .all(&inner.db)
1147                .await?;
1148
1149        let actor_infos = {
1150            let info = self.env.shared_actor_infos().read_guard();
1151
1152            let mut result = Vec::new();
1153
1154            for (fragment_id, object_id, schema_id, object_type) in fragment_objects {
1155                let Some(fragment) = info.get_fragment(fragment_id as _) else {
1156                    return Err(MetaError::unavailable(format!(
1157                        "shared actor info missing for fragment {fragment_id} while listing actors"
1158                    )));
1159                };
1160
1161                for actor_id in fragment.actors.keys() {
1162                    result.push((
1163                        *actor_id as _,
1164                        fragment.fragment_id as _,
1165                        object_id,
1166                        schema_id,
1167                        object_type,
1168                    ));
1169                }
1170            }
1171
1172            result
1173        };
1174
1175        Ok(actor_infos)
1176    }
1177
1178    pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1179        let guard = self.env.shared_actor_info.read_guard();
1180        guard
1181            .iter_over_fragments()
1182            .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1183            .collect_vec()
1184    }
1185
1186    pub async fn list_fragment_descs_with_node(
1187        &self,
1188        is_creating: bool,
1189    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1190        let inner = self.inner.read().await;
1191        let txn = inner.db.begin().await?;
1192
1193        let fragments_query = Self::build_fragment_query(is_creating);
1194        let fragments: Vec<fragment::Model> = fragments_query.all(&txn).await?;
1195
1196        let rows = fragments
1197            .into_iter()
1198            .map(|fragment| FragmentDescRow {
1199                fragment_id: fragment.fragment_id,
1200                job_id: fragment.job_id,
1201                fragment_type_mask: fragment.fragment_type_mask,
1202                distribution_type: fragment.distribution_type,
1203                state_table_ids: fragment.state_table_ids,
1204                vnode_count: fragment.vnode_count,
1205                parallelism_override: fragment.parallelism,
1206                stream_node: Some(fragment.stream_node),
1207            })
1208            .collect_vec();
1209
1210        self.build_fragment_distributions(&txn, rows).await
1211    }
1212
1213    pub async fn list_fragment_descs_without_node(
1214        &self,
1215        is_creating: bool,
1216    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1217        let inner = self.inner.read().await;
1218        let txn = inner.db.begin().await?;
1219
1220        let fragments_query = Self::build_fragment_query(is_creating);
1221        #[allow(clippy::type_complexity)]
1222        let fragments: Vec<(
1223            FragmentId,
1224            JobId,
1225            i32,
1226            DistributionType,
1227            TableIdArray,
1228            i32,
1229            Option<StreamingParallelism>,
1230        )> = fragments_query
1231            .select_only()
1232            .columns([
1233                fragment::Column::FragmentId,
1234                fragment::Column::JobId,
1235                fragment::Column::FragmentTypeMask,
1236                fragment::Column::DistributionType,
1237                fragment::Column::StateTableIds,
1238                fragment::Column::VnodeCount,
1239                fragment::Column::Parallelism,
1240            ])
1241            .into_tuple()
1242            .all(&txn)
1243            .await?;
1244
1245        let rows = fragments
1246            .into_iter()
1247            .map(
1248                |(
1249                    fragment_id,
1250                    job_id,
1251                    fragment_type_mask,
1252                    distribution_type,
1253                    state_table_ids,
1254                    vnode_count,
1255                    parallelism_override,
1256                )| FragmentDescRow {
1257                    fragment_id,
1258                    job_id,
1259                    fragment_type_mask,
1260                    distribution_type,
1261                    state_table_ids,
1262                    vnode_count,
1263                    parallelism_override,
1264                    stream_node: None,
1265                },
1266            )
1267            .collect_vec();
1268
1269        self.build_fragment_distributions(&txn, rows).await
1270    }
1271
1272    fn build_fragment_query(is_creating: bool) -> sea_orm::Select<fragment::Entity> {
1273        if is_creating {
1274            FragmentModel::find()
1275                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1276                .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1277                .filter(
1278                    streaming_job::Column::JobStatus
1279                        .eq(JobStatus::Initial)
1280                        .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1281                )
1282        } else {
1283            FragmentModel::find()
1284        }
1285    }
1286
1287    async fn build_fragment_distributions(
1288        &self,
1289        txn: &DatabaseTransaction,
1290        rows: Vec<FragmentDescRow>,
1291    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1292        let fragment_ids = rows.iter().map(|row| row.fragment_id).collect_vec();
1293        let job_ids = rows.iter().map(|row| row.job_id).unique().collect_vec();
1294
1295        let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragment_ids.is_empty() {
1296            HashMap::new()
1297        } else {
1298            StreamingJob::find()
1299                .select_only()
1300                .columns([
1301                    streaming_job::Column::JobId,
1302                    streaming_job::Column::Parallelism,
1303                ])
1304                .filter(streaming_job::Column::JobId.is_in(job_ids))
1305                .into_tuple()
1306                .all(txn)
1307                .await?
1308                .into_iter()
1309                .collect()
1310        };
1311
1312        let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1313            if fragment_ids.is_empty() {
1314                Vec::new()
1315            } else {
1316                FragmentRelation::find()
1317                    .select_only()
1318                    .columns([
1319                        fragment_relation::Column::TargetFragmentId,
1320                        fragment_relation::Column::SourceFragmentId,
1321                        fragment_relation::Column::DispatcherType,
1322                    ])
1323                    .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1324                    .into_tuple()
1325                    .all(txn)
1326                    .await?
1327            };
1328
1329        let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1330        for (target_id, source_id, _) in upstream_entries {
1331            all_upstreams.entry(target_id).or_default().push(source_id);
1332        }
1333
1334        let root_fragment_map = if fragment_ids.is_empty() {
1335            HashMap::new()
1336        } else {
1337            let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_ids).await?;
1338            Self::collect_root_fragment_mapping(ensembles)
1339        };
1340
1341        let guard = self.env.shared_actor_info.read_guard();
1342        let mut result = Vec::with_capacity(rows.len());
1343
1344        for row in rows {
1345            let parallelism = guard
1346                .get_fragment(row.fragment_id as _)
1347                .map(|fragment| fragment.actors.len())
1348                .unwrap_or_default();
1349
1350            let root_fragments = root_fragment_map
1351                .get(&row.fragment_id)
1352                .cloned()
1353                .unwrap_or_default();
1354
1355            let upstreams = all_upstreams.remove(&row.fragment_id).unwrap_or_default();
1356
1357            let parallelism_policy = Self::format_fragment_parallelism_policy(
1358                row.distribution_type,
1359                row.parallelism_override.as_ref(),
1360                job_parallelisms.get(&row.job_id),
1361                &root_fragments,
1362            );
1363
1364            let fragment = FragmentDistribution {
1365                fragment_id: row.fragment_id,
1366                table_id: row.job_id,
1367                distribution_type: PbFragmentDistributionType::from(row.distribution_type) as _,
1368                state_table_ids: row.state_table_ids.0,
1369                upstream_fragment_ids: upstreams.clone(),
1370                fragment_type_mask: row.fragment_type_mask as _,
1371                parallelism: parallelism as _,
1372                vnode_count: row.vnode_count as _,
1373                node: row.stream_node.map(|node| node.to_protobuf()),
1374                parallelism_policy,
1375            };
1376
1377            result.push((fragment, upstreams));
1378        }
1379
1380        Ok(result)
1381    }
1382
1383    pub async fn list_sink_log_store_tables(&self) -> MetaResult<Vec<(SinkId, TableId)>> {
1384        let inner = self.inner.read().await;
1385        let txn = inner.db.begin().await?;
1386
1387        let fragments: Vec<(JobId, StreamNode)> = FragmentModel::find()
1388            .select_only()
1389            .columns([fragment::Column::JobId, fragment::Column::StreamNode])
1390            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Sink))
1391            .into_tuple()
1392            .all(&txn)
1393            .await?;
1394
1395        let mut mapping: HashMap<SinkId, TableId> = HashMap::new();
1396
1397        for (job_id, stream_node) in fragments {
1398            let sink_id = SinkId::new(job_id.as_raw_id());
1399            let stream_node = stream_node.to_protobuf();
1400            let mut internal_table_id: Option<TableId> = None;
1401
1402            visit_stream_node_body(&stream_node, |body| {
1403                if let NodeBody::Sink(node) = body
1404                    && node.log_store_type == SinkLogStoreType::KvLogStore as i32
1405                    && let Some(table) = &node.table
1406                {
1407                    internal_table_id = Some(TableId::new(table.id.as_raw_id()));
1408                }
1409            });
1410
1411            if let Some(table_id) = internal_table_id
1412                && let Some(existing) = mapping.insert(sink_id, table_id)
1413                && existing != table_id
1414            {
1415                tracing::warn!(
1416                    "sink {sink_id:?} has multiple log store tables: {existing:?} vs {table_id:?}"
1417                );
1418            }
1419        }
1420
1421        Ok(mapping.into_iter().collect())
1422    }
1423
1424    /// Build a fragment-to-root lookup for all reported root fragment ensembles.
1425    fn collect_root_fragment_mapping(
1426        ensembles: Vec<NoShuffleEnsemble>,
1427    ) -> HashMap<FragmentId, Vec<FragmentId>> {
1428        let mut mapping = HashMap::new();
1429
1430        for ensemble in ensembles {
1431            let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1432            roots.sort_unstable();
1433            roots.dedup();
1434
1435            if roots.is_empty() {
1436                continue;
1437            }
1438
1439            let root_set: HashSet<_> = roots.iter().copied().collect();
1440
1441            for fragment_id in ensemble.component_fragments() {
1442                if root_set.contains(&fragment_id) {
1443                    mapping.insert(fragment_id, Vec::new());
1444                } else {
1445                    mapping.insert(fragment_id, roots.clone());
1446                }
1447            }
1448        }
1449
1450        mapping
1451    }
1452
1453    fn format_fragment_parallelism_policy(
1454        distribution_type: DistributionType,
1455        fragment_parallelism: Option<&StreamingParallelism>,
1456        job_parallelism: Option<&StreamingParallelism>,
1457        root_fragments: &[FragmentId],
1458    ) -> String {
1459        if distribution_type == DistributionType::Single {
1460            return "single".to_owned();
1461        }
1462
1463        if let Some(parallelism) = fragment_parallelism {
1464            return format!(
1465                "override({})",
1466                Self::format_streaming_parallelism(parallelism)
1467            );
1468        }
1469
1470        if !root_fragments.is_empty() {
1471            let mut upstreams = root_fragments.to_vec();
1472            upstreams.sort_unstable();
1473            upstreams.dedup();
1474
1475            return format!("upstream_fragment({upstreams:?})");
1476        }
1477
1478        let inherited = job_parallelism
1479            .map(Self::format_streaming_parallelism)
1480            .unwrap_or_else(|| "unknown".to_owned());
1481        format!("inherit({inherited})")
1482    }
1483
1484    fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1485        match parallelism {
1486            StreamingParallelism::Adaptive => "adaptive".to_owned(),
1487            StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1488            StreamingParallelism::Custom => "custom".to_owned(),
1489        }
1490    }
1491
1492    pub async fn list_sink_actor_mapping(
1493        &self,
1494    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1495        let inner = self.inner.read().await;
1496        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1497            .select_only()
1498            .columns([sink::Column::SinkId, sink::Column::Name])
1499            .into_tuple()
1500            .all(&inner.db)
1501            .await?;
1502        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1503
1504        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1505
1506        let actor_with_type: Vec<(ActorId, SinkId)> = {
1507            let info = self.env.shared_actor_infos().read_guard();
1508
1509            info.iter_over_fragments()
1510                .filter(|(_, fragment)| {
1511                    sink_ids.contains(&fragment.job_id.as_sink_id())
1512                        && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1513                })
1514                .flat_map(|(_, fragment)| {
1515                    fragment
1516                        .actors
1517                        .keys()
1518                        .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1519                })
1520                .collect()
1521        };
1522
1523        let mut sink_actor_mapping = HashMap::new();
1524        for (actor_id, sink_id) in actor_with_type {
1525            sink_actor_mapping
1526                .entry(sink_id)
1527                .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1528                .1
1529                .push(actor_id);
1530        }
1531
1532        Ok(sink_actor_mapping)
1533    }
1534
1535    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1536        let inner = self.inner.read().await;
1537        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1538            .select_only()
1539            .columns([
1540                fragment::Column::FragmentId,
1541                fragment::Column::JobId,
1542                fragment::Column::StateTableIds,
1543            ])
1544            .into_partial_model()
1545            .all(&inner.db)
1546            .await?;
1547        Ok(fragment_state_tables)
1548    }
1549
1550    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1551    /// collected
1552    pub async fn load_all_actors_dynamic(
1553        &self,
1554        database_id: Option<DatabaseId>,
1555        worker_nodes: &ActiveStreamingWorkerNodes,
1556    ) -> MetaResult<FragmentRenderMap> {
1557        let loaded = self.load_fragment_context(database_id).await?;
1558
1559        if loaded.is_empty() {
1560            return Ok(HashMap::new());
1561        }
1562
1563        let adaptive_parallelism_strategy = {
1564            let system_params_reader = self.env.system_params_reader().await;
1565            system_params_reader.adaptive_parallelism_strategy()
1566        };
1567
1568        let RenderedGraph { fragments, .. } = render_actor_assignments(
1569            self.env.actor_id_generator(),
1570            worker_nodes.current(),
1571            adaptive_parallelism_strategy,
1572            &loaded,
1573        )?;
1574
1575        tracing::trace!(?fragments, "reload all actors");
1576
1577        Ok(fragments)
1578    }
1579
1580    /// Async load stage: collects all metadata required for rendering actor assignments.
1581    pub async fn load_fragment_context(
1582        &self,
1583        database_id: Option<DatabaseId>,
1584    ) -> MetaResult<LoadedFragmentContext> {
1585        let inner = self.inner.read().await;
1586        let txn = inner.db.begin().await?;
1587
1588        self.load_fragment_context_in_txn(&txn, database_id).await
1589    }
1590
1591    pub async fn load_fragment_context_in_txn<C>(
1592        &self,
1593        txn: &C,
1594        database_id: Option<DatabaseId>,
1595    ) -> MetaResult<LoadedFragmentContext>
1596    where
1597        C: ConnectionTrait,
1598    {
1599        let mut query = StreamingJob::find()
1600            .select_only()
1601            .column(streaming_job::Column::JobId);
1602
1603        if let Some(database_id) = database_id {
1604            query = query
1605                .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1606                .filter(object::Column::DatabaseId.eq(database_id));
1607        }
1608
1609        let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
1610
1611        let jobs: HashSet<JobId> = jobs.into_iter().collect();
1612
1613        if jobs.is_empty() {
1614            return Ok(LoadedFragmentContext::default());
1615        }
1616
1617        load_fragment_context_for_jobs(txn, jobs).await
1618    }
1619
1620    #[await_tree::instrument]
1621    pub async fn fill_snapshot_backfill_epoch(
1622        &self,
1623        fragment_ids: impl Iterator<Item = FragmentId>,
1624        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1625        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1626    ) -> MetaResult<()> {
1627        let inner = self.inner.write().await;
1628        let txn = inner.db.begin().await?;
1629        for fragment_id in fragment_ids {
1630            let fragment = FragmentModel::find_by_id(fragment_id)
1631                .one(&txn)
1632                .await?
1633                .context(format!("fragment {} not found", fragment_id))?;
1634            let mut node = fragment.stream_node.to_protobuf();
1635            if crate::stream::fill_snapshot_backfill_epoch(
1636                &mut node,
1637                snapshot_backfill_info,
1638                cross_db_snapshot_backfill_info,
1639            )? {
1640                let node = StreamNode::from(&node);
1641                FragmentModel::update(fragment::ActiveModel {
1642                    fragment_id: Set(fragment_id),
1643                    stream_node: Set(node),
1644                    ..Default::default()
1645                })
1646                .exec(&txn)
1647                .await?;
1648            }
1649        }
1650        txn.commit().await?;
1651        Ok(())
1652    }
1653
1654    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1655    pub fn get_running_actors_of_fragment(
1656        &self,
1657        fragment_id: FragmentId,
1658    ) -> MetaResult<HashSet<model::ActorId>> {
1659        let info = self.env.shared_actor_infos().read_guard();
1660
1661        let actors = info
1662            .get_fragment(fragment_id as _)
1663            .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1664            .unwrap_or_default();
1665
1666        Ok(actors)
1667    }
1668
1669    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1670    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1671    pub async fn get_running_actors_for_source_backfill(
1672        &self,
1673        source_backfill_fragment_id: FragmentId,
1674        source_fragment_id: FragmentId,
1675    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1676        let inner = self.inner.read().await;
1677        let txn = inner.db.begin().await?;
1678
1679        let fragment_relation: DispatcherType = FragmentRelation::find()
1680            .select_only()
1681            .column(fragment_relation::Column::DispatcherType)
1682            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1683            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1684            .into_tuple()
1685            .one(&txn)
1686            .await?
1687            .ok_or_else(|| {
1688                anyhow!(
1689                    "no fragment connection from source fragment {} to source backfill fragment {}",
1690                    source_fragment_id,
1691                    source_backfill_fragment_id
1692                )
1693            })?;
1694
1695        if fragment_relation != DispatcherType::NoShuffle {
1696            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1697        }
1698
1699        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1700            let result: MetaResult<DistributionType> = try {
1701                FragmentModel::find_by_id(fragment_id)
1702                    .select_only()
1703                    .column(fragment::Column::DistributionType)
1704                    .into_tuple()
1705                    .one(txn)
1706                    .await?
1707                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1708            };
1709            result
1710        };
1711
1712        let source_backfill_distribution_type =
1713            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1714        let source_distribution_type =
1715            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1716
1717        let load_fragment_actor_distribution =
1718            |actor_info: &SharedActorInfos,
1719             fragment_id: FragmentId|
1720             -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1721                let guard = actor_info.read_guard();
1722
1723                guard
1724                    .get_fragment(fragment_id as _)
1725                    .map(|fragment| {
1726                        fragment
1727                            .actors
1728                            .iter()
1729                            .map(|(actor_id, actor)| {
1730                                (
1731                                    *actor_id as _,
1732                                    actor
1733                                        .vnode_bitmap
1734                                        .as_ref()
1735                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1736                                )
1737                            })
1738                            .collect()
1739                    })
1740                    .unwrap_or_default()
1741            };
1742
1743        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1744            load_fragment_actor_distribution(
1745                self.env.shared_actor_infos(),
1746                source_backfill_fragment_id,
1747            );
1748
1749        let source_actors =
1750            load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1751
1752        Ok(resolve_no_shuffle_actor_mapping(
1753            source_distribution_type,
1754            source_actors.iter().map(|(&id, bitmap)| (id, bitmap)),
1755            source_backfill_distribution_type,
1756            source_backfill_actors
1757                .iter()
1758                .map(|(&id, bitmap)| (id, bitmap)),
1759        )
1760        .into_iter()
1761        .map(|(source_actor, source_backfill_actor)| {
1762            (source_backfill_actor as _, source_actor as _)
1763        })
1764        .collect())
1765    }
1766
1767    /// Get and filter the "**root**" fragments of the specified jobs.
1768    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1769    ///
1770    /// Root fragment connects to downstream jobs.
1771    ///
1772    /// ## What can be the root fragment
1773    /// - For sink, it should have one `Sink` fragment.
1774    /// - For MV, it should have one `MView` fragment.
1775    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1776    /// - For source, it should have one `Source` fragment.
1777    ///
1778    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1779    pub async fn get_root_fragments(
1780        &self,
1781        job_ids: Vec<JobId>,
1782    ) -> MetaResult<HashMap<JobId, Fragment>> {
1783        let inner = self.inner.read().await;
1784
1785        let all_fragments = FragmentModel::find()
1786            .filter(fragment::Column::JobId.is_in(job_ids))
1787            .all(&inner.db)
1788            .await?;
1789        // job_id -> fragment
1790        let mut root_fragments = HashMap::<JobId, Fragment>::new();
1791        for fragment in all_fragments {
1792            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1793            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1794                _ = root_fragments.insert(fragment.job_id, fragment.into());
1795            } else if mask.contains(FragmentTypeFlag::Source) {
1796                // look for Source fragment only if there's no MView fragment
1797                // (notice try_insert here vs insert above)
1798                _ = root_fragments.try_insert(fragment.job_id, fragment.into());
1799            }
1800        }
1801
1802        Ok(root_fragments)
1803    }
1804
1805    pub async fn get_root_fragment(&self, job_id: JobId) -> MetaResult<Fragment> {
1806        let mut root_fragments = self.get_root_fragments(vec![job_id]).await?;
1807        let root_fragment = root_fragments
1808            .remove(&job_id)
1809            .context(format!("root fragment for job {} not found", job_id))?;
1810
1811        Ok(root_fragment)
1812    }
1813
1814    /// Get the downstream fragments connected to the specified job.
1815    pub async fn get_downstream_fragments(
1816        &self,
1817        job_id: JobId,
1818    ) -> MetaResult<Vec<(stream_plan::DispatcherType, Fragment)>> {
1819        let root_fragment = self.get_root_fragment(job_id).await?;
1820
1821        let inner = self.inner.read().await;
1822        let txn = inner.db.begin().await?;
1823        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1824            .filter(
1825                fragment_relation::Column::SourceFragmentId
1826                    .eq(root_fragment.fragment_id as FragmentId),
1827            )
1828            .all(&txn)
1829            .await?;
1830
1831        let downstream_fragment_ids = downstream_fragment_relations
1832            .iter()
1833            .map(|model| model.target_fragment_id as FragmentId)
1834            .collect::<HashSet<_>>();
1835
1836        let downstream_fragments: Vec<fragment::Model> = FragmentModel::find()
1837            .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1838            .all(&txn)
1839            .await?;
1840
1841        let mut downstream_fragments_map: HashMap<_, _> = downstream_fragments
1842            .into_iter()
1843            .map(|fragment| (fragment.fragment_id, fragment))
1844            .collect();
1845
1846        let mut downstream_fragments = vec![];
1847
1848        let fragment_map: HashMap<_, _> = downstream_fragment_relations
1849            .iter()
1850            .map(|model| (model.target_fragment_id, model.dispatcher_type))
1851            .collect();
1852
1853        for (fragment_id, dispatcher_type) in fragment_map {
1854            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1855
1856            let fragment = downstream_fragments_map
1857                .remove(&fragment_id)
1858                .context(format!(
1859                    "downstream fragment node for id {} not found",
1860                    fragment_id
1861                ))?
1862                .into();
1863
1864            downstream_fragments.push((dispatch_type, fragment));
1865        }
1866        Ok(downstream_fragments)
1867    }
1868
1869    pub async fn load_source_fragment_ids(
1870        &self,
1871    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1872        let inner = self.inner.read().await;
1873        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1874            .select_only()
1875            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1876            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1877            .into_tuple()
1878            .all(&inner.db)
1879            .await?;
1880
1881        let mut source_fragment_ids = HashMap::new();
1882        for (fragment_id, stream_node) in fragments {
1883            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1884                source_fragment_ids
1885                    .entry(source_id)
1886                    .or_insert_with(BTreeSet::new)
1887                    .insert(fragment_id);
1888            }
1889        }
1890        Ok(source_fragment_ids)
1891    }
1892
1893    pub async fn load_backfill_fragment_ids(
1894        &self,
1895    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1896        let inner = self.inner.read().await;
1897        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1898            .select_only()
1899            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1900            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1901            .into_tuple()
1902            .all(&inner.db)
1903            .await?;
1904
1905        let mut source_fragment_ids = HashMap::new();
1906        for (fragment_id, stream_node) in fragments {
1907            if let Some((source_id, upstream_source_fragment_id)) =
1908                stream_node.to_protobuf().find_source_backfill()
1909            {
1910                source_fragment_ids
1911                    .entry(source_id)
1912                    .or_insert_with(BTreeSet::new)
1913                    .insert((fragment_id, upstream_source_fragment_id));
1914            }
1915        }
1916        Ok(source_fragment_ids)
1917    }
1918
1919    pub async fn get_all_upstream_sink_infos(
1920        &self,
1921        target_table: &PbTable,
1922        target_fragment_id: FragmentId,
1923    ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1924        let inner = self.inner.read().await;
1925        let txn = inner.db.begin().await?;
1926
1927        self.get_all_upstream_sink_infos_in_txn(&txn, target_table, target_fragment_id)
1928            .await
1929    }
1930
1931    pub async fn get_all_upstream_sink_infos_in_txn<C>(
1932        &self,
1933        txn: &C,
1934        target_table: &PbTable,
1935        target_fragment_id: FragmentId,
1936    ) -> MetaResult<Vec<UpstreamSinkInfo>>
1937    where
1938        C: ConnectionTrait,
1939    {
1940        let incoming_sinks = Sink::find()
1941            .filter(sink::Column::TargetTable.eq(target_table.id))
1942            .all(txn)
1943            .await?;
1944
1945        let sink_ids = incoming_sinks.iter().map(|s| s.sink_id).collect_vec();
1946        let sink_fragment_ids = get_sink_fragment_by_ids(txn, sink_ids).await?;
1947
1948        let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1949        for sink in &incoming_sinks {
1950            let sink_fragment_id =
1951                sink_fragment_ids
1952                    .get(&sink.sink_id)
1953                    .cloned()
1954                    .ok_or(anyhow::anyhow!(
1955                        "sink fragment not found for sink id {}",
1956                        sink.sink_id
1957                    ))?;
1958            let upstream_info = build_upstream_sink_info(
1959                sink.sink_id,
1960                sink.original_target_columns
1961                    .as_ref()
1962                    .map(|cols| cols.to_protobuf())
1963                    .unwrap_or_default(),
1964                sink_fragment_id,
1965                target_table,
1966                target_fragment_id,
1967            )?;
1968            upstream_sink_infos.push(upstream_info);
1969        }
1970
1971        Ok(upstream_sink_infos)
1972    }
1973
1974    pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1975        let inner = self.inner.read().await;
1976        let txn = inner.db.begin().await?;
1977
1978        let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1979            .select_only()
1980            .column(fragment::Column::FragmentId)
1981            .filter(
1982                fragment::Column::JobId
1983                    .eq(job_id)
1984                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1985            )
1986            .into_tuple()
1987            .all(&txn)
1988            .await?;
1989
1990        if mview_fragment.len() != 1 {
1991            return Err(anyhow::anyhow!(
1992                "expected exactly one mview fragment for job {}, found {}",
1993                job_id,
1994                mview_fragment.len()
1995            )
1996            .into());
1997        }
1998
1999        Ok(mview_fragment.into_iter().next().unwrap())
2000    }
2001
2002    pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
2003        let inner = self.inner.read().await;
2004        let txn = inner.db.begin().await?;
2005        has_table_been_migrated(&txn, table_id).await
2006    }
2007
2008    pub async fn update_fragment_splits<C>(
2009        &self,
2010        txn: &C,
2011        fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
2012    ) -> MetaResult<()>
2013    where
2014        C: ConnectionTrait,
2015    {
2016        if fragment_splits.is_empty() {
2017            return Ok(());
2018        }
2019
2020        let existing_fragment_ids: HashSet<FragmentId> = FragmentModel::find()
2021            .select_only()
2022            .column(fragment::Column::FragmentId)
2023            .filter(fragment::Column::FragmentId.is_in(fragment_splits.keys().copied()))
2024            .into_tuple()
2025            .all(txn)
2026            .await?
2027            .into_iter()
2028            .collect();
2029
2030        // Filter out stale fragment ids to avoid FK violations when split updates race with drop.
2031        let (models, skipped_fragment_ids): (Vec<_>, Vec<_>) = fragment_splits
2032            .iter()
2033            .partition_map(|(fragment_id, splits)| {
2034                if existing_fragment_ids.contains(fragment_id) {
2035                    Either::Left(fragment_splits::ActiveModel {
2036                        fragment_id: Set(*fragment_id as _),
2037                        splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
2038                            splits: splits.iter().map(Into::into).collect_vec(),
2039                        }))),
2040                    })
2041                } else {
2042                    Either::Right(*fragment_id)
2043                }
2044            });
2045
2046        if !skipped_fragment_ids.is_empty() {
2047            tracing::warn!(
2048                skipped_fragment_ids = ?skipped_fragment_ids,
2049                total_fragment_ids = fragment_splits.len(),
2050                "skipping stale fragment split updates for missing fragments"
2051            );
2052        }
2053
2054        if models.is_empty() {
2055            return Ok(());
2056        }
2057
2058        FragmentSplits::insert_many(models)
2059            .on_conflict(
2060                OnConflict::column(fragment_splits::Column::FragmentId)
2061                    .update_column(fragment_splits::Column::Splits)
2062                    .to_owned(),
2063            )
2064            .exec(txn)
2065            .await?;
2066
2067        Ok(())
2068    }
2069}
2070
2071#[cfg(test)]
2072mod tests {
2073    use std::collections::{BTreeMap, HashMap, HashSet};
2074
2075    use itertools::Itertools;
2076    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
2077    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
2078    use risingwave_common::id::JobId;
2079    use risingwave_common::util::iter_util::ZipEqDebug;
2080    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
2081    use risingwave_meta_model::fragment::DistributionType;
2082    use risingwave_meta_model::*;
2083    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
2084    use risingwave_pb::plan_common::PbExprContext;
2085    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
2086    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
2087    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
2088
2089    use super::ActorInfo;
2090    use crate::MetaResult;
2091    use crate::controller::catalog::CatalogController;
2092    use crate::model::{Fragment, StreamActor};
2093
2094    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
2095
2096    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
2097
2098    const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
2099
2100    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
2101
2102    const TEST_JOB_ID: JobId = JobId::new(1);
2103
2104    const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
2105
2106    fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
2107        let mut upstream_actor_ids = BTreeMap::new();
2108        upstream_actor_ids.insert(
2109            TEST_UPSTREAM_FRAGMENT_ID,
2110            HashSet::from_iter([(actor_id + 100)]),
2111        );
2112        upstream_actor_ids.insert(
2113            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
2114            HashSet::from_iter([(actor_id + 200)]),
2115        );
2116        upstream_actor_ids
2117    }
2118
2119    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
2120        let mut input = vec![];
2121        for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
2122            input.push(PbStreamNode {
2123                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
2124                    upstream_fragment_id,
2125                    ..Default::default()
2126                }))),
2127                ..Default::default()
2128            });
2129        }
2130
2131        PbStreamNode {
2132            input,
2133            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
2134            ..Default::default()
2135        }
2136    }
2137
2138    #[tokio::test]
2139    async fn test_extract_fragment() -> MetaResult<()> {
2140        let actor_count = 3u32;
2141        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2142            .map(|actor_id| {
2143                (
2144                    actor_id.into(),
2145                    generate_upstream_actor_ids_for_actor(actor_id.into()),
2146                )
2147            })
2148            .collect();
2149
2150        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
2151
2152        let pb_fragment = Fragment {
2153            fragment_id: TEST_FRAGMENT_ID as _,
2154            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
2155            distribution_type: PbFragmentDistributionType::Hash as _,
2156            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
2157            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
2158            nodes: stream_node,
2159        };
2160
2161        let fragment =
2162            CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
2163
2164        check_fragment(fragment, pb_fragment);
2165
2166        Ok(())
2167    }
2168
2169    #[tokio::test]
2170    async fn test_compose_fragment() -> MetaResult<()> {
2171        let actor_count = 3u32;
2172
2173        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2174            .map(|actor_id| {
2175                (
2176                    actor_id.into(),
2177                    generate_upstream_actor_ids_for_actor(actor_id.into()),
2178                )
2179            })
2180            .collect();
2181
2182        let mut actor_bitmaps = ActorMapping::new_uniform(
2183            (0..actor_count).map(|i| i.into()),
2184            VirtualNode::COUNT_FOR_TEST,
2185        )
2186        .to_bitmaps();
2187
2188        let actors = (0..actor_count)
2189            .map(|actor_id| {
2190                let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
2191                    splits: vec![PbConnectorSplit {
2192                        split_type: "dummy".to_owned(),
2193                        ..Default::default()
2194                    }],
2195                });
2196
2197                ActorInfo {
2198                    actor_id: actor_id.into(),
2199                    fragment_id: TEST_FRAGMENT_ID,
2200                    splits: actor_splits,
2201                    worker_id: 0.into(),
2202                    vnode_bitmap: actor_bitmaps
2203                        .remove(&actor_id)
2204                        .map(|bitmap| bitmap.to_protobuf())
2205                        .as_ref()
2206                        .map(VnodeBitmap::from),
2207                    expr_context: ExprContext::from(&PbExprContext {
2208                        time_zone: String::from("America/New_York"),
2209                        strict_mode: false,
2210                    }),
2211                    config_override: "a.b.c = true".into(),
2212                }
2213            })
2214            .collect_vec();
2215
2216        let stream_node = {
2217            let template_actor = actors.first().cloned().unwrap();
2218
2219            let template_upstream_actor_ids =
2220                upstream_actor_ids.get(&template_actor.actor_id).unwrap();
2221
2222            generate_merger_stream_node(template_upstream_actor_ids)
2223        };
2224
2225        #[expect(deprecated)]
2226        let fragment = fragment::Model {
2227            fragment_id: TEST_FRAGMENT_ID,
2228            job_id: TEST_JOB_ID,
2229            fragment_type_mask: 0,
2230            distribution_type: DistributionType::Hash,
2231            stream_node: StreamNode::from(&stream_node),
2232            state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
2233            upstream_fragment_id: Default::default(),
2234            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2235            parallelism: None,
2236        };
2237
2238        let (pb_fragment, pb_actors, pb_actor_status, pb_actor_splits) =
2239            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2240
2241        assert_eq!(pb_actor_status.len(), actor_count as usize);
2242        assert!(
2243            pb_actor_status
2244                .values()
2245                .all(|actor_status| actor_status.location.is_some())
2246        );
2247        assert_eq!(pb_actor_splits.len(), actor_count as usize);
2248
2249        check_fragment(fragment, pb_fragment);
2250        check_actors(
2251            actors,
2252            &upstream_actor_ids,
2253            pb_actors,
2254            pb_actor_splits,
2255            &stream_node,
2256        );
2257
2258        Ok(())
2259    }
2260
2261    fn check_actors(
2262        actors: Vec<ActorInfo>,
2263        actor_upstreams: &FragmentActorUpstreams,
2264        pb_actors: Vec<StreamActor>,
2265        pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2266        stream_node: &PbStreamNode,
2267    ) {
2268        for (
2269            ActorInfo {
2270                actor_id,
2271                fragment_id,
2272                splits,
2273                worker_id: _,
2274                vnode_bitmap,
2275                expr_context,
2276                ..
2277            },
2278            StreamActor {
2279                actor_id: pb_actor_id,
2280                fragment_id: pb_fragment_id,
2281                vnode_bitmap: pb_vnode_bitmap,
2282                mview_definition,
2283                expr_context: pb_expr_context,
2284                ..
2285            },
2286        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2287        {
2288            assert_eq!(actor_id, pb_actor_id as ActorId);
2289            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2290
2291            assert_eq!(
2292                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2293                pb_vnode_bitmap,
2294            );
2295
2296            assert_eq!(mview_definition, "");
2297
2298            visit_stream_node_body(stream_node, |body| {
2299                if let PbNodeBody::Merge(m) = body {
2300                    assert!(
2301                        actor_upstreams
2302                            .get(&actor_id)
2303                            .unwrap()
2304                            .contains_key(&m.upstream_fragment_id)
2305                    );
2306                }
2307            });
2308
2309            assert_eq!(
2310                splits,
2311                pb_actor_splits
2312                    .get(&pb_actor_id)
2313                    .map(ConnectorSplits::from)
2314                    .unwrap_or_default()
2315            );
2316
2317            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2318        }
2319    }
2320
2321    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2322        let Fragment {
2323            fragment_id,
2324            fragment_type_mask,
2325            distribution_type: pb_distribution_type,
2326            state_table_ids: pb_state_table_ids,
2327            maybe_vnode_count: _,
2328            nodes,
2329        } = pb_fragment;
2330
2331        assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2332        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2333        assert_eq!(
2334            pb_distribution_type,
2335            PbFragmentDistributionType::from(fragment.distribution_type)
2336        );
2337
2338        assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2339        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2340    }
2341
2342    #[test]
2343    fn test_parallelism_policy_with_root_fragments() {
2344        #[expect(deprecated)]
2345        let fragment = fragment::Model {
2346            fragment_id: 3.into(),
2347            job_id: TEST_JOB_ID,
2348            fragment_type_mask: 0,
2349            distribution_type: DistributionType::Hash,
2350            stream_node: StreamNode::from(&PbStreamNode::default()),
2351            state_table_ids: TableIdArray::default(),
2352            upstream_fragment_id: Default::default(),
2353            vnode_count: 0,
2354            parallelism: None,
2355        };
2356
2357        let job_parallelism = StreamingParallelism::Fixed(4);
2358
2359        let policy = super::CatalogController::format_fragment_parallelism_policy(
2360            fragment.distribution_type,
2361            fragment.parallelism.as_ref(),
2362            Some(&job_parallelism),
2363            &[],
2364        );
2365
2366        assert_eq!(policy, "inherit(fixed(4))");
2367    }
2368
2369    #[test]
2370    fn test_parallelism_policy_with_upstream_roots() {
2371        #[expect(deprecated)]
2372        let fragment = fragment::Model {
2373            fragment_id: 5.into(),
2374            job_id: TEST_JOB_ID,
2375            fragment_type_mask: 0,
2376            distribution_type: DistributionType::Hash,
2377            stream_node: StreamNode::from(&PbStreamNode::default()),
2378            state_table_ids: TableIdArray::default(),
2379            upstream_fragment_id: Default::default(),
2380            vnode_count: 0,
2381            parallelism: None,
2382        };
2383
2384        let policy = super::CatalogController::format_fragment_parallelism_policy(
2385            fragment.distribution_type,
2386            fragment.parallelism.as_ref(),
2387            None,
2388            &[3.into(), 1.into(), 2.into(), 1.into()],
2389        );
2390
2391        assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2392    }
2393}