risingwave_meta/controller/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
26use risingwave_common::util::stream_graph_visitor::{
27    visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_connector::source::SplitImpl;
30use risingwave_meta_model::actor::ActorStatus;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34    Actor, Fragment as FragmentModel, FragmentRelation, Sink, SourceSplits, StreamingJob,
35};
36use risingwave_meta_model::{
37    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39    VnodeBitmap, WorkerId, actor, database, fragment, fragment_relation, object, sink, source,
40    source_splits, streaming_job, table,
41};
42use risingwave_meta_model_migration::{Alias, ExprTrait, OnConflict, SelectStatement, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46    Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
49use risingwave_pb::meta::table_fragments::fragment::{
50    FragmentDistributionType, PbFragmentDistributionType,
51};
52use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
53use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping};
54use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57    PbDispatchOutputMapping, PbDispatcherType, PbStreamContext, PbStreamNode, PbStreamScanType,
58    StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63    ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, ModelTrait, PaginatorTrait,
64    QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
65};
66use serde::{Deserialize, Serialize};
67use tracing::debug;
68
69use crate::barrier::SnapshotBackfillInfo;
70use crate::controller::catalog::{CatalogController, CatalogControllerInner};
71use crate::controller::scale::resolve_streaming_job_definition;
72use crate::controller::utils::{
73    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
74    get_fragment_mappings, get_sink_fragment_by_ids, has_table_been_migrated,
75    resolve_no_shuffle_actor_dispatcher,
76};
77use crate::manager::{LocalNotification, NotificationManager};
78use crate::model::{
79    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
80    StreamActor, StreamContext, StreamJobFragments, TableParallelism,
81};
82use crate::rpc::ddl_controller::build_upstream_sink_info;
83use crate::stream::{SplitAssignment, UpstreamSinkInfo};
84use crate::{MetaError, MetaResult};
85
86/// Some information of running (inflight) actors.
87#[derive(Clone, Debug)]
88pub struct InflightActorInfo {
89    pub worker_id: WorkerId,
90    pub vnode_bitmap: Option<Bitmap>,
91    pub splits: Vec<SplitImpl>,
92}
93
94#[derive(Clone, Debug)]
95pub struct InflightFragmentInfo {
96    pub fragment_id: crate::model::FragmentId,
97    pub distribution_type: DistributionType,
98    pub nodes: PbStreamNode,
99    pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
100    pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
101}
102
103#[derive(Clone, Debug)]
104pub struct FragmentParallelismInfo {
105    pub distribution_type: FragmentDistributionType,
106    pub actor_count: usize,
107    pub vnode_count: usize,
108}
109
110#[easy_ext::ext(FragmentTypeMaskExt)]
111pub impl FragmentTypeMask {
112    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
113        Expr::col(fragment::Column::FragmentTypeMask)
114            .bit_and(Expr::value(flag as i32))
115            .ne(0)
116    }
117
118    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
119        Expr::col(fragment::Column::FragmentTypeMask)
120            .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
121            .ne(0)
122    }
123
124    fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
125        Expr::col(fragment::Column::FragmentTypeMask)
126            .bit_and(Expr::value(flag as i32))
127            .eq(0)
128    }
129}
130
131#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
132#[serde(rename_all = "camelCase")] // for dashboard
133pub struct StreamingJobInfo {
134    pub job_id: ObjectId,
135    pub obj_type: ObjectType,
136    pub name: String,
137    pub job_status: JobStatus,
138    pub parallelism: StreamingParallelism,
139    pub max_parallelism: i32,
140    pub resource_group: String,
141    pub database_id: DatabaseId,
142    pub schema_id: SchemaId,
143}
144
145impl CatalogControllerInner {
146    /// List all fragment vnode mapping info for all CREATED streaming jobs.
147    pub async fn all_running_fragment_mappings(
148        &self,
149    ) -> MetaResult<impl Iterator<Item = FragmentWorkerSlotMapping> + '_> {
150        let txn = self.db.begin().await?;
151
152        let job_ids: Vec<ObjectId> = StreamingJob::find()
153            .select_only()
154            .column(streaming_job::Column::JobId)
155            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
156            .into_tuple()
157            .all(&txn)
158            .await?;
159
160        let mut result = vec![];
161        for job_id in job_ids {
162            let mappings = get_fragment_mappings(&txn, job_id).await?;
163
164            result.extend(mappings.into_iter());
165        }
166
167        Ok(result.into_iter())
168    }
169}
170
171impl NotificationManager {
172    pub(crate) fn notify_fragment_mapping(
173        &self,
174        operation: NotificationOperation,
175        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
176    ) {
177        let fragment_ids = fragment_mappings
178            .iter()
179            .map(|mapping| mapping.fragment_id)
180            .collect_vec();
181        if fragment_ids.is_empty() {
182            return;
183        }
184        // notify all fragment mappings to frontend.
185        for fragment_mapping in fragment_mappings {
186            self.notify_frontend_without_version(
187                operation,
188                NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
189            );
190        }
191
192        // update serving vnode mappings.
193        match operation {
194            NotificationOperation::Add | NotificationOperation::Update => {
195                self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
196                    fragment_ids,
197                ));
198            }
199            NotificationOperation::Delete => {
200                self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
201                    fragment_ids,
202                ));
203            }
204            op => {
205                tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
206            }
207        }
208    }
209}
210
211impl CatalogController {
212    pub fn extract_fragment_and_actors_from_fragments(
213        job_id: ObjectId,
214        fragments: impl Iterator<Item = &Fragment>,
215        actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
216        actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
217    ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
218        fragments
219            .map(|fragment| {
220                Self::extract_fragment_and_actors_for_new_job(
221                    job_id,
222                    fragment,
223                    actor_status,
224                    actor_splits,
225                )
226            })
227            .try_collect()
228    }
229
230    pub fn extract_fragment_and_actors_for_new_job(
231        job_id: ObjectId,
232        fragment: &Fragment,
233        actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
234        actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
235    ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
236        let vnode_count = fragment.vnode_count();
237        let Fragment {
238            fragment_id: pb_fragment_id,
239            fragment_type_mask: pb_fragment_type_mask,
240            distribution_type: pb_distribution_type,
241            actors: pb_actors,
242            state_table_ids: pb_state_table_ids,
243            nodes,
244            ..
245        } = fragment;
246
247        let state_table_ids = pb_state_table_ids.clone().into();
248
249        assert!(!pb_actors.is_empty());
250
251        let stream_node = {
252            let mut stream_node = nodes.clone();
253            visit_stream_node_mut(&mut stream_node, |body| {
254                #[expect(deprecated)]
255                if let NodeBody::Merge(m) = body {
256                    m.upstream_actor_id = vec![];
257                }
258            });
259
260            stream_node
261        };
262
263        let mut actors = vec![];
264
265        for actor in pb_actors {
266            let StreamActor {
267                actor_id,
268                fragment_id,
269                vnode_bitmap,
270                mview_definition: _,
271                expr_context: pb_expr_context,
272                ..
273            } = actor;
274
275            let splits = actor_splits.get(actor_id).map(|splits| {
276                ConnectorSplits::from(&PbConnectorSplits {
277                    splits: splits.iter().map(ConnectorSplit::from).collect(),
278                })
279            });
280            let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
281                anyhow::anyhow!(
282                    "actor {} in fragment {} has no actor_status",
283                    actor_id,
284                    fragment_id
285                )
286            })?;
287
288            let worker_id = status.worker_id() as _;
289
290            let pb_expr_context = pb_expr_context
291                .as_ref()
292                .expect("no expression context found");
293
294            #[expect(deprecated)]
295            actors.push(actor::Model {
296                actor_id: *actor_id as _,
297                fragment_id: *fragment_id as _,
298                status: status.get_state().unwrap().into(),
299                splits,
300                worker_id,
301                upstream_actor_ids: Default::default(),
302                vnode_bitmap: vnode_bitmap
303                    .as_ref()
304                    .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
305                expr_context: ExprContext::from(pb_expr_context),
306            });
307        }
308
309        let stream_node = StreamNode::from(&stream_node);
310
311        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
312            .unwrap()
313            .into();
314
315        #[expect(deprecated)]
316        let fragment = fragment::Model {
317            fragment_id: *pb_fragment_id as _,
318            job_id,
319            fragment_type_mask: (*pb_fragment_type_mask).into(),
320            distribution_type,
321            stream_node,
322            state_table_ids,
323            upstream_fragment_id: Default::default(),
324            vnode_count: vnode_count as _,
325        };
326
327        Ok((fragment, actors))
328    }
329
330    pub fn compose_table_fragments(
331        table_id: u32,
332        state: PbState,
333        ctx: Option<PbStreamContext>,
334        fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
335        parallelism: StreamingParallelism,
336        max_parallelism: usize,
337        job_definition: Option<String>,
338    ) -> MetaResult<StreamJobFragments> {
339        let mut pb_fragments = BTreeMap::new();
340        let mut pb_actor_status = BTreeMap::new();
341
342        for (fragment, actors) in fragments {
343            let (fragment, fragment_actor_status, _) =
344                Self::compose_fragment(fragment, actors, job_definition.clone())?;
345
346            pb_fragments.insert(fragment.fragment_id, fragment);
347            pb_actor_status.extend(fragment_actor_status.into_iter());
348        }
349
350        let table_fragments = StreamJobFragments {
351            stream_job_id: table_id.into(),
352            state: state as _,
353            fragments: pb_fragments,
354            actor_status: pb_actor_status,
355            ctx: ctx
356                .as_ref()
357                .map(StreamContext::from_protobuf)
358                .unwrap_or_default(),
359            assigned_parallelism: match parallelism {
360                StreamingParallelism::Custom => TableParallelism::Custom,
361                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
362                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
363            },
364            max_parallelism,
365        };
366
367        Ok(table_fragments)
368    }
369
370    #[allow(clippy::type_complexity)]
371    pub(crate) fn compose_fragment(
372        fragment: fragment::Model,
373        actors: Vec<actor::Model>,
374        job_definition: Option<String>,
375    ) -> MetaResult<(
376        Fragment,
377        HashMap<crate::model::ActorId, PbActorStatus>,
378        HashMap<crate::model::ActorId, PbConnectorSplits>,
379    )> {
380        let fragment::Model {
381            fragment_id,
382            fragment_type_mask,
383            distribution_type,
384            stream_node,
385            state_table_ids,
386            vnode_count,
387            ..
388        } = fragment;
389
390        let stream_node = stream_node.to_protobuf();
391        let mut upstream_fragments = HashSet::new();
392        visit_stream_node_body(&stream_node, |body| {
393            if let NodeBody::Merge(m) = body {
394                assert!(
395                    upstream_fragments.insert(m.upstream_fragment_id),
396                    "non-duplicate upstream fragment"
397                );
398            }
399        });
400
401        let mut pb_actors = vec![];
402
403        let mut pb_actor_status = HashMap::new();
404        let mut pb_actor_splits = HashMap::new();
405
406        for actor in actors {
407            if actor.fragment_id != fragment_id {
408                bail!(
409                    "fragment id {} from actor {} is different from fragment {}",
410                    actor.fragment_id,
411                    actor.actor_id,
412                    fragment_id
413                )
414            }
415
416            let actor::Model {
417                actor_id,
418                fragment_id,
419                status,
420                worker_id,
421                splits,
422                vnode_bitmap,
423                expr_context,
424                ..
425            } = actor;
426
427            let vnode_bitmap =
428                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
429            let pb_expr_context = Some(expr_context.to_protobuf());
430
431            pb_actor_status.insert(
432                actor_id as _,
433                PbActorStatus {
434                    location: PbActorLocation::from_worker(worker_id as u32),
435                    state: PbActorState::from(status) as _,
436                },
437            );
438
439            if let Some(splits) = splits {
440                pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
441            }
442
443            pb_actors.push(StreamActor {
444                actor_id: actor_id as _,
445                fragment_id: fragment_id as _,
446                vnode_bitmap,
447                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
448                expr_context: pb_expr_context,
449            })
450        }
451
452        let pb_state_table_ids = state_table_ids.into_u32_array();
453        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
454        let pb_fragment = Fragment {
455            fragment_id: fragment_id as _,
456            fragment_type_mask: fragment_type_mask.into(),
457            distribution_type: pb_distribution_type,
458            actors: pb_actors,
459            state_table_ids: pb_state_table_ids,
460            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
461            nodes: stream_node,
462        };
463
464        Ok((pb_fragment, pb_actor_status, pb_actor_splits))
465    }
466
467    pub async fn running_fragment_parallelisms(
468        &self,
469        id_filter: Option<HashSet<FragmentId>>,
470    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
471        let inner = self.inner.read().await;
472
473        let query_alias = Alias::new("fragment_actor_count");
474        let count_alias = Alias::new("count");
475
476        let mut query = SelectStatement::new()
477            .column(actor::Column::FragmentId)
478            .expr_as(actor::Column::ActorId.count(), count_alias.clone())
479            .from(Actor)
480            .group_by_col(actor::Column::FragmentId)
481            .to_owned();
482
483        if let Some(id_filter) = id_filter {
484            query.cond_having(actor::Column::FragmentId.is_in(id_filter));
485        }
486
487        let outer = SelectStatement::new()
488            .column((FragmentModel, fragment::Column::FragmentId))
489            .column(count_alias)
490            .column(fragment::Column::DistributionType)
491            .column(fragment::Column::VnodeCount)
492            .from_subquery(query.to_owned(), query_alias.clone())
493            .inner_join(
494                FragmentModel,
495                Expr::col((query_alias, actor::Column::FragmentId))
496                    .equals((FragmentModel, fragment::Column::FragmentId)),
497            )
498            .to_owned();
499
500        let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
501            Selector::<SelectGetableTuple<(FragmentId, i64, DistributionType, i32)>>::into_tuple(
502                outer.to_owned(),
503            )
504            .all(&inner.db)
505            .await?;
506
507        Ok(fragment_parallelisms
508            .into_iter()
509            .map(|(fragment_id, count, distribution_type, vnode_count)| {
510                (
511                    fragment_id,
512                    FragmentParallelismInfo {
513                        distribution_type: distribution_type.into(),
514                        actor_count: count as usize,
515                        vnode_count: vnode_count as usize,
516                    },
517                )
518            })
519            .collect())
520    }
521
522    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
523        let inner = self.inner.read().await;
524        let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
525            .select_only()
526            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
527            .into_tuple()
528            .all(&inner.db)
529            .await?;
530        Ok(fragment_jobs.into_iter().collect())
531    }
532
533    pub async fn get_fragment_job_id(
534        &self,
535        fragment_ids: Vec<FragmentId>,
536    ) -> MetaResult<Vec<ObjectId>> {
537        let inner = self.inner.read().await;
538
539        let object_ids: Vec<ObjectId> = FragmentModel::find()
540            .select_only()
541            .column(fragment::Column::JobId)
542            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
543            .into_tuple()
544            .all(&inner.db)
545            .await?;
546
547        Ok(object_ids)
548    }
549
550    pub async fn get_fragment_desc_by_id(
551        &self,
552        fragment_id: FragmentId,
553    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
554        let inner = self.inner.read().await;
555
556        // Get the fragment description
557        let fragment_opt = FragmentModel::find()
558            .select_only()
559            .columns([
560                fragment::Column::FragmentId,
561                fragment::Column::JobId,
562                fragment::Column::FragmentTypeMask,
563                fragment::Column::DistributionType,
564                fragment::Column::StateTableIds,
565                fragment::Column::VnodeCount,
566                fragment::Column::StreamNode,
567            ])
568            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
569            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
570            .filter(fragment::Column::FragmentId.eq(fragment_id))
571            .group_by(fragment::Column::FragmentId)
572            .into_model::<FragmentDesc>()
573            .one(&inner.db)
574            .await?;
575
576        let Some(fragment) = fragment_opt else {
577            return Ok(None); // Fragment not found
578        };
579
580        // Get upstream fragments
581        let upstreams: Vec<FragmentId> = FragmentRelation::find()
582            .select_only()
583            .column(fragment_relation::Column::SourceFragmentId)
584            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
585            .into_tuple()
586            .all(&inner.db)
587            .await?;
588
589        Ok(Some((fragment, upstreams)))
590    }
591
592    pub async fn list_fragment_database_ids(
593        &self,
594        select_fragment_ids: Option<Vec<FragmentId>>,
595    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
596        let inner = self.inner.read().await;
597        let select = FragmentModel::find()
598            .select_only()
599            .column(fragment::Column::FragmentId)
600            .column(object::Column::DatabaseId)
601            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
602        let select = if let Some(select_fragment_ids) = select_fragment_ids {
603            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
604        } else {
605            select
606        };
607        Ok(select.into_tuple().all(&inner.db).await?)
608    }
609
610    pub async fn get_job_fragments_by_id(
611        &self,
612        job_id: ObjectId,
613    ) -> MetaResult<StreamJobFragments> {
614        let inner = self.inner.read().await;
615        let fragment_actors = FragmentModel::find()
616            .find_with_related(Actor)
617            .filter(fragment::Column::JobId.eq(job_id))
618            .all(&inner.db)
619            .await?;
620
621        let job_info = StreamingJob::find_by_id(job_id)
622            .one(&inner.db)
623            .await?
624            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
625
626        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
627            .await?
628            .remove(&job_id);
629
630        Self::compose_table_fragments(
631            job_id as _,
632            job_info.job_status.into(),
633            job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
634            fragment_actors,
635            job_info.parallelism.clone(),
636            job_info.max_parallelism as _,
637            job_definition,
638        )
639    }
640
641    pub async fn get_fragment_actor_dispatchers(
642        &self,
643        fragment_ids: Vec<FragmentId>,
644    ) -> MetaResult<FragmentActorDispatchers> {
645        let inner = self.inner.read().await;
646        get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
647    }
648
649    pub async fn get_fragment_downstream_relations(
650        &self,
651        fragment_ids: Vec<FragmentId>,
652    ) -> MetaResult<FragmentDownstreamRelation> {
653        let inner = self.inner.read().await;
654        let mut stream = FragmentRelation::find()
655            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
656            .stream(&inner.db)
657            .await?;
658        let mut relations = FragmentDownstreamRelation::new();
659        while let Some(relation) = stream.try_next().await? {
660            relations
661                .entry(relation.source_fragment_id as _)
662                .or_default()
663                .push(DownstreamFragmentRelation {
664                    downstream_fragment_id: relation.target_fragment_id as _,
665                    dispatcher_type: relation.dispatcher_type,
666                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
667                    output_mapping: PbDispatchOutputMapping {
668                        indices: relation.output_indices.into_u32_array(),
669                        types: relation
670                            .output_type_mapping
671                            .unwrap_or_default()
672                            .to_protobuf(),
673                    },
674                });
675        }
676        Ok(relations)
677    }
678
679    pub async fn get_job_fragment_backfill_scan_type(
680        &self,
681        job_id: ObjectId,
682    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
683        let inner = self.inner.read().await;
684        let fragments: Vec<_> = FragmentModel::find()
685            .filter(fragment::Column::JobId.eq(job_id))
686            .all(&inner.db)
687            .await?;
688
689        let mut result = HashMap::new();
690
691        for fragment::Model {
692            fragment_id,
693            stream_node,
694            ..
695        } in fragments
696        {
697            let stream_node = stream_node.to_protobuf();
698            visit_stream_node_body(&stream_node, |body| {
699                if let NodeBody::StreamScan(node) = body {
700                    match node.stream_scan_type() {
701                        StreamScanType::Unspecified => {}
702                        scan_type => {
703                            result.insert(fragment_id as crate::model::FragmentId, scan_type);
704                        }
705                    }
706                }
707            });
708        }
709
710        Ok(result)
711    }
712
713    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
714        let inner = self.inner.read().await;
715        let job_states = StreamingJob::find()
716            .select_only()
717            .column(streaming_job::Column::JobId)
718            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
719            .join(JoinType::InnerJoin, object::Relation::Database2.def())
720            .column(object::Column::ObjType)
721            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
722            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
723            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
724            .column_as(
725                Expr::if_null(
726                    Expr::col((table::Entity, table::Column::Name)),
727                    Expr::if_null(
728                        Expr::col((source::Entity, source::Column::Name)),
729                        Expr::if_null(
730                            Expr::col((sink::Entity, sink::Column::Name)),
731                            Expr::val("<unknown>"),
732                        ),
733                    ),
734                ),
735                "name",
736            )
737            .columns([
738                streaming_job::Column::JobStatus,
739                streaming_job::Column::Parallelism,
740                streaming_job::Column::MaxParallelism,
741            ])
742            .column_as(
743                Expr::if_null(
744                    Expr::col((
745                        streaming_job::Entity,
746                        streaming_job::Column::SpecificResourceGroup,
747                    )),
748                    Expr::col((database::Entity, database::Column::ResourceGroup)),
749                ),
750                "resource_group",
751            )
752            .column(object::Column::DatabaseId)
753            .column(object::Column::SchemaId)
754            .into_model()
755            .all(&inner.db)
756            .await?;
757        Ok(job_states)
758    }
759
760    pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
761        let inner = self.inner.read().await;
762        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
763            .select_only()
764            .column(streaming_job::Column::MaxParallelism)
765            .into_tuple()
766            .one(&inner.db)
767            .await?
768            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
769        Ok(max_parallelism as usize)
770    }
771
772    /// Get all actor ids in the target streaming jobs.
773    pub async fn get_job_actor_mapping(
774        &self,
775        job_ids: Vec<ObjectId>,
776    ) -> MetaResult<HashMap<ObjectId, Vec<ActorId>>> {
777        let inner = self.inner.read().await;
778        let job_actors: Vec<(ObjectId, ActorId)> = Actor::find()
779            .select_only()
780            .column(fragment::Column::JobId)
781            .column(actor::Column::ActorId)
782            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
783            .filter(fragment::Column::JobId.is_in(job_ids))
784            .into_tuple()
785            .all(&inner.db)
786            .await?;
787        Ok(job_actors.into_iter().into_group_map())
788    }
789
790    /// Try to get internal table ids of each streaming job, used by metrics collection.
791    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
792        if let Ok(inner) = self.inner.try_read()
793            && let Ok(job_state_tables) = FragmentModel::find()
794                .select_only()
795                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
796                .into_tuple::<(ObjectId, I32Array)>()
797                .all(&inner.db)
798                .await
799        {
800            let mut job_internal_table_ids = HashMap::new();
801            for (job_id, state_table_ids) in job_state_tables {
802                job_internal_table_ids
803                    .entry(job_id)
804                    .or_insert_with(Vec::new)
805                    .extend(state_table_ids.into_inner());
806            }
807            return Some(job_internal_table_ids.into_iter().collect());
808        }
809        None
810    }
811
812    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
813        let inner = self.inner.read().await;
814        let count = FragmentModel::find().count(&inner.db).await?;
815        Ok(count > 0)
816    }
817
818    pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
819        let inner = self.inner.read().await;
820        let actor_cnt: Vec<(WorkerId, i64)> = Actor::find()
821            .select_only()
822            .column(actor::Column::WorkerId)
823            .column_as(actor::Column::ActorId.count(), "count")
824            .group_by(actor::Column::WorkerId)
825            .into_tuple()
826            .all(&inner.db)
827            .await?;
828
829        Ok(actor_cnt
830            .into_iter()
831            .map(|(worker_id, count)| (worker_id, count as usize))
832            .collect())
833    }
834
835    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
836    pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
837        let inner = self.inner.read().await;
838        let jobs = StreamingJob::find().all(&inner.db).await?;
839
840        let mut job_definition = resolve_streaming_job_definition(
841            &inner.db,
842            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
843        )
844        .await?;
845
846        let mut table_fragments = BTreeMap::new();
847        for job in jobs {
848            let fragment_actors = FragmentModel::find()
849                .find_with_related(Actor)
850                .filter(fragment::Column::JobId.eq(job.job_id))
851                .all(&inner.db)
852                .await?;
853
854            table_fragments.insert(
855                job.job_id as ObjectId,
856                Self::compose_table_fragments(
857                    job.job_id as _,
858                    job.job_status.into(),
859                    job.timezone.map(|tz| PbStreamContext { timezone: tz }),
860                    fragment_actors,
861                    job.parallelism.clone(),
862                    job.max_parallelism as _,
863                    job_definition.remove(&job.job_id),
864                )?,
865            );
866        }
867
868        Ok(table_fragments)
869    }
870
871    /// Returns pairs of (job id, actor ids), where actors belong to CDC table backfill fragment of the job.
872    pub async fn cdc_table_backfill_actor_ids(&self) -> MetaResult<HashMap<u32, HashSet<u32>>> {
873        let inner = self.inner.read().await;
874        let mut job_id_actor_ids = HashMap::default();
875        let stream_cdc_scan_flag = FragmentTypeFlag::StreamCdcScan as i32;
876        let fragment_type_mask = stream_cdc_scan_flag;
877        let fragment_actors: Vec<(fragment::Model, Vec<actor::Model>)> = FragmentModel::find()
878            .find_with_related(Actor)
879            .filter(fragment::Column::FragmentTypeMask.eq(fragment_type_mask))
880            .all(&inner.db)
881            .await?;
882        for (fragment, actors) in fragment_actors {
883            let e: &mut HashSet<u32> = job_id_actor_ids.entry(fragment.job_id as _).or_default();
884            e.extend(actors.iter().map(|a| a.actor_id as u32));
885        }
886        Ok(job_id_actor_ids)
887    }
888
889    pub async fn upstream_fragments(
890        &self,
891        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
892    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
893        let inner = self.inner.read().await;
894        let mut stream = FragmentRelation::find()
895            .select_only()
896            .columns([
897                fragment_relation::Column::SourceFragmentId,
898                fragment_relation::Column::TargetFragmentId,
899            ])
900            .filter(
901                fragment_relation::Column::TargetFragmentId
902                    .is_in(fragment_ids.map(|id| id as FragmentId)),
903            )
904            .into_tuple::<(FragmentId, FragmentId)>()
905            .stream(&inner.db)
906            .await?;
907        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
908        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
909            upstream_fragments
910                .entry(downstream_fragment_id as crate::model::FragmentId)
911                .or_default()
912                .insert(upstream_fragment_id as crate::model::FragmentId);
913        }
914        Ok(upstream_fragments)
915    }
916
917    pub async fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
918        let inner = self.inner.read().await;
919        let actor_locations: Vec<PartialActorLocation> =
920            Actor::find().into_partial_model().all(&inner.db).await?;
921        Ok(actor_locations)
922    }
923
924    pub async fn list_actor_info(
925        &self,
926    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
927        let inner = self.inner.read().await;
928        let actor_locations: Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)> =
929            Actor::find()
930                .join(JoinType::LeftJoin, actor::Relation::Fragment.def())
931                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
932                .select_only()
933                .columns([actor::Column::ActorId, actor::Column::FragmentId])
934                .column_as(object::Column::Oid, "job_id")
935                .column_as(object::Column::SchemaId, "schema_id")
936                .column_as(object::Column::ObjType, "type")
937                .into_tuple()
938                .all(&inner.db)
939                .await?;
940        Ok(actor_locations)
941    }
942
943    pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
944        let inner = self.inner.read().await;
945
946        let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
947            .select_only()
948            .filter(actor::Column::Splits.is_not_null())
949            .columns([actor::Column::ActorId, actor::Column::FragmentId])
950            .into_tuple()
951            .all(&inner.db)
952            .await?;
953
954        Ok(source_actors)
955    }
956
957    pub async fn list_creating_fragment_descs(
958        &self,
959    ) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
960        let inner = self.inner.read().await;
961        let mut result = Vec::new();
962        let fragments = FragmentModel::find()
963            .select_only()
964            .columns([
965                fragment::Column::FragmentId,
966                fragment::Column::JobId,
967                fragment::Column::FragmentTypeMask,
968                fragment::Column::DistributionType,
969                fragment::Column::StateTableIds,
970                fragment::Column::VnodeCount,
971                fragment::Column::StreamNode,
972            ])
973            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
974            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
975            .join(JoinType::LeftJoin, fragment::Relation::Object.def())
976            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
977            .filter(
978                streaming_job::Column::JobStatus
979                    .eq(JobStatus::Initial)
980                    .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
981            )
982            .group_by(fragment::Column::FragmentId)
983            .into_model::<FragmentDesc>()
984            .all(&inner.db)
985            .await?;
986        for fragment in fragments {
987            let upstreams: Vec<FragmentId> = FragmentRelation::find()
988                .select_only()
989                .column(fragment_relation::Column::SourceFragmentId)
990                .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
991                .into_tuple()
992                .all(&inner.db)
993                .await?;
994            result.push((fragment, upstreams));
995        }
996        Ok(result)
997    }
998
999    pub async fn list_fragment_descs(&self) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
1000        let inner = self.inner.read().await;
1001        let mut result = Vec::new();
1002        let fragments = FragmentModel::find()
1003            .select_only()
1004            .columns([
1005                fragment::Column::FragmentId,
1006                fragment::Column::JobId,
1007                fragment::Column::FragmentTypeMask,
1008                fragment::Column::DistributionType,
1009                fragment::Column::StateTableIds,
1010                fragment::Column::VnodeCount,
1011                fragment::Column::StreamNode,
1012            ])
1013            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
1014            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
1015            .group_by(fragment::Column::FragmentId)
1016            .into_model::<FragmentDesc>()
1017            .all(&inner.db)
1018            .await?;
1019        for fragment in fragments {
1020            let upstreams: Vec<FragmentId> = FragmentRelation::find()
1021                .select_only()
1022                .column(fragment_relation::Column::SourceFragmentId)
1023                .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
1024                .into_tuple()
1025                .all(&inner.db)
1026                .await?;
1027            result.push((fragment, upstreams));
1028        }
1029        Ok(result)
1030    }
1031
1032    pub async fn list_sink_actor_mapping(
1033        &self,
1034    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1035        let inner = self.inner.read().await;
1036        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1037            .select_only()
1038            .columns([sink::Column::SinkId, sink::Column::Name])
1039            .into_tuple()
1040            .all(&inner.db)
1041            .await?;
1042        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1043        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1044
1045        let actor_with_type: Vec<(ActorId, SinkId)> = Actor::find()
1046            .select_only()
1047            .column(actor::Column::ActorId)
1048            .column(fragment::Column::JobId)
1049            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1050            .filter(
1051                fragment::Column::JobId
1052                    .is_in(sink_ids)
1053                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
1054            )
1055            .into_tuple()
1056            .all(&inner.db)
1057            .await?;
1058
1059        let mut sink_actor_mapping = HashMap::new();
1060        for (actor_id, sink_id) in actor_with_type {
1061            sink_actor_mapping
1062                .entry(sink_id)
1063                .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1064                .1
1065                .push(actor_id);
1066        }
1067
1068        Ok(sink_actor_mapping)
1069    }
1070
1071    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1072        let inner = self.inner.read().await;
1073        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1074            .select_only()
1075            .columns([
1076                fragment::Column::FragmentId,
1077                fragment::Column::JobId,
1078                fragment::Column::StateTableIds,
1079            ])
1080            .into_partial_model()
1081            .all(&inner.db)
1082            .await?;
1083        Ok(fragment_state_tables)
1084    }
1085
1086    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1087    /// collected
1088    pub async fn load_all_actors(
1089        &self,
1090        database_id: Option<DatabaseId>,
1091    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
1092    {
1093        let inner = self.inner.read().await;
1094        let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
1095        let filter_condition = if let Some(database_id) = database_id {
1096            filter_condition.and(object::Column::DatabaseId.eq(database_id))
1097        } else {
1098            filter_condition
1099        };
1100        #[expect(clippy::type_complexity)]
1101        let mut actor_info_stream: BoxStream<
1102            '_,
1103            Result<
1104                (
1105                    ActorId,
1106                    WorkerId,
1107                    Option<VnodeBitmap>,
1108                    Option<ConnectorSplits>,
1109                    FragmentId,
1110                    StreamNode,
1111                    I32Array,
1112                    DistributionType,
1113                    DatabaseId,
1114                    ObjectId,
1115                ),
1116                _,
1117            >,
1118        > = Actor::find()
1119            .select_only()
1120            .column(actor::Column::ActorId)
1121            .column(actor::Column::WorkerId)
1122            .column(actor::Column::VnodeBitmap)
1123            .column(actor::Column::Splits)
1124            .column(fragment::Column::FragmentId)
1125            .column(fragment::Column::StreamNode)
1126            .column(fragment::Column::StateTableIds)
1127            .column(fragment::Column::DistributionType)
1128            .column(object::Column::DatabaseId)
1129            .column(object::Column::Oid)
1130            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1131            .join(JoinType::InnerJoin, fragment::Relation::Object.def())
1132            .filter(filter_condition)
1133            .into_tuple()
1134            .stream(&inner.db)
1135            .await?;
1136
1137        let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1138            HashMap::new();
1139
1140        while let Some((
1141            actor_id,
1142            worker_id,
1143            vnode_bitmap,
1144            splits,
1145            fragment_id,
1146            node,
1147            state_table_ids,
1148            distribution_type,
1149            database_id,
1150            job_id,
1151        )) = actor_info_stream.try_next().await?
1152        {
1153            let fragment_infos = database_fragment_infos
1154                .entry(database_id)
1155                .or_default()
1156                .entry(job_id)
1157                .or_default();
1158            let state_table_ids = state_table_ids.into_inner();
1159            let state_table_ids = state_table_ids
1160                .into_iter()
1161                .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1162                .collect();
1163
1164            let actor_info = InflightActorInfo {
1165                worker_id,
1166                vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1167                splits: splits
1168                    .map(|connector_splits| {
1169                        connector_splits
1170                            .to_protobuf()
1171                            .splits
1172                            .iter()
1173                            .map(|connector_split| SplitImpl::try_from(connector_split).unwrap())
1174                            .collect_vec()
1175                    })
1176                    .unwrap_or_default(),
1177            };
1178            match fragment_infos.entry(fragment_id) {
1179                Entry::Occupied(mut entry) => {
1180                    let info: &mut InflightFragmentInfo = entry.get_mut();
1181                    assert_eq!(info.state_table_ids, state_table_ids);
1182                    assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1183                }
1184                Entry::Vacant(entry) => {
1185                    entry.insert(InflightFragmentInfo {
1186                        fragment_id: fragment_id as _,
1187                        distribution_type,
1188                        nodes: node.to_protobuf(),
1189                        actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1190                        state_table_ids,
1191                    });
1192                }
1193            }
1194        }
1195
1196        debug!(?database_fragment_infos, "reload all actors");
1197
1198        Ok(database_fragment_infos)
1199    }
1200
1201    pub async fn migrate_actors(
1202        &self,
1203        plan: HashMap<WorkerSlotId, WorkerSlotId>,
1204    ) -> MetaResult<()> {
1205        let inner = self.inner.read().await;
1206        let txn = inner.db.begin().await?;
1207
1208        let actors: Vec<(
1209            FragmentId,
1210            DistributionType,
1211            ActorId,
1212            Option<VnodeBitmap>,
1213            WorkerId,
1214            ActorStatus,
1215        )> = Actor::find()
1216            .select_only()
1217            .columns([
1218                fragment::Column::FragmentId,
1219                fragment::Column::DistributionType,
1220            ])
1221            .columns([
1222                actor::Column::ActorId,
1223                actor::Column::VnodeBitmap,
1224                actor::Column::WorkerId,
1225                actor::Column::Status,
1226            ])
1227            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1228            .into_tuple()
1229            .all(&txn)
1230            .await?;
1231
1232        let mut actor_locations = HashMap::new();
1233
1234        for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1235            if *status != ActorStatus::Running {
1236                tracing::warn!(
1237                    "skipping actor {} in fragment {} with status {:?}",
1238                    actor_id,
1239                    fragment_id,
1240                    status
1241                );
1242                continue;
1243            }
1244
1245            actor_locations
1246                .entry(*worker_id)
1247                .or_insert(HashMap::new())
1248                .entry(*fragment_id)
1249                .or_insert(BTreeSet::new())
1250                .insert(*actor_id);
1251        }
1252
1253        let expired_or_changed_workers: HashSet<_> =
1254            plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1255
1256        let mut actor_migration_plan = HashMap::new();
1257        for (worker, fragment) in actor_locations {
1258            if expired_or_changed_workers.contains(&worker) {
1259                for (fragment_id, actors) in fragment {
1260                    debug!(
1261                        "worker {} expired or changed, migrating fragment {}",
1262                        worker, fragment_id
1263                    );
1264                    let worker_slot_to_actor: HashMap<_, _> = actors
1265                        .iter()
1266                        .enumerate()
1267                        .map(|(idx, actor_id)| {
1268                            (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1269                        })
1270                        .collect();
1271
1272                    for (worker_slot, actor) in worker_slot_to_actor {
1273                        if let Some(target) = plan.get(&worker_slot) {
1274                            actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1275                        }
1276                    }
1277                }
1278            }
1279        }
1280
1281        for (actor, worker) in actor_migration_plan {
1282            Actor::update_many()
1283                .col_expr(
1284                    actor::Column::WorkerId,
1285                    Expr::value(Value::Int(Some(worker))),
1286                )
1287                .filter(actor::Column::ActorId.eq(actor))
1288                .exec(&txn)
1289                .await?;
1290        }
1291
1292        txn.commit().await?;
1293
1294        Ok(())
1295    }
1296
1297    pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1298        let inner = self.inner.read().await;
1299
1300        let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1301            .select_only()
1302            .columns([fragment::Column::FragmentId])
1303            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1304            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1305            .into_tuple()
1306            .all(&inner.db)
1307            .await?;
1308
1309        let mut actor_locations = HashMap::new();
1310
1311        for (fragment_id, _, worker_id) in actors {
1312            *actor_locations
1313                .entry(worker_id)
1314                .or_insert(HashMap::new())
1315                .entry(fragment_id)
1316                .or_insert(0_usize) += 1;
1317        }
1318
1319        let mut result = HashSet::new();
1320        for (worker_id, mapping) in actor_locations {
1321            let max_fragment_len = mapping.values().max().unwrap();
1322
1323            result
1324                .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1325        }
1326
1327        Ok(result)
1328    }
1329
1330    pub async fn all_node_actors(
1331        &self,
1332        include_inactive: bool,
1333    ) -> MetaResult<HashMap<WorkerId, Vec<StreamActor>>> {
1334        let inner = self.inner.read().await;
1335        let fragment_actors = if include_inactive {
1336            FragmentModel::find()
1337                .find_with_related(Actor)
1338                .all(&inner.db)
1339                .await?
1340        } else {
1341            FragmentModel::find()
1342                .find_with_related(Actor)
1343                .filter(actor::Column::Status.eq(ActorStatus::Running))
1344                .all(&inner.db)
1345                .await?
1346        };
1347
1348        let job_definitions = resolve_streaming_job_definition(
1349            &inner.db,
1350            &HashSet::from_iter(fragment_actors.iter().map(|(fragment, _)| fragment.job_id)),
1351        )
1352        .await?;
1353
1354        let mut node_actors = HashMap::new();
1355        for (fragment, actors) in fragment_actors {
1356            let job_id = fragment.job_id;
1357            let (table_fragments, actor_status, _) = Self::compose_fragment(
1358                fragment,
1359                actors,
1360                job_definitions.get(&(job_id as _)).cloned(),
1361            )?;
1362            for actor in table_fragments.actors {
1363                let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
1364                node_actors
1365                    .entry(node_id)
1366                    .or_insert_with(Vec::new)
1367                    .push(actor);
1368            }
1369        }
1370
1371        Ok(node_actors)
1372    }
1373
1374    pub async fn get_worker_actor_ids(
1375        &self,
1376        job_ids: Vec<ObjectId>,
1377    ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
1378        let inner = self.inner.read().await;
1379        let actor_workers: Vec<(ActorId, WorkerId)> = Actor::find()
1380            .select_only()
1381            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1382            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1383            .filter(fragment::Column::JobId.is_in(job_ids))
1384            .into_tuple()
1385            .all(&inner.db)
1386            .await?;
1387
1388        let mut worker_actors = BTreeMap::new();
1389        for (actor_id, worker_id) in actor_workers {
1390            worker_actors
1391                .entry(worker_id)
1392                .or_insert_with(Vec::new)
1393                .push(actor_id);
1394        }
1395
1396        Ok(worker_actors)
1397    }
1398
1399    pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1400        let inner = self.inner.read().await;
1401        let txn = inner.db.begin().await?;
1402        for assignments in split_assignment.values() {
1403            for (actor_id, splits) in assignments {
1404                let actor_splits = splits.iter().map(Into::into).collect_vec();
1405                Actor::update(actor::ActiveModel {
1406                    actor_id: Set(*actor_id as _),
1407                    splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1408                        splits: actor_splits,
1409                    }))),
1410                    ..Default::default()
1411                })
1412                .exec(&txn)
1413                .await
1414                .map_err(|err| {
1415                    if err == DbErr::RecordNotUpdated {
1416                        MetaError::catalog_id_not_found("actor_id", actor_id)
1417                    } else {
1418                        err.into()
1419                    }
1420                })?;
1421            }
1422        }
1423        txn.commit().await?;
1424
1425        Ok(())
1426    }
1427
1428    #[await_tree::instrument]
1429    pub async fn fill_snapshot_backfill_epoch(
1430        &self,
1431        fragment_ids: impl Iterator<Item = FragmentId>,
1432        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1433        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1434    ) -> MetaResult<()> {
1435        let inner = self.inner.write().await;
1436        let txn = inner.db.begin().await?;
1437        for fragment_id in fragment_ids {
1438            let fragment = FragmentModel::find_by_id(fragment_id)
1439                .one(&txn)
1440                .await?
1441                .context(format!("fragment {} not found", fragment_id))?;
1442            let mut node = fragment.stream_node.to_protobuf();
1443            if crate::stream::fill_snapshot_backfill_epoch(
1444                &mut node,
1445                snapshot_backfill_info,
1446                cross_db_snapshot_backfill_info,
1447            )? {
1448                let node = StreamNode::from(&node);
1449                FragmentModel::update(fragment::ActiveModel {
1450                    fragment_id: Set(fragment_id),
1451                    stream_node: Set(node),
1452                    ..Default::default()
1453                })
1454                .exec(&txn)
1455                .await?;
1456            }
1457        }
1458        txn.commit().await?;
1459        Ok(())
1460    }
1461
1462    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1463    pub async fn get_running_actors_of_fragment(
1464        &self,
1465        fragment_id: FragmentId,
1466    ) -> MetaResult<Vec<ActorId>> {
1467        let inner = self.inner.read().await;
1468        let actors: Vec<ActorId> = Actor::find()
1469            .select_only()
1470            .column(actor::Column::ActorId)
1471            .filter(actor::Column::FragmentId.eq(fragment_id))
1472            .filter(actor::Column::Status.eq(ActorStatus::Running))
1473            .into_tuple()
1474            .all(&inner.db)
1475            .await?;
1476        Ok(actors)
1477    }
1478
1479    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1480    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1481    pub async fn get_running_actors_for_source_backfill(
1482        &self,
1483        source_backfill_fragment_id: FragmentId,
1484        source_fragment_id: FragmentId,
1485    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1486        let inner = self.inner.read().await;
1487        let txn = inner.db.begin().await?;
1488
1489        let fragment_relation: DispatcherType = FragmentRelation::find()
1490            .select_only()
1491            .column(fragment_relation::Column::DispatcherType)
1492            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1493            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1494            .into_tuple()
1495            .one(&txn)
1496            .await?
1497            .ok_or_else(|| {
1498                anyhow!(
1499                    "no fragment connection from source fragment {} to source backfill fragment {}",
1500                    source_fragment_id,
1501                    source_backfill_fragment_id
1502                )
1503            })?;
1504
1505        if fragment_relation != DispatcherType::NoShuffle {
1506            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1507        }
1508
1509        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1510            let result: MetaResult<DistributionType> = try {
1511                FragmentModel::find_by_id(fragment_id)
1512                    .select_only()
1513                    .column(fragment::Column::DistributionType)
1514                    .into_tuple()
1515                    .one(txn)
1516                    .await?
1517                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1518            };
1519            result
1520        };
1521
1522        let source_backfill_distribution_type =
1523            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1524        let source_distribution_type =
1525            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1526
1527        let load_fragment_actor_distribution = |txn, fragment_id: FragmentId| async move {
1528            Actor::find()
1529                .select_only()
1530                .column(actor::Column::ActorId)
1531                .column(actor::Column::VnodeBitmap)
1532                .filter(actor::Column::FragmentId.eq(fragment_id))
1533                .into_tuple()
1534                .stream(txn)
1535                .await?
1536                .map(|result| {
1537                    result.map(|(actor_id, vnode): (ActorId, Option<VnodeBitmap>)| {
1538                        (
1539                            actor_id as _,
1540                            vnode.map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1541                        )
1542                    })
1543                })
1544                .try_collect()
1545                .await
1546        };
1547
1548        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1549            load_fragment_actor_distribution(&txn, source_backfill_fragment_id).await?;
1550
1551        let source_actors = load_fragment_actor_distribution(&txn, source_fragment_id).await?;
1552
1553        Ok(resolve_no_shuffle_actor_dispatcher(
1554            source_distribution_type,
1555            &source_actors,
1556            source_backfill_distribution_type,
1557            &source_backfill_actors,
1558        )
1559        .into_iter()
1560        .map(|(source_actor, source_backfill_actor)| {
1561            (source_backfill_actor as _, source_actor as _)
1562        })
1563        .collect())
1564    }
1565
1566    pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
1567        let inner = self.inner.read().await;
1568        let actors: Vec<ActorId> = Actor::find()
1569            .select_only()
1570            .column(actor::Column::ActorId)
1571            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1572            .filter(fragment::Column::JobId.is_in(job_ids))
1573            .into_tuple()
1574            .all(&inner.db)
1575            .await?;
1576        Ok(actors)
1577    }
1578
1579    /// Get and filter the "**root**" fragments of the specified jobs.
1580    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1581    ///
1582    /// Root fragment connects to downstream jobs.
1583    ///
1584    /// ## What can be the root fragment
1585    /// - For sink, it should have one `Sink` fragment.
1586    /// - For MV, it should have one `MView` fragment.
1587    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1588    /// - For source, it should have one `Source` fragment.
1589    ///
1590    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1591    pub async fn get_root_fragments(
1592        &self,
1593        job_ids: Vec<ObjectId>,
1594    ) -> MetaResult<(HashMap<ObjectId, Fragment>, Vec<(ActorId, WorkerId)>)> {
1595        let inner = self.inner.read().await;
1596
1597        let job_definitions = resolve_streaming_job_definition(
1598            &inner.db,
1599            &HashSet::from_iter(job_ids.iter().copied()),
1600        )
1601        .await?;
1602
1603        let all_fragments = FragmentModel::find()
1604            .filter(fragment::Column::JobId.is_in(job_ids))
1605            .all(&inner.db)
1606            .await?;
1607        // job_id -> fragment
1608        let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1609        for fragment in all_fragments {
1610            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1611            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1612                _ = root_fragments.insert(fragment.job_id, fragment);
1613            } else if mask.contains(FragmentTypeFlag::Source) {
1614                // look for Source fragment only if there's no MView fragment
1615                // (notice try_insert here vs insert above)
1616                _ = root_fragments.try_insert(fragment.job_id, fragment);
1617            }
1618        }
1619
1620        let mut root_fragments_pb = HashMap::new();
1621        for (_, fragment) in root_fragments {
1622            let actors = fragment.find_related(Actor).all(&inner.db).await?;
1623
1624            let job_id = fragment.job_id;
1625            root_fragments_pb.insert(
1626                fragment.job_id,
1627                Self::compose_fragment(
1628                    fragment,
1629                    actors,
1630                    job_definitions.get(&(job_id as _)).cloned(),
1631                )?
1632                .0,
1633            );
1634        }
1635
1636        let actors: Vec<(ActorId, WorkerId)> = Actor::find()
1637            .select_only()
1638            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1639            .into_tuple()
1640            .all(&inner.db)
1641            .await?;
1642
1643        Ok((root_fragments_pb, actors))
1644    }
1645
1646    pub async fn get_root_fragment(
1647        &self,
1648        job_id: ObjectId,
1649    ) -> MetaResult<(Fragment, Vec<(ActorId, WorkerId)>)> {
1650        let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1651        let root_fragment = root_fragments
1652            .remove(&job_id)
1653            .context(format!("root fragment for job {} not found", job_id))?;
1654        Ok((root_fragment, actors))
1655    }
1656
1657    /// Get the downstream fragments connected to the specified job.
1658    pub async fn get_downstream_fragments(
1659        &self,
1660        job_id: ObjectId,
1661    ) -> MetaResult<(Vec<(PbDispatcherType, Fragment)>, Vec<(ActorId, WorkerId)>)> {
1662        let (root_fragment, actors) = self.get_root_fragment(job_id).await?;
1663
1664        let inner = self.inner.read().await;
1665        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1666            .filter(
1667                fragment_relation::Column::SourceFragmentId
1668                    .eq(root_fragment.fragment_id as FragmentId),
1669            )
1670            .all(&inner.db)
1671            .await?;
1672        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
1673            .await?
1674            .remove(&job_id);
1675
1676        let mut downstream_fragments = vec![];
1677        for fragment_relation::Model {
1678            target_fragment_id: fragment_id,
1679            dispatcher_type,
1680            ..
1681        } in downstream_fragment_relations
1682        {
1683            let mut fragment_actors = FragmentModel::find_by_id(fragment_id)
1684                .find_with_related(Actor)
1685                .all(&inner.db)
1686                .await?;
1687            if fragment_actors.is_empty() {
1688                bail!("No fragment found for fragment id {}", fragment_id);
1689            }
1690            assert_eq!(fragment_actors.len(), 1);
1691            let (fragment, actors) = fragment_actors.pop().unwrap();
1692            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1693            let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0;
1694            downstream_fragments.push((dispatch_type, fragment));
1695        }
1696
1697        Ok((downstream_fragments, actors))
1698    }
1699
1700    pub async fn load_source_fragment_ids(
1701        &self,
1702    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1703        let inner = self.inner.read().await;
1704        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1705            .select_only()
1706            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1707            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1708            .into_tuple()
1709            .all(&inner.db)
1710            .await?;
1711
1712        let mut source_fragment_ids = HashMap::new();
1713        for (fragment_id, stream_node) in fragments {
1714            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1715                source_fragment_ids
1716                    .entry(source_id as SourceId)
1717                    .or_insert_with(BTreeSet::new)
1718                    .insert(fragment_id);
1719            }
1720        }
1721        Ok(source_fragment_ids)
1722    }
1723
1724    pub async fn load_backfill_fragment_ids(
1725        &self,
1726    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1727        let inner = self.inner.read().await;
1728        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1729            .select_only()
1730            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1731            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1732            .into_tuple()
1733            .all(&inner.db)
1734            .await?;
1735
1736        let mut source_fragment_ids = HashMap::new();
1737        for (fragment_id, stream_node) in fragments {
1738            if let Some((source_id, upstream_source_fragment_id)) =
1739                stream_node.to_protobuf().find_source_backfill()
1740            {
1741                source_fragment_ids
1742                    .entry(source_id as SourceId)
1743                    .or_insert_with(BTreeSet::new)
1744                    .insert((fragment_id, upstream_source_fragment_id));
1745            }
1746        }
1747        Ok(source_fragment_ids)
1748    }
1749
1750    pub async fn load_actor_splits(&self) -> MetaResult<HashMap<ActorId, ConnectorSplits>> {
1751        let inner = self.inner.read().await;
1752        let splits: Vec<(ActorId, ConnectorSplits)> = Actor::find()
1753            .select_only()
1754            .columns([actor::Column::ActorId, actor::Column::Splits])
1755            .filter(actor::Column::Splits.is_not_null())
1756            .into_tuple()
1757            .all(&inner.db)
1758            .await?;
1759        Ok(splits.into_iter().collect())
1760    }
1761
1762    /// Get the actor count of `Materialize` or `Sink` fragment of the specified table.
1763    pub async fn get_actual_job_fragment_parallelism(
1764        &self,
1765        job_id: ObjectId,
1766    ) -> MetaResult<Option<usize>> {
1767        let inner = self.inner.read().await;
1768        let fragments: Vec<(FragmentId, i64)> = FragmentModel::find()
1769            .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1770            .select_only()
1771            .columns([fragment::Column::FragmentId])
1772            .column_as(actor::Column::ActorId.count(), "count")
1773            .filter(
1774                fragment::Column::JobId
1775                    .eq(job_id)
1776                    .and(FragmentTypeMask::intersects_any([
1777                        FragmentTypeFlag::Mview,
1778                        FragmentTypeFlag::Sink,
1779                    ])),
1780            )
1781            .group_by(fragment::Column::FragmentId)
1782            .into_tuple()
1783            .all(&inner.db)
1784            .await?;
1785
1786        Ok(fragments
1787            .into_iter()
1788            .at_most_one()
1789            .ok()
1790            .flatten()
1791            .map(|(_, count)| count as usize))
1792    }
1793
1794    pub async fn get_all_upstream_sink_infos(
1795        &self,
1796        target_table: &PbTable,
1797        target_fragment_id: FragmentId,
1798    ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1799        let incoming_sinks = self.get_table_incoming_sinks(target_table.id as _).await?;
1800
1801        let inner = self.inner.read().await;
1802        let txn = inner.db.begin().await?;
1803
1804        let sink_ids = incoming_sinks.iter().map(|s| s.id as SinkId).collect_vec();
1805        let sink_fragment_ids = get_sink_fragment_by_ids(&txn, sink_ids).await?;
1806
1807        let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1808        for pb_sink in &incoming_sinks {
1809            let sink_fragment_id =
1810                sink_fragment_ids
1811                    .get(&(pb_sink.id as _))
1812                    .cloned()
1813                    .ok_or(anyhow::anyhow!(
1814                        "sink fragment not found for sink id {}",
1815                        pb_sink.id
1816                    ))?;
1817            let upstream_info = build_upstream_sink_info(
1818                pb_sink,
1819                sink_fragment_id,
1820                target_table,
1821                target_fragment_id,
1822            )?;
1823            upstream_sink_infos.push(upstream_info);
1824        }
1825
1826        Ok(upstream_sink_infos)
1827    }
1828
1829    pub async fn get_mview_fragment_by_id(&self, table_id: TableId) -> MetaResult<FragmentId> {
1830        let inner = self.inner.read().await;
1831        let txn = inner.db.begin().await?;
1832
1833        let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1834            .select_only()
1835            .column(fragment::Column::FragmentId)
1836            .filter(
1837                fragment::Column::JobId
1838                    .eq(table_id)
1839                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1840            )
1841            .into_tuple()
1842            .all(&txn)
1843            .await?;
1844
1845        if mview_fragment.len() != 1 {
1846            return Err(anyhow::anyhow!(
1847                "expected exactly one mview fragment for table {}, found {}",
1848                table_id,
1849                mview_fragment.len()
1850            )
1851            .into());
1852        }
1853
1854        Ok(mview_fragment.into_iter().next().unwrap())
1855    }
1856
1857    pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1858        let inner = self.inner.read().await;
1859        let txn = inner.db.begin().await?;
1860        has_table_been_migrated(&txn, table_id).await
1861    }
1862
1863    pub async fn update_source_splits(
1864        &self,
1865        source_splits: &HashMap<SourceId, Vec<SplitImpl>>,
1866    ) -> MetaResult<()> {
1867        let inner = self.inner.read().await;
1868        let txn = inner.db.begin().await?;
1869
1870        let models: Vec<source_splits::ActiveModel> = source_splits
1871            .iter()
1872            .map(|(source_id, splits)| source_splits::ActiveModel {
1873                source_id: Set(*source_id as _),
1874                splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1875                    splits: splits.iter().map(Into::into).collect_vec(),
1876                }))),
1877            })
1878            .collect();
1879
1880        SourceSplits::insert_many(models)
1881            .on_conflict(
1882                OnConflict::column(source_splits::Column::SourceId)
1883                    .update_column(source_splits::Column::Splits)
1884                    .to_owned(),
1885            )
1886            .exec(&txn)
1887            .await?;
1888
1889        txn.commit().await?;
1890
1891        Ok(())
1892    }
1893}
1894
1895#[cfg(test)]
1896mod tests {
1897    use std::collections::{BTreeMap, HashMap, HashSet};
1898
1899    use itertools::Itertools;
1900    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1901    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1902    use risingwave_common::util::iter_util::ZipEqDebug;
1903    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1904    use risingwave_meta_model::actor::ActorStatus;
1905    use risingwave_meta_model::fragment::DistributionType;
1906    use risingwave_meta_model::{
1907        ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1908        VnodeBitmap, actor, fragment,
1909    };
1910    use risingwave_pb::common::PbActorLocation;
1911    use risingwave_pb::meta::table_fragments::PbActorStatus;
1912    use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1913    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1914    use risingwave_pb::plan_common::PbExprContext;
1915    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1916    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1917    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1918
1919    use crate::MetaResult;
1920    use crate::controller::catalog::CatalogController;
1921    use crate::model::{Fragment, StreamActor};
1922
1923    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1924
1925    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1926
1927    const TEST_FRAGMENT_ID: FragmentId = 1;
1928
1929    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1930
1931    const TEST_JOB_ID: ObjectId = 1;
1932
1933    const TEST_STATE_TABLE_ID: TableId = 1000;
1934
1935    fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1936        let mut upstream_actor_ids = BTreeMap::new();
1937        upstream_actor_ids.insert(
1938            TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1939            HashSet::from_iter([(actor_id + 100)]),
1940        );
1941        upstream_actor_ids.insert(
1942            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1943            HashSet::from_iter([(actor_id + 200)]),
1944        );
1945        upstream_actor_ids
1946    }
1947
1948    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1949        let mut input = vec![];
1950        for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1951            input.push(PbStreamNode {
1952                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1953                    upstream_fragment_id: *upstream_fragment_id as _,
1954                    ..Default::default()
1955                }))),
1956                ..Default::default()
1957            });
1958        }
1959
1960        PbStreamNode {
1961            input,
1962            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1963            ..Default::default()
1964        }
1965    }
1966
1967    #[tokio::test]
1968    async fn test_extract_fragment() -> MetaResult<()> {
1969        let actor_count = 3u32;
1970        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1971            .map(|actor_id| {
1972                (
1973                    actor_id as _,
1974                    generate_upstream_actor_ids_for_actor(actor_id),
1975                )
1976            })
1977            .collect();
1978
1979        let actor_bitmaps = ActorMapping::new_uniform(
1980            (0..actor_count).map(|i| i as _),
1981            VirtualNode::COUNT_FOR_TEST,
1982        )
1983        .to_bitmaps();
1984
1985        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1986
1987        let pb_actors = (0..actor_count)
1988            .map(|actor_id| StreamActor {
1989                actor_id: actor_id as _,
1990                fragment_id: TEST_FRAGMENT_ID as _,
1991                vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1992                mview_definition: "".to_owned(),
1993                expr_context: Some(PbExprContext {
1994                    time_zone: String::from("America/New_York"),
1995                    strict_mode: false,
1996                }),
1997            })
1998            .collect_vec();
1999
2000        let pb_fragment = Fragment {
2001            fragment_id: TEST_FRAGMENT_ID as _,
2002            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
2003            distribution_type: PbFragmentDistributionType::Hash as _,
2004            actors: pb_actors.clone(),
2005            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
2006            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
2007            nodes: stream_node.clone(),
2008        };
2009
2010        let pb_actor_status = (0..actor_count)
2011            .map(|actor_id| {
2012                (
2013                    actor_id,
2014                    PbActorStatus {
2015                        location: PbActorLocation::from_worker(0),
2016                        state: PbActorState::Running as _,
2017                    },
2018                )
2019            })
2020            .collect();
2021
2022        let pb_actor_splits = Default::default();
2023
2024        let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
2025            TEST_JOB_ID,
2026            &pb_fragment,
2027            &pb_actor_status,
2028            &pb_actor_splits,
2029        )?;
2030
2031        check_fragment(fragment, pb_fragment);
2032        check_actors(
2033            actors,
2034            &upstream_actor_ids,
2035            pb_actors,
2036            Default::default(),
2037            &stream_node,
2038        );
2039
2040        Ok(())
2041    }
2042
2043    #[tokio::test]
2044    async fn test_compose_fragment() -> MetaResult<()> {
2045        let actor_count = 3u32;
2046
2047        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2048            .map(|actor_id| {
2049                (
2050                    actor_id as _,
2051                    generate_upstream_actor_ids_for_actor(actor_id),
2052                )
2053            })
2054            .collect();
2055
2056        let mut actor_bitmaps = ActorMapping::new_uniform(
2057            (0..actor_count).map(|i| i as _),
2058            VirtualNode::COUNT_FOR_TEST,
2059        )
2060        .to_bitmaps();
2061
2062        let actors = (0..actor_count)
2063            .map(|actor_id| {
2064                let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
2065                    splits: vec![PbConnectorSplit {
2066                        split_type: "dummy".to_owned(),
2067                        ..Default::default()
2068                    }],
2069                }));
2070
2071                #[expect(deprecated)]
2072                actor::Model {
2073                    actor_id: actor_id as ActorId,
2074                    fragment_id: TEST_FRAGMENT_ID,
2075                    status: ActorStatus::Running,
2076                    splits: actor_splits,
2077                    worker_id: 0,
2078                    upstream_actor_ids: Default::default(),
2079                    vnode_bitmap: actor_bitmaps
2080                        .remove(&actor_id)
2081                        .map(|bitmap| bitmap.to_protobuf())
2082                        .as_ref()
2083                        .map(VnodeBitmap::from),
2084                    expr_context: ExprContext::from(&PbExprContext {
2085                        time_zone: String::from("America/New_York"),
2086                        strict_mode: false,
2087                    }),
2088                }
2089            })
2090            .collect_vec();
2091
2092        let stream_node = {
2093            let template_actor = actors.first().cloned().unwrap();
2094
2095            let template_upstream_actor_ids = upstream_actor_ids
2096                .get(&(template_actor.actor_id as _))
2097                .unwrap();
2098
2099            generate_merger_stream_node(template_upstream_actor_ids)
2100        };
2101
2102        #[expect(deprecated)]
2103        let fragment = fragment::Model {
2104            fragment_id: TEST_FRAGMENT_ID,
2105            job_id: TEST_JOB_ID,
2106            fragment_type_mask: 0,
2107            distribution_type: DistributionType::Hash,
2108            stream_node: StreamNode::from(&stream_node),
2109            state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
2110            upstream_fragment_id: Default::default(),
2111            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2112        };
2113
2114        let (pb_fragment, pb_actor_status, pb_actor_splits) =
2115            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2116
2117        assert_eq!(pb_actor_status.len(), actor_count as usize);
2118        assert_eq!(pb_actor_splits.len(), actor_count as usize);
2119
2120        let pb_actors = pb_fragment.actors.clone();
2121
2122        check_fragment(fragment, pb_fragment);
2123        check_actors(
2124            actors,
2125            &upstream_actor_ids,
2126            pb_actors,
2127            pb_actor_splits,
2128            &stream_node,
2129        );
2130
2131        Ok(())
2132    }
2133
2134    fn check_actors(
2135        actors: Vec<actor::Model>,
2136        actor_upstreams: &FragmentActorUpstreams,
2137        pb_actors: Vec<StreamActor>,
2138        pb_actor_splits: HashMap<u32, PbConnectorSplits>,
2139        stream_node: &PbStreamNode,
2140    ) {
2141        for (
2142            actor::Model {
2143                actor_id,
2144                fragment_id,
2145                status,
2146                splits,
2147                worker_id: _,
2148                vnode_bitmap,
2149                expr_context,
2150                ..
2151            },
2152            StreamActor {
2153                actor_id: pb_actor_id,
2154                fragment_id: pb_fragment_id,
2155                vnode_bitmap: pb_vnode_bitmap,
2156                mview_definition,
2157                expr_context: pb_expr_context,
2158                ..
2159            },
2160        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2161        {
2162            assert_eq!(actor_id, pb_actor_id as ActorId);
2163            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2164
2165            assert_eq!(
2166                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2167                pb_vnode_bitmap,
2168            );
2169
2170            assert_eq!(mview_definition, "");
2171
2172            visit_stream_node_body(stream_node, |body| {
2173                if let PbNodeBody::Merge(m) = body {
2174                    assert!(
2175                        actor_upstreams
2176                            .get(&(actor_id as _))
2177                            .unwrap()
2178                            .contains_key(&m.upstream_fragment_id)
2179                    );
2180                }
2181            });
2182
2183            assert_eq!(status, ActorStatus::Running);
2184
2185            assert_eq!(
2186                splits,
2187                pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
2188            );
2189
2190            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2191        }
2192    }
2193
2194    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2195        let Fragment {
2196            fragment_id,
2197            fragment_type_mask,
2198            distribution_type: pb_distribution_type,
2199            actors: _,
2200            state_table_ids: pb_state_table_ids,
2201            maybe_vnode_count: _,
2202            nodes,
2203        } = pb_fragment;
2204
2205        assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2206        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2207        assert_eq!(
2208            pb_distribution_type,
2209            PbFragmentDistributionType::from(fragment.distribution_type)
2210        );
2211
2212        assert_eq!(
2213            pb_state_table_ids,
2214            fragment.state_table_ids.into_u32_array()
2215        );
2216        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2217    }
2218}