risingwave_meta/controller/
fragment.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::{Either, Itertools};
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::CdcScanOptions;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34    Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39    TableIdArray, VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits,
40    object, sink, source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46    Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::fragment::{
49    FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan;
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57    PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, SinkLogStoreType,
58    StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63    ColumnTrait, ConnectionTrait, DatabaseTransaction, EntityTrait, FromQueryResult, JoinType,
64    PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, StreamTrait, TransactionTrait,
65};
66use serde::{Deserialize, Serialize};
67
68use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
69use crate::controller::catalog::CatalogController;
70use crate::controller::scale::{
71    FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble, RenderedGraph,
72    find_fragment_no_shuffle_dags_detailed, load_fragment_context_for_jobs,
73    render_actor_assignments, resolve_streaming_job_definition,
74};
75use crate::controller::utils::{
76    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
77    get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
78    resolve_no_shuffle_actor_dispatcher,
79};
80use crate::error::MetaError;
81use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
82use crate::model::{
83    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
84    StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
85    TableParallelism,
86};
87use crate::rpc::ddl_controller::build_upstream_sink_info;
88use crate::stream::UpstreamSinkInfo;
89use crate::{MetaResult, model};
90
91/// Some information of running (inflight) actors.
92#[derive(Debug)]
93pub struct InflightActorInfo {
94    pub worker_id: WorkerId,
95    pub vnode_bitmap: Option<Bitmap>,
96    pub splits: Vec<SplitImpl>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100struct ActorInfo {
101    pub actor_id: ActorId,
102    pub fragment_id: FragmentId,
103    pub splits: ConnectorSplits,
104    pub worker_id: WorkerId,
105    pub vnode_bitmap: Option<VnodeBitmap>,
106    pub expr_context: ExprContext,
107    pub config_override: Arc<str>,
108}
109
110#[derive(Debug)]
111struct FragmentDescRow {
112    fragment_id: FragmentId,
113    job_id: JobId,
114    fragment_type_mask: i32,
115    distribution_type: DistributionType,
116    state_table_ids: TableIdArray,
117    vnode_count: i32,
118    parallelism_override: Option<StreamingParallelism>,
119    stream_node: Option<StreamNode>,
120}
121
122#[derive(Debug)]
123pub struct InflightFragmentInfo {
124    pub fragment_id: FragmentId,
125    pub distribution_type: DistributionType,
126    pub fragment_type_mask: FragmentTypeMask,
127    pub vnode_count: usize,
128    pub nodes: PbStreamNode,
129    pub actors: HashMap<ActorId, InflightActorInfo>,
130    pub state_table_ids: HashSet<TableId>,
131}
132
133#[derive(Clone, Debug)]
134pub struct FragmentParallelismInfo {
135    pub distribution_type: FragmentDistributionType,
136    pub actor_count: usize,
137    pub vnode_count: usize,
138}
139
140#[easy_ext::ext(FragmentTypeMaskExt)]
141pub impl FragmentTypeMask {
142    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
143        Expr::col(fragment::Column::FragmentTypeMask)
144            .bit_and(Expr::value(flag as i32))
145            .ne(0)
146    }
147
148    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
149        Expr::col(fragment::Column::FragmentTypeMask)
150            .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
151            .ne(0)
152    }
153
154    fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
155        Expr::col(fragment::Column::FragmentTypeMask)
156            .bit_and(Expr::value(flag as i32))
157            .eq(0)
158    }
159}
160
161#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
162#[serde(rename_all = "camelCase")] // for dashboard
163pub struct StreamingJobInfo {
164    pub job_id: JobId,
165    pub obj_type: ObjectType,
166    pub name: String,
167    pub job_status: JobStatus,
168    pub parallelism: StreamingParallelism,
169    pub max_parallelism: i32,
170    pub resource_group: String,
171    pub config_override: String,
172    pub database_id: DatabaseId,
173    pub schema_id: SchemaId,
174}
175
176impl NotificationManager {
177    pub(crate) fn notify_fragment_mapping(
178        &self,
179        operation: NotificationOperation,
180        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
181    ) {
182        let fragment_ids = fragment_mappings
183            .iter()
184            .map(|mapping| mapping.fragment_id)
185            .collect_vec();
186        if fragment_ids.is_empty() {
187            return;
188        }
189        // notify all fragment mappings to frontend.
190        for fragment_mapping in fragment_mappings {
191            self.notify_frontend_without_version(
192                operation,
193                NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
194            );
195        }
196
197        // update serving vnode mappings.
198        match operation {
199            NotificationOperation::Add | NotificationOperation::Update => {
200                self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
201                    fragment_ids,
202                ));
203            }
204            NotificationOperation::Delete => {
205                self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
206                    fragment_ids,
207                ));
208            }
209            op => {
210                tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
211            }
212        }
213    }
214}
215
216impl CatalogController {
217    pub fn prepare_fragment_models_from_fragments(
218        job_id: JobId,
219        fragments: impl Iterator<Item = &Fragment>,
220    ) -> MetaResult<Vec<fragment::Model>> {
221        fragments
222            .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
223            .try_collect()
224    }
225
226    pub fn prepare_fragment_model_for_new_job(
227        job_id: JobId,
228        fragment: &Fragment,
229    ) -> MetaResult<fragment::Model> {
230        let vnode_count = fragment.vnode_count();
231        let Fragment {
232            fragment_id: pb_fragment_id,
233            fragment_type_mask: pb_fragment_type_mask,
234            distribution_type: pb_distribution_type,
235            actors: pb_actors,
236            state_table_ids: pb_state_table_ids,
237            nodes,
238            ..
239        } = fragment;
240
241        let state_table_ids = pb_state_table_ids.clone().into();
242
243        assert!(!pb_actors.is_empty());
244
245        let fragment_parallelism = nodes
246            .node_body
247            .as_ref()
248            .and_then(|body| match body {
249                NodeBody::StreamCdcScan(node) => Some(node),
250                _ => None,
251            })
252            .and_then(|node| node.options.as_ref())
253            .map(CdcScanOptions::from_proto)
254            .filter(|opts| opts.is_parallelized_backfill())
255            .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
256
257        let stream_node = StreamNode::from(nodes);
258
259        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
260            .unwrap()
261            .into();
262
263        #[expect(deprecated)]
264        let fragment = fragment::Model {
265            fragment_id: *pb_fragment_id as _,
266            job_id,
267            fragment_type_mask: (*pb_fragment_type_mask).into(),
268            distribution_type,
269            stream_node,
270            state_table_ids,
271            upstream_fragment_id: Default::default(),
272            vnode_count: vnode_count as _,
273            parallelism: fragment_parallelism,
274        };
275
276        Ok(fragment)
277    }
278
279    fn compose_table_fragments(
280        job_id: JobId,
281        state: PbState,
282        ctx: StreamContext,
283        fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
284        parallelism: StreamingParallelism,
285        max_parallelism: usize,
286        job_definition: Option<String>,
287    ) -> MetaResult<StreamJobFragments> {
288        let mut pb_fragments = BTreeMap::new();
289        let mut pb_actor_status = BTreeMap::new();
290
291        for (fragment, actors) in fragments {
292            let (fragment, fragment_actor_status, _) =
293                Self::compose_fragment(fragment, actors, job_definition.clone())?;
294
295            pb_fragments.insert(fragment.fragment_id, fragment);
296            pb_actor_status.extend(fragment_actor_status.into_iter());
297        }
298
299        let table_fragments = StreamJobFragments {
300            stream_job_id: job_id,
301            state: state as _,
302            fragments: pb_fragments,
303            actor_status: pb_actor_status,
304            ctx,
305            assigned_parallelism: match parallelism {
306                StreamingParallelism::Custom => TableParallelism::Custom,
307                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
308                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
309            },
310            max_parallelism,
311        };
312
313        Ok(table_fragments)
314    }
315
316    #[allow(clippy::type_complexity)]
317    fn compose_fragment(
318        fragment: fragment::Model,
319        actors: Vec<ActorInfo>,
320        job_definition: Option<String>,
321    ) -> MetaResult<(
322        Fragment,
323        HashMap<crate::model::ActorId, PbActorStatus>,
324        HashMap<crate::model::ActorId, PbConnectorSplits>,
325    )> {
326        let fragment::Model {
327            fragment_id,
328            fragment_type_mask,
329            distribution_type,
330            stream_node,
331            state_table_ids,
332            vnode_count,
333            ..
334        } = fragment;
335
336        let stream_node = stream_node.to_protobuf();
337        let mut upstream_fragments = HashSet::new();
338        visit_stream_node_body(&stream_node, |body| {
339            if let NodeBody::Merge(m) = body {
340                assert!(
341                    upstream_fragments.insert(m.upstream_fragment_id),
342                    "non-duplicate upstream fragment"
343                );
344            }
345        });
346
347        let mut pb_actors = vec![];
348
349        let mut pb_actor_status = HashMap::new();
350        let mut pb_actor_splits = HashMap::new();
351
352        for actor in actors {
353            if actor.fragment_id != fragment_id {
354                bail!(
355                    "fragment id {} from actor {} is different from fragment {}",
356                    actor.fragment_id,
357                    actor.actor_id,
358                    fragment_id
359                )
360            }
361
362            let ActorInfo {
363                actor_id,
364                fragment_id,
365                worker_id,
366                splits,
367                vnode_bitmap,
368                expr_context,
369                config_override,
370                ..
371            } = actor;
372
373            let vnode_bitmap =
374                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
375            let pb_expr_context = Some(expr_context.to_protobuf());
376
377            pb_actor_status.insert(
378                actor_id as _,
379                PbActorStatus {
380                    location: PbActorLocation::from_worker(worker_id),
381                },
382            );
383
384            pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
385
386            pb_actors.push(StreamActor {
387                actor_id: actor_id as _,
388                fragment_id: fragment_id as _,
389                vnode_bitmap,
390                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
391                expr_context: pb_expr_context,
392                config_override,
393            })
394        }
395
396        let pb_state_table_ids = state_table_ids.0;
397        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
398        let pb_fragment = Fragment {
399            fragment_id: fragment_id as _,
400            fragment_type_mask: fragment_type_mask.into(),
401            distribution_type: pb_distribution_type,
402            actors: pb_actors,
403            state_table_ids: pb_state_table_ids,
404            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
405            nodes: stream_node,
406        };
407
408        Ok((pb_fragment, pb_actor_status, pb_actor_splits))
409    }
410
411    pub fn running_fragment_parallelisms(
412        &self,
413        id_filter: Option<HashSet<FragmentId>>,
414    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
415        let info = self.env.shared_actor_infos().read_guard();
416
417        let mut result = HashMap::new();
418        for (fragment_id, fragment) in info.iter_over_fragments() {
419            if let Some(id_filter) = &id_filter
420                && !id_filter.contains(fragment_id)
421            {
422                continue; // Skip fragments not in the filter
423            }
424
425            result.insert(
426                *fragment_id as _,
427                FragmentParallelismInfo {
428                    distribution_type: fragment.distribution_type.into(),
429                    actor_count: fragment.actors.len() as _,
430                    vnode_count: fragment.vnode_count,
431                },
432            );
433        }
434
435        Ok(result)
436    }
437
438    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
439        let inner = self.inner.read().await;
440        let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
441            .select_only()
442            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
443            .into_tuple()
444            .all(&inner.db)
445            .await?;
446        Ok(fragment_jobs.into_iter().collect())
447    }
448
449    pub async fn get_fragment_job_id(
450        &self,
451        fragment_ids: Vec<FragmentId>,
452    ) -> MetaResult<Vec<ObjectId>> {
453        let inner = self.inner.read().await;
454        self.get_fragment_job_id_in_txn(&inner.db, fragment_ids)
455            .await
456    }
457
458    pub async fn get_fragment_job_id_in_txn<C>(
459        &self,
460        txn: &C,
461        fragment_ids: Vec<FragmentId>,
462    ) -> MetaResult<Vec<ObjectId>>
463    where
464        C: ConnectionTrait + Send,
465    {
466        let object_ids: Vec<ObjectId> = FragmentModel::find()
467            .select_only()
468            .column(fragment::Column::JobId)
469            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
470            .into_tuple()
471            .all(txn)
472            .await?;
473
474        Ok(object_ids)
475    }
476
477    pub async fn get_fragment_desc_by_id(
478        &self,
479        fragment_id: FragmentId,
480    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
481        let inner = self.inner.read().await;
482
483        let fragment_model = match FragmentModel::find_by_id(fragment_id)
484            .one(&inner.db)
485            .await?
486        {
487            Some(fragment) => fragment,
488            None => return Ok(None),
489        };
490
491        let job_parallelism: Option<StreamingParallelism> =
492            StreamingJob::find_by_id(fragment_model.job_id)
493                .select_only()
494                .column(streaming_job::Column::Parallelism)
495                .into_tuple()
496                .one(&inner.db)
497                .await?;
498
499        let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
500            .select_only()
501            .columns([
502                fragment_relation::Column::SourceFragmentId,
503                fragment_relation::Column::DispatcherType,
504            ])
505            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
506            .into_tuple()
507            .all(&inner.db)
508            .await?;
509
510        let upstreams: Vec<_> = upstream_entries
511            .into_iter()
512            .map(|(source_id, _)| source_id)
513            .collect();
514
515        let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
516            .await
517            .map(Self::collect_root_fragment_mapping)?;
518        let root_fragments = root_fragment_map
519            .get(&fragment_id)
520            .cloned()
521            .unwrap_or_default();
522
523        let info = self.env.shared_actor_infos().read_guard();
524        let SharedFragmentInfo { actors, .. } = info
525            .get_fragment(fragment_model.fragment_id as _)
526            .unwrap_or_else(|| {
527                panic!(
528                    "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
529                    fragment_model.fragment_id,
530                    fragment_model.job_id
531                )
532            });
533
534        let parallelism_policy = Self::format_fragment_parallelism_policy(
535            fragment_model.distribution_type,
536            fragment_model.parallelism.as_ref(),
537            job_parallelism.as_ref(),
538            &root_fragments,
539        );
540
541        let fragment = FragmentDesc {
542            fragment_id: fragment_model.fragment_id,
543            job_id: fragment_model.job_id,
544            fragment_type_mask: fragment_model.fragment_type_mask,
545            distribution_type: fragment_model.distribution_type,
546            state_table_ids: fragment_model.state_table_ids.clone(),
547            parallelism: actors.len() as _,
548            vnode_count: fragment_model.vnode_count,
549            stream_node: fragment_model.stream_node.clone(),
550            parallelism_policy,
551        };
552
553        Ok(Some((fragment, upstreams)))
554    }
555
556    pub async fn list_fragment_database_ids(
557        &self,
558        select_fragment_ids: Option<Vec<FragmentId>>,
559    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
560        let inner = self.inner.read().await;
561        let select = FragmentModel::find()
562            .select_only()
563            .column(fragment::Column::FragmentId)
564            .column(object::Column::DatabaseId)
565            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
566        let select = if let Some(select_fragment_ids) = select_fragment_ids {
567            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
568        } else {
569            select
570        };
571        Ok(select.into_tuple().all(&inner.db).await?)
572    }
573
574    pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
575        let inner = self.inner.read().await;
576
577        // Load fragments matching the job from the database
578        let fragments: Vec<_> = FragmentModel::find()
579            .filter(fragment::Column::JobId.eq(job_id))
580            .all(&inner.db)
581            .await?;
582
583        let job_info = StreamingJob::find_by_id(job_id)
584            .one(&inner.db)
585            .await?
586            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
587
588        let fragment_actors =
589            self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
590
591        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
592            .await?
593            .remove(&job_id);
594
595        Self::compose_table_fragments(
596            job_id,
597            job_info.job_status.into(),
598            job_info.stream_context(),
599            fragment_actors,
600            job_info.parallelism.clone(),
601            job_info.max_parallelism as _,
602            job_definition,
603        )
604    }
605
606    pub async fn get_fragment_actor_dispatchers(
607        &self,
608        fragment_ids: Vec<FragmentId>,
609    ) -> MetaResult<FragmentActorDispatchers> {
610        let inner = self.inner.read().await;
611
612        self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
613            .await
614    }
615
616    pub async fn get_fragment_actor_dispatchers_txn(
617        &self,
618        c: &impl ConnectionTrait,
619        fragment_ids: Vec<FragmentId>,
620    ) -> MetaResult<FragmentActorDispatchers> {
621        let fragment_relations = FragmentRelation::find()
622            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
623            .all(c)
624            .await?;
625
626        type FragmentActorInfo = (
627            DistributionType,
628            Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
629        );
630
631        let shared_info = self.env.shared_actor_infos();
632        let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
633        let get_fragment_actors = |fragment_id: FragmentId| async move {
634            let result: MetaResult<FragmentActorInfo> = try {
635                let read_guard = shared_info.read_guard();
636
637                let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
638
639                (
640                    fragment.distribution_type,
641                    Arc::new(
642                        fragment
643                            .actors
644                            .iter()
645                            .map(|(actor_id, actor_info)| {
646                                (
647                                    *actor_id,
648                                    actor_info
649                                        .vnode_bitmap
650                                        .as_ref()
651                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
652                                )
653                            })
654                            .collect(),
655                    ),
656                )
657            };
658            result
659        };
660
661        let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
662        for fragment_relation::Model {
663            source_fragment_id,
664            target_fragment_id,
665            dispatcher_type,
666            dist_key_indices,
667            output_indices,
668            output_type_mapping,
669        } in fragment_relations
670        {
671            let (source_fragment_distribution, source_fragment_actors) = {
672                let (distribution, actors) = {
673                    match fragment_actor_cache.entry(source_fragment_id) {
674                        Entry::Occupied(entry) => entry.into_mut(),
675                        Entry::Vacant(entry) => {
676                            entry.insert(get_fragment_actors(source_fragment_id).await?)
677                        }
678                    }
679                };
680                (*distribution, actors.clone())
681            };
682            let (target_fragment_distribution, target_fragment_actors) = {
683                let (distribution, actors) = {
684                    match fragment_actor_cache.entry(target_fragment_id) {
685                        Entry::Occupied(entry) => entry.into_mut(),
686                        Entry::Vacant(entry) => {
687                            entry.insert(get_fragment_actors(target_fragment_id).await?)
688                        }
689                    }
690                };
691                (*distribution, actors.clone())
692            };
693            let output_mapping = PbDispatchOutputMapping {
694                indices: output_indices.into_u32_array(),
695                types: output_type_mapping.unwrap_or_default().to_protobuf(),
696            };
697            let dispatchers = compose_dispatchers(
698                source_fragment_distribution,
699                &source_fragment_actors,
700                target_fragment_id as _,
701                target_fragment_distribution,
702                &target_fragment_actors,
703                dispatcher_type,
704                dist_key_indices.into_u32_array(),
705                output_mapping,
706            );
707            let actor_dispatchers_map = actor_dispatchers_map
708                .entry(source_fragment_id as _)
709                .or_default();
710            for (actor_id, dispatchers) in dispatchers {
711                actor_dispatchers_map
712                    .entry(actor_id as _)
713                    .or_default()
714                    .push(dispatchers);
715            }
716        }
717        Ok(actor_dispatchers_map)
718    }
719
720    pub async fn get_fragment_downstream_relations(
721        &self,
722        fragment_ids: Vec<FragmentId>,
723    ) -> MetaResult<FragmentDownstreamRelation> {
724        let inner = self.inner.read().await;
725        self.get_fragment_downstream_relations_in_txn(&inner.db, fragment_ids)
726            .await
727    }
728
729    pub async fn get_fragment_downstream_relations_in_txn<C>(
730        &self,
731        txn: &C,
732        fragment_ids: Vec<FragmentId>,
733    ) -> MetaResult<FragmentDownstreamRelation>
734    where
735        C: ConnectionTrait + StreamTrait + Send,
736    {
737        let mut stream = FragmentRelation::find()
738            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
739            .stream(txn)
740            .await?;
741        let mut relations = FragmentDownstreamRelation::new();
742        while let Some(relation) = stream.try_next().await? {
743            relations
744                .entry(relation.source_fragment_id as _)
745                .or_default()
746                .push(DownstreamFragmentRelation {
747                    downstream_fragment_id: relation.target_fragment_id as _,
748                    dispatcher_type: relation.dispatcher_type,
749                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
750                    output_mapping: PbDispatchOutputMapping {
751                        indices: relation.output_indices.into_u32_array(),
752                        types: relation
753                            .output_type_mapping
754                            .unwrap_or_default()
755                            .to_protobuf(),
756                    },
757                });
758        }
759        Ok(relations)
760    }
761
762    pub async fn get_job_fragment_backfill_scan_type(
763        &self,
764        job_id: JobId,
765    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
766        let inner = self.inner.read().await;
767        self.get_job_fragment_backfill_scan_type_in_txn(&inner.db, job_id)
768            .await
769    }
770
771    pub async fn get_job_fragment_backfill_scan_type_in_txn<C>(
772        &self,
773        txn: &C,
774        job_id: JobId,
775    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>>
776    where
777        C: ConnectionTrait + Send,
778    {
779        let fragments: Vec<_> = FragmentModel::find()
780            .filter(fragment::Column::JobId.eq(job_id))
781            .all(txn)
782            .await?;
783
784        let mut result = HashMap::new();
785
786        for fragment::Model {
787            fragment_id,
788            stream_node,
789            ..
790        } in fragments
791        {
792            let stream_node = stream_node.to_protobuf();
793            visit_stream_node_body(&stream_node, |body| {
794                if let NodeBody::StreamScan(node) = body {
795                    match node.stream_scan_type() {
796                        StreamScanType::Unspecified => {}
797                        scan_type => {
798                            result.insert(fragment_id as crate::model::FragmentId, scan_type);
799                        }
800                    }
801                }
802            });
803        }
804
805        Ok(result)
806    }
807
808    pub async fn count_streaming_jobs(&self) -> MetaResult<usize> {
809        let inner = self.inner.read().await;
810        let count = StreamingJob::find().count(&inner.db).await?;
811        Ok(usize::try_from(count).context("streaming job count overflow")?)
812    }
813
814    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
815        let inner = self.inner.read().await;
816        let job_states = StreamingJob::find()
817            .select_only()
818            .column(streaming_job::Column::JobId)
819            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
820            .join(JoinType::InnerJoin, object::Relation::Database2.def())
821            .column(object::Column::ObjType)
822            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
823            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
824            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
825            .column_as(
826                Expr::if_null(
827                    Expr::col((table::Entity, table::Column::Name)),
828                    Expr::if_null(
829                        Expr::col((source::Entity, source::Column::Name)),
830                        Expr::if_null(
831                            Expr::col((sink::Entity, sink::Column::Name)),
832                            Expr::val("<unknown>"),
833                        ),
834                    ),
835                ),
836                "name",
837            )
838            .columns([
839                streaming_job::Column::JobStatus,
840                streaming_job::Column::Parallelism,
841                streaming_job::Column::MaxParallelism,
842            ])
843            .column_as(
844                Expr::if_null(
845                    Expr::col((
846                        streaming_job::Entity,
847                        streaming_job::Column::SpecificResourceGroup,
848                    )),
849                    Expr::col((database::Entity, database::Column::ResourceGroup)),
850                ),
851                "resource_group",
852            )
853            .column_as(
854                Expr::if_null(
855                    Expr::col((streaming_job::Entity, streaming_job::Column::ConfigOverride)),
856                    Expr::val(""),
857                ),
858                "config_override",
859            )
860            .column(object::Column::DatabaseId)
861            .column(object::Column::SchemaId)
862            .into_model()
863            .all(&inner.db)
864            .await?;
865        Ok(job_states)
866    }
867
868    pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
869        let inner = self.inner.read().await;
870        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
871            .select_only()
872            .column(streaming_job::Column::MaxParallelism)
873            .into_tuple()
874            .one(&inner.db)
875            .await?
876            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
877        Ok(max_parallelism as usize)
878    }
879
880    /// Try to get internal table ids of each streaming job, used by metrics collection.
881    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
882        if let Ok(inner) = self.inner.try_read()
883            && let Ok(job_state_tables) = FragmentModel::find()
884                .select_only()
885                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
886                .into_tuple::<(JobId, I32Array)>()
887                .all(&inner.db)
888                .await
889        {
890            let mut job_internal_table_ids = HashMap::new();
891            for (job_id, state_table_ids) in job_state_tables {
892                job_internal_table_ids
893                    .entry(job_id)
894                    .or_insert_with(Vec::new)
895                    .extend(
896                        state_table_ids
897                            .into_inner()
898                            .into_iter()
899                            .map(|table_id| TableId::new(table_id as _)),
900                    );
901            }
902            return Some(job_internal_table_ids.into_iter().collect());
903        }
904        None
905    }
906
907    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
908        let inner = self.inner.read().await;
909        let count = FragmentModel::find().count(&inner.db).await?;
910        Ok(count > 0)
911    }
912
913    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
914        let read_guard = self.env.shared_actor_infos().read_guard();
915        let actor_cnt: HashMap<WorkerId, _> = read_guard
916            .iter_over_fragments()
917            .flat_map(|(_, fragment)| {
918                fragment
919                    .actors
920                    .iter()
921                    .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
922            })
923            .into_group_map()
924            .into_iter()
925            .map(|(k, v)| (k, v.len()))
926            .collect();
927
928        Ok(actor_cnt)
929    }
930
931    fn collect_fragment_actor_map(
932        &self,
933        fragment_ids: &[FragmentId],
934        stream_context: StreamContext,
935    ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
936        let guard = self.env.shared_actor_infos().read_guard();
937        let pb_expr_context = stream_context.to_expr_context();
938        let expr_context: ExprContext = (&pb_expr_context).into();
939
940        let mut actor_map = HashMap::with_capacity(fragment_ids.len());
941        for fragment_id in fragment_ids {
942            let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
943                anyhow!("fragment {} not found in shared actor info", fragment_id)
944            })?;
945
946            let actors = fragment_info
947                .actors
948                .iter()
949                .map(|(actor_id, actor_info)| ActorInfo {
950                    actor_id: *actor_id as _,
951                    fragment_id: *fragment_id,
952                    splits: ConnectorSplits::from(&PbConnectorSplits {
953                        splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
954                    }),
955                    worker_id: actor_info.worker_id as _,
956                    vnode_bitmap: actor_info
957                        .vnode_bitmap
958                        .as_ref()
959                        .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
960                    expr_context: expr_context.clone(),
961                    config_override: stream_context.config_override.clone(),
962                })
963                .collect();
964
965            actor_map.insert(*fragment_id, actors);
966        }
967
968        Ok(actor_map)
969    }
970
971    fn collect_fragment_actor_pairs(
972        &self,
973        fragments: Vec<fragment::Model>,
974        stream_context: StreamContext,
975    ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
976        let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
977        let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
978        fragments
979            .into_iter()
980            .map(|fragment| {
981                let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
982                    anyhow!(
983                        "fragment {} missing in shared actor info map",
984                        fragment.fragment_id
985                    )
986                })?;
987                Ok((fragment, actors))
988            })
989            .collect()
990    }
991
992    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
993    pub async fn table_fragments(&self) -> MetaResult<BTreeMap<JobId, StreamJobFragments>> {
994        let inner = self.inner.read().await;
995        let jobs = StreamingJob::find().all(&inner.db).await?;
996
997        let mut job_definition = resolve_streaming_job_definition(
998            &inner.db,
999            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
1000        )
1001        .await?;
1002
1003        let mut table_fragments = BTreeMap::new();
1004        for job in jobs {
1005            let fragments = FragmentModel::find()
1006                .filter(fragment::Column::JobId.eq(job.job_id))
1007                .all(&inner.db)
1008                .await?;
1009
1010            let fragment_actors =
1011                self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
1012
1013            table_fragments.insert(
1014                job.job_id,
1015                Self::compose_table_fragments(
1016                    job.job_id,
1017                    job.job_status.into(),
1018                    job.stream_context(),
1019                    fragment_actors,
1020                    job.parallelism.clone(),
1021                    job.max_parallelism as _,
1022                    job_definition.remove(&job.job_id),
1023                )?,
1024            );
1025        }
1026
1027        Ok(table_fragments)
1028    }
1029
1030    pub async fn upstream_fragments(
1031        &self,
1032        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1033    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
1034        let inner = self.inner.read().await;
1035        self.upstream_fragments_in_txn(&inner.db, fragment_ids)
1036            .await
1037    }
1038
1039    pub async fn upstream_fragments_in_txn<C>(
1040        &self,
1041        txn: &C,
1042        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1043    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>>
1044    where
1045        C: ConnectionTrait + StreamTrait + Send,
1046    {
1047        let mut stream = FragmentRelation::find()
1048            .select_only()
1049            .columns([
1050                fragment_relation::Column::SourceFragmentId,
1051                fragment_relation::Column::TargetFragmentId,
1052            ])
1053            .filter(
1054                fragment_relation::Column::TargetFragmentId
1055                    .is_in(fragment_ids.map(|id| id as FragmentId)),
1056            )
1057            .into_tuple::<(FragmentId, FragmentId)>()
1058            .stream(txn)
1059            .await?;
1060        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
1061        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
1062            upstream_fragments
1063                .entry(downstream_fragment_id as crate::model::FragmentId)
1064                .or_default()
1065                .insert(upstream_fragment_id as crate::model::FragmentId);
1066        }
1067        Ok(upstream_fragments)
1068    }
1069
1070    pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
1071        let info = self.env.shared_actor_infos().read_guard();
1072
1073        let actor_locations = info
1074            .iter_over_fragments()
1075            .flat_map(|(fragment_id, fragment)| {
1076                fragment
1077                    .actors
1078                    .iter()
1079                    .map(|(actor_id, actor)| PartialActorLocation {
1080                        actor_id: *actor_id as _,
1081                        fragment_id: *fragment_id as _,
1082                        worker_id: actor.worker_id,
1083                    })
1084            })
1085            .collect_vec();
1086
1087        Ok(actor_locations)
1088    }
1089
1090    pub async fn list_actor_info(
1091        &self,
1092    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1093        let inner = self.inner.read().await;
1094
1095        let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1096            FragmentModel::find()
1097                .select_only()
1098                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1099                .column(fragment::Column::FragmentId)
1100                .column_as(object::Column::Oid, "job_id")
1101                .column_as(object::Column::SchemaId, "schema_id")
1102                .column_as(object::Column::ObjType, "type")
1103                .into_tuple()
1104                .all(&inner.db)
1105                .await?;
1106
1107        let actor_infos = {
1108            let info = self.env.shared_actor_infos().read_guard();
1109
1110            let mut result = Vec::new();
1111
1112            for (fragment_id, object_id, schema_id, object_type) in fragment_objects {
1113                let Some(fragment) = info.get_fragment(fragment_id as _) else {
1114                    return Err(MetaError::unavailable(format!(
1115                        "shared actor info missing for fragment {fragment_id} while listing actors"
1116                    )));
1117                };
1118
1119                for actor_id in fragment.actors.keys() {
1120                    result.push((
1121                        *actor_id as _,
1122                        fragment.fragment_id as _,
1123                        object_id,
1124                        schema_id,
1125                        object_type,
1126                    ));
1127                }
1128            }
1129
1130            result
1131        };
1132
1133        Ok(actor_infos)
1134    }
1135
1136    pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1137        let guard = self.env.shared_actor_info.read_guard();
1138        guard
1139            .iter_over_fragments()
1140            .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1141            .collect_vec()
1142    }
1143
1144    pub async fn list_fragment_descs_with_node(
1145        &self,
1146        is_creating: bool,
1147    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1148        let inner = self.inner.read().await;
1149        let txn = inner.db.begin().await?;
1150
1151        let fragments_query = Self::build_fragment_query(is_creating);
1152        let fragments: Vec<fragment::Model> = fragments_query.all(&txn).await?;
1153
1154        let rows = fragments
1155            .into_iter()
1156            .map(|fragment| FragmentDescRow {
1157                fragment_id: fragment.fragment_id,
1158                job_id: fragment.job_id,
1159                fragment_type_mask: fragment.fragment_type_mask,
1160                distribution_type: fragment.distribution_type,
1161                state_table_ids: fragment.state_table_ids,
1162                vnode_count: fragment.vnode_count,
1163                parallelism_override: fragment.parallelism,
1164                stream_node: Some(fragment.stream_node),
1165            })
1166            .collect_vec();
1167
1168        self.build_fragment_distributions(&txn, rows).await
1169    }
1170
1171    pub async fn list_fragment_descs_without_node(
1172        &self,
1173        is_creating: bool,
1174    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1175        let inner = self.inner.read().await;
1176        let txn = inner.db.begin().await?;
1177
1178        let fragments_query = Self::build_fragment_query(is_creating);
1179        #[allow(clippy::type_complexity)]
1180        let fragments: Vec<(
1181            FragmentId,
1182            JobId,
1183            i32,
1184            DistributionType,
1185            TableIdArray,
1186            i32,
1187            Option<StreamingParallelism>,
1188        )> = fragments_query
1189            .select_only()
1190            .columns([
1191                fragment::Column::FragmentId,
1192                fragment::Column::JobId,
1193                fragment::Column::FragmentTypeMask,
1194                fragment::Column::DistributionType,
1195                fragment::Column::StateTableIds,
1196                fragment::Column::VnodeCount,
1197                fragment::Column::Parallelism,
1198            ])
1199            .into_tuple()
1200            .all(&txn)
1201            .await?;
1202
1203        let rows = fragments
1204            .into_iter()
1205            .map(
1206                |(
1207                    fragment_id,
1208                    job_id,
1209                    fragment_type_mask,
1210                    distribution_type,
1211                    state_table_ids,
1212                    vnode_count,
1213                    parallelism_override,
1214                )| FragmentDescRow {
1215                    fragment_id,
1216                    job_id,
1217                    fragment_type_mask,
1218                    distribution_type,
1219                    state_table_ids,
1220                    vnode_count,
1221                    parallelism_override,
1222                    stream_node: None,
1223                },
1224            )
1225            .collect_vec();
1226
1227        self.build_fragment_distributions(&txn, rows).await
1228    }
1229
1230    fn build_fragment_query(is_creating: bool) -> sea_orm::Select<fragment::Entity> {
1231        if is_creating {
1232            FragmentModel::find()
1233                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1234                .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1235                .filter(
1236                    streaming_job::Column::JobStatus
1237                        .eq(JobStatus::Initial)
1238                        .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1239                )
1240        } else {
1241            FragmentModel::find()
1242        }
1243    }
1244
1245    async fn build_fragment_distributions(
1246        &self,
1247        txn: &DatabaseTransaction,
1248        rows: Vec<FragmentDescRow>,
1249    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1250        let fragment_ids = rows.iter().map(|row| row.fragment_id).collect_vec();
1251        let job_ids = rows.iter().map(|row| row.job_id).unique().collect_vec();
1252
1253        let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragment_ids.is_empty() {
1254            HashMap::new()
1255        } else {
1256            StreamingJob::find()
1257                .select_only()
1258                .columns([
1259                    streaming_job::Column::JobId,
1260                    streaming_job::Column::Parallelism,
1261                ])
1262                .filter(streaming_job::Column::JobId.is_in(job_ids))
1263                .into_tuple()
1264                .all(txn)
1265                .await?
1266                .into_iter()
1267                .collect()
1268        };
1269
1270        let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1271            if fragment_ids.is_empty() {
1272                Vec::new()
1273            } else {
1274                FragmentRelation::find()
1275                    .select_only()
1276                    .columns([
1277                        fragment_relation::Column::TargetFragmentId,
1278                        fragment_relation::Column::SourceFragmentId,
1279                        fragment_relation::Column::DispatcherType,
1280                    ])
1281                    .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1282                    .into_tuple()
1283                    .all(txn)
1284                    .await?
1285            };
1286
1287        let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1288        for (target_id, source_id, _) in upstream_entries {
1289            all_upstreams.entry(target_id).or_default().push(source_id);
1290        }
1291
1292        let root_fragment_map = if fragment_ids.is_empty() {
1293            HashMap::new()
1294        } else {
1295            let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_ids).await?;
1296            Self::collect_root_fragment_mapping(ensembles)
1297        };
1298
1299        let guard = self.env.shared_actor_info.read_guard();
1300        let mut result = Vec::with_capacity(rows.len());
1301
1302        for row in rows {
1303            let parallelism = guard
1304                .get_fragment(row.fragment_id as _)
1305                .map(|fragment| fragment.actors.len())
1306                .unwrap_or_default();
1307
1308            let root_fragments = root_fragment_map
1309                .get(&row.fragment_id)
1310                .cloned()
1311                .unwrap_or_default();
1312
1313            let upstreams = all_upstreams.remove(&row.fragment_id).unwrap_or_default();
1314
1315            let parallelism_policy = Self::format_fragment_parallelism_policy(
1316                row.distribution_type,
1317                row.parallelism_override.as_ref(),
1318                job_parallelisms.get(&row.job_id),
1319                &root_fragments,
1320            );
1321
1322            let fragment = FragmentDistribution {
1323                fragment_id: row.fragment_id,
1324                table_id: row.job_id,
1325                distribution_type: PbFragmentDistributionType::from(row.distribution_type) as _,
1326                state_table_ids: row.state_table_ids.0,
1327                upstream_fragment_ids: upstreams.clone(),
1328                fragment_type_mask: row.fragment_type_mask as _,
1329                parallelism: parallelism as _,
1330                vnode_count: row.vnode_count as _,
1331                node: row.stream_node.map(|node| node.to_protobuf()),
1332                parallelism_policy,
1333            };
1334
1335            result.push((fragment, upstreams));
1336        }
1337
1338        Ok(result)
1339    }
1340
1341    pub async fn list_sink_log_store_tables(&self) -> MetaResult<Vec<(SinkId, TableId)>> {
1342        let inner = self.inner.read().await;
1343        let txn = inner.db.begin().await?;
1344
1345        let fragments: Vec<(JobId, StreamNode)> = FragmentModel::find()
1346            .select_only()
1347            .columns([fragment::Column::JobId, fragment::Column::StreamNode])
1348            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Sink))
1349            .into_tuple()
1350            .all(&txn)
1351            .await?;
1352
1353        let mut mapping: HashMap<SinkId, TableId> = HashMap::new();
1354
1355        for (job_id, stream_node) in fragments {
1356            let sink_id = SinkId::new(job_id.as_raw_id());
1357            let stream_node = stream_node.to_protobuf();
1358            let mut internal_table_id: Option<TableId> = None;
1359
1360            visit_stream_node_body(&stream_node, |body| {
1361                if let NodeBody::Sink(node) = body
1362                    && node.log_store_type == SinkLogStoreType::KvLogStore as i32
1363                    && let Some(table) = &node.table
1364                {
1365                    internal_table_id = Some(TableId::new(table.id.as_raw_id()));
1366                }
1367            });
1368
1369            if let Some(table_id) = internal_table_id
1370                && let Some(existing) = mapping.insert(sink_id, table_id)
1371                && existing != table_id
1372            {
1373                tracing::warn!(
1374                    "sink {sink_id:?} has multiple log store tables: {existing:?} vs {table_id:?}"
1375                );
1376            }
1377        }
1378
1379        Ok(mapping.into_iter().collect())
1380    }
1381
1382    /// Build a fragment-to-root lookup for all reported root fragment ensembles.
1383    fn collect_root_fragment_mapping(
1384        ensembles: Vec<NoShuffleEnsemble>,
1385    ) -> HashMap<FragmentId, Vec<FragmentId>> {
1386        let mut mapping = HashMap::new();
1387
1388        for ensemble in ensembles {
1389            let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1390            roots.sort_unstable();
1391            roots.dedup();
1392
1393            if roots.is_empty() {
1394                continue;
1395            }
1396
1397            let root_set: HashSet<_> = roots.iter().copied().collect();
1398
1399            for fragment_id in ensemble.component_fragments() {
1400                if root_set.contains(&fragment_id) {
1401                    mapping.insert(fragment_id, Vec::new());
1402                } else {
1403                    mapping.insert(fragment_id, roots.clone());
1404                }
1405            }
1406        }
1407
1408        mapping
1409    }
1410
1411    fn format_fragment_parallelism_policy(
1412        distribution_type: DistributionType,
1413        fragment_parallelism: Option<&StreamingParallelism>,
1414        job_parallelism: Option<&StreamingParallelism>,
1415        root_fragments: &[FragmentId],
1416    ) -> String {
1417        if distribution_type == DistributionType::Single {
1418            return "single".to_owned();
1419        }
1420
1421        if let Some(parallelism) = fragment_parallelism {
1422            return format!(
1423                "override({})",
1424                Self::format_streaming_parallelism(parallelism)
1425            );
1426        }
1427
1428        if !root_fragments.is_empty() {
1429            let mut upstreams = root_fragments.to_vec();
1430            upstreams.sort_unstable();
1431            upstreams.dedup();
1432
1433            return format!("upstream_fragment({upstreams:?})");
1434        }
1435
1436        let inherited = job_parallelism
1437            .map(Self::format_streaming_parallelism)
1438            .unwrap_or_else(|| "unknown".to_owned());
1439        format!("inherit({inherited})")
1440    }
1441
1442    fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1443        match parallelism {
1444            StreamingParallelism::Adaptive => "adaptive".to_owned(),
1445            StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1446            StreamingParallelism::Custom => "custom".to_owned(),
1447        }
1448    }
1449
1450    pub async fn list_sink_actor_mapping(
1451        &self,
1452    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1453        let inner = self.inner.read().await;
1454        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1455            .select_only()
1456            .columns([sink::Column::SinkId, sink::Column::Name])
1457            .into_tuple()
1458            .all(&inner.db)
1459            .await?;
1460        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1461
1462        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1463
1464        let actor_with_type: Vec<(ActorId, SinkId)> = {
1465            let info = self.env.shared_actor_infos().read_guard();
1466
1467            info.iter_over_fragments()
1468                .filter(|(_, fragment)| {
1469                    sink_ids.contains(&fragment.job_id.as_sink_id())
1470                        && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1471                })
1472                .flat_map(|(_, fragment)| {
1473                    fragment
1474                        .actors
1475                        .keys()
1476                        .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1477                })
1478                .collect()
1479        };
1480
1481        let mut sink_actor_mapping = HashMap::new();
1482        for (actor_id, sink_id) in actor_with_type {
1483            sink_actor_mapping
1484                .entry(sink_id)
1485                .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1486                .1
1487                .push(actor_id);
1488        }
1489
1490        Ok(sink_actor_mapping)
1491    }
1492
1493    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1494        let inner = self.inner.read().await;
1495        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1496            .select_only()
1497            .columns([
1498                fragment::Column::FragmentId,
1499                fragment::Column::JobId,
1500                fragment::Column::StateTableIds,
1501            ])
1502            .into_partial_model()
1503            .all(&inner.db)
1504            .await?;
1505        Ok(fragment_state_tables)
1506    }
1507
1508    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1509    /// collected
1510    pub async fn load_all_actors_dynamic(
1511        &self,
1512        database_id: Option<DatabaseId>,
1513        worker_nodes: &ActiveStreamingWorkerNodes,
1514    ) -> MetaResult<FragmentRenderMap> {
1515        let loaded = self.load_fragment_context(database_id).await?;
1516
1517        if loaded.is_empty() {
1518            return Ok(HashMap::new());
1519        }
1520
1521        let adaptive_parallelism_strategy = {
1522            let system_params_reader = self.env.system_params_reader().await;
1523            system_params_reader.adaptive_parallelism_strategy()
1524        };
1525
1526        let RenderedGraph { fragments, .. } = render_actor_assignments(
1527            self.env.actor_id_generator(),
1528            worker_nodes.current(),
1529            adaptive_parallelism_strategy,
1530            &loaded,
1531        )?;
1532
1533        tracing::trace!(?fragments, "reload all actors");
1534
1535        Ok(fragments)
1536    }
1537
1538    /// Async load stage: collects all metadata required for rendering actor assignments.
1539    pub async fn load_fragment_context(
1540        &self,
1541        database_id: Option<DatabaseId>,
1542    ) -> MetaResult<LoadedFragmentContext> {
1543        let inner = self.inner.read().await;
1544        let txn = inner.db.begin().await?;
1545
1546        self.load_fragment_context_in_txn(&txn, database_id).await
1547    }
1548
1549    pub async fn load_fragment_context_in_txn<C>(
1550        &self,
1551        txn: &C,
1552        database_id: Option<DatabaseId>,
1553    ) -> MetaResult<LoadedFragmentContext>
1554    where
1555        C: ConnectionTrait,
1556    {
1557        let mut query = StreamingJob::find()
1558            .select_only()
1559            .column(streaming_job::Column::JobId);
1560
1561        if let Some(database_id) = database_id {
1562            query = query
1563                .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1564                .filter(object::Column::DatabaseId.eq(database_id));
1565        }
1566
1567        let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
1568
1569        let jobs: HashSet<JobId> = jobs.into_iter().collect();
1570
1571        if jobs.is_empty() {
1572            return Ok(LoadedFragmentContext::default());
1573        }
1574
1575        load_fragment_context_for_jobs(txn, jobs).await
1576    }
1577
1578    #[await_tree::instrument]
1579    pub async fn fill_snapshot_backfill_epoch(
1580        &self,
1581        fragment_ids: impl Iterator<Item = FragmentId>,
1582        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1583        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1584    ) -> MetaResult<()> {
1585        let inner = self.inner.write().await;
1586        let txn = inner.db.begin().await?;
1587        for fragment_id in fragment_ids {
1588            let fragment = FragmentModel::find_by_id(fragment_id)
1589                .one(&txn)
1590                .await?
1591                .context(format!("fragment {} not found", fragment_id))?;
1592            let mut node = fragment.stream_node.to_protobuf();
1593            if crate::stream::fill_snapshot_backfill_epoch(
1594                &mut node,
1595                snapshot_backfill_info,
1596                cross_db_snapshot_backfill_info,
1597            )? {
1598                let node = StreamNode::from(&node);
1599                FragmentModel::update(fragment::ActiveModel {
1600                    fragment_id: Set(fragment_id),
1601                    stream_node: Set(node),
1602                    ..Default::default()
1603                })
1604                .exec(&txn)
1605                .await?;
1606            }
1607        }
1608        txn.commit().await?;
1609        Ok(())
1610    }
1611
1612    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1613    pub fn get_running_actors_of_fragment(
1614        &self,
1615        fragment_id: FragmentId,
1616    ) -> MetaResult<HashSet<model::ActorId>> {
1617        let info = self.env.shared_actor_infos().read_guard();
1618
1619        let actors = info
1620            .get_fragment(fragment_id as _)
1621            .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1622            .unwrap_or_default();
1623
1624        Ok(actors)
1625    }
1626
1627    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1628    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1629    pub async fn get_running_actors_for_source_backfill(
1630        &self,
1631        source_backfill_fragment_id: FragmentId,
1632        source_fragment_id: FragmentId,
1633    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1634        let inner = self.inner.read().await;
1635        let txn = inner.db.begin().await?;
1636
1637        let fragment_relation: DispatcherType = FragmentRelation::find()
1638            .select_only()
1639            .column(fragment_relation::Column::DispatcherType)
1640            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1641            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1642            .into_tuple()
1643            .one(&txn)
1644            .await?
1645            .ok_or_else(|| {
1646                anyhow!(
1647                    "no fragment connection from source fragment {} to source backfill fragment {}",
1648                    source_fragment_id,
1649                    source_backfill_fragment_id
1650                )
1651            })?;
1652
1653        if fragment_relation != DispatcherType::NoShuffle {
1654            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1655        }
1656
1657        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1658            let result: MetaResult<DistributionType> = try {
1659                FragmentModel::find_by_id(fragment_id)
1660                    .select_only()
1661                    .column(fragment::Column::DistributionType)
1662                    .into_tuple()
1663                    .one(txn)
1664                    .await?
1665                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1666            };
1667            result
1668        };
1669
1670        let source_backfill_distribution_type =
1671            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1672        let source_distribution_type =
1673            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1674
1675        let load_fragment_actor_distribution =
1676            |actor_info: &SharedActorInfos,
1677             fragment_id: FragmentId|
1678             -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1679                let guard = actor_info.read_guard();
1680
1681                guard
1682                    .get_fragment(fragment_id as _)
1683                    .map(|fragment| {
1684                        fragment
1685                            .actors
1686                            .iter()
1687                            .map(|(actor_id, actor)| {
1688                                (
1689                                    *actor_id as _,
1690                                    actor
1691                                        .vnode_bitmap
1692                                        .as_ref()
1693                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1694                                )
1695                            })
1696                            .collect()
1697                    })
1698                    .unwrap_or_default()
1699            };
1700
1701        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1702            load_fragment_actor_distribution(
1703                self.env.shared_actor_infos(),
1704                source_backfill_fragment_id,
1705            );
1706
1707        let source_actors =
1708            load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1709
1710        Ok(resolve_no_shuffle_actor_dispatcher(
1711            source_distribution_type,
1712            &source_actors,
1713            source_backfill_distribution_type,
1714            &source_backfill_actors,
1715        )
1716        .into_iter()
1717        .map(|(source_actor, source_backfill_actor)| {
1718            (source_backfill_actor as _, source_actor as _)
1719        })
1720        .collect())
1721    }
1722
1723    /// Get and filter the "**root**" fragments of the specified jobs.
1724    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1725    ///
1726    /// Root fragment connects to downstream jobs.
1727    ///
1728    /// ## What can be the root fragment
1729    /// - For sink, it should have one `Sink` fragment.
1730    /// - For MV, it should have one `MView` fragment.
1731    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1732    /// - For source, it should have one `Source` fragment.
1733    ///
1734    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1735    pub async fn get_root_fragments(
1736        &self,
1737        job_ids: Vec<JobId>,
1738    ) -> MetaResult<(
1739        HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1740        HashMap<ActorId, WorkerId>,
1741    )> {
1742        let inner = self.inner.read().await;
1743
1744        let all_fragments = FragmentModel::find()
1745            .filter(fragment::Column::JobId.is_in(job_ids))
1746            .all(&inner.db)
1747            .await?;
1748        // job_id -> fragment
1749        let mut root_fragments = HashMap::<JobId, fragment::Model>::new();
1750        for fragment in all_fragments {
1751            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1752            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1753                _ = root_fragments.insert(fragment.job_id, fragment);
1754            } else if mask.contains(FragmentTypeFlag::Source) {
1755                // look for Source fragment only if there's no MView fragment
1756                // (notice try_insert here vs insert above)
1757                _ = root_fragments.try_insert(fragment.job_id, fragment);
1758            }
1759        }
1760
1761        let mut root_fragments_pb = HashMap::new();
1762
1763        let info = self.env.shared_actor_infos().read_guard();
1764
1765        let root_fragment_to_jobs: HashMap<_, _> = root_fragments
1766            .iter()
1767            .map(|(job_id, fragment)| (fragment.fragment_id, *job_id))
1768            .collect();
1769
1770        for fragment in root_fragment_to_jobs.keys() {
1771            let fragment_info = info.get_fragment(*fragment).context(format!(
1772                "root fragment {} not found in shared actor info",
1773                fragment
1774            ))?;
1775
1776            let job_id = root_fragment_to_jobs[&fragment_info.fragment_id];
1777            let fragment = root_fragments
1778                .get(&job_id)
1779                .context(format!("root fragment for job {} not found", job_id))?;
1780
1781            root_fragments_pb.insert(
1782                job_id,
1783                (fragment_info.clone(), fragment.stream_node.to_protobuf()),
1784            );
1785        }
1786
1787        let mut all_actor_locations = HashMap::new();
1788
1789        for (_, SharedFragmentInfo { actors, .. }) in info.iter_over_fragments() {
1790            for (actor_id, actor_info) in actors {
1791                all_actor_locations.insert(*actor_id as ActorId, actor_info.worker_id);
1792            }
1793        }
1794
1795        Ok((root_fragments_pb, all_actor_locations))
1796    }
1797
1798    pub async fn get_root_fragment(
1799        &self,
1800        job_id: JobId,
1801    ) -> MetaResult<(SharedFragmentInfo, HashMap<ActorId, WorkerId>)> {
1802        let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1803        let (root_fragment, _) = root_fragments
1804            .remove(&job_id)
1805            .context(format!("root fragment for job {} not found", job_id))?;
1806
1807        Ok((root_fragment, actors))
1808    }
1809
1810    /// Get the downstream fragments connected to the specified job.
1811    pub async fn get_downstream_fragments(
1812        &self,
1813        job_id: JobId,
1814    ) -> MetaResult<(
1815        Vec<(
1816            stream_plan::DispatcherType,
1817            SharedFragmentInfo,
1818            PbStreamNode,
1819        )>,
1820        HashMap<ActorId, WorkerId>,
1821    )> {
1822        let (root_fragment, actor_locations) = self.get_root_fragment(job_id).await?;
1823
1824        let inner = self.inner.read().await;
1825        let txn = inner.db.begin().await?;
1826        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1827            .filter(
1828                fragment_relation::Column::SourceFragmentId
1829                    .eq(root_fragment.fragment_id as FragmentId),
1830            )
1831            .all(&txn)
1832            .await?;
1833
1834        let downstream_fragment_ids = downstream_fragment_relations
1835            .iter()
1836            .map(|model| model.target_fragment_id as FragmentId)
1837            .collect::<HashSet<_>>();
1838
1839        let downstream_fragment_nodes: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1840            .select_only()
1841            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1842            .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1843            .into_tuple()
1844            .all(&txn)
1845            .await?;
1846
1847        let downstream_fragment_nodes: HashMap<_, _> =
1848            downstream_fragment_nodes.into_iter().collect();
1849
1850        let mut downstream_fragments = vec![];
1851
1852        let info = self.env.shared_actor_infos().read_guard();
1853
1854        let fragment_map: HashMap<_, _> = downstream_fragment_relations
1855            .iter()
1856            .map(|model| (model.target_fragment_id, model.dispatcher_type))
1857            .collect();
1858
1859        for fragment_id in fragment_map.keys() {
1860            let fragment_info @ SharedFragmentInfo { actors, .. } =
1861                info.get_fragment(*fragment_id).unwrap();
1862
1863            let dispatcher_type = fragment_map[fragment_id];
1864
1865            if actors.is_empty() {
1866                bail!("No fragment found for fragment id {}", fragment_id);
1867            }
1868
1869            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1870
1871            let nodes = downstream_fragment_nodes
1872                .get(fragment_id)
1873                .context(format!(
1874                    "downstream fragment node for id {} not found",
1875                    fragment_id
1876                ))?
1877                .to_protobuf();
1878
1879            downstream_fragments.push((dispatch_type, fragment_info.clone(), nodes));
1880        }
1881        Ok((downstream_fragments, actor_locations))
1882    }
1883
1884    pub async fn load_source_fragment_ids(
1885        &self,
1886    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1887        let inner = self.inner.read().await;
1888        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1889            .select_only()
1890            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1891            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1892            .into_tuple()
1893            .all(&inner.db)
1894            .await?;
1895
1896        let mut source_fragment_ids = HashMap::new();
1897        for (fragment_id, stream_node) in fragments {
1898            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1899                source_fragment_ids
1900                    .entry(source_id)
1901                    .or_insert_with(BTreeSet::new)
1902                    .insert(fragment_id);
1903            }
1904        }
1905        Ok(source_fragment_ids)
1906    }
1907
1908    pub async fn load_backfill_fragment_ids(
1909        &self,
1910    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1911        let inner = self.inner.read().await;
1912        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1913            .select_only()
1914            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1915            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1916            .into_tuple()
1917            .all(&inner.db)
1918            .await?;
1919
1920        let mut source_fragment_ids = HashMap::new();
1921        for (fragment_id, stream_node) in fragments {
1922            if let Some((source_id, upstream_source_fragment_id)) =
1923                stream_node.to_protobuf().find_source_backfill()
1924            {
1925                source_fragment_ids
1926                    .entry(source_id)
1927                    .or_insert_with(BTreeSet::new)
1928                    .insert((fragment_id, upstream_source_fragment_id));
1929            }
1930        }
1931        Ok(source_fragment_ids)
1932    }
1933
1934    pub async fn get_all_upstream_sink_infos(
1935        &self,
1936        target_table: &PbTable,
1937        target_fragment_id: FragmentId,
1938    ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1939        let inner = self.inner.read().await;
1940        let txn = inner.db.begin().await?;
1941
1942        self.get_all_upstream_sink_infos_in_txn(&txn, target_table, target_fragment_id)
1943            .await
1944    }
1945
1946    pub async fn get_all_upstream_sink_infos_in_txn<C>(
1947        &self,
1948        txn: &C,
1949        target_table: &PbTable,
1950        target_fragment_id: FragmentId,
1951    ) -> MetaResult<Vec<UpstreamSinkInfo>>
1952    where
1953        C: ConnectionTrait,
1954    {
1955        let incoming_sinks = Sink::find()
1956            .filter(sink::Column::TargetTable.eq(target_table.id))
1957            .all(txn)
1958            .await?;
1959
1960        let sink_ids = incoming_sinks.iter().map(|s| s.sink_id).collect_vec();
1961        let sink_fragment_ids = get_sink_fragment_by_ids(txn, sink_ids).await?;
1962
1963        let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1964        for sink in &incoming_sinks {
1965            let sink_fragment_id =
1966                sink_fragment_ids
1967                    .get(&sink.sink_id)
1968                    .cloned()
1969                    .ok_or(anyhow::anyhow!(
1970                        "sink fragment not found for sink id {}",
1971                        sink.sink_id
1972                    ))?;
1973            let upstream_info = build_upstream_sink_info(
1974                sink.sink_id,
1975                sink.original_target_columns
1976                    .as_ref()
1977                    .map(|cols| cols.to_protobuf())
1978                    .unwrap_or_default(),
1979                sink_fragment_id,
1980                target_table,
1981                target_fragment_id,
1982            )?;
1983            upstream_sink_infos.push(upstream_info);
1984        }
1985
1986        Ok(upstream_sink_infos)
1987    }
1988
1989    pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1990        let inner = self.inner.read().await;
1991        let txn = inner.db.begin().await?;
1992
1993        let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1994            .select_only()
1995            .column(fragment::Column::FragmentId)
1996            .filter(
1997                fragment::Column::JobId
1998                    .eq(job_id)
1999                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
2000            )
2001            .into_tuple()
2002            .all(&txn)
2003            .await?;
2004
2005        if mview_fragment.len() != 1 {
2006            return Err(anyhow::anyhow!(
2007                "expected exactly one mview fragment for job {}, found {}",
2008                job_id,
2009                mview_fragment.len()
2010            )
2011            .into());
2012        }
2013
2014        Ok(mview_fragment.into_iter().next().unwrap())
2015    }
2016
2017    pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
2018        let inner = self.inner.read().await;
2019        let txn = inner.db.begin().await?;
2020        has_table_been_migrated(&txn, table_id).await
2021    }
2022
2023    pub async fn update_fragment_splits<C>(
2024        &self,
2025        txn: &C,
2026        fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
2027    ) -> MetaResult<()>
2028    where
2029        C: ConnectionTrait,
2030    {
2031        if fragment_splits.is_empty() {
2032            return Ok(());
2033        }
2034
2035        let existing_fragment_ids: HashSet<FragmentId> = FragmentModel::find()
2036            .select_only()
2037            .column(fragment::Column::FragmentId)
2038            .filter(fragment::Column::FragmentId.is_in(fragment_splits.keys().copied()))
2039            .into_tuple()
2040            .all(txn)
2041            .await?
2042            .into_iter()
2043            .collect();
2044
2045        // Filter out stale fragment ids to avoid FK violations when split updates race with drop.
2046        let (models, skipped_fragment_ids): (Vec<_>, Vec<_>) = fragment_splits
2047            .iter()
2048            .partition_map(|(fragment_id, splits)| {
2049                if existing_fragment_ids.contains(fragment_id) {
2050                    Either::Left(fragment_splits::ActiveModel {
2051                        fragment_id: Set(*fragment_id as _),
2052                        splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
2053                            splits: splits.iter().map(Into::into).collect_vec(),
2054                        }))),
2055                    })
2056                } else {
2057                    Either::Right(*fragment_id)
2058                }
2059            });
2060
2061        if !skipped_fragment_ids.is_empty() {
2062            tracing::warn!(
2063                skipped_fragment_ids = ?skipped_fragment_ids,
2064                total_fragment_ids = fragment_splits.len(),
2065                "skipping stale fragment split updates for missing fragments"
2066            );
2067        }
2068
2069        if models.is_empty() {
2070            return Ok(());
2071        }
2072
2073        FragmentSplits::insert_many(models)
2074            .on_conflict(
2075                OnConflict::column(fragment_splits::Column::FragmentId)
2076                    .update_column(fragment_splits::Column::Splits)
2077                    .to_owned(),
2078            )
2079            .exec(txn)
2080            .await?;
2081
2082        Ok(())
2083    }
2084}
2085
2086#[cfg(test)]
2087mod tests {
2088    use std::collections::{BTreeMap, HashMap, HashSet};
2089
2090    use itertools::Itertools;
2091    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
2092    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
2093    use risingwave_common::id::JobId;
2094    use risingwave_common::util::iter_util::ZipEqDebug;
2095    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
2096    use risingwave_meta_model::fragment::DistributionType;
2097    use risingwave_meta_model::*;
2098    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
2099    use risingwave_pb::plan_common::PbExprContext;
2100    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
2101    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
2102    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
2103
2104    use super::ActorInfo;
2105    use crate::MetaResult;
2106    use crate::controller::catalog::CatalogController;
2107    use crate::model::{Fragment, StreamActor};
2108
2109    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
2110
2111    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
2112
2113    const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
2114
2115    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
2116
2117    const TEST_JOB_ID: JobId = JobId::new(1);
2118
2119    const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
2120
2121    fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
2122        let mut upstream_actor_ids = BTreeMap::new();
2123        upstream_actor_ids.insert(
2124            TEST_UPSTREAM_FRAGMENT_ID,
2125            HashSet::from_iter([(actor_id + 100)]),
2126        );
2127        upstream_actor_ids.insert(
2128            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
2129            HashSet::from_iter([(actor_id + 200)]),
2130        );
2131        upstream_actor_ids
2132    }
2133
2134    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
2135        let mut input = vec![];
2136        for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
2137            input.push(PbStreamNode {
2138                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
2139                    upstream_fragment_id,
2140                    ..Default::default()
2141                }))),
2142                ..Default::default()
2143            });
2144        }
2145
2146        PbStreamNode {
2147            input,
2148            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
2149            ..Default::default()
2150        }
2151    }
2152
2153    #[tokio::test]
2154    async fn test_extract_fragment() -> MetaResult<()> {
2155        let actor_count = 3u32;
2156        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2157            .map(|actor_id| {
2158                (
2159                    actor_id.into(),
2160                    generate_upstream_actor_ids_for_actor(actor_id.into()),
2161                )
2162            })
2163            .collect();
2164
2165        let actor_bitmaps = ActorMapping::new_uniform(
2166            (0..actor_count).map(|i| i.into()),
2167            VirtualNode::COUNT_FOR_TEST,
2168        )
2169        .to_bitmaps();
2170
2171        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
2172
2173        let pb_actors = (0..actor_count)
2174            .map(|actor_id| StreamActor {
2175                actor_id: actor_id.into(),
2176                fragment_id: TEST_FRAGMENT_ID as _,
2177                vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
2178                mview_definition: "".to_owned(),
2179                expr_context: Some(PbExprContext {
2180                    time_zone: String::from("America/New_York"),
2181                    strict_mode: false,
2182                }),
2183                config_override: "".into(),
2184            })
2185            .collect_vec();
2186
2187        let pb_fragment = Fragment {
2188            fragment_id: TEST_FRAGMENT_ID as _,
2189            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
2190            distribution_type: PbFragmentDistributionType::Hash as _,
2191            actors: pb_actors,
2192            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
2193            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
2194            nodes: stream_node,
2195        };
2196
2197        let fragment =
2198            CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
2199
2200        check_fragment(fragment, pb_fragment);
2201
2202        Ok(())
2203    }
2204
2205    #[tokio::test]
2206    async fn test_compose_fragment() -> MetaResult<()> {
2207        let actor_count = 3u32;
2208
2209        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2210            .map(|actor_id| {
2211                (
2212                    actor_id.into(),
2213                    generate_upstream_actor_ids_for_actor(actor_id.into()),
2214                )
2215            })
2216            .collect();
2217
2218        let mut actor_bitmaps = ActorMapping::new_uniform(
2219            (0..actor_count).map(|i| i.into()),
2220            VirtualNode::COUNT_FOR_TEST,
2221        )
2222        .to_bitmaps();
2223
2224        let actors = (0..actor_count)
2225            .map(|actor_id| {
2226                let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
2227                    splits: vec![PbConnectorSplit {
2228                        split_type: "dummy".to_owned(),
2229                        ..Default::default()
2230                    }],
2231                });
2232
2233                ActorInfo {
2234                    actor_id: actor_id.into(),
2235                    fragment_id: TEST_FRAGMENT_ID,
2236                    splits: actor_splits,
2237                    worker_id: 0.into(),
2238                    vnode_bitmap: actor_bitmaps
2239                        .remove(&actor_id)
2240                        .map(|bitmap| bitmap.to_protobuf())
2241                        .as_ref()
2242                        .map(VnodeBitmap::from),
2243                    expr_context: ExprContext::from(&PbExprContext {
2244                        time_zone: String::from("America/New_York"),
2245                        strict_mode: false,
2246                    }),
2247                    config_override: "a.b.c = true".into(),
2248                }
2249            })
2250            .collect_vec();
2251
2252        let stream_node = {
2253            let template_actor = actors.first().cloned().unwrap();
2254
2255            let template_upstream_actor_ids =
2256                upstream_actor_ids.get(&template_actor.actor_id).unwrap();
2257
2258            generate_merger_stream_node(template_upstream_actor_ids)
2259        };
2260
2261        #[expect(deprecated)]
2262        let fragment = fragment::Model {
2263            fragment_id: TEST_FRAGMENT_ID,
2264            job_id: TEST_JOB_ID,
2265            fragment_type_mask: 0,
2266            distribution_type: DistributionType::Hash,
2267            stream_node: StreamNode::from(&stream_node),
2268            state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
2269            upstream_fragment_id: Default::default(),
2270            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2271            parallelism: None,
2272        };
2273
2274        let (pb_fragment, pb_actor_status, pb_actor_splits) =
2275            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2276
2277        assert_eq!(pb_actor_status.len(), actor_count as usize);
2278        assert!(
2279            pb_actor_status
2280                .values()
2281                .all(|actor_status| actor_status.location.is_some())
2282        );
2283        assert_eq!(pb_actor_splits.len(), actor_count as usize);
2284
2285        let pb_actors = pb_fragment.actors.clone();
2286
2287        check_fragment(fragment, pb_fragment);
2288        check_actors(
2289            actors,
2290            &upstream_actor_ids,
2291            pb_actors,
2292            pb_actor_splits,
2293            &stream_node,
2294        );
2295
2296        Ok(())
2297    }
2298
2299    fn check_actors(
2300        actors: Vec<ActorInfo>,
2301        actor_upstreams: &FragmentActorUpstreams,
2302        pb_actors: Vec<StreamActor>,
2303        pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2304        stream_node: &PbStreamNode,
2305    ) {
2306        for (
2307            ActorInfo {
2308                actor_id,
2309                fragment_id,
2310                splits,
2311                worker_id: _,
2312                vnode_bitmap,
2313                expr_context,
2314                ..
2315            },
2316            StreamActor {
2317                actor_id: pb_actor_id,
2318                fragment_id: pb_fragment_id,
2319                vnode_bitmap: pb_vnode_bitmap,
2320                mview_definition,
2321                expr_context: pb_expr_context,
2322                ..
2323            },
2324        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2325        {
2326            assert_eq!(actor_id, pb_actor_id as ActorId);
2327            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2328
2329            assert_eq!(
2330                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2331                pb_vnode_bitmap,
2332            );
2333
2334            assert_eq!(mview_definition, "");
2335
2336            visit_stream_node_body(stream_node, |body| {
2337                if let PbNodeBody::Merge(m) = body {
2338                    assert!(
2339                        actor_upstreams
2340                            .get(&actor_id)
2341                            .unwrap()
2342                            .contains_key(&m.upstream_fragment_id)
2343                    );
2344                }
2345            });
2346
2347            assert_eq!(
2348                splits,
2349                pb_actor_splits
2350                    .get(&pb_actor_id)
2351                    .map(ConnectorSplits::from)
2352                    .unwrap_or_default()
2353            );
2354
2355            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2356        }
2357    }
2358
2359    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2360        let Fragment {
2361            fragment_id,
2362            fragment_type_mask,
2363            distribution_type: pb_distribution_type,
2364            actors: _,
2365            state_table_ids: pb_state_table_ids,
2366            maybe_vnode_count: _,
2367            nodes,
2368        } = pb_fragment;
2369
2370        assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2371        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2372        assert_eq!(
2373            pb_distribution_type,
2374            PbFragmentDistributionType::from(fragment.distribution_type)
2375        );
2376
2377        assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2378        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2379    }
2380
2381    #[test]
2382    fn test_parallelism_policy_with_root_fragments() {
2383        #[expect(deprecated)]
2384        let fragment = fragment::Model {
2385            fragment_id: 3.into(),
2386            job_id: TEST_JOB_ID,
2387            fragment_type_mask: 0,
2388            distribution_type: DistributionType::Hash,
2389            stream_node: StreamNode::from(&PbStreamNode::default()),
2390            state_table_ids: TableIdArray::default(),
2391            upstream_fragment_id: Default::default(),
2392            vnode_count: 0,
2393            parallelism: None,
2394        };
2395
2396        let job_parallelism = StreamingParallelism::Fixed(4);
2397
2398        let policy = super::CatalogController::format_fragment_parallelism_policy(
2399            fragment.distribution_type,
2400            fragment.parallelism.as_ref(),
2401            Some(&job_parallelism),
2402            &[],
2403        );
2404
2405        assert_eq!(policy, "inherit(fixed(4))");
2406    }
2407
2408    #[test]
2409    fn test_parallelism_policy_with_upstream_roots() {
2410        #[expect(deprecated)]
2411        let fragment = fragment::Model {
2412            fragment_id: 5.into(),
2413            job_id: TEST_JOB_ID,
2414            fragment_type_mask: 0,
2415            distribution_type: DistributionType::Hash,
2416            stream_node: StreamNode::from(&PbStreamNode::default()),
2417            state_table_ids: TableIdArray::default(),
2418            upstream_fragment_id: Default::default(),
2419            vnode_count: 0,
2420            parallelism: None,
2421        };
2422
2423        let policy = super::CatalogController::format_fragment_parallelism_policy(
2424            fragment.distribution_type,
2425            fragment.parallelism.as_ref(),
2426            None,
2427            &[3.into(), 1.into(), 2.into(), 1.into()],
2428        );
2429
2430        assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2431    }
2432}