risingwave_meta/controller/
fragment.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::{Either, Itertools};
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::CdcScanOptions;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34    Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39    TableIdArray, VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits,
40    object, sink, source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46    Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::fragment::{
49    FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan;
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57    PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, SinkLogStoreType,
58    StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63    ColumnTrait, ConnectionTrait, DatabaseTransaction, EntityTrait, FromQueryResult, JoinType,
64    PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, StreamTrait, TransactionTrait,
65};
66use serde::{Deserialize, Serialize};
67
68use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
69use crate::controller::catalog::CatalogController;
70use crate::controller::scale::{
71    FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble, RenderedGraph,
72    find_fragment_no_shuffle_dags_detailed, load_fragment_context_for_jobs,
73    render_actor_assignments, resolve_streaming_job_definition,
74};
75use crate::controller::utils::{
76    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
77    get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
78    resolve_no_shuffle_actor_mapping,
79};
80use crate::error::MetaError;
81use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
82use crate::model::{
83    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
84    StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
85    TableParallelism,
86};
87use crate::rpc::ddl_controller::build_upstream_sink_info;
88use crate::stream::UpstreamSinkInfo;
89use crate::{MetaResult, model};
90
91/// Some information of running (inflight) actors.
92#[derive(Debug)]
93pub struct InflightActorInfo {
94    pub worker_id: WorkerId,
95    pub vnode_bitmap: Option<Bitmap>,
96    pub splits: Vec<SplitImpl>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100struct ActorInfo {
101    pub actor_id: ActorId,
102    pub fragment_id: FragmentId,
103    pub splits: ConnectorSplits,
104    pub worker_id: WorkerId,
105    pub vnode_bitmap: Option<VnodeBitmap>,
106    pub expr_context: ExprContext,
107    pub config_override: Arc<str>,
108}
109
110#[derive(Debug)]
111struct FragmentDescRow {
112    fragment_id: FragmentId,
113    job_id: JobId,
114    fragment_type_mask: i32,
115    distribution_type: DistributionType,
116    state_table_ids: TableIdArray,
117    vnode_count: i32,
118    parallelism_override: Option<StreamingParallelism>,
119    stream_node: Option<StreamNode>,
120}
121
122#[derive(Debug)]
123pub struct InflightFragmentInfo {
124    pub fragment_id: FragmentId,
125    pub distribution_type: DistributionType,
126    pub fragment_type_mask: FragmentTypeMask,
127    pub vnode_count: usize,
128    pub nodes: PbStreamNode,
129    pub actors: HashMap<ActorId, InflightActorInfo>,
130    pub state_table_ids: HashSet<TableId>,
131}
132
133#[derive(Clone, Debug)]
134pub struct FragmentParallelismInfo {
135    pub distribution_type: FragmentDistributionType,
136    pub vnode_count: usize,
137}
138
139#[easy_ext::ext(FragmentTypeMaskExt)]
140pub impl FragmentTypeMask {
141    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
142        Expr::col(fragment::Column::FragmentTypeMask)
143            .bit_and(Expr::value(flag as i32))
144            .ne(0)
145    }
146
147    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
148        Expr::col(fragment::Column::FragmentTypeMask)
149            .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
150            .ne(0)
151    }
152
153    fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
154        Expr::col(fragment::Column::FragmentTypeMask)
155            .bit_and(Expr::value(flag as i32))
156            .eq(0)
157    }
158}
159
160#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
161#[serde(rename_all = "camelCase")] // for dashboard
162pub struct StreamingJobInfo {
163    pub job_id: JobId,
164    pub obj_type: ObjectType,
165    pub name: String,
166    pub job_status: JobStatus,
167    pub parallelism: StreamingParallelism,
168    pub max_parallelism: i32,
169    pub resource_group: String,
170    pub config_override: String,
171    pub database_id: DatabaseId,
172    pub schema_id: SchemaId,
173}
174
175impl NotificationManager {
176    pub(crate) fn notify_fragment_mapping(
177        &self,
178        operation: NotificationOperation,
179        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
180    ) {
181        let fragment_ids = fragment_mappings
182            .iter()
183            .map(|mapping| mapping.fragment_id)
184            .collect_vec();
185        if fragment_ids.is_empty() {
186            return;
187        }
188        // notify all fragment mappings to frontend.
189        for fragment_mapping in fragment_mappings {
190            self.notify_frontend_without_version(
191                operation,
192                NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
193            );
194        }
195
196        // update serving vnode mappings.
197        match operation {
198            NotificationOperation::Add | NotificationOperation::Update => {
199                self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
200                    fragment_ids,
201                ));
202            }
203            NotificationOperation::Delete => {
204                self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
205                    fragment_ids,
206                ));
207            }
208            op => {
209                tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
210            }
211        }
212    }
213}
214
215impl CatalogController {
216    pub fn prepare_fragment_models_from_fragments(
217        job_id: JobId,
218        fragments: impl Iterator<Item = &Fragment>,
219    ) -> MetaResult<Vec<fragment::Model>> {
220        fragments
221            .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
222            .try_collect()
223    }
224
225    pub fn prepare_fragment_model_for_new_job(
226        job_id: JobId,
227        fragment: &Fragment,
228    ) -> MetaResult<fragment::Model> {
229        let vnode_count = fragment.vnode_count();
230        let Fragment {
231            fragment_id: pb_fragment_id,
232            fragment_type_mask: pb_fragment_type_mask,
233            distribution_type: pb_distribution_type,
234            state_table_ids: pb_state_table_ids,
235            nodes,
236            ..
237        } = fragment;
238
239        let state_table_ids = pb_state_table_ids.clone().into();
240
241        let fragment_parallelism = nodes
242            .node_body
243            .as_ref()
244            .and_then(|body| match body {
245                NodeBody::StreamCdcScan(node) => Some(node),
246                _ => None,
247            })
248            .and_then(|node| node.options.as_ref())
249            .map(CdcScanOptions::from_proto)
250            .filter(|opts| opts.is_parallelized_backfill())
251            .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
252
253        let stream_node = StreamNode::from(nodes);
254
255        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
256            .unwrap()
257            .into();
258
259        #[expect(deprecated)]
260        let fragment = fragment::Model {
261            fragment_id: *pb_fragment_id as _,
262            job_id,
263            fragment_type_mask: (*pb_fragment_type_mask).into(),
264            distribution_type,
265            stream_node,
266            state_table_ids,
267            upstream_fragment_id: Default::default(),
268            vnode_count: vnode_count as _,
269            parallelism: fragment_parallelism,
270        };
271
272        Ok(fragment)
273    }
274
275    #[expect(clippy::type_complexity)]
276    fn compose_table_fragments(
277        job_id: JobId,
278        state: PbState,
279        ctx: StreamContext,
280        fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
281        parallelism: StreamingParallelism,
282        max_parallelism: usize,
283        job_definition: Option<String>,
284    ) -> MetaResult<(
285        StreamJobFragments,
286        HashMap<FragmentId, Vec<StreamActor>>,
287        HashMap<crate::model::ActorId, PbActorStatus>,
288    )> {
289        let mut pb_fragments = BTreeMap::new();
290        let mut fragment_actors = HashMap::new();
291        let mut all_actor_status = HashMap::new();
292
293        for (fragment, actors) in fragments {
294            let (fragment, actors, actor_status, _) =
295                Self::compose_fragment(fragment, actors, job_definition.clone())?;
296            let fragment_id = fragment.fragment_id;
297            fragment_actors.insert(fragment_id, actors);
298            all_actor_status.extend(actor_status);
299
300            pb_fragments.insert(fragment_id, fragment);
301        }
302
303        let table_fragments = StreamJobFragments {
304            stream_job_id: job_id,
305            state: state as _,
306            fragments: pb_fragments,
307            ctx,
308            assigned_parallelism: match parallelism {
309                StreamingParallelism::Custom => TableParallelism::Custom,
310                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
311                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
312            },
313            max_parallelism,
314        };
315
316        Ok((table_fragments, fragment_actors, all_actor_status))
317    }
318
319    #[expect(clippy::type_complexity)]
320    fn compose_fragment(
321        fragment: fragment::Model,
322        actors: Vec<ActorInfo>,
323        job_definition: Option<String>,
324    ) -> MetaResult<(
325        Fragment,
326        Vec<StreamActor>,
327        HashMap<crate::model::ActorId, PbActorStatus>,
328        HashMap<crate::model::ActorId, PbConnectorSplits>,
329    )> {
330        let fragment::Model {
331            fragment_id,
332            fragment_type_mask,
333            distribution_type,
334            stream_node,
335            state_table_ids,
336            vnode_count,
337            ..
338        } = fragment;
339
340        let stream_node = stream_node.to_protobuf();
341        let mut upstream_fragments = HashSet::new();
342        visit_stream_node_body(&stream_node, |body| {
343            if let NodeBody::Merge(m) = body {
344                assert!(
345                    upstream_fragments.insert(m.upstream_fragment_id),
346                    "non-duplicate upstream fragment"
347                );
348            }
349        });
350
351        let mut pb_actors = vec![];
352
353        let mut pb_actor_status = HashMap::new();
354        let mut pb_actor_splits = HashMap::new();
355
356        for actor in actors {
357            if actor.fragment_id != fragment_id {
358                bail!(
359                    "fragment id {} from actor {} is different from fragment {}",
360                    actor.fragment_id,
361                    actor.actor_id,
362                    fragment_id
363                )
364            }
365
366            let ActorInfo {
367                actor_id,
368                fragment_id,
369                worker_id,
370                splits,
371                vnode_bitmap,
372                expr_context,
373                config_override,
374                ..
375            } = actor;
376
377            let vnode_bitmap =
378                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
379            let pb_expr_context = Some(expr_context.to_protobuf());
380
381            pb_actor_status.insert(
382                actor_id as _,
383                PbActorStatus {
384                    location: PbActorLocation::from_worker(worker_id),
385                },
386            );
387
388            pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
389
390            pb_actors.push(StreamActor {
391                actor_id: actor_id as _,
392                fragment_id: fragment_id as _,
393                vnode_bitmap,
394                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
395                expr_context: pb_expr_context,
396                config_override,
397            })
398        }
399
400        let pb_state_table_ids = state_table_ids.0;
401        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
402        let pb_fragment = Fragment {
403            fragment_id: fragment_id as _,
404            fragment_type_mask: fragment_type_mask.into(),
405            distribution_type: pb_distribution_type,
406            state_table_ids: pb_state_table_ids,
407            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
408            nodes: stream_node,
409        };
410
411        Ok((pb_fragment, pb_actors, pb_actor_status, pb_actor_splits))
412    }
413
414    /// Returns distribution type and vnode count for all (or filtered) fragments.
415    ///
416    /// Reads directly from the persistent catalog (fragment table) rather than
417    /// from the in-memory `shared_actor_infos`.  This is critical because the
418    /// serving vnode mapping must be available even before the barrier manager's
419    /// recovery has completed and populated `shared_actor_infos`.
420    pub async fn fragment_parallelisms(
421        &self,
422    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
423        let inner = self.inner.read().await;
424        let query = FragmentModel::find().select_only().columns([
425            fragment::Column::FragmentId,
426            fragment::Column::DistributionType,
427            fragment::Column::VnodeCount,
428        ]);
429        let fragments: Vec<(FragmentId, DistributionType, i32)> =
430            query.into_tuple().all(&inner.db).await?;
431
432        Ok(fragments
433            .into_iter()
434            .map(|(fragment_id, distribution_type, vnode_count)| {
435                (
436                    fragment_id,
437                    FragmentParallelismInfo {
438                        distribution_type: PbFragmentDistributionType::from(distribution_type),
439                        vnode_count: vnode_count as usize,
440                    },
441                )
442            })
443            .collect())
444    }
445
446    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
447        let inner = self.inner.read().await;
448        let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
449            .select_only()
450            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
451            .into_tuple()
452            .all(&inner.db)
453            .await?;
454        Ok(fragment_jobs.into_iter().collect())
455    }
456
457    pub async fn get_fragment_job_id(
458        &self,
459        fragment_ids: Vec<FragmentId>,
460    ) -> MetaResult<Vec<ObjectId>> {
461        let inner = self.inner.read().await;
462        self.get_fragment_job_id_in_txn(&inner.db, fragment_ids)
463            .await
464    }
465
466    pub async fn get_fragment_job_id_in_txn<C>(
467        &self,
468        txn: &C,
469        fragment_ids: Vec<FragmentId>,
470    ) -> MetaResult<Vec<ObjectId>>
471    where
472        C: ConnectionTrait + Send,
473    {
474        let object_ids: Vec<ObjectId> = FragmentModel::find()
475            .select_only()
476            .column(fragment::Column::JobId)
477            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
478            .into_tuple()
479            .all(txn)
480            .await?;
481
482        Ok(object_ids)
483    }
484
485    pub async fn get_fragment_desc_by_id(
486        &self,
487        fragment_id: FragmentId,
488    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
489        let inner = self.inner.read().await;
490
491        let fragment_model = match FragmentModel::find_by_id(fragment_id)
492            .one(&inner.db)
493            .await?
494        {
495            Some(fragment) => fragment,
496            None => return Ok(None),
497        };
498
499        let job_parallelism: Option<StreamingParallelism> =
500            StreamingJob::find_by_id(fragment_model.job_id)
501                .select_only()
502                .column(streaming_job::Column::Parallelism)
503                .into_tuple()
504                .one(&inner.db)
505                .await?;
506
507        let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
508            .select_only()
509            .columns([
510                fragment_relation::Column::SourceFragmentId,
511                fragment_relation::Column::DispatcherType,
512            ])
513            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
514            .into_tuple()
515            .all(&inner.db)
516            .await?;
517
518        let upstreams: Vec<_> = upstream_entries
519            .into_iter()
520            .map(|(source_id, _)| source_id)
521            .collect();
522
523        let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
524            .await
525            .map(Self::collect_root_fragment_mapping)?;
526        let root_fragments = root_fragment_map
527            .get(&fragment_id)
528            .cloned()
529            .unwrap_or_default();
530
531        let info = self.env.shared_actor_infos().read_guard();
532        let SharedFragmentInfo { actors, .. } = info
533            .get_fragment(fragment_model.fragment_id as _)
534            .unwrap_or_else(|| {
535                panic!(
536                    "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
537                    fragment_model.fragment_id,
538                    fragment_model.job_id
539                )
540            });
541
542        let parallelism_policy = Self::format_fragment_parallelism_policy(
543            fragment_model.distribution_type,
544            fragment_model.parallelism.as_ref(),
545            job_parallelism.as_ref(),
546            &root_fragments,
547        );
548
549        let fragment = FragmentDesc {
550            fragment_id: fragment_model.fragment_id,
551            job_id: fragment_model.job_id,
552            fragment_type_mask: fragment_model.fragment_type_mask,
553            distribution_type: fragment_model.distribution_type,
554            state_table_ids: fragment_model.state_table_ids.clone(),
555            parallelism: actors.len() as _,
556            vnode_count: fragment_model.vnode_count,
557            stream_node: fragment_model.stream_node.clone(),
558            parallelism_policy,
559        };
560
561        Ok(Some((fragment, upstreams)))
562    }
563
564    pub async fn list_fragment_database_ids(
565        &self,
566        select_fragment_ids: Option<Vec<FragmentId>>,
567    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
568        let inner = self.inner.read().await;
569        let select = FragmentModel::find()
570            .select_only()
571            .column(fragment::Column::FragmentId)
572            .column(object::Column::DatabaseId)
573            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
574        let select = if let Some(select_fragment_ids) = select_fragment_ids {
575            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
576        } else {
577            select
578        };
579        Ok(select.into_tuple().all(&inner.db).await?)
580    }
581
582    pub async fn get_job_fragments_by_id(
583        &self,
584        job_id: JobId,
585    ) -> MetaResult<(
586        StreamJobFragments,
587        HashMap<FragmentId, Vec<StreamActor>>,
588        HashMap<ActorId, PbActorStatus>,
589    )> {
590        let inner = self.inner.read().await;
591
592        // Load fragments matching the job from the database
593        let fragments: Vec<_> = FragmentModel::find()
594            .filter(fragment::Column::JobId.eq(job_id))
595            .all(&inner.db)
596            .await?;
597
598        let job_info = StreamingJob::find_by_id(job_id)
599            .one(&inner.db)
600            .await?
601            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
602
603        let fragment_actors =
604            self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
605
606        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
607            .await?
608            .remove(&job_id);
609
610        Self::compose_table_fragments(
611            job_id,
612            job_info.job_status.into(),
613            job_info.stream_context(),
614            fragment_actors,
615            job_info.parallelism,
616            job_info.max_parallelism as _,
617            job_definition,
618        )
619    }
620
621    pub async fn get_fragment_actor_dispatchers(
622        &self,
623        fragment_ids: Vec<FragmentId>,
624    ) -> MetaResult<FragmentActorDispatchers> {
625        let inner = self.inner.read().await;
626
627        self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
628            .await
629    }
630
631    pub async fn get_fragment_actor_dispatchers_txn(
632        &self,
633        c: &impl ConnectionTrait,
634        fragment_ids: Vec<FragmentId>,
635    ) -> MetaResult<FragmentActorDispatchers> {
636        let fragment_relations = FragmentRelation::find()
637            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
638            .all(c)
639            .await?;
640
641        type FragmentActorInfo = (
642            DistributionType,
643            Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
644        );
645
646        let shared_info = self.env.shared_actor_infos();
647        let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
648        let get_fragment_actors = |fragment_id: FragmentId| async move {
649            let result: MetaResult<FragmentActorInfo> = try {
650                let read_guard = shared_info.read_guard();
651
652                let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
653
654                (
655                    fragment.distribution_type,
656                    Arc::new(
657                        fragment
658                            .actors
659                            .iter()
660                            .map(|(actor_id, actor_info)| {
661                                (
662                                    *actor_id,
663                                    actor_info
664                                        .vnode_bitmap
665                                        .as_ref()
666                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
667                                )
668                            })
669                            .collect(),
670                    ),
671                )
672            };
673            result
674        };
675
676        let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
677        for fragment_relation::Model {
678            source_fragment_id,
679            target_fragment_id,
680            dispatcher_type,
681            dist_key_indices,
682            output_indices,
683            output_type_mapping,
684        } in fragment_relations
685        {
686            let (source_fragment_distribution, source_fragment_actors) = {
687                let (distribution, actors) = {
688                    match fragment_actor_cache.entry(source_fragment_id) {
689                        Entry::Occupied(entry) => entry.into_mut(),
690                        Entry::Vacant(entry) => {
691                            entry.insert(get_fragment_actors(source_fragment_id).await?)
692                        }
693                    }
694                };
695                (*distribution, actors.clone())
696            };
697            let (target_fragment_distribution, target_fragment_actors) = {
698                let (distribution, actors) = {
699                    match fragment_actor_cache.entry(target_fragment_id) {
700                        Entry::Occupied(entry) => entry.into_mut(),
701                        Entry::Vacant(entry) => {
702                            entry.insert(get_fragment_actors(target_fragment_id).await?)
703                        }
704                    }
705                };
706                (*distribution, actors.clone())
707            };
708            let output_mapping = PbDispatchOutputMapping {
709                indices: output_indices.into_u32_array(),
710                types: output_type_mapping.unwrap_or_default().to_protobuf(),
711            };
712            let (dispatchers, _) = compose_dispatchers(
713                source_fragment_distribution,
714                &source_fragment_actors,
715                target_fragment_id as _,
716                target_fragment_distribution,
717                &target_fragment_actors,
718                dispatcher_type,
719                dist_key_indices.into_u32_array(),
720                output_mapping,
721            );
722            let actor_dispatchers_map = actor_dispatchers_map
723                .entry(source_fragment_id as _)
724                .or_default();
725            for (actor_id, dispatchers) in dispatchers {
726                actor_dispatchers_map
727                    .entry(actor_id as _)
728                    .or_default()
729                    .push(dispatchers);
730            }
731        }
732        Ok(actor_dispatchers_map)
733    }
734
735    pub async fn get_fragment_downstream_relations(
736        &self,
737        fragment_ids: Vec<FragmentId>,
738    ) -> MetaResult<FragmentDownstreamRelation> {
739        let inner = self.inner.read().await;
740        self.get_fragment_downstream_relations_in_txn(&inner.db, fragment_ids)
741            .await
742    }
743
744    pub async fn get_fragment_downstream_relations_in_txn<C>(
745        &self,
746        txn: &C,
747        fragment_ids: Vec<FragmentId>,
748    ) -> MetaResult<FragmentDownstreamRelation>
749    where
750        C: ConnectionTrait + StreamTrait + Send,
751    {
752        let mut stream = FragmentRelation::find()
753            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
754            .stream(txn)
755            .await?;
756        let mut relations = FragmentDownstreamRelation::new();
757        while let Some(relation) = stream.try_next().await? {
758            relations
759                .entry(relation.source_fragment_id as _)
760                .or_default()
761                .push(DownstreamFragmentRelation {
762                    downstream_fragment_id: relation.target_fragment_id as _,
763                    dispatcher_type: relation.dispatcher_type,
764                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
765                    output_mapping: PbDispatchOutputMapping {
766                        indices: relation.output_indices.into_u32_array(),
767                        types: relation
768                            .output_type_mapping
769                            .unwrap_or_default()
770                            .to_protobuf(),
771                    },
772                });
773        }
774        Ok(relations)
775    }
776
777    pub async fn get_job_fragment_backfill_scan_type(
778        &self,
779        job_id: JobId,
780    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
781        let inner = self.inner.read().await;
782        self.get_job_fragment_backfill_scan_type_in_txn(&inner.db, job_id)
783            .await
784    }
785
786    pub async fn get_job_fragment_backfill_scan_type_in_txn<C>(
787        &self,
788        txn: &C,
789        job_id: JobId,
790    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>>
791    where
792        C: ConnectionTrait + Send,
793    {
794        let fragments: Vec<_> = FragmentModel::find()
795            .filter(fragment::Column::JobId.eq(job_id))
796            .all(txn)
797            .await?;
798
799        let mut result = HashMap::new();
800
801        for fragment::Model {
802            fragment_id,
803            stream_node,
804            ..
805        } in fragments
806        {
807            let stream_node = stream_node.to_protobuf();
808            visit_stream_node_body(&stream_node, |body| {
809                if let NodeBody::StreamScan(node) = body {
810                    match node.stream_scan_type() {
811                        StreamScanType::Unspecified => {}
812                        scan_type => {
813                            result.insert(fragment_id as crate::model::FragmentId, scan_type);
814                        }
815                    }
816                }
817            });
818        }
819
820        Ok(result)
821    }
822
823    pub async fn count_streaming_jobs(&self) -> MetaResult<usize> {
824        let inner = self.inner.read().await;
825        let count = StreamingJob::find().count(&inner.db).await?;
826        Ok(usize::try_from(count).context("streaming job count overflow")?)
827    }
828
829    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
830        let inner = self.inner.read().await;
831        let job_states = StreamingJob::find()
832            .select_only()
833            .column(streaming_job::Column::JobId)
834            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
835            .join(JoinType::InnerJoin, object::Relation::Database2.def())
836            .column(object::Column::ObjType)
837            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
838            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
839            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
840            .column_as(
841                Expr::if_null(
842                    Expr::col((table::Entity, table::Column::Name)),
843                    Expr::if_null(
844                        Expr::col((source::Entity, source::Column::Name)),
845                        Expr::if_null(
846                            Expr::col((sink::Entity, sink::Column::Name)),
847                            Expr::val("<unknown>"),
848                        ),
849                    ),
850                ),
851                "name",
852            )
853            .columns([
854                streaming_job::Column::JobStatus,
855                streaming_job::Column::Parallelism,
856                streaming_job::Column::MaxParallelism,
857            ])
858            .column_as(
859                Expr::if_null(
860                    Expr::col((
861                        streaming_job::Entity,
862                        streaming_job::Column::SpecificResourceGroup,
863                    )),
864                    Expr::col((database::Entity, database::Column::ResourceGroup)),
865                ),
866                "resource_group",
867            )
868            .column_as(
869                Expr::if_null(
870                    Expr::col((streaming_job::Entity, streaming_job::Column::ConfigOverride)),
871                    Expr::val(""),
872                ),
873                "config_override",
874            )
875            .column(object::Column::DatabaseId)
876            .column(object::Column::SchemaId)
877            .into_model()
878            .all(&inner.db)
879            .await?;
880        Ok(job_states)
881    }
882
883    pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
884        let inner = self.inner.read().await;
885        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
886            .select_only()
887            .column(streaming_job::Column::MaxParallelism)
888            .into_tuple()
889            .one(&inner.db)
890            .await?
891            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
892        Ok(max_parallelism as usize)
893    }
894
895    /// Try to get internal table ids of each streaming job, used by metrics collection.
896    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
897        if let Ok(inner) = self.inner.try_read()
898            && let Ok(job_state_tables) = FragmentModel::find()
899                .select_only()
900                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
901                .into_tuple::<(JobId, I32Array)>()
902                .all(&inner.db)
903                .await
904        {
905            let mut job_internal_table_ids = HashMap::new();
906            for (job_id, state_table_ids) in job_state_tables {
907                job_internal_table_ids
908                    .entry(job_id)
909                    .or_insert_with(Vec::new)
910                    .extend(
911                        state_table_ids
912                            .into_inner()
913                            .into_iter()
914                            .map(|table_id| TableId::new(table_id as _)),
915                    );
916            }
917            return Some(job_internal_table_ids.into_iter().collect());
918        }
919        None
920    }
921
922    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
923        let inner = self.inner.read().await;
924        let count = FragmentModel::find().count(&inner.db).await?;
925        Ok(count > 0)
926    }
927
928    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
929        let read_guard = self.env.shared_actor_infos().read_guard();
930        let actor_cnt: HashMap<WorkerId, _> = read_guard
931            .iter_over_fragments()
932            .flat_map(|(_, fragment)| {
933                fragment
934                    .actors
935                    .iter()
936                    .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
937            })
938            .into_group_map()
939            .into_iter()
940            .map(|(k, v)| (k, v.len()))
941            .collect();
942
943        Ok(actor_cnt)
944    }
945
946    fn collect_fragment_actor_map(
947        &self,
948        fragment_ids: &[FragmentId],
949        stream_context: StreamContext,
950    ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
951        let guard = self.env.shared_actor_infos().read_guard();
952        let pb_expr_context = stream_context.to_expr_context();
953        let expr_context: ExprContext = (&pb_expr_context).into();
954
955        let mut actor_map = HashMap::with_capacity(fragment_ids.len());
956        for fragment_id in fragment_ids {
957            let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
958                anyhow!("fragment {} not found in shared actor info", fragment_id)
959            })?;
960
961            let actors = fragment_info
962                .actors
963                .iter()
964                .map(|(actor_id, actor_info)| ActorInfo {
965                    actor_id: *actor_id as _,
966                    fragment_id: *fragment_id,
967                    splits: ConnectorSplits::from(&PbConnectorSplits {
968                        splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
969                    }),
970                    worker_id: actor_info.worker_id as _,
971                    vnode_bitmap: actor_info
972                        .vnode_bitmap
973                        .as_ref()
974                        .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
975                    expr_context: expr_context.clone(),
976                    config_override: stream_context.config_override.clone(),
977                })
978                .collect();
979
980            actor_map.insert(*fragment_id, actors);
981        }
982
983        Ok(actor_map)
984    }
985
986    fn collect_fragment_actor_pairs(
987        &self,
988        fragments: Vec<fragment::Model>,
989        stream_context: StreamContext,
990    ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
991        let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
992        let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
993        fragments
994            .into_iter()
995            .map(|fragment| {
996                let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
997                    anyhow!(
998                        "fragment {} missing in shared actor info map",
999                        fragment.fragment_id
1000                    )
1001                })?;
1002                Ok((fragment, actors))
1003            })
1004            .collect()
1005    }
1006
1007    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
1008    pub async fn table_fragments(
1009        &self,
1010    ) -> MetaResult<
1011        BTreeMap<
1012            JobId,
1013            (
1014                StreamJobFragments,
1015                HashMap<FragmentId, Vec<StreamActor>>,
1016                HashMap<crate::model::ActorId, PbActorStatus>,
1017            ),
1018        >,
1019    > {
1020        let inner = self.inner.read().await;
1021        let jobs = StreamingJob::find().all(&inner.db).await?;
1022
1023        let mut job_definition = resolve_streaming_job_definition(
1024            &inner.db,
1025            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
1026        )
1027        .await?;
1028
1029        let mut table_fragments = BTreeMap::new();
1030        for job in jobs {
1031            let fragments = FragmentModel::find()
1032                .filter(fragment::Column::JobId.eq(job.job_id))
1033                .all(&inner.db)
1034                .await?;
1035
1036            let fragment_actors =
1037                self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
1038
1039            table_fragments.insert(
1040                job.job_id,
1041                Self::compose_table_fragments(
1042                    job.job_id,
1043                    job.job_status.into(),
1044                    job.stream_context(),
1045                    fragment_actors,
1046                    job.parallelism,
1047                    job.max_parallelism as _,
1048                    job_definition.remove(&job.job_id),
1049                )?,
1050            );
1051        }
1052
1053        Ok(table_fragments)
1054    }
1055
1056    pub async fn upstream_fragments(
1057        &self,
1058        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1059    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
1060        let inner = self.inner.read().await;
1061        self.upstream_fragments_in_txn(&inner.db, fragment_ids)
1062            .await
1063    }
1064
1065    pub async fn upstream_fragments_in_txn<C>(
1066        &self,
1067        txn: &C,
1068        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1069    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>>
1070    where
1071        C: ConnectionTrait + StreamTrait + Send,
1072    {
1073        let mut stream = FragmentRelation::find()
1074            .select_only()
1075            .columns([
1076                fragment_relation::Column::SourceFragmentId,
1077                fragment_relation::Column::TargetFragmentId,
1078            ])
1079            .filter(
1080                fragment_relation::Column::TargetFragmentId
1081                    .is_in(fragment_ids.map(|id| id as FragmentId)),
1082            )
1083            .into_tuple::<(FragmentId, FragmentId)>()
1084            .stream(txn)
1085            .await?;
1086        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
1087        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
1088            upstream_fragments
1089                .entry(downstream_fragment_id as crate::model::FragmentId)
1090                .or_default()
1091                .insert(upstream_fragment_id as crate::model::FragmentId);
1092        }
1093        Ok(upstream_fragments)
1094    }
1095
1096    pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
1097        let info = self.env.shared_actor_infos().read_guard();
1098
1099        let actor_locations = info
1100            .iter_over_fragments()
1101            .flat_map(|(fragment_id, fragment)| {
1102                fragment
1103                    .actors
1104                    .iter()
1105                    .map(|(actor_id, actor)| PartialActorLocation {
1106                        actor_id: *actor_id as _,
1107                        fragment_id: *fragment_id as _,
1108                        worker_id: actor.worker_id,
1109                    })
1110            })
1111            .collect_vec();
1112
1113        Ok(actor_locations)
1114    }
1115
1116    pub async fn list_actor_info(
1117        &self,
1118    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1119        let inner = self.inner.read().await;
1120
1121        let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1122            FragmentModel::find()
1123                .select_only()
1124                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1125                .column(fragment::Column::FragmentId)
1126                .column_as(object::Column::Oid, "job_id")
1127                .column_as(object::Column::SchemaId, "schema_id")
1128                .column_as(object::Column::ObjType, "type")
1129                .into_tuple()
1130                .all(&inner.db)
1131                .await?;
1132
1133        let actor_infos = {
1134            let info = self.env.shared_actor_infos().read_guard();
1135
1136            let mut result = Vec::new();
1137
1138            for (fragment_id, object_id, schema_id, object_type) in fragment_objects {
1139                let Some(fragment) = info.get_fragment(fragment_id as _) else {
1140                    return Err(MetaError::unavailable(format!(
1141                        "shared actor info missing for fragment {fragment_id} while listing actors"
1142                    )));
1143                };
1144
1145                for actor_id in fragment.actors.keys() {
1146                    result.push((
1147                        *actor_id as _,
1148                        fragment.fragment_id as _,
1149                        object_id,
1150                        schema_id,
1151                        object_type,
1152                    ));
1153                }
1154            }
1155
1156            result
1157        };
1158
1159        Ok(actor_infos)
1160    }
1161
1162    pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1163        let guard = self.env.shared_actor_info.read_guard();
1164        guard
1165            .iter_over_fragments()
1166            .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1167            .collect_vec()
1168    }
1169
1170    pub async fn list_fragment_descs_with_node(
1171        &self,
1172        is_creating: bool,
1173    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1174        let inner = self.inner.read().await;
1175        let txn = inner.db.begin().await?;
1176
1177        let fragments_query = Self::build_fragment_query(is_creating);
1178        let fragments: Vec<fragment::Model> = fragments_query.all(&txn).await?;
1179
1180        let rows = fragments
1181            .into_iter()
1182            .map(|fragment| FragmentDescRow {
1183                fragment_id: fragment.fragment_id,
1184                job_id: fragment.job_id,
1185                fragment_type_mask: fragment.fragment_type_mask,
1186                distribution_type: fragment.distribution_type,
1187                state_table_ids: fragment.state_table_ids,
1188                vnode_count: fragment.vnode_count,
1189                parallelism_override: fragment.parallelism,
1190                stream_node: Some(fragment.stream_node),
1191            })
1192            .collect_vec();
1193
1194        self.build_fragment_distributions(&txn, rows).await
1195    }
1196
1197    pub async fn list_fragment_descs_without_node(
1198        &self,
1199        is_creating: bool,
1200    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1201        let inner = self.inner.read().await;
1202        let txn = inner.db.begin().await?;
1203
1204        let fragments_query = Self::build_fragment_query(is_creating);
1205        #[allow(clippy::type_complexity)]
1206        let fragments: Vec<(
1207            FragmentId,
1208            JobId,
1209            i32,
1210            DistributionType,
1211            TableIdArray,
1212            i32,
1213            Option<StreamingParallelism>,
1214        )> = fragments_query
1215            .select_only()
1216            .columns([
1217                fragment::Column::FragmentId,
1218                fragment::Column::JobId,
1219                fragment::Column::FragmentTypeMask,
1220                fragment::Column::DistributionType,
1221                fragment::Column::StateTableIds,
1222                fragment::Column::VnodeCount,
1223                fragment::Column::Parallelism,
1224            ])
1225            .into_tuple()
1226            .all(&txn)
1227            .await?;
1228
1229        let rows = fragments
1230            .into_iter()
1231            .map(
1232                |(
1233                    fragment_id,
1234                    job_id,
1235                    fragment_type_mask,
1236                    distribution_type,
1237                    state_table_ids,
1238                    vnode_count,
1239                    parallelism_override,
1240                )| FragmentDescRow {
1241                    fragment_id,
1242                    job_id,
1243                    fragment_type_mask,
1244                    distribution_type,
1245                    state_table_ids,
1246                    vnode_count,
1247                    parallelism_override,
1248                    stream_node: None,
1249                },
1250            )
1251            .collect_vec();
1252
1253        self.build_fragment_distributions(&txn, rows).await
1254    }
1255
1256    fn build_fragment_query(is_creating: bool) -> sea_orm::Select<fragment::Entity> {
1257        if is_creating {
1258            FragmentModel::find()
1259                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1260                .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1261                .filter(
1262                    streaming_job::Column::JobStatus
1263                        .eq(JobStatus::Initial)
1264                        .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1265                )
1266        } else {
1267            FragmentModel::find()
1268        }
1269    }
1270
1271    async fn build_fragment_distributions(
1272        &self,
1273        txn: &DatabaseTransaction,
1274        rows: Vec<FragmentDescRow>,
1275    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1276        let fragment_ids = rows.iter().map(|row| row.fragment_id).collect_vec();
1277        let job_ids = rows.iter().map(|row| row.job_id).unique().collect_vec();
1278
1279        let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragment_ids.is_empty() {
1280            HashMap::new()
1281        } else {
1282            StreamingJob::find()
1283                .select_only()
1284                .columns([
1285                    streaming_job::Column::JobId,
1286                    streaming_job::Column::Parallelism,
1287                ])
1288                .filter(streaming_job::Column::JobId.is_in(job_ids))
1289                .into_tuple()
1290                .all(txn)
1291                .await?
1292                .into_iter()
1293                .collect()
1294        };
1295
1296        let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1297            if fragment_ids.is_empty() {
1298                Vec::new()
1299            } else {
1300                FragmentRelation::find()
1301                    .select_only()
1302                    .columns([
1303                        fragment_relation::Column::TargetFragmentId,
1304                        fragment_relation::Column::SourceFragmentId,
1305                        fragment_relation::Column::DispatcherType,
1306                    ])
1307                    .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1308                    .into_tuple()
1309                    .all(txn)
1310                    .await?
1311            };
1312
1313        let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1314        for (target_id, source_id, _) in upstream_entries {
1315            all_upstreams.entry(target_id).or_default().push(source_id);
1316        }
1317
1318        let root_fragment_map = if fragment_ids.is_empty() {
1319            HashMap::new()
1320        } else {
1321            let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_ids).await?;
1322            Self::collect_root_fragment_mapping(ensembles)
1323        };
1324
1325        let guard = self.env.shared_actor_info.read_guard();
1326        let mut result = Vec::with_capacity(rows.len());
1327
1328        for row in rows {
1329            let parallelism = guard
1330                .get_fragment(row.fragment_id as _)
1331                .map(|fragment| fragment.actors.len())
1332                .unwrap_or_default();
1333
1334            let root_fragments = root_fragment_map
1335                .get(&row.fragment_id)
1336                .cloned()
1337                .unwrap_or_default();
1338
1339            let upstreams = all_upstreams.remove(&row.fragment_id).unwrap_or_default();
1340
1341            let parallelism_policy = Self::format_fragment_parallelism_policy(
1342                row.distribution_type,
1343                row.parallelism_override.as_ref(),
1344                job_parallelisms.get(&row.job_id),
1345                &root_fragments,
1346            );
1347
1348            let fragment = FragmentDistribution {
1349                fragment_id: row.fragment_id,
1350                table_id: row.job_id,
1351                distribution_type: PbFragmentDistributionType::from(row.distribution_type) as _,
1352                state_table_ids: row.state_table_ids.0,
1353                upstream_fragment_ids: upstreams.clone(),
1354                fragment_type_mask: row.fragment_type_mask as _,
1355                parallelism: parallelism as _,
1356                vnode_count: row.vnode_count as _,
1357                node: row.stream_node.map(|node| node.to_protobuf()),
1358                parallelism_policy,
1359            };
1360
1361            result.push((fragment, upstreams));
1362        }
1363
1364        Ok(result)
1365    }
1366
1367    pub async fn list_sink_log_store_tables(&self) -> MetaResult<Vec<(SinkId, TableId)>> {
1368        let inner = self.inner.read().await;
1369        let txn = inner.db.begin().await?;
1370
1371        let fragments: Vec<(JobId, StreamNode)> = FragmentModel::find()
1372            .select_only()
1373            .columns([fragment::Column::JobId, fragment::Column::StreamNode])
1374            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Sink))
1375            .into_tuple()
1376            .all(&txn)
1377            .await?;
1378
1379        let mut mapping: HashMap<SinkId, TableId> = HashMap::new();
1380
1381        for (job_id, stream_node) in fragments {
1382            let sink_id = SinkId::new(job_id.as_raw_id());
1383            let stream_node = stream_node.to_protobuf();
1384            let mut internal_table_id: Option<TableId> = None;
1385
1386            visit_stream_node_body(&stream_node, |body| {
1387                if let NodeBody::Sink(node) = body
1388                    && node.log_store_type == SinkLogStoreType::KvLogStore as i32
1389                    && let Some(table) = &node.table
1390                {
1391                    internal_table_id = Some(TableId::new(table.id.as_raw_id()));
1392                }
1393            });
1394
1395            if let Some(table_id) = internal_table_id
1396                && let Some(existing) = mapping.insert(sink_id, table_id)
1397                && existing != table_id
1398            {
1399                tracing::warn!(
1400                    "sink {sink_id:?} has multiple log store tables: {existing:?} vs {table_id:?}"
1401                );
1402            }
1403        }
1404
1405        Ok(mapping.into_iter().collect())
1406    }
1407
1408    /// Build a fragment-to-root lookup for all reported root fragment ensembles.
1409    fn collect_root_fragment_mapping(
1410        ensembles: Vec<NoShuffleEnsemble>,
1411    ) -> HashMap<FragmentId, Vec<FragmentId>> {
1412        let mut mapping = HashMap::new();
1413
1414        for ensemble in ensembles {
1415            let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1416            roots.sort_unstable();
1417            roots.dedup();
1418
1419            if roots.is_empty() {
1420                continue;
1421            }
1422
1423            let root_set: HashSet<_> = roots.iter().copied().collect();
1424
1425            for fragment_id in ensemble.component_fragments() {
1426                if root_set.contains(&fragment_id) {
1427                    mapping.insert(fragment_id, Vec::new());
1428                } else {
1429                    mapping.insert(fragment_id, roots.clone());
1430                }
1431            }
1432        }
1433
1434        mapping
1435    }
1436
1437    fn format_fragment_parallelism_policy(
1438        distribution_type: DistributionType,
1439        fragment_parallelism: Option<&StreamingParallelism>,
1440        job_parallelism: Option<&StreamingParallelism>,
1441        root_fragments: &[FragmentId],
1442    ) -> String {
1443        if distribution_type == DistributionType::Single {
1444            return "single".to_owned();
1445        }
1446
1447        if let Some(parallelism) = fragment_parallelism {
1448            return format!(
1449                "override({})",
1450                Self::format_streaming_parallelism(parallelism)
1451            );
1452        }
1453
1454        if !root_fragments.is_empty() {
1455            let mut upstreams = root_fragments.to_vec();
1456            upstreams.sort_unstable();
1457            upstreams.dedup();
1458
1459            return format!("upstream_fragment({upstreams:?})");
1460        }
1461
1462        let inherited = job_parallelism
1463            .map(Self::format_streaming_parallelism)
1464            .unwrap_or_else(|| "unknown".to_owned());
1465        format!("inherit({inherited})")
1466    }
1467
1468    fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1469        match parallelism {
1470            StreamingParallelism::Adaptive => "adaptive".to_owned(),
1471            StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1472            StreamingParallelism::Custom => "custom".to_owned(),
1473        }
1474    }
1475
1476    pub async fn list_sink_actor_mapping(
1477        &self,
1478    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1479        let inner = self.inner.read().await;
1480        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1481            .select_only()
1482            .columns([sink::Column::SinkId, sink::Column::Name])
1483            .into_tuple()
1484            .all(&inner.db)
1485            .await?;
1486        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1487
1488        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1489
1490        let actor_with_type: Vec<(ActorId, SinkId)> = {
1491            let info = self.env.shared_actor_infos().read_guard();
1492
1493            info.iter_over_fragments()
1494                .filter(|(_, fragment)| {
1495                    sink_ids.contains(&fragment.job_id.as_sink_id())
1496                        && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1497                })
1498                .flat_map(|(_, fragment)| {
1499                    fragment
1500                        .actors
1501                        .keys()
1502                        .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1503                })
1504                .collect()
1505        };
1506
1507        let mut sink_actor_mapping = HashMap::new();
1508        for (actor_id, sink_id) in actor_with_type {
1509            sink_actor_mapping
1510                .entry(sink_id)
1511                .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1512                .1
1513                .push(actor_id);
1514        }
1515
1516        Ok(sink_actor_mapping)
1517    }
1518
1519    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1520        let inner = self.inner.read().await;
1521        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1522            .select_only()
1523            .columns([
1524                fragment::Column::FragmentId,
1525                fragment::Column::JobId,
1526                fragment::Column::StateTableIds,
1527            ])
1528            .into_partial_model()
1529            .all(&inner.db)
1530            .await?;
1531        Ok(fragment_state_tables)
1532    }
1533
1534    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1535    /// collected
1536    pub async fn load_all_actors_dynamic(
1537        &self,
1538        database_id: Option<DatabaseId>,
1539        worker_nodes: &ActiveStreamingWorkerNodes,
1540    ) -> MetaResult<FragmentRenderMap> {
1541        let loaded = self.load_fragment_context(database_id).await?;
1542
1543        if loaded.is_empty() {
1544            return Ok(HashMap::new());
1545        }
1546
1547        let adaptive_parallelism_strategy = {
1548            let system_params_reader = self.env.system_params_reader().await;
1549            system_params_reader.adaptive_parallelism_strategy()
1550        };
1551
1552        let RenderedGraph { fragments, .. } = render_actor_assignments(
1553            self.env.actor_id_generator(),
1554            worker_nodes.current(),
1555            adaptive_parallelism_strategy,
1556            &loaded,
1557        )?;
1558
1559        tracing::trace!(?fragments, "reload all actors");
1560
1561        Ok(fragments)
1562    }
1563
1564    /// Async load stage: collects all metadata required for rendering actor assignments.
1565    pub async fn load_fragment_context(
1566        &self,
1567        database_id: Option<DatabaseId>,
1568    ) -> MetaResult<LoadedFragmentContext> {
1569        let inner = self.inner.read().await;
1570        let txn = inner.db.begin().await?;
1571
1572        self.load_fragment_context_in_txn(&txn, database_id).await
1573    }
1574
1575    pub async fn load_fragment_context_in_txn<C>(
1576        &self,
1577        txn: &C,
1578        database_id: Option<DatabaseId>,
1579    ) -> MetaResult<LoadedFragmentContext>
1580    where
1581        C: ConnectionTrait,
1582    {
1583        let mut query = StreamingJob::find()
1584            .select_only()
1585            .column(streaming_job::Column::JobId);
1586
1587        if let Some(database_id) = database_id {
1588            query = query
1589                .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1590                .filter(object::Column::DatabaseId.eq(database_id));
1591        }
1592
1593        let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
1594
1595        let jobs: HashSet<JobId> = jobs.into_iter().collect();
1596
1597        if jobs.is_empty() {
1598            return Ok(LoadedFragmentContext::default());
1599        }
1600
1601        load_fragment_context_for_jobs(txn, jobs).await
1602    }
1603
1604    #[await_tree::instrument]
1605    pub async fn fill_snapshot_backfill_epoch(
1606        &self,
1607        fragment_ids: impl Iterator<Item = FragmentId>,
1608        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1609        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1610    ) -> MetaResult<()> {
1611        let inner = self.inner.write().await;
1612        let txn = inner.db.begin().await?;
1613        for fragment_id in fragment_ids {
1614            let fragment = FragmentModel::find_by_id(fragment_id)
1615                .one(&txn)
1616                .await?
1617                .context(format!("fragment {} not found", fragment_id))?;
1618            let mut node = fragment.stream_node.to_protobuf();
1619            if crate::stream::fill_snapshot_backfill_epoch(
1620                &mut node,
1621                snapshot_backfill_info,
1622                cross_db_snapshot_backfill_info,
1623            )? {
1624                let node = StreamNode::from(&node);
1625                FragmentModel::update(fragment::ActiveModel {
1626                    fragment_id: Set(fragment_id),
1627                    stream_node: Set(node),
1628                    ..Default::default()
1629                })
1630                .exec(&txn)
1631                .await?;
1632            }
1633        }
1634        txn.commit().await?;
1635        Ok(())
1636    }
1637
1638    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1639    pub fn get_running_actors_of_fragment(
1640        &self,
1641        fragment_id: FragmentId,
1642    ) -> MetaResult<HashSet<model::ActorId>> {
1643        let info = self.env.shared_actor_infos().read_guard();
1644
1645        let actors = info
1646            .get_fragment(fragment_id as _)
1647            .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1648            .unwrap_or_default();
1649
1650        Ok(actors)
1651    }
1652
1653    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1654    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1655    pub async fn get_running_actors_for_source_backfill(
1656        &self,
1657        source_backfill_fragment_id: FragmentId,
1658        source_fragment_id: FragmentId,
1659    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1660        let inner = self.inner.read().await;
1661        let txn = inner.db.begin().await?;
1662
1663        let fragment_relation: DispatcherType = FragmentRelation::find()
1664            .select_only()
1665            .column(fragment_relation::Column::DispatcherType)
1666            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1667            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1668            .into_tuple()
1669            .one(&txn)
1670            .await?
1671            .ok_or_else(|| {
1672                anyhow!(
1673                    "no fragment connection from source fragment {} to source backfill fragment {}",
1674                    source_fragment_id,
1675                    source_backfill_fragment_id
1676                )
1677            })?;
1678
1679        if fragment_relation != DispatcherType::NoShuffle {
1680            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1681        }
1682
1683        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1684            let result: MetaResult<DistributionType> = try {
1685                FragmentModel::find_by_id(fragment_id)
1686                    .select_only()
1687                    .column(fragment::Column::DistributionType)
1688                    .into_tuple()
1689                    .one(txn)
1690                    .await?
1691                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1692            };
1693            result
1694        };
1695
1696        let source_backfill_distribution_type =
1697            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1698        let source_distribution_type =
1699            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1700
1701        let load_fragment_actor_distribution =
1702            |actor_info: &SharedActorInfos,
1703             fragment_id: FragmentId|
1704             -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1705                let guard = actor_info.read_guard();
1706
1707                guard
1708                    .get_fragment(fragment_id as _)
1709                    .map(|fragment| {
1710                        fragment
1711                            .actors
1712                            .iter()
1713                            .map(|(actor_id, actor)| {
1714                                (
1715                                    *actor_id as _,
1716                                    actor
1717                                        .vnode_bitmap
1718                                        .as_ref()
1719                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1720                                )
1721                            })
1722                            .collect()
1723                    })
1724                    .unwrap_or_default()
1725            };
1726
1727        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1728            load_fragment_actor_distribution(
1729                self.env.shared_actor_infos(),
1730                source_backfill_fragment_id,
1731            );
1732
1733        let source_actors =
1734            load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1735
1736        Ok(resolve_no_shuffle_actor_mapping(
1737            source_distribution_type,
1738            source_actors.iter().map(|(&id, bitmap)| (id, bitmap)),
1739            source_backfill_distribution_type,
1740            source_backfill_actors
1741                .iter()
1742                .map(|(&id, bitmap)| (id, bitmap)),
1743        )
1744        .into_iter()
1745        .map(|(source_actor, source_backfill_actor)| {
1746            (source_backfill_actor as _, source_actor as _)
1747        })
1748        .collect())
1749    }
1750
1751    /// Get and filter the "**root**" fragments of the specified jobs.
1752    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1753    ///
1754    /// Root fragment connects to downstream jobs.
1755    ///
1756    /// ## What can be the root fragment
1757    /// - For sink, it should have one `Sink` fragment.
1758    /// - For MV, it should have one `MView` fragment.
1759    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1760    /// - For source, it should have one `Source` fragment.
1761    ///
1762    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1763    pub async fn get_root_fragments(
1764        &self,
1765        job_ids: Vec<JobId>,
1766    ) -> MetaResult<HashMap<JobId, Fragment>> {
1767        let inner = self.inner.read().await;
1768
1769        let all_fragments = FragmentModel::find()
1770            .filter(fragment::Column::JobId.is_in(job_ids))
1771            .all(&inner.db)
1772            .await?;
1773        // job_id -> fragment
1774        let mut root_fragments = HashMap::<JobId, Fragment>::new();
1775        for fragment in all_fragments {
1776            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1777            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1778                _ = root_fragments.insert(fragment.job_id, fragment.into());
1779            } else if mask.contains(FragmentTypeFlag::Source) {
1780                // look for Source fragment only if there's no MView fragment
1781                // (notice try_insert here vs insert above)
1782                _ = root_fragments.try_insert(fragment.job_id, fragment.into());
1783            }
1784        }
1785
1786        Ok(root_fragments)
1787    }
1788
1789    pub async fn get_root_fragment(&self, job_id: JobId) -> MetaResult<Fragment> {
1790        let mut root_fragments = self.get_root_fragments(vec![job_id]).await?;
1791        let root_fragment = root_fragments
1792            .remove(&job_id)
1793            .context(format!("root fragment for job {} not found", job_id))?;
1794
1795        Ok(root_fragment)
1796    }
1797
1798    /// Get the downstream fragments connected to the specified job.
1799    pub async fn get_downstream_fragments(
1800        &self,
1801        job_id: JobId,
1802    ) -> MetaResult<Vec<(stream_plan::DispatcherType, Fragment)>> {
1803        let root_fragment = self.get_root_fragment(job_id).await?;
1804
1805        let inner = self.inner.read().await;
1806        let txn = inner.db.begin().await?;
1807        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1808            .filter(
1809                fragment_relation::Column::SourceFragmentId
1810                    .eq(root_fragment.fragment_id as FragmentId),
1811            )
1812            .all(&txn)
1813            .await?;
1814
1815        let downstream_fragment_ids = downstream_fragment_relations
1816            .iter()
1817            .map(|model| model.target_fragment_id as FragmentId)
1818            .collect::<HashSet<_>>();
1819
1820        let downstream_fragments: Vec<fragment::Model> = FragmentModel::find()
1821            .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1822            .all(&txn)
1823            .await?;
1824
1825        let mut downstream_fragments_map: HashMap<_, _> = downstream_fragments
1826            .into_iter()
1827            .map(|fragment| (fragment.fragment_id, fragment))
1828            .collect();
1829
1830        let mut downstream_fragments = vec![];
1831
1832        let fragment_map: HashMap<_, _> = downstream_fragment_relations
1833            .iter()
1834            .map(|model| (model.target_fragment_id, model.dispatcher_type))
1835            .collect();
1836
1837        for (fragment_id, dispatcher_type) in fragment_map {
1838            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1839
1840            let fragment = downstream_fragments_map
1841                .remove(&fragment_id)
1842                .context(format!(
1843                    "downstream fragment node for id {} not found",
1844                    fragment_id
1845                ))?
1846                .into();
1847
1848            downstream_fragments.push((dispatch_type, fragment));
1849        }
1850        Ok(downstream_fragments)
1851    }
1852
1853    pub async fn load_source_fragment_ids(
1854        &self,
1855    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1856        let inner = self.inner.read().await;
1857        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1858            .select_only()
1859            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1860            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1861            .into_tuple()
1862            .all(&inner.db)
1863            .await?;
1864
1865        let mut source_fragment_ids = HashMap::new();
1866        for (fragment_id, stream_node) in fragments {
1867            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1868                source_fragment_ids
1869                    .entry(source_id)
1870                    .or_insert_with(BTreeSet::new)
1871                    .insert(fragment_id);
1872            }
1873        }
1874        Ok(source_fragment_ids)
1875    }
1876
1877    pub async fn load_backfill_fragment_ids(
1878        &self,
1879    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1880        let inner = self.inner.read().await;
1881        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1882            .select_only()
1883            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1884            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1885            .into_tuple()
1886            .all(&inner.db)
1887            .await?;
1888
1889        let mut source_fragment_ids = HashMap::new();
1890        for (fragment_id, stream_node) in fragments {
1891            if let Some((source_id, upstream_source_fragment_id)) =
1892                stream_node.to_protobuf().find_source_backfill()
1893            {
1894                source_fragment_ids
1895                    .entry(source_id)
1896                    .or_insert_with(BTreeSet::new)
1897                    .insert((fragment_id, upstream_source_fragment_id));
1898            }
1899        }
1900        Ok(source_fragment_ids)
1901    }
1902
1903    pub async fn get_all_upstream_sink_infos(
1904        &self,
1905        target_table: &PbTable,
1906        target_fragment_id: FragmentId,
1907    ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1908        let inner = self.inner.read().await;
1909        let txn = inner.db.begin().await?;
1910
1911        self.get_all_upstream_sink_infos_in_txn(&txn, target_table, target_fragment_id)
1912            .await
1913    }
1914
1915    pub async fn get_all_upstream_sink_infos_in_txn<C>(
1916        &self,
1917        txn: &C,
1918        target_table: &PbTable,
1919        target_fragment_id: FragmentId,
1920    ) -> MetaResult<Vec<UpstreamSinkInfo>>
1921    where
1922        C: ConnectionTrait,
1923    {
1924        let incoming_sinks = Sink::find()
1925            .filter(sink::Column::TargetTable.eq(target_table.id))
1926            .all(txn)
1927            .await?;
1928
1929        let sink_ids = incoming_sinks.iter().map(|s| s.sink_id).collect_vec();
1930        let sink_fragment_ids = get_sink_fragment_by_ids(txn, sink_ids).await?;
1931
1932        let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1933        for sink in &incoming_sinks {
1934            let sink_fragment_id =
1935                sink_fragment_ids
1936                    .get(&sink.sink_id)
1937                    .cloned()
1938                    .ok_or(anyhow::anyhow!(
1939                        "sink fragment not found for sink id {}",
1940                        sink.sink_id
1941                    ))?;
1942            let upstream_info = build_upstream_sink_info(
1943                sink.sink_id,
1944                sink.original_target_columns
1945                    .as_ref()
1946                    .map(|cols| cols.to_protobuf())
1947                    .unwrap_or_default(),
1948                sink_fragment_id,
1949                target_table,
1950                target_fragment_id,
1951            )?;
1952            upstream_sink_infos.push(upstream_info);
1953        }
1954
1955        Ok(upstream_sink_infos)
1956    }
1957
1958    pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1959        let inner = self.inner.read().await;
1960        let txn = inner.db.begin().await?;
1961
1962        let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1963            .select_only()
1964            .column(fragment::Column::FragmentId)
1965            .filter(
1966                fragment::Column::JobId
1967                    .eq(job_id)
1968                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1969            )
1970            .into_tuple()
1971            .all(&txn)
1972            .await?;
1973
1974        if mview_fragment.len() != 1 {
1975            return Err(anyhow::anyhow!(
1976                "expected exactly one mview fragment for job {}, found {}",
1977                job_id,
1978                mview_fragment.len()
1979            )
1980            .into());
1981        }
1982
1983        Ok(mview_fragment.into_iter().next().unwrap())
1984    }
1985
1986    pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1987        let inner = self.inner.read().await;
1988        let txn = inner.db.begin().await?;
1989        has_table_been_migrated(&txn, table_id).await
1990    }
1991
1992    pub async fn update_fragment_splits<C>(
1993        &self,
1994        txn: &C,
1995        fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
1996    ) -> MetaResult<()>
1997    where
1998        C: ConnectionTrait,
1999    {
2000        if fragment_splits.is_empty() {
2001            return Ok(());
2002        }
2003
2004        let existing_fragment_ids: HashSet<FragmentId> = FragmentModel::find()
2005            .select_only()
2006            .column(fragment::Column::FragmentId)
2007            .filter(fragment::Column::FragmentId.is_in(fragment_splits.keys().copied()))
2008            .into_tuple()
2009            .all(txn)
2010            .await?
2011            .into_iter()
2012            .collect();
2013
2014        // Filter out stale fragment ids to avoid FK violations when split updates race with drop.
2015        let (models, skipped_fragment_ids): (Vec<_>, Vec<_>) = fragment_splits
2016            .iter()
2017            .partition_map(|(fragment_id, splits)| {
2018                if existing_fragment_ids.contains(fragment_id) {
2019                    Either::Left(fragment_splits::ActiveModel {
2020                        fragment_id: Set(*fragment_id as _),
2021                        splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
2022                            splits: splits.iter().map(Into::into).collect_vec(),
2023                        }))),
2024                    })
2025                } else {
2026                    Either::Right(*fragment_id)
2027                }
2028            });
2029
2030        if !skipped_fragment_ids.is_empty() {
2031            tracing::warn!(
2032                skipped_fragment_ids = ?skipped_fragment_ids,
2033                total_fragment_ids = fragment_splits.len(),
2034                "skipping stale fragment split updates for missing fragments"
2035            );
2036        }
2037
2038        if models.is_empty() {
2039            return Ok(());
2040        }
2041
2042        FragmentSplits::insert_many(models)
2043            .on_conflict(
2044                OnConflict::column(fragment_splits::Column::FragmentId)
2045                    .update_column(fragment_splits::Column::Splits)
2046                    .to_owned(),
2047            )
2048            .exec(txn)
2049            .await?;
2050
2051        Ok(())
2052    }
2053}
2054
2055#[cfg(test)]
2056mod tests {
2057    use std::collections::{BTreeMap, HashMap, HashSet};
2058
2059    use itertools::Itertools;
2060    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
2061    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
2062    use risingwave_common::id::JobId;
2063    use risingwave_common::util::iter_util::ZipEqDebug;
2064    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
2065    use risingwave_meta_model::fragment::DistributionType;
2066    use risingwave_meta_model::*;
2067    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
2068    use risingwave_pb::plan_common::PbExprContext;
2069    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
2070    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
2071    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
2072
2073    use super::ActorInfo;
2074    use crate::MetaResult;
2075    use crate::controller::catalog::CatalogController;
2076    use crate::model::{Fragment, StreamActor};
2077
2078    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
2079
2080    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
2081
2082    const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
2083
2084    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
2085
2086    const TEST_JOB_ID: JobId = JobId::new(1);
2087
2088    const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
2089
2090    fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
2091        let mut upstream_actor_ids = BTreeMap::new();
2092        upstream_actor_ids.insert(
2093            TEST_UPSTREAM_FRAGMENT_ID,
2094            HashSet::from_iter([(actor_id + 100)]),
2095        );
2096        upstream_actor_ids.insert(
2097            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
2098            HashSet::from_iter([(actor_id + 200)]),
2099        );
2100        upstream_actor_ids
2101    }
2102
2103    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
2104        let mut input = vec![];
2105        for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
2106            input.push(PbStreamNode {
2107                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
2108                    upstream_fragment_id,
2109                    ..Default::default()
2110                }))),
2111                ..Default::default()
2112            });
2113        }
2114
2115        PbStreamNode {
2116            input,
2117            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
2118            ..Default::default()
2119        }
2120    }
2121
2122    #[tokio::test]
2123    async fn test_extract_fragment() -> MetaResult<()> {
2124        let actor_count = 3u32;
2125        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2126            .map(|actor_id| {
2127                (
2128                    actor_id.into(),
2129                    generate_upstream_actor_ids_for_actor(actor_id.into()),
2130                )
2131            })
2132            .collect();
2133
2134        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
2135
2136        let pb_fragment = Fragment {
2137            fragment_id: TEST_FRAGMENT_ID as _,
2138            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
2139            distribution_type: PbFragmentDistributionType::Hash as _,
2140            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
2141            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
2142            nodes: stream_node,
2143        };
2144
2145        let fragment =
2146            CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
2147
2148        check_fragment(fragment, pb_fragment);
2149
2150        Ok(())
2151    }
2152
2153    #[tokio::test]
2154    async fn test_compose_fragment() -> MetaResult<()> {
2155        let actor_count = 3u32;
2156
2157        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2158            .map(|actor_id| {
2159                (
2160                    actor_id.into(),
2161                    generate_upstream_actor_ids_for_actor(actor_id.into()),
2162                )
2163            })
2164            .collect();
2165
2166        let mut actor_bitmaps = ActorMapping::new_uniform(
2167            (0..actor_count).map(|i| i.into()),
2168            VirtualNode::COUNT_FOR_TEST,
2169        )
2170        .to_bitmaps();
2171
2172        let actors = (0..actor_count)
2173            .map(|actor_id| {
2174                let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
2175                    splits: vec![PbConnectorSplit {
2176                        split_type: "dummy".to_owned(),
2177                        ..Default::default()
2178                    }],
2179                });
2180
2181                ActorInfo {
2182                    actor_id: actor_id.into(),
2183                    fragment_id: TEST_FRAGMENT_ID,
2184                    splits: actor_splits,
2185                    worker_id: 0.into(),
2186                    vnode_bitmap: actor_bitmaps
2187                        .remove(&actor_id)
2188                        .map(|bitmap| bitmap.to_protobuf())
2189                        .as_ref()
2190                        .map(VnodeBitmap::from),
2191                    expr_context: ExprContext::from(&PbExprContext {
2192                        time_zone: String::from("America/New_York"),
2193                        strict_mode: false,
2194                    }),
2195                    config_override: "a.b.c = true".into(),
2196                }
2197            })
2198            .collect_vec();
2199
2200        let stream_node = {
2201            let template_actor = actors.first().cloned().unwrap();
2202
2203            let template_upstream_actor_ids =
2204                upstream_actor_ids.get(&template_actor.actor_id).unwrap();
2205
2206            generate_merger_stream_node(template_upstream_actor_ids)
2207        };
2208
2209        #[expect(deprecated)]
2210        let fragment = fragment::Model {
2211            fragment_id: TEST_FRAGMENT_ID,
2212            job_id: TEST_JOB_ID,
2213            fragment_type_mask: 0,
2214            distribution_type: DistributionType::Hash,
2215            stream_node: StreamNode::from(&stream_node),
2216            state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
2217            upstream_fragment_id: Default::default(),
2218            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2219            parallelism: None,
2220        };
2221
2222        let (pb_fragment, pb_actors, pb_actor_status, pb_actor_splits) =
2223            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2224
2225        assert_eq!(pb_actor_status.len(), actor_count as usize);
2226        assert!(
2227            pb_actor_status
2228                .values()
2229                .all(|actor_status| actor_status.location.is_some())
2230        );
2231        assert_eq!(pb_actor_splits.len(), actor_count as usize);
2232
2233        check_fragment(fragment, pb_fragment);
2234        check_actors(
2235            actors,
2236            &upstream_actor_ids,
2237            pb_actors,
2238            pb_actor_splits,
2239            &stream_node,
2240        );
2241
2242        Ok(())
2243    }
2244
2245    fn check_actors(
2246        actors: Vec<ActorInfo>,
2247        actor_upstreams: &FragmentActorUpstreams,
2248        pb_actors: Vec<StreamActor>,
2249        pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2250        stream_node: &PbStreamNode,
2251    ) {
2252        for (
2253            ActorInfo {
2254                actor_id,
2255                fragment_id,
2256                splits,
2257                worker_id: _,
2258                vnode_bitmap,
2259                expr_context,
2260                ..
2261            },
2262            StreamActor {
2263                actor_id: pb_actor_id,
2264                fragment_id: pb_fragment_id,
2265                vnode_bitmap: pb_vnode_bitmap,
2266                mview_definition,
2267                expr_context: pb_expr_context,
2268                ..
2269            },
2270        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2271        {
2272            assert_eq!(actor_id, pb_actor_id as ActorId);
2273            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2274
2275            assert_eq!(
2276                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2277                pb_vnode_bitmap,
2278            );
2279
2280            assert_eq!(mview_definition, "");
2281
2282            visit_stream_node_body(stream_node, |body| {
2283                if let PbNodeBody::Merge(m) = body {
2284                    assert!(
2285                        actor_upstreams
2286                            .get(&actor_id)
2287                            .unwrap()
2288                            .contains_key(&m.upstream_fragment_id)
2289                    );
2290                }
2291            });
2292
2293            assert_eq!(
2294                splits,
2295                pb_actor_splits
2296                    .get(&pb_actor_id)
2297                    .map(ConnectorSplits::from)
2298                    .unwrap_or_default()
2299            );
2300
2301            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2302        }
2303    }
2304
2305    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2306        let Fragment {
2307            fragment_id,
2308            fragment_type_mask,
2309            distribution_type: pb_distribution_type,
2310            state_table_ids: pb_state_table_ids,
2311            maybe_vnode_count: _,
2312            nodes,
2313        } = pb_fragment;
2314
2315        assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2316        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2317        assert_eq!(
2318            pb_distribution_type,
2319            PbFragmentDistributionType::from(fragment.distribution_type)
2320        );
2321
2322        assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2323        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2324    }
2325
2326    #[test]
2327    fn test_parallelism_policy_with_root_fragments() {
2328        #[expect(deprecated)]
2329        let fragment = fragment::Model {
2330            fragment_id: 3.into(),
2331            job_id: TEST_JOB_ID,
2332            fragment_type_mask: 0,
2333            distribution_type: DistributionType::Hash,
2334            stream_node: StreamNode::from(&PbStreamNode::default()),
2335            state_table_ids: TableIdArray::default(),
2336            upstream_fragment_id: Default::default(),
2337            vnode_count: 0,
2338            parallelism: None,
2339        };
2340
2341        let job_parallelism = StreamingParallelism::Fixed(4);
2342
2343        let policy = super::CatalogController::format_fragment_parallelism_policy(
2344            fragment.distribution_type,
2345            fragment.parallelism.as_ref(),
2346            Some(&job_parallelism),
2347            &[],
2348        );
2349
2350        assert_eq!(policy, "inherit(fixed(4))");
2351    }
2352
2353    #[test]
2354    fn test_parallelism_policy_with_upstream_roots() {
2355        #[expect(deprecated)]
2356        let fragment = fragment::Model {
2357            fragment_id: 5.into(),
2358            job_id: TEST_JOB_ID,
2359            fragment_type_mask: 0,
2360            distribution_type: DistributionType::Hash,
2361            stream_node: StreamNode::from(&PbStreamNode::default()),
2362            state_table_ids: TableIdArray::default(),
2363            upstream_fragment_id: Default::default(),
2364            vnode_count: 0,
2365            parallelism: None,
2366        };
2367
2368        let policy = super::CatalogController::format_fragment_parallelism_policy(
2369            fragment.distribution_type,
2370            fragment.parallelism.as_ref(),
2371            None,
2372            &[3.into(), 1.into(), 2.into(), 1.into()],
2373        );
2374
2375        assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2376    }
2377}