risingwave_meta/controller/
fragment.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::TryStreamExt;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::id::JobId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
29use risingwave_connector::source::SplitImpl;
30use risingwave_connector::source::cdc::CdcScanOptions;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34    Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39    TableIdArray, VnodeBitmap, WorkerId, database, fragment, fragment_relation, fragment_splits,
40    object, sink, source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46    Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::fragment::{
49    FragmentDistributionType, PbFragmentDistributionType,
50};
51use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
52use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
53use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan;
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57    PbDispatchOutputMapping, PbDispatcherType, PbStreamNode, PbStreamScanType, SinkLogStoreType,
58    StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63    ColumnTrait, ConnectionTrait, DatabaseTransaction, EntityTrait, FromQueryResult, JoinType,
64    PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, StreamTrait, TransactionTrait,
65};
66use serde::{Deserialize, Serialize};
67
68use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
69use crate::controller::catalog::CatalogController;
70use crate::controller::scale::{
71    FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble, RenderedGraph,
72    find_fragment_no_shuffle_dags_detailed, load_fragment_context_for_jobs,
73    render_actor_assignments, resolve_streaming_job_definition,
74};
75use crate::controller::utils::{
76    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, compose_dispatchers,
77    get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
78    resolve_no_shuffle_actor_dispatcher,
79};
80use crate::error::MetaError;
81use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, NotificationManager};
82use crate::model::{
83    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
84    StreamActor, StreamContext, StreamJobFragments, StreamingJobModelContextExt as _,
85    TableParallelism,
86};
87use crate::rpc::ddl_controller::build_upstream_sink_info;
88use crate::stream::UpstreamSinkInfo;
89use crate::{MetaResult, model};
90
91/// Some information of running (inflight) actors.
92#[derive(Debug)]
93pub struct InflightActorInfo {
94    pub worker_id: WorkerId,
95    pub vnode_bitmap: Option<Bitmap>,
96    pub splits: Vec<SplitImpl>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100struct ActorInfo {
101    pub actor_id: ActorId,
102    pub fragment_id: FragmentId,
103    pub splits: ConnectorSplits,
104    pub worker_id: WorkerId,
105    pub vnode_bitmap: Option<VnodeBitmap>,
106    pub expr_context: ExprContext,
107    pub config_override: Arc<str>,
108}
109
110#[derive(Debug)]
111struct FragmentDescRow {
112    fragment_id: FragmentId,
113    job_id: JobId,
114    fragment_type_mask: i32,
115    distribution_type: DistributionType,
116    state_table_ids: TableIdArray,
117    vnode_count: i32,
118    parallelism_override: Option<StreamingParallelism>,
119    stream_node: Option<StreamNode>,
120}
121
122#[derive(Debug)]
123pub struct InflightFragmentInfo {
124    pub fragment_id: FragmentId,
125    pub distribution_type: DistributionType,
126    pub fragment_type_mask: FragmentTypeMask,
127    pub vnode_count: usize,
128    pub nodes: PbStreamNode,
129    pub actors: HashMap<ActorId, InflightActorInfo>,
130    pub state_table_ids: HashSet<TableId>,
131}
132
133#[derive(Clone, Debug)]
134pub struct FragmentParallelismInfo {
135    pub distribution_type: FragmentDistributionType,
136    pub actor_count: usize,
137    pub vnode_count: usize,
138}
139
140#[easy_ext::ext(FragmentTypeMaskExt)]
141pub impl FragmentTypeMask {
142    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
143        Expr::col(fragment::Column::FragmentTypeMask)
144            .bit_and(Expr::value(flag as i32))
145            .ne(0)
146    }
147
148    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
149        Expr::col(fragment::Column::FragmentTypeMask)
150            .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
151            .ne(0)
152    }
153
154    fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
155        Expr::col(fragment::Column::FragmentTypeMask)
156            .bit_and(Expr::value(flag as i32))
157            .eq(0)
158    }
159}
160
161#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
162#[serde(rename_all = "camelCase")] // for dashboard
163pub struct StreamingJobInfo {
164    pub job_id: JobId,
165    pub obj_type: ObjectType,
166    pub name: String,
167    pub job_status: JobStatus,
168    pub parallelism: StreamingParallelism,
169    pub max_parallelism: i32,
170    pub resource_group: String,
171    pub config_override: String,
172    pub database_id: DatabaseId,
173    pub schema_id: SchemaId,
174}
175
176impl NotificationManager {
177    pub(crate) fn notify_fragment_mapping(
178        &self,
179        operation: NotificationOperation,
180        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
181    ) {
182        let fragment_ids = fragment_mappings
183            .iter()
184            .map(|mapping| mapping.fragment_id)
185            .collect_vec();
186        if fragment_ids.is_empty() {
187            return;
188        }
189        // notify all fragment mappings to frontend.
190        for fragment_mapping in fragment_mappings {
191            self.notify_frontend_without_version(
192                operation,
193                NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
194            );
195        }
196
197        // update serving vnode mappings.
198        match operation {
199            NotificationOperation::Add | NotificationOperation::Update => {
200                self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
201                    fragment_ids,
202                ));
203            }
204            NotificationOperation::Delete => {
205                self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
206                    fragment_ids,
207                ));
208            }
209            op => {
210                tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
211            }
212        }
213    }
214}
215
216impl CatalogController {
217    pub fn prepare_fragment_models_from_fragments(
218        job_id: JobId,
219        fragments: impl Iterator<Item = &Fragment>,
220    ) -> MetaResult<Vec<fragment::Model>> {
221        fragments
222            .map(|fragment| Self::prepare_fragment_model_for_new_job(job_id, fragment))
223            .try_collect()
224    }
225
226    pub fn prepare_fragment_model_for_new_job(
227        job_id: JobId,
228        fragment: &Fragment,
229    ) -> MetaResult<fragment::Model> {
230        let vnode_count = fragment.vnode_count();
231        let Fragment {
232            fragment_id: pb_fragment_id,
233            fragment_type_mask: pb_fragment_type_mask,
234            distribution_type: pb_distribution_type,
235            actors: pb_actors,
236            state_table_ids: pb_state_table_ids,
237            nodes,
238            ..
239        } = fragment;
240
241        let state_table_ids = pb_state_table_ids.clone().into();
242
243        assert!(!pb_actors.is_empty());
244
245        let fragment_parallelism = nodes
246            .node_body
247            .as_ref()
248            .and_then(|body| match body {
249                NodeBody::StreamCdcScan(node) => Some(node),
250                _ => None,
251            })
252            .and_then(|node| node.options.as_ref())
253            .map(CdcScanOptions::from_proto)
254            .filter(|opts| opts.is_parallelized_backfill())
255            .map(|opts| StreamingParallelism::Fixed(opts.backfill_parallelism as usize));
256
257        let stream_node = StreamNode::from(nodes);
258
259        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
260            .unwrap()
261            .into();
262
263        #[expect(deprecated)]
264        let fragment = fragment::Model {
265            fragment_id: *pb_fragment_id as _,
266            job_id,
267            fragment_type_mask: (*pb_fragment_type_mask).into(),
268            distribution_type,
269            stream_node,
270            state_table_ids,
271            upstream_fragment_id: Default::default(),
272            vnode_count: vnode_count as _,
273            parallelism: fragment_parallelism,
274        };
275
276        Ok(fragment)
277    }
278
279    fn compose_table_fragments(
280        job_id: JobId,
281        state: PbState,
282        ctx: StreamContext,
283        fragments: Vec<(fragment::Model, Vec<ActorInfo>)>,
284        parallelism: StreamingParallelism,
285        max_parallelism: usize,
286        job_definition: Option<String>,
287    ) -> MetaResult<StreamJobFragments> {
288        let mut pb_fragments = BTreeMap::new();
289        let mut pb_actor_status = BTreeMap::new();
290
291        for (fragment, actors) in fragments {
292            let (fragment, fragment_actor_status, _) =
293                Self::compose_fragment(fragment, actors, job_definition.clone())?;
294
295            pb_fragments.insert(fragment.fragment_id, fragment);
296            pb_actor_status.extend(fragment_actor_status.into_iter());
297        }
298
299        let table_fragments = StreamJobFragments {
300            stream_job_id: job_id,
301            state: state as _,
302            fragments: pb_fragments,
303            actor_status: pb_actor_status,
304            ctx,
305            assigned_parallelism: match parallelism {
306                StreamingParallelism::Custom => TableParallelism::Custom,
307                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
308                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
309            },
310            max_parallelism,
311        };
312
313        Ok(table_fragments)
314    }
315
316    #[allow(clippy::type_complexity)]
317    fn compose_fragment(
318        fragment: fragment::Model,
319        actors: Vec<ActorInfo>,
320        job_definition: Option<String>,
321    ) -> MetaResult<(
322        Fragment,
323        HashMap<crate::model::ActorId, PbActorStatus>,
324        HashMap<crate::model::ActorId, PbConnectorSplits>,
325    )> {
326        let fragment::Model {
327            fragment_id,
328            fragment_type_mask,
329            distribution_type,
330            stream_node,
331            state_table_ids,
332            vnode_count,
333            ..
334        } = fragment;
335
336        let stream_node = stream_node.to_protobuf();
337        let mut upstream_fragments = HashSet::new();
338        visit_stream_node_body(&stream_node, |body| {
339            if let NodeBody::Merge(m) = body {
340                assert!(
341                    upstream_fragments.insert(m.upstream_fragment_id),
342                    "non-duplicate upstream fragment"
343                );
344            }
345        });
346
347        let mut pb_actors = vec![];
348
349        let mut pb_actor_status = HashMap::new();
350        let mut pb_actor_splits = HashMap::new();
351
352        for actor in actors {
353            if actor.fragment_id != fragment_id {
354                bail!(
355                    "fragment id {} from actor {} is different from fragment {}",
356                    actor.fragment_id,
357                    actor.actor_id,
358                    fragment_id
359                )
360            }
361
362            let ActorInfo {
363                actor_id,
364                fragment_id,
365                worker_id,
366                splits,
367                vnode_bitmap,
368                expr_context,
369                config_override,
370                ..
371            } = actor;
372
373            let vnode_bitmap =
374                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
375            let pb_expr_context = Some(expr_context.to_protobuf());
376
377            pb_actor_status.insert(
378                actor_id as _,
379                PbActorStatus {
380                    location: PbActorLocation::from_worker(worker_id),
381                },
382            );
383
384            pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
385
386            pb_actors.push(StreamActor {
387                actor_id: actor_id as _,
388                fragment_id: fragment_id as _,
389                vnode_bitmap,
390                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
391                expr_context: pb_expr_context,
392                config_override,
393            })
394        }
395
396        let pb_state_table_ids = state_table_ids.0;
397        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
398        let pb_fragment = Fragment {
399            fragment_id: fragment_id as _,
400            fragment_type_mask: fragment_type_mask.into(),
401            distribution_type: pb_distribution_type,
402            actors: pb_actors,
403            state_table_ids: pb_state_table_ids,
404            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
405            nodes: stream_node,
406        };
407
408        Ok((pb_fragment, pb_actor_status, pb_actor_splits))
409    }
410
411    pub fn running_fragment_parallelisms(
412        &self,
413        id_filter: Option<HashSet<FragmentId>>,
414    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
415        let info = self.env.shared_actor_infos().read_guard();
416
417        let mut result = HashMap::new();
418        for (fragment_id, fragment) in info.iter_over_fragments() {
419            if let Some(id_filter) = &id_filter
420                && !id_filter.contains(&(*fragment_id as _))
421            {
422                continue; // Skip fragments not in the filter
423            }
424
425            result.insert(
426                *fragment_id as _,
427                FragmentParallelismInfo {
428                    distribution_type: fragment.distribution_type.into(),
429                    actor_count: fragment.actors.len() as _,
430                    vnode_count: fragment.vnode_count,
431                },
432            );
433        }
434
435        Ok(result)
436    }
437
438    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, JobId>> {
439        let inner = self.inner.read().await;
440        let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
441            .select_only()
442            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
443            .into_tuple()
444            .all(&inner.db)
445            .await?;
446        Ok(fragment_jobs.into_iter().collect())
447    }
448
449    pub async fn get_fragment_job_id(
450        &self,
451        fragment_ids: Vec<FragmentId>,
452    ) -> MetaResult<Vec<ObjectId>> {
453        let inner = self.inner.read().await;
454
455        let object_ids: Vec<ObjectId> = FragmentModel::find()
456            .select_only()
457            .column(fragment::Column::JobId)
458            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
459            .into_tuple()
460            .all(&inner.db)
461            .await?;
462
463        Ok(object_ids)
464    }
465
466    pub async fn get_fragment_desc_by_id(
467        &self,
468        fragment_id: FragmentId,
469    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
470        let inner = self.inner.read().await;
471
472        let fragment_model = match FragmentModel::find_by_id(fragment_id)
473            .one(&inner.db)
474            .await?
475        {
476            Some(fragment) => fragment,
477            None => return Ok(None),
478        };
479
480        let job_parallelism: Option<StreamingParallelism> =
481            StreamingJob::find_by_id(fragment_model.job_id)
482                .select_only()
483                .column(streaming_job::Column::Parallelism)
484                .into_tuple()
485                .one(&inner.db)
486                .await?;
487
488        let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
489            .select_only()
490            .columns([
491                fragment_relation::Column::SourceFragmentId,
492                fragment_relation::Column::DispatcherType,
493            ])
494            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
495            .into_tuple()
496            .all(&inner.db)
497            .await?;
498
499        let upstreams: Vec<_> = upstream_entries
500            .into_iter()
501            .map(|(source_id, _)| source_id)
502            .collect();
503
504        let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id])
505            .await
506            .map(Self::collect_root_fragment_mapping)?;
507        let root_fragments = root_fragment_map
508            .get(&fragment_id)
509            .cloned()
510            .unwrap_or_default();
511
512        let info = self.env.shared_actor_infos().read_guard();
513        let SharedFragmentInfo { actors, .. } = info
514            .get_fragment(fragment_model.fragment_id as _)
515            .unwrap_or_else(|| {
516                panic!(
517                    "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
518                    fragment_model.fragment_id,
519                    fragment_model.job_id
520                )
521            });
522
523        let parallelism_policy = Self::format_fragment_parallelism_policy(
524            fragment_model.distribution_type,
525            fragment_model.parallelism.as_ref(),
526            job_parallelism.as_ref(),
527            &root_fragments,
528        );
529
530        let fragment = FragmentDesc {
531            fragment_id: fragment_model.fragment_id,
532            job_id: fragment_model.job_id,
533            fragment_type_mask: fragment_model.fragment_type_mask,
534            distribution_type: fragment_model.distribution_type,
535            state_table_ids: fragment_model.state_table_ids.clone(),
536            parallelism: actors.len() as _,
537            vnode_count: fragment_model.vnode_count,
538            stream_node: fragment_model.stream_node.clone(),
539            parallelism_policy,
540        };
541
542        Ok(Some((fragment, upstreams)))
543    }
544
545    pub async fn list_fragment_database_ids(
546        &self,
547        select_fragment_ids: Option<Vec<FragmentId>>,
548    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
549        let inner = self.inner.read().await;
550        let select = FragmentModel::find()
551            .select_only()
552            .column(fragment::Column::FragmentId)
553            .column(object::Column::DatabaseId)
554            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
555        let select = if let Some(select_fragment_ids) = select_fragment_ids {
556            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
557        } else {
558            select
559        };
560        Ok(select.into_tuple().all(&inner.db).await?)
561    }
562
563    pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
564        let inner = self.inner.read().await;
565
566        // Load fragments matching the job from the database
567        let fragments: Vec<_> = FragmentModel::find()
568            .filter(fragment::Column::JobId.eq(job_id))
569            .all(&inner.db)
570            .await?;
571
572        let job_info = StreamingJob::find_by_id(job_id)
573            .one(&inner.db)
574            .await?
575            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
576
577        let fragment_actors =
578            self.collect_fragment_actor_pairs(fragments, job_info.stream_context())?;
579
580        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
581            .await?
582            .remove(&job_id);
583
584        Self::compose_table_fragments(
585            job_id,
586            job_info.job_status.into(),
587            job_info.stream_context(),
588            fragment_actors,
589            job_info.parallelism.clone(),
590            job_info.max_parallelism as _,
591            job_definition,
592        )
593    }
594
595    pub async fn get_fragment_actor_dispatchers(
596        &self,
597        fragment_ids: Vec<FragmentId>,
598    ) -> MetaResult<FragmentActorDispatchers> {
599        let inner = self.inner.read().await;
600
601        self.get_fragment_actor_dispatchers_txn(&inner.db, fragment_ids)
602            .await
603    }
604
605    pub async fn get_fragment_actor_dispatchers_txn(
606        &self,
607        c: &impl ConnectionTrait,
608        fragment_ids: Vec<FragmentId>,
609    ) -> MetaResult<FragmentActorDispatchers> {
610        let fragment_relations = FragmentRelation::find()
611            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
612            .all(c)
613            .await?;
614
615        type FragmentActorInfo = (
616            DistributionType,
617            Arc<HashMap<crate::model::ActorId, Option<Bitmap>>>,
618        );
619
620        let shared_info = self.env.shared_actor_infos();
621        let mut fragment_actor_cache: HashMap<FragmentId, FragmentActorInfo> = HashMap::new();
622        let get_fragment_actors = |fragment_id: FragmentId| async move {
623            let result: MetaResult<FragmentActorInfo> = try {
624                let read_guard = shared_info.read_guard();
625
626                let fragment = read_guard.get_fragment(fragment_id as _).unwrap();
627
628                (
629                    fragment.distribution_type,
630                    Arc::new(
631                        fragment
632                            .actors
633                            .iter()
634                            .map(|(actor_id, actor_info)| {
635                                (
636                                    *actor_id,
637                                    actor_info
638                                        .vnode_bitmap
639                                        .as_ref()
640                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
641                                )
642                            })
643                            .collect(),
644                    ),
645                )
646            };
647            result
648        };
649
650        let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new();
651        for fragment_relation::Model {
652            source_fragment_id,
653            target_fragment_id,
654            dispatcher_type,
655            dist_key_indices,
656            output_indices,
657            output_type_mapping,
658        } in fragment_relations
659        {
660            let (source_fragment_distribution, source_fragment_actors) = {
661                let (distribution, actors) = {
662                    match fragment_actor_cache.entry(source_fragment_id) {
663                        Entry::Occupied(entry) => entry.into_mut(),
664                        Entry::Vacant(entry) => {
665                            entry.insert(get_fragment_actors(source_fragment_id).await?)
666                        }
667                    }
668                };
669                (*distribution, actors.clone())
670            };
671            let (target_fragment_distribution, target_fragment_actors) = {
672                let (distribution, actors) = {
673                    match fragment_actor_cache.entry(target_fragment_id) {
674                        Entry::Occupied(entry) => entry.into_mut(),
675                        Entry::Vacant(entry) => {
676                            entry.insert(get_fragment_actors(target_fragment_id).await?)
677                        }
678                    }
679                };
680                (*distribution, actors.clone())
681            };
682            let output_mapping = PbDispatchOutputMapping {
683                indices: output_indices.into_u32_array(),
684                types: output_type_mapping.unwrap_or_default().to_protobuf(),
685            };
686            let dispatchers = compose_dispatchers(
687                source_fragment_distribution,
688                &source_fragment_actors,
689                target_fragment_id as _,
690                target_fragment_distribution,
691                &target_fragment_actors,
692                dispatcher_type,
693                dist_key_indices.into_u32_array(),
694                output_mapping,
695            );
696            let actor_dispatchers_map = actor_dispatchers_map
697                .entry(source_fragment_id as _)
698                .or_default();
699            for (actor_id, dispatchers) in dispatchers {
700                actor_dispatchers_map
701                    .entry(actor_id as _)
702                    .or_default()
703                    .push(dispatchers);
704            }
705        }
706        Ok(actor_dispatchers_map)
707    }
708
709    pub async fn get_fragment_downstream_relations(
710        &self,
711        fragment_ids: Vec<FragmentId>,
712    ) -> MetaResult<FragmentDownstreamRelation> {
713        let inner = self.inner.read().await;
714        self.get_fragment_downstream_relations_in_txn(&inner.db, fragment_ids)
715            .await
716    }
717
718    pub async fn get_fragment_downstream_relations_in_txn<C>(
719        &self,
720        txn: &C,
721        fragment_ids: Vec<FragmentId>,
722    ) -> MetaResult<FragmentDownstreamRelation>
723    where
724        C: ConnectionTrait + StreamTrait + Send,
725    {
726        let mut stream = FragmentRelation::find()
727            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
728            .stream(txn)
729            .await?;
730        let mut relations = FragmentDownstreamRelation::new();
731        while let Some(relation) = stream.try_next().await? {
732            relations
733                .entry(relation.source_fragment_id as _)
734                .or_default()
735                .push(DownstreamFragmentRelation {
736                    downstream_fragment_id: relation.target_fragment_id as _,
737                    dispatcher_type: relation.dispatcher_type,
738                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
739                    output_mapping: PbDispatchOutputMapping {
740                        indices: relation.output_indices.into_u32_array(),
741                        types: relation
742                            .output_type_mapping
743                            .unwrap_or_default()
744                            .to_protobuf(),
745                    },
746                });
747        }
748        Ok(relations)
749    }
750
751    pub async fn get_job_fragment_backfill_scan_type(
752        &self,
753        job_id: JobId,
754    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
755        let inner = self.inner.read().await;
756        let fragments: Vec<_> = FragmentModel::find()
757            .filter(fragment::Column::JobId.eq(job_id))
758            .all(&inner.db)
759            .await?;
760
761        let mut result = HashMap::new();
762
763        for fragment::Model {
764            fragment_id,
765            stream_node,
766            ..
767        } in fragments
768        {
769            let stream_node = stream_node.to_protobuf();
770            visit_stream_node_body(&stream_node, |body| {
771                if let NodeBody::StreamScan(node) = body {
772                    match node.stream_scan_type() {
773                        StreamScanType::Unspecified => {}
774                        scan_type => {
775                            result.insert(fragment_id as crate::model::FragmentId, scan_type);
776                        }
777                    }
778                }
779            });
780        }
781
782        Ok(result)
783    }
784
785    pub async fn count_streaming_jobs(&self) -> MetaResult<usize> {
786        let inner = self.inner.read().await;
787        let count = StreamingJob::find().count(&inner.db).await?;
788        Ok(usize::try_from(count).context("streaming job count overflow")?)
789    }
790
791    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
792        let inner = self.inner.read().await;
793        let job_states = StreamingJob::find()
794            .select_only()
795            .column(streaming_job::Column::JobId)
796            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
797            .join(JoinType::InnerJoin, object::Relation::Database2.def())
798            .column(object::Column::ObjType)
799            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
800            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
801            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
802            .column_as(
803                Expr::if_null(
804                    Expr::col((table::Entity, table::Column::Name)),
805                    Expr::if_null(
806                        Expr::col((source::Entity, source::Column::Name)),
807                        Expr::if_null(
808                            Expr::col((sink::Entity, sink::Column::Name)),
809                            Expr::val("<unknown>"),
810                        ),
811                    ),
812                ),
813                "name",
814            )
815            .columns([
816                streaming_job::Column::JobStatus,
817                streaming_job::Column::Parallelism,
818                streaming_job::Column::MaxParallelism,
819            ])
820            .column_as(
821                Expr::if_null(
822                    Expr::col((
823                        streaming_job::Entity,
824                        streaming_job::Column::SpecificResourceGroup,
825                    )),
826                    Expr::col((database::Entity, database::Column::ResourceGroup)),
827                ),
828                "resource_group",
829            )
830            .column_as(
831                Expr::if_null(
832                    Expr::col((streaming_job::Entity, streaming_job::Column::ConfigOverride)),
833                    Expr::val(""),
834                ),
835                "config_override",
836            )
837            .column(object::Column::DatabaseId)
838            .column(object::Column::SchemaId)
839            .into_model()
840            .all(&inner.db)
841            .await?;
842        Ok(job_states)
843    }
844
845    pub async fn get_max_parallelism_by_id(&self, job_id: JobId) -> MetaResult<usize> {
846        let inner = self.inner.read().await;
847        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
848            .select_only()
849            .column(streaming_job::Column::MaxParallelism)
850            .into_tuple()
851            .one(&inner.db)
852            .await?
853            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
854        Ok(max_parallelism as usize)
855    }
856
857    /// Try to get internal table ids of each streaming job, used by metrics collection.
858    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(JobId, Vec<TableId>)>> {
859        if let Ok(inner) = self.inner.try_read()
860            && let Ok(job_state_tables) = FragmentModel::find()
861                .select_only()
862                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
863                .into_tuple::<(JobId, I32Array)>()
864                .all(&inner.db)
865                .await
866        {
867            let mut job_internal_table_ids = HashMap::new();
868            for (job_id, state_table_ids) in job_state_tables {
869                job_internal_table_ids
870                    .entry(job_id)
871                    .or_insert_with(Vec::new)
872                    .extend(
873                        state_table_ids
874                            .into_inner()
875                            .into_iter()
876                            .map(|table_id| TableId::new(table_id as _)),
877                    );
878            }
879            return Some(job_internal_table_ids.into_iter().collect());
880        }
881        None
882    }
883
884    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
885        let inner = self.inner.read().await;
886        let count = FragmentModel::find().count(&inner.db).await?;
887        Ok(count > 0)
888    }
889
890    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
891        let read_guard = self.env.shared_actor_infos().read_guard();
892        let actor_cnt: HashMap<WorkerId, _> = read_guard
893            .iter_over_fragments()
894            .flat_map(|(_, fragment)| {
895                fragment
896                    .actors
897                    .iter()
898                    .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
899            })
900            .into_group_map()
901            .into_iter()
902            .map(|(k, v)| (k, v.len()))
903            .collect();
904
905        Ok(actor_cnt)
906    }
907
908    fn collect_fragment_actor_map(
909        &self,
910        fragment_ids: &[FragmentId],
911        stream_context: StreamContext,
912    ) -> MetaResult<HashMap<FragmentId, Vec<ActorInfo>>> {
913        let guard = self.env.shared_actor_infos().read_guard();
914        let pb_expr_context = stream_context.to_expr_context();
915        let expr_context: ExprContext = (&pb_expr_context).into();
916
917        let mut actor_map = HashMap::with_capacity(fragment_ids.len());
918        for fragment_id in fragment_ids {
919            let fragment_info = guard.get_fragment(*fragment_id as _).ok_or_else(|| {
920                anyhow!("fragment {} not found in shared actor info", fragment_id)
921            })?;
922
923            let actors = fragment_info
924                .actors
925                .iter()
926                .map(|(actor_id, actor_info)| ActorInfo {
927                    actor_id: *actor_id as _,
928                    fragment_id: *fragment_id,
929                    splits: ConnectorSplits::from(&PbConnectorSplits {
930                        splits: actor_info.splits.iter().map(ConnectorSplit::from).collect(),
931                    }),
932                    worker_id: actor_info.worker_id as _,
933                    vnode_bitmap: actor_info
934                        .vnode_bitmap
935                        .as_ref()
936                        .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
937                    expr_context: expr_context.clone(),
938                    config_override: stream_context.config_override.clone(),
939                })
940                .collect();
941
942            actor_map.insert(*fragment_id, actors);
943        }
944
945        Ok(actor_map)
946    }
947
948    fn collect_fragment_actor_pairs(
949        &self,
950        fragments: Vec<fragment::Model>,
951        stream_context: StreamContext,
952    ) -> MetaResult<Vec<(fragment::Model, Vec<ActorInfo>)>> {
953        let fragment_ids: Vec<_> = fragments.iter().map(|f| f.fragment_id).collect();
954        let mut actor_map = self.collect_fragment_actor_map(&fragment_ids, stream_context)?;
955        fragments
956            .into_iter()
957            .map(|fragment| {
958                let actors = actor_map.remove(&fragment.fragment_id).ok_or_else(|| {
959                    anyhow!(
960                        "fragment {} missing in shared actor info map",
961                        fragment.fragment_id
962                    )
963                })?;
964                Ok((fragment, actors))
965            })
966            .collect()
967    }
968
969    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
970    pub async fn table_fragments(&self) -> MetaResult<BTreeMap<JobId, StreamJobFragments>> {
971        let inner = self.inner.read().await;
972        let jobs = StreamingJob::find().all(&inner.db).await?;
973
974        let mut job_definition = resolve_streaming_job_definition(
975            &inner.db,
976            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
977        )
978        .await?;
979
980        let mut table_fragments = BTreeMap::new();
981        for job in jobs {
982            let fragments = FragmentModel::find()
983                .filter(fragment::Column::JobId.eq(job.job_id))
984                .all(&inner.db)
985                .await?;
986
987            let fragment_actors =
988                self.collect_fragment_actor_pairs(fragments, job.stream_context())?;
989
990            table_fragments.insert(
991                job.job_id,
992                Self::compose_table_fragments(
993                    job.job_id,
994                    job.job_status.into(),
995                    job.stream_context(),
996                    fragment_actors,
997                    job.parallelism.clone(),
998                    job.max_parallelism as _,
999                    job_definition.remove(&job.job_id),
1000                )?,
1001            );
1002        }
1003
1004        Ok(table_fragments)
1005    }
1006
1007    pub async fn upstream_fragments(
1008        &self,
1009        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
1010    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
1011        let inner = self.inner.read().await;
1012        let mut stream = FragmentRelation::find()
1013            .select_only()
1014            .columns([
1015                fragment_relation::Column::SourceFragmentId,
1016                fragment_relation::Column::TargetFragmentId,
1017            ])
1018            .filter(
1019                fragment_relation::Column::TargetFragmentId
1020                    .is_in(fragment_ids.map(|id| id as FragmentId)),
1021            )
1022            .into_tuple::<(FragmentId, FragmentId)>()
1023            .stream(&inner.db)
1024            .await?;
1025        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
1026        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
1027            upstream_fragments
1028                .entry(downstream_fragment_id as crate::model::FragmentId)
1029                .or_default()
1030                .insert(upstream_fragment_id as crate::model::FragmentId);
1031        }
1032        Ok(upstream_fragments)
1033    }
1034
1035    pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
1036        let info = self.env.shared_actor_infos().read_guard();
1037
1038        let actor_locations = info
1039            .iter_over_fragments()
1040            .flat_map(|(fragment_id, fragment)| {
1041                fragment
1042                    .actors
1043                    .iter()
1044                    .map(|(actor_id, actor)| PartialActorLocation {
1045                        actor_id: *actor_id as _,
1046                        fragment_id: *fragment_id as _,
1047                        worker_id: actor.worker_id,
1048                    })
1049            })
1050            .collect_vec();
1051
1052        Ok(actor_locations)
1053    }
1054
1055    pub async fn list_actor_info(
1056        &self,
1057    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
1058        let inner = self.inner.read().await;
1059
1060        let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
1061            FragmentModel::find()
1062                .select_only()
1063                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1064                .column(fragment::Column::FragmentId)
1065                .column_as(object::Column::Oid, "job_id")
1066                .column_as(object::Column::SchemaId, "schema_id")
1067                .column_as(object::Column::ObjType, "type")
1068                .into_tuple()
1069                .all(&inner.db)
1070                .await?;
1071
1072        let actor_infos = {
1073            let info = self.env.shared_actor_infos().read_guard();
1074
1075            let mut result = Vec::new();
1076
1077            for (fragment_id, object_id, schema_id, object_type) in fragment_objects {
1078                let Some(fragment) = info.get_fragment(fragment_id as _) else {
1079                    return Err(MetaError::unavailable(format!(
1080                        "shared actor info missing for fragment {fragment_id} while listing actors"
1081                    )));
1082                };
1083
1084                for actor_id in fragment.actors.keys() {
1085                    result.push((
1086                        *actor_id as _,
1087                        fragment.fragment_id as _,
1088                        object_id,
1089                        schema_id,
1090                        object_type,
1091                    ));
1092                }
1093            }
1094
1095            result
1096        };
1097
1098        Ok(actor_infos)
1099    }
1100
1101    pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
1102        let guard = self.env.shared_actor_info.read_guard();
1103        guard
1104            .iter_over_fragments()
1105            .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
1106            .collect_vec()
1107    }
1108
1109    pub async fn list_fragment_descs_with_node(
1110        &self,
1111        is_creating: bool,
1112    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1113        let inner = self.inner.read().await;
1114        let txn = inner.db.begin().await?;
1115
1116        let fragments_query = Self::build_fragment_query(is_creating);
1117        let fragments: Vec<fragment::Model> = fragments_query.all(&txn).await?;
1118
1119        let rows = fragments
1120            .into_iter()
1121            .map(|fragment| FragmentDescRow {
1122                fragment_id: fragment.fragment_id,
1123                job_id: fragment.job_id,
1124                fragment_type_mask: fragment.fragment_type_mask,
1125                distribution_type: fragment.distribution_type,
1126                state_table_ids: fragment.state_table_ids,
1127                vnode_count: fragment.vnode_count,
1128                parallelism_override: fragment.parallelism,
1129                stream_node: Some(fragment.stream_node),
1130            })
1131            .collect_vec();
1132
1133        self.build_fragment_distributions(&txn, rows).await
1134    }
1135
1136    pub async fn list_fragment_descs_without_node(
1137        &self,
1138        is_creating: bool,
1139    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1140        let inner = self.inner.read().await;
1141        let txn = inner.db.begin().await?;
1142
1143        let fragments_query = Self::build_fragment_query(is_creating);
1144        #[allow(clippy::type_complexity)]
1145        let fragments: Vec<(
1146            FragmentId,
1147            JobId,
1148            i32,
1149            DistributionType,
1150            TableIdArray,
1151            i32,
1152            Option<StreamingParallelism>,
1153        )> = fragments_query
1154            .select_only()
1155            .columns([
1156                fragment::Column::FragmentId,
1157                fragment::Column::JobId,
1158                fragment::Column::FragmentTypeMask,
1159                fragment::Column::DistributionType,
1160                fragment::Column::StateTableIds,
1161                fragment::Column::VnodeCount,
1162                fragment::Column::Parallelism,
1163            ])
1164            .into_tuple()
1165            .all(&txn)
1166            .await?;
1167
1168        let rows = fragments
1169            .into_iter()
1170            .map(
1171                |(
1172                    fragment_id,
1173                    job_id,
1174                    fragment_type_mask,
1175                    distribution_type,
1176                    state_table_ids,
1177                    vnode_count,
1178                    parallelism_override,
1179                )| FragmentDescRow {
1180                    fragment_id,
1181                    job_id,
1182                    fragment_type_mask,
1183                    distribution_type,
1184                    state_table_ids,
1185                    vnode_count,
1186                    parallelism_override,
1187                    stream_node: None,
1188                },
1189            )
1190            .collect_vec();
1191
1192        self.build_fragment_distributions(&txn, rows).await
1193    }
1194
1195    fn build_fragment_query(is_creating: bool) -> sea_orm::Select<fragment::Entity> {
1196        if is_creating {
1197            FragmentModel::find()
1198                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
1199                .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
1200                .filter(
1201                    streaming_job::Column::JobStatus
1202                        .eq(JobStatus::Initial)
1203                        .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
1204                )
1205        } else {
1206            FragmentModel::find()
1207        }
1208    }
1209
1210    async fn build_fragment_distributions(
1211        &self,
1212        txn: &DatabaseTransaction,
1213        rows: Vec<FragmentDescRow>,
1214    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
1215        let fragment_ids = rows.iter().map(|row| row.fragment_id).collect_vec();
1216        let job_ids = rows.iter().map(|row| row.job_id).unique().collect_vec();
1217
1218        let job_parallelisms: HashMap<JobId, StreamingParallelism> = if fragment_ids.is_empty() {
1219            HashMap::new()
1220        } else {
1221            StreamingJob::find()
1222                .select_only()
1223                .columns([
1224                    streaming_job::Column::JobId,
1225                    streaming_job::Column::Parallelism,
1226                ])
1227                .filter(streaming_job::Column::JobId.is_in(job_ids))
1228                .into_tuple()
1229                .all(txn)
1230                .await?
1231                .into_iter()
1232                .collect()
1233        };
1234
1235        let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
1236            if fragment_ids.is_empty() {
1237                Vec::new()
1238            } else {
1239                FragmentRelation::find()
1240                    .select_only()
1241                    .columns([
1242                        fragment_relation::Column::TargetFragmentId,
1243                        fragment_relation::Column::SourceFragmentId,
1244                        fragment_relation::Column::DispatcherType,
1245                    ])
1246                    .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
1247                    .into_tuple()
1248                    .all(txn)
1249                    .await?
1250            };
1251
1252        let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1253        for (target_id, source_id, _) in upstream_entries {
1254            all_upstreams.entry(target_id).or_default().push(source_id);
1255        }
1256
1257        let root_fragment_map = if fragment_ids.is_empty() {
1258            HashMap::new()
1259        } else {
1260            let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_ids).await?;
1261            Self::collect_root_fragment_mapping(ensembles)
1262        };
1263
1264        let guard = self.env.shared_actor_info.read_guard();
1265        let mut result = Vec::with_capacity(rows.len());
1266
1267        for row in rows {
1268            let parallelism = guard
1269                .get_fragment(row.fragment_id as _)
1270                .map(|fragment| fragment.actors.len())
1271                .unwrap_or_default();
1272
1273            let root_fragments = root_fragment_map
1274                .get(&row.fragment_id)
1275                .cloned()
1276                .unwrap_or_default();
1277
1278            let upstreams = all_upstreams.remove(&row.fragment_id).unwrap_or_default();
1279
1280            let parallelism_policy = Self::format_fragment_parallelism_policy(
1281                row.distribution_type,
1282                row.parallelism_override.as_ref(),
1283                job_parallelisms.get(&row.job_id),
1284                &root_fragments,
1285            );
1286
1287            let fragment = FragmentDistribution {
1288                fragment_id: row.fragment_id,
1289                table_id: row.job_id,
1290                distribution_type: PbFragmentDistributionType::from(row.distribution_type) as _,
1291                state_table_ids: row.state_table_ids.0,
1292                upstream_fragment_ids: upstreams.clone(),
1293                fragment_type_mask: row.fragment_type_mask as _,
1294                parallelism: parallelism as _,
1295                vnode_count: row.vnode_count as _,
1296                node: row.stream_node.map(|node| node.to_protobuf()),
1297                parallelism_policy,
1298            };
1299
1300            result.push((fragment, upstreams));
1301        }
1302
1303        Ok(result)
1304    }
1305
1306    pub async fn list_sink_log_store_tables(&self) -> MetaResult<Vec<(SinkId, TableId)>> {
1307        let inner = self.inner.read().await;
1308        let txn = inner.db.begin().await?;
1309
1310        let fragments: Vec<(JobId, StreamNode)> = FragmentModel::find()
1311            .select_only()
1312            .columns([fragment::Column::JobId, fragment::Column::StreamNode])
1313            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Sink))
1314            .into_tuple()
1315            .all(&txn)
1316            .await?;
1317
1318        let mut mapping: HashMap<SinkId, TableId> = HashMap::new();
1319
1320        for (job_id, stream_node) in fragments {
1321            let sink_id = SinkId::new(job_id.as_raw_id());
1322            let stream_node = stream_node.to_protobuf();
1323            let mut internal_table_id: Option<TableId> = None;
1324
1325            visit_stream_node_body(&stream_node, |body| {
1326                if let NodeBody::Sink(node) = body
1327                    && node.log_store_type == SinkLogStoreType::KvLogStore as i32
1328                    && let Some(table) = &node.table
1329                {
1330                    internal_table_id = Some(TableId::new(table.id.as_raw_id()));
1331                }
1332            });
1333
1334            if let Some(table_id) = internal_table_id
1335                && let Some(existing) = mapping.insert(sink_id, table_id)
1336                && existing != table_id
1337            {
1338                tracing::warn!(
1339                    "sink {sink_id:?} has multiple log store tables: {existing:?} vs {table_id:?}"
1340                );
1341            }
1342        }
1343
1344        Ok(mapping.into_iter().collect())
1345    }
1346
1347    /// Build a fragment-to-root lookup for all reported root fragment ensembles.
1348    fn collect_root_fragment_mapping(
1349        ensembles: Vec<NoShuffleEnsemble>,
1350    ) -> HashMap<FragmentId, Vec<FragmentId>> {
1351        let mut mapping = HashMap::new();
1352
1353        for ensemble in ensembles {
1354            let mut roots: Vec<_> = ensemble.entry_fragments().collect();
1355            roots.sort_unstable();
1356            roots.dedup();
1357
1358            if roots.is_empty() {
1359                continue;
1360            }
1361
1362            let root_set: HashSet<_> = roots.iter().copied().collect();
1363
1364            for fragment_id in ensemble.component_fragments() {
1365                if root_set.contains(&fragment_id) {
1366                    mapping.insert(fragment_id, Vec::new());
1367                } else {
1368                    mapping.insert(fragment_id, roots.clone());
1369                }
1370            }
1371        }
1372
1373        mapping
1374    }
1375
1376    fn format_fragment_parallelism_policy(
1377        distribution_type: DistributionType,
1378        fragment_parallelism: Option<&StreamingParallelism>,
1379        job_parallelism: Option<&StreamingParallelism>,
1380        root_fragments: &[FragmentId],
1381    ) -> String {
1382        if distribution_type == DistributionType::Single {
1383            return "single".to_owned();
1384        }
1385
1386        if let Some(parallelism) = fragment_parallelism {
1387            return format!(
1388                "override({})",
1389                Self::format_streaming_parallelism(parallelism)
1390            );
1391        }
1392
1393        if !root_fragments.is_empty() {
1394            let mut upstreams = root_fragments.to_vec();
1395            upstreams.sort_unstable();
1396            upstreams.dedup();
1397
1398            return format!("upstream_fragment({upstreams:?})");
1399        }
1400
1401        let inherited = job_parallelism
1402            .map(Self::format_streaming_parallelism)
1403            .unwrap_or_else(|| "unknown".to_owned());
1404        format!("inherit({inherited})")
1405    }
1406
1407    fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1408        match parallelism {
1409            StreamingParallelism::Adaptive => "adaptive".to_owned(),
1410            StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1411            StreamingParallelism::Custom => "custom".to_owned(),
1412        }
1413    }
1414
1415    pub async fn list_sink_actor_mapping(
1416        &self,
1417    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1418        let inner = self.inner.read().await;
1419        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1420            .select_only()
1421            .columns([sink::Column::SinkId, sink::Column::Name])
1422            .into_tuple()
1423            .all(&inner.db)
1424            .await?;
1425        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1426
1427        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1428
1429        let actor_with_type: Vec<(ActorId, SinkId)> = {
1430            let info = self.env.shared_actor_infos().read_guard();
1431
1432            info.iter_over_fragments()
1433                .filter(|(_, fragment)| {
1434                    sink_ids.contains(&fragment.job_id.as_sink_id())
1435                        && fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink)
1436                })
1437                .flat_map(|(_, fragment)| {
1438                    fragment
1439                        .actors
1440                        .keys()
1441                        .map(move |actor_id| (*actor_id as _, fragment.job_id.as_sink_id()))
1442                })
1443                .collect()
1444        };
1445
1446        let mut sink_actor_mapping = HashMap::new();
1447        for (actor_id, sink_id) in actor_with_type {
1448            sink_actor_mapping
1449                .entry(sink_id)
1450                .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1451                .1
1452                .push(actor_id);
1453        }
1454
1455        Ok(sink_actor_mapping)
1456    }
1457
1458    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1459        let inner = self.inner.read().await;
1460        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1461            .select_only()
1462            .columns([
1463                fragment::Column::FragmentId,
1464                fragment::Column::JobId,
1465                fragment::Column::StateTableIds,
1466            ])
1467            .into_partial_model()
1468            .all(&inner.db)
1469            .await?;
1470        Ok(fragment_state_tables)
1471    }
1472
1473    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1474    /// collected
1475    pub async fn load_all_actors_dynamic(
1476        &self,
1477        database_id: Option<DatabaseId>,
1478        worker_nodes: &ActiveStreamingWorkerNodes,
1479    ) -> MetaResult<FragmentRenderMap> {
1480        let loaded = self.load_fragment_context(database_id).await?;
1481
1482        if loaded.is_empty() {
1483            return Ok(HashMap::new());
1484        }
1485
1486        let adaptive_parallelism_strategy = {
1487            let system_params_reader = self.env.system_params_reader().await;
1488            system_params_reader.adaptive_parallelism_strategy()
1489        };
1490
1491        let RenderedGraph { fragments, .. } = render_actor_assignments(
1492            self.env.actor_id_generator(),
1493            worker_nodes.current(),
1494            adaptive_parallelism_strategy,
1495            &loaded,
1496        )?;
1497
1498        tracing::trace!(?fragments, "reload all actors");
1499
1500        Ok(fragments)
1501    }
1502
1503    /// Async load stage: collects all metadata required for rendering actor assignments.
1504    pub async fn load_fragment_context(
1505        &self,
1506        database_id: Option<DatabaseId>,
1507    ) -> MetaResult<LoadedFragmentContext> {
1508        let inner = self.inner.read().await;
1509        let txn = inner.db.begin().await?;
1510
1511        self.load_fragment_context_in_txn(&txn, database_id).await
1512    }
1513
1514    pub async fn load_fragment_context_in_txn<C>(
1515        &self,
1516        txn: &C,
1517        database_id: Option<DatabaseId>,
1518    ) -> MetaResult<LoadedFragmentContext>
1519    where
1520        C: ConnectionTrait,
1521    {
1522        let mut query = StreamingJob::find()
1523            .select_only()
1524            .column(streaming_job::Column::JobId);
1525
1526        if let Some(database_id) = database_id {
1527            query = query
1528                .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
1529                .filter(object::Column::DatabaseId.eq(database_id));
1530        }
1531
1532        let jobs: Vec<JobId> = query.into_tuple().all(txn).await?;
1533
1534        let jobs: HashSet<JobId> = jobs.into_iter().collect();
1535
1536        if jobs.is_empty() {
1537            return Ok(LoadedFragmentContext::default());
1538        }
1539
1540        load_fragment_context_for_jobs(txn, jobs).await
1541    }
1542
1543    #[await_tree::instrument]
1544    pub async fn fill_snapshot_backfill_epoch(
1545        &self,
1546        fragment_ids: impl Iterator<Item = FragmentId>,
1547        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1548        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1549    ) -> MetaResult<()> {
1550        let inner = self.inner.write().await;
1551        let txn = inner.db.begin().await?;
1552        for fragment_id in fragment_ids {
1553            let fragment = FragmentModel::find_by_id(fragment_id)
1554                .one(&txn)
1555                .await?
1556                .context(format!("fragment {} not found", fragment_id))?;
1557            let mut node = fragment.stream_node.to_protobuf();
1558            if crate::stream::fill_snapshot_backfill_epoch(
1559                &mut node,
1560                snapshot_backfill_info,
1561                cross_db_snapshot_backfill_info,
1562            )? {
1563                let node = StreamNode::from(&node);
1564                FragmentModel::update(fragment::ActiveModel {
1565                    fragment_id: Set(fragment_id),
1566                    stream_node: Set(node),
1567                    ..Default::default()
1568                })
1569                .exec(&txn)
1570                .await?;
1571            }
1572        }
1573        txn.commit().await?;
1574        Ok(())
1575    }
1576
1577    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1578    pub fn get_running_actors_of_fragment(
1579        &self,
1580        fragment_id: FragmentId,
1581    ) -> MetaResult<HashSet<model::ActorId>> {
1582        let info = self.env.shared_actor_infos().read_guard();
1583
1584        let actors = info
1585            .get_fragment(fragment_id as _)
1586            .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1587            .unwrap_or_default();
1588
1589        Ok(actors)
1590    }
1591
1592    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1593    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1594    pub async fn get_running_actors_for_source_backfill(
1595        &self,
1596        source_backfill_fragment_id: FragmentId,
1597        source_fragment_id: FragmentId,
1598    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1599        let inner = self.inner.read().await;
1600        let txn = inner.db.begin().await?;
1601
1602        let fragment_relation: DispatcherType = FragmentRelation::find()
1603            .select_only()
1604            .column(fragment_relation::Column::DispatcherType)
1605            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1606            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1607            .into_tuple()
1608            .one(&txn)
1609            .await?
1610            .ok_or_else(|| {
1611                anyhow!(
1612                    "no fragment connection from source fragment {} to source backfill fragment {}",
1613                    source_fragment_id,
1614                    source_backfill_fragment_id
1615                )
1616            })?;
1617
1618        if fragment_relation != DispatcherType::NoShuffle {
1619            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1620        }
1621
1622        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1623            let result: MetaResult<DistributionType> = try {
1624                FragmentModel::find_by_id(fragment_id)
1625                    .select_only()
1626                    .column(fragment::Column::DistributionType)
1627                    .into_tuple()
1628                    .one(txn)
1629                    .await?
1630                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1631            };
1632            result
1633        };
1634
1635        let source_backfill_distribution_type =
1636            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1637        let source_distribution_type =
1638            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1639
1640        let load_fragment_actor_distribution =
1641            |actor_info: &SharedActorInfos,
1642             fragment_id: FragmentId|
1643             -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1644                let guard = actor_info.read_guard();
1645
1646                guard
1647                    .get_fragment(fragment_id as _)
1648                    .map(|fragment| {
1649                        fragment
1650                            .actors
1651                            .iter()
1652                            .map(|(actor_id, actor)| {
1653                                (
1654                                    *actor_id as _,
1655                                    actor
1656                                        .vnode_bitmap
1657                                        .as_ref()
1658                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1659                                )
1660                            })
1661                            .collect()
1662                    })
1663                    .unwrap_or_default()
1664            };
1665
1666        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1667            load_fragment_actor_distribution(
1668                self.env.shared_actor_infos(),
1669                source_backfill_fragment_id,
1670            );
1671
1672        let source_actors =
1673            load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1674
1675        Ok(resolve_no_shuffle_actor_dispatcher(
1676            source_distribution_type,
1677            &source_actors,
1678            source_backfill_distribution_type,
1679            &source_backfill_actors,
1680        )
1681        .into_iter()
1682        .map(|(source_actor, source_backfill_actor)| {
1683            (source_backfill_actor as _, source_actor as _)
1684        })
1685        .collect())
1686    }
1687
1688    /// Get and filter the "**root**" fragments of the specified jobs.
1689    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1690    ///
1691    /// Root fragment connects to downstream jobs.
1692    ///
1693    /// ## What can be the root fragment
1694    /// - For sink, it should have one `Sink` fragment.
1695    /// - For MV, it should have one `MView` fragment.
1696    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1697    /// - For source, it should have one `Source` fragment.
1698    ///
1699    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1700    pub async fn get_root_fragments(
1701        &self,
1702        job_ids: Vec<JobId>,
1703    ) -> MetaResult<(
1704        HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1705        HashMap<ActorId, WorkerId>,
1706    )> {
1707        let inner = self.inner.read().await;
1708
1709        let all_fragments = FragmentModel::find()
1710            .filter(fragment::Column::JobId.is_in(job_ids))
1711            .all(&inner.db)
1712            .await?;
1713        // job_id -> fragment
1714        let mut root_fragments = HashMap::<JobId, fragment::Model>::new();
1715        for fragment in all_fragments {
1716            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1717            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1718                _ = root_fragments.insert(fragment.job_id, fragment);
1719            } else if mask.contains(FragmentTypeFlag::Source) {
1720                // look for Source fragment only if there's no MView fragment
1721                // (notice try_insert here vs insert above)
1722                _ = root_fragments.try_insert(fragment.job_id, fragment);
1723            }
1724        }
1725
1726        let mut root_fragments_pb = HashMap::new();
1727
1728        let info = self.env.shared_actor_infos().read_guard();
1729
1730        let root_fragment_to_jobs: HashMap<_, _> = root_fragments
1731            .iter()
1732            .map(|(job_id, fragment)| (fragment.fragment_id, *job_id))
1733            .collect();
1734
1735        for fragment in root_fragment_to_jobs.keys() {
1736            let fragment_info = info.get_fragment(*fragment).context(format!(
1737                "root fragment {} not found in shared actor info",
1738                fragment
1739            ))?;
1740
1741            let job_id = root_fragment_to_jobs[&fragment_info.fragment_id];
1742            let fragment = root_fragments
1743                .get(&job_id)
1744                .context(format!("root fragment for job {} not found", job_id))?;
1745
1746            root_fragments_pb.insert(
1747                job_id,
1748                (fragment_info.clone(), fragment.stream_node.to_protobuf()),
1749            );
1750        }
1751
1752        let mut all_actor_locations = HashMap::new();
1753
1754        for (_, SharedFragmentInfo { actors, .. }) in info.iter_over_fragments() {
1755            for (actor_id, actor_info) in actors {
1756                all_actor_locations.insert(*actor_id as ActorId, actor_info.worker_id);
1757            }
1758        }
1759
1760        Ok((root_fragments_pb, all_actor_locations))
1761    }
1762
1763    pub async fn get_root_fragment(
1764        &self,
1765        job_id: JobId,
1766    ) -> MetaResult<(SharedFragmentInfo, HashMap<ActorId, WorkerId>)> {
1767        let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1768        let (root_fragment, _) = root_fragments
1769            .remove(&job_id)
1770            .context(format!("root fragment for job {} not found", job_id))?;
1771
1772        Ok((root_fragment, actors))
1773    }
1774
1775    /// Get the downstream fragments connected to the specified job.
1776    pub async fn get_downstream_fragments(
1777        &self,
1778        job_id: JobId,
1779    ) -> MetaResult<(
1780        Vec<(
1781            stream_plan::DispatcherType,
1782            SharedFragmentInfo,
1783            PbStreamNode,
1784        )>,
1785        HashMap<ActorId, WorkerId>,
1786    )> {
1787        let (root_fragment, actor_locations) = self.get_root_fragment(job_id).await?;
1788
1789        let inner = self.inner.read().await;
1790        let txn = inner.db.begin().await?;
1791        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1792            .filter(
1793                fragment_relation::Column::SourceFragmentId
1794                    .eq(root_fragment.fragment_id as FragmentId),
1795            )
1796            .all(&txn)
1797            .await?;
1798
1799        let downstream_fragment_ids = downstream_fragment_relations
1800            .iter()
1801            .map(|model| model.target_fragment_id as FragmentId)
1802            .collect::<HashSet<_>>();
1803
1804        let downstream_fragment_nodes: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1805            .select_only()
1806            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1807            .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1808            .into_tuple()
1809            .all(&txn)
1810            .await?;
1811
1812        let downstream_fragment_nodes: HashMap<_, _> =
1813            downstream_fragment_nodes.into_iter().collect();
1814
1815        let mut downstream_fragments = vec![];
1816
1817        let info = self.env.shared_actor_infos().read_guard();
1818
1819        let fragment_map: HashMap<_, _> = downstream_fragment_relations
1820            .iter()
1821            .map(|model| (model.target_fragment_id, model.dispatcher_type))
1822            .collect();
1823
1824        for fragment_id in fragment_map.keys() {
1825            let fragment_info @ SharedFragmentInfo { actors, .. } =
1826                info.get_fragment(*fragment_id).unwrap();
1827
1828            let dispatcher_type = fragment_map[fragment_id];
1829
1830            if actors.is_empty() {
1831                bail!("No fragment found for fragment id {}", fragment_id);
1832            }
1833
1834            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1835
1836            let nodes = downstream_fragment_nodes
1837                .get(fragment_id)
1838                .context(format!(
1839                    "downstream fragment node for id {} not found",
1840                    fragment_id
1841                ))?
1842                .to_protobuf();
1843
1844            downstream_fragments.push((dispatch_type, fragment_info.clone(), nodes));
1845        }
1846        Ok((downstream_fragments, actor_locations))
1847    }
1848
1849    pub async fn load_source_fragment_ids(
1850        &self,
1851    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1852        let inner = self.inner.read().await;
1853        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1854            .select_only()
1855            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1856            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1857            .into_tuple()
1858            .all(&inner.db)
1859            .await?;
1860
1861        let mut source_fragment_ids = HashMap::new();
1862        for (fragment_id, stream_node) in fragments {
1863            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1864                source_fragment_ids
1865                    .entry(source_id)
1866                    .or_insert_with(BTreeSet::new)
1867                    .insert(fragment_id);
1868            }
1869        }
1870        Ok(source_fragment_ids)
1871    }
1872
1873    pub async fn load_backfill_fragment_ids(
1874        &self,
1875    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
1876        let inner = self.inner.read().await;
1877        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1878            .select_only()
1879            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1880            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1881            .into_tuple()
1882            .all(&inner.db)
1883            .await?;
1884
1885        let mut source_fragment_ids = HashMap::new();
1886        for (fragment_id, stream_node) in fragments {
1887            if let Some((source_id, upstream_source_fragment_id)) =
1888                stream_node.to_protobuf().find_source_backfill()
1889            {
1890                source_fragment_ids
1891                    .entry(source_id)
1892                    .or_insert_with(BTreeSet::new)
1893                    .insert((fragment_id, upstream_source_fragment_id));
1894            }
1895        }
1896        Ok(source_fragment_ids)
1897    }
1898
1899    pub async fn get_all_upstream_sink_infos(
1900        &self,
1901        target_table: &PbTable,
1902        target_fragment_id: FragmentId,
1903    ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1904        let inner = self.inner.read().await;
1905        let txn = inner.db.begin().await?;
1906
1907        self.get_all_upstream_sink_infos_in_txn(&txn, target_table, target_fragment_id)
1908            .await
1909    }
1910
1911    pub async fn get_all_upstream_sink_infos_in_txn<C>(
1912        &self,
1913        txn: &C,
1914        target_table: &PbTable,
1915        target_fragment_id: FragmentId,
1916    ) -> MetaResult<Vec<UpstreamSinkInfo>>
1917    where
1918        C: ConnectionTrait,
1919    {
1920        let incoming_sinks = Sink::find()
1921            .filter(sink::Column::TargetTable.eq(target_table.id))
1922            .all(txn)
1923            .await?;
1924
1925        let sink_ids = incoming_sinks.iter().map(|s| s.sink_id).collect_vec();
1926        let sink_fragment_ids = get_sink_fragment_by_ids(txn, sink_ids).await?;
1927
1928        let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1929        for sink in &incoming_sinks {
1930            let sink_fragment_id =
1931                sink_fragment_ids
1932                    .get(&sink.sink_id)
1933                    .cloned()
1934                    .ok_or(anyhow::anyhow!(
1935                        "sink fragment not found for sink id {}",
1936                        sink.sink_id
1937                    ))?;
1938            let upstream_info = build_upstream_sink_info(
1939                sink.sink_id,
1940                sink.original_target_columns
1941                    .as_ref()
1942                    .map(|cols| cols.to_protobuf())
1943                    .unwrap_or_default(),
1944                sink_fragment_id,
1945                target_table,
1946                target_fragment_id,
1947            )?;
1948            upstream_sink_infos.push(upstream_info);
1949        }
1950
1951        Ok(upstream_sink_infos)
1952    }
1953
1954    pub async fn get_mview_fragment_by_id(&self, job_id: JobId) -> MetaResult<FragmentId> {
1955        let inner = self.inner.read().await;
1956        let txn = inner.db.begin().await?;
1957
1958        let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1959            .select_only()
1960            .column(fragment::Column::FragmentId)
1961            .filter(
1962                fragment::Column::JobId
1963                    .eq(job_id)
1964                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1965            )
1966            .into_tuple()
1967            .all(&txn)
1968            .await?;
1969
1970        if mview_fragment.len() != 1 {
1971            return Err(anyhow::anyhow!(
1972                "expected exactly one mview fragment for job {}, found {}",
1973                job_id,
1974                mview_fragment.len()
1975            )
1976            .into());
1977        }
1978
1979        Ok(mview_fragment.into_iter().next().unwrap())
1980    }
1981
1982    pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1983        let inner = self.inner.read().await;
1984        let txn = inner.db.begin().await?;
1985        has_table_been_migrated(&txn, table_id).await
1986    }
1987
1988    pub async fn update_fragment_splits<C>(
1989        &self,
1990        txn: &C,
1991        fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
1992    ) -> MetaResult<()>
1993    where
1994        C: ConnectionTrait,
1995    {
1996        if fragment_splits.is_empty() {
1997            return Ok(());
1998        }
1999
2000        let models: Vec<fragment_splits::ActiveModel> = fragment_splits
2001            .iter()
2002            .map(|(fragment_id, splits)| fragment_splits::ActiveModel {
2003                fragment_id: Set(*fragment_id as _),
2004                splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
2005                    splits: splits.iter().map(Into::into).collect_vec(),
2006                }))),
2007            })
2008            .collect();
2009
2010        FragmentSplits::insert_many(models)
2011            .on_conflict(
2012                OnConflict::column(fragment_splits::Column::FragmentId)
2013                    .update_column(fragment_splits::Column::Splits)
2014                    .to_owned(),
2015            )
2016            .exec(txn)
2017            .await?;
2018
2019        Ok(())
2020    }
2021}
2022
2023#[cfg(test)]
2024mod tests {
2025    use std::collections::{BTreeMap, HashMap, HashSet};
2026
2027    use itertools::Itertools;
2028    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
2029    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
2030    use risingwave_common::id::JobId;
2031    use risingwave_common::util::iter_util::ZipEqDebug;
2032    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
2033    use risingwave_meta_model::fragment::DistributionType;
2034    use risingwave_meta_model::*;
2035    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
2036    use risingwave_pb::plan_common::PbExprContext;
2037    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
2038    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
2039    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
2040
2041    use super::ActorInfo;
2042    use crate::MetaResult;
2043    use crate::controller::catalog::CatalogController;
2044    use crate::model::{Fragment, StreamActor};
2045
2046    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
2047
2048    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
2049
2050    const TEST_FRAGMENT_ID: FragmentId = FragmentId::new(1);
2051
2052    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = FragmentId::new(2);
2053
2054    const TEST_JOB_ID: JobId = JobId::new(1);
2055
2056    const TEST_STATE_TABLE_ID: TableId = TableId::new(1000);
2057
2058    fn generate_upstream_actor_ids_for_actor(actor_id: ActorId) -> ActorUpstreams {
2059        let mut upstream_actor_ids = BTreeMap::new();
2060        upstream_actor_ids.insert(
2061            TEST_UPSTREAM_FRAGMENT_ID,
2062            HashSet::from_iter([(actor_id + 100)]),
2063        );
2064        upstream_actor_ids.insert(
2065            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
2066            HashSet::from_iter([(actor_id + 200)]),
2067        );
2068        upstream_actor_ids
2069    }
2070
2071    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
2072        let mut input = vec![];
2073        for &upstream_fragment_id in actor_upstream_actor_ids.keys() {
2074            input.push(PbStreamNode {
2075                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
2076                    upstream_fragment_id,
2077                    ..Default::default()
2078                }))),
2079                ..Default::default()
2080            });
2081        }
2082
2083        PbStreamNode {
2084            input,
2085            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
2086            ..Default::default()
2087        }
2088    }
2089
2090    #[tokio::test]
2091    async fn test_extract_fragment() -> MetaResult<()> {
2092        let actor_count = 3u32;
2093        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2094            .map(|actor_id| {
2095                (
2096                    actor_id.into(),
2097                    generate_upstream_actor_ids_for_actor(actor_id.into()),
2098                )
2099            })
2100            .collect();
2101
2102        let actor_bitmaps = ActorMapping::new_uniform(
2103            (0..actor_count).map(|i| i.into()),
2104            VirtualNode::COUNT_FOR_TEST,
2105        )
2106        .to_bitmaps();
2107
2108        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
2109
2110        let pb_actors = (0..actor_count)
2111            .map(|actor_id| StreamActor {
2112                actor_id: actor_id.into(),
2113                fragment_id: TEST_FRAGMENT_ID as _,
2114                vnode_bitmap: actor_bitmaps.get(&actor_id.into()).cloned(),
2115                mview_definition: "".to_owned(),
2116                expr_context: Some(PbExprContext {
2117                    time_zone: String::from("America/New_York"),
2118                    strict_mode: false,
2119                }),
2120                config_override: "".into(),
2121            })
2122            .collect_vec();
2123
2124        let pb_fragment = Fragment {
2125            fragment_id: TEST_FRAGMENT_ID as _,
2126            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
2127            distribution_type: PbFragmentDistributionType::Hash as _,
2128            actors: pb_actors,
2129            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
2130            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
2131            nodes: stream_node,
2132        };
2133
2134        let fragment =
2135            CatalogController::prepare_fragment_model_for_new_job(TEST_JOB_ID, &pb_fragment)?;
2136
2137        check_fragment(fragment, pb_fragment);
2138
2139        Ok(())
2140    }
2141
2142    #[tokio::test]
2143    async fn test_compose_fragment() -> MetaResult<()> {
2144        let actor_count = 3u32;
2145
2146        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2147            .map(|actor_id| {
2148                (
2149                    actor_id.into(),
2150                    generate_upstream_actor_ids_for_actor(actor_id.into()),
2151                )
2152            })
2153            .collect();
2154
2155        let mut actor_bitmaps = ActorMapping::new_uniform(
2156            (0..actor_count).map(|i| i.into()),
2157            VirtualNode::COUNT_FOR_TEST,
2158        )
2159        .to_bitmaps();
2160
2161        let actors = (0..actor_count)
2162            .map(|actor_id| {
2163                let actor_splits = ConnectorSplits::from(&PbConnectorSplits {
2164                    splits: vec![PbConnectorSplit {
2165                        split_type: "dummy".to_owned(),
2166                        ..Default::default()
2167                    }],
2168                });
2169
2170                ActorInfo {
2171                    actor_id: actor_id.into(),
2172                    fragment_id: TEST_FRAGMENT_ID,
2173                    splits: actor_splits,
2174                    worker_id: 0.into(),
2175                    vnode_bitmap: actor_bitmaps
2176                        .remove(&actor_id.into())
2177                        .map(|bitmap| bitmap.to_protobuf())
2178                        .as_ref()
2179                        .map(VnodeBitmap::from),
2180                    expr_context: ExprContext::from(&PbExprContext {
2181                        time_zone: String::from("America/New_York"),
2182                        strict_mode: false,
2183                    }),
2184                    config_override: "a.b.c = true".into(),
2185                }
2186            })
2187            .collect_vec();
2188
2189        let stream_node = {
2190            let template_actor = actors.first().cloned().unwrap();
2191
2192            let template_upstream_actor_ids = upstream_actor_ids
2193                .get(&(template_actor.actor_id as _))
2194                .unwrap();
2195
2196            generate_merger_stream_node(template_upstream_actor_ids)
2197        };
2198
2199        #[expect(deprecated)]
2200        let fragment = fragment::Model {
2201            fragment_id: TEST_FRAGMENT_ID,
2202            job_id: TEST_JOB_ID,
2203            fragment_type_mask: 0,
2204            distribution_type: DistributionType::Hash,
2205            stream_node: StreamNode::from(&stream_node),
2206            state_table_ids: TableIdArray(vec![TEST_STATE_TABLE_ID]),
2207            upstream_fragment_id: Default::default(),
2208            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2209            parallelism: None,
2210        };
2211
2212        let (pb_fragment, pb_actor_status, pb_actor_splits) =
2213            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2214
2215        assert_eq!(pb_actor_status.len(), actor_count as usize);
2216        assert!(
2217            pb_actor_status
2218                .values()
2219                .all(|actor_status| actor_status.location.is_some())
2220        );
2221        assert_eq!(pb_actor_splits.len(), actor_count as usize);
2222
2223        let pb_actors = pb_fragment.actors.clone();
2224
2225        check_fragment(fragment, pb_fragment);
2226        check_actors(
2227            actors,
2228            &upstream_actor_ids,
2229            pb_actors,
2230            pb_actor_splits,
2231            &stream_node,
2232        );
2233
2234        Ok(())
2235    }
2236
2237    fn check_actors(
2238        actors: Vec<ActorInfo>,
2239        actor_upstreams: &FragmentActorUpstreams,
2240        pb_actors: Vec<StreamActor>,
2241        pb_actor_splits: HashMap<ActorId, PbConnectorSplits>,
2242        stream_node: &PbStreamNode,
2243    ) {
2244        for (
2245            ActorInfo {
2246                actor_id,
2247                fragment_id,
2248                splits,
2249                worker_id: _,
2250                vnode_bitmap,
2251                expr_context,
2252                ..
2253            },
2254            StreamActor {
2255                actor_id: pb_actor_id,
2256                fragment_id: pb_fragment_id,
2257                vnode_bitmap: pb_vnode_bitmap,
2258                mview_definition,
2259                expr_context: pb_expr_context,
2260                ..
2261            },
2262        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2263        {
2264            assert_eq!(actor_id, pb_actor_id as ActorId);
2265            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2266
2267            assert_eq!(
2268                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2269                pb_vnode_bitmap,
2270            );
2271
2272            assert_eq!(mview_definition, "");
2273
2274            visit_stream_node_body(stream_node, |body| {
2275                if let PbNodeBody::Merge(m) = body {
2276                    assert!(
2277                        actor_upstreams
2278                            .get(&(actor_id as _))
2279                            .unwrap()
2280                            .contains_key(&m.upstream_fragment_id)
2281                    );
2282                }
2283            });
2284
2285            assert_eq!(
2286                splits,
2287                pb_actor_splits
2288                    .get(&pb_actor_id)
2289                    .map(ConnectorSplits::from)
2290                    .unwrap_or_default()
2291            );
2292
2293            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2294        }
2295    }
2296
2297    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2298        let Fragment {
2299            fragment_id,
2300            fragment_type_mask,
2301            distribution_type: pb_distribution_type,
2302            actors: _,
2303            state_table_ids: pb_state_table_ids,
2304            maybe_vnode_count: _,
2305            nodes,
2306        } = pb_fragment;
2307
2308        assert_eq!(fragment_id, TEST_FRAGMENT_ID);
2309        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2310        assert_eq!(
2311            pb_distribution_type,
2312            PbFragmentDistributionType::from(fragment.distribution_type)
2313        );
2314
2315        assert_eq!(pb_state_table_ids, fragment.state_table_ids.0);
2316        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2317    }
2318
2319    #[test]
2320    fn test_parallelism_policy_with_root_fragments() {
2321        #[expect(deprecated)]
2322        let fragment = fragment::Model {
2323            fragment_id: 3.into(),
2324            job_id: TEST_JOB_ID,
2325            fragment_type_mask: 0,
2326            distribution_type: DistributionType::Hash,
2327            stream_node: StreamNode::from(&PbStreamNode::default()),
2328            state_table_ids: TableIdArray::default(),
2329            upstream_fragment_id: Default::default(),
2330            vnode_count: 0,
2331            parallelism: None,
2332        };
2333
2334        let job_parallelism = StreamingParallelism::Fixed(4);
2335
2336        let policy = super::CatalogController::format_fragment_parallelism_policy(
2337            fragment.distribution_type,
2338            fragment.parallelism.as_ref(),
2339            Some(&job_parallelism),
2340            &[],
2341        );
2342
2343        assert_eq!(policy, "inherit(fixed(4))");
2344    }
2345
2346    #[test]
2347    fn test_parallelism_policy_with_upstream_roots() {
2348        #[expect(deprecated)]
2349        let fragment = fragment::Model {
2350            fragment_id: 5.into(),
2351            job_id: TEST_JOB_ID,
2352            fragment_type_mask: 0,
2353            distribution_type: DistributionType::Hash,
2354            stream_node: StreamNode::from(&PbStreamNode::default()),
2355            state_table_ids: TableIdArray::default(),
2356            upstream_fragment_id: Default::default(),
2357            vnode_count: 0,
2358            parallelism: None,
2359        };
2360
2361        let policy = super::CatalogController::format_fragment_parallelism_policy(
2362            fragment.distribution_type,
2363            fragment.parallelism.as_ref(),
2364            None,
2365            &[3.into(), 1.into(), 2.into(), 1.into()],
2366        );
2367
2368        assert_eq!(policy, "upstream_fragment([1, 2, 3])");
2369    }
2370}