risingwave_meta/controller/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
26use risingwave_common::util::stream_graph_visitor::{
27    visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_connector::source::SplitImpl;
30use risingwave_meta_model::actor::ActorStatus;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34    Actor, Fragment as FragmentModel, FragmentRelation, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39    VnodeBitmap, WorkerId, actor, database, fragment, fragment_relation, object, sink, source,
40    streaming_job, table,
41};
42use risingwave_meta_model_migration::{Alias, ExprTrait, SelectStatement, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46    Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
49use risingwave_pb::meta::table_fragments::fragment::{
50    FragmentDistributionType, PbFragmentDistributionType,
51};
52use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
53use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping};
54use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57    PbDispatchOutputMapping, PbDispatcherType, PbStreamContext, PbStreamNode, PbStreamScanType,
58    StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63    ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, ModelTrait, PaginatorTrait,
64    QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
65};
66use serde::{Deserialize, Serialize};
67use tracing::debug;
68
69use crate::barrier::SnapshotBackfillInfo;
70use crate::controller::catalog::{CatalogController, CatalogControllerInner};
71use crate::controller::scale::resolve_streaming_job_definition;
72use crate::controller::utils::{
73    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
74    get_fragment_mappings, get_sink_fragment_by_ids, has_table_been_migrated,
75    resolve_no_shuffle_actor_dispatcher,
76};
77use crate::manager::{LocalNotification, NotificationManager};
78use crate::model::{
79    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
80    StreamActor, StreamContext, StreamJobFragments, TableParallelism,
81};
82use crate::rpc::ddl_controller::build_upstream_sink_info;
83use crate::stream::{SplitAssignment, UpstreamSinkInfo, build_actor_split_impls};
84use crate::{MetaError, MetaResult};
85
86/// Some information of running (inflight) actors.
87#[derive(Clone, Debug)]
88pub struct InflightActorInfo {
89    pub worker_id: WorkerId,
90    pub vnode_bitmap: Option<Bitmap>,
91}
92
93#[derive(Clone, Debug)]
94pub struct InflightFragmentInfo {
95    pub fragment_id: crate::model::FragmentId,
96    pub distribution_type: DistributionType,
97    pub nodes: PbStreamNode,
98    pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
99    pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
100}
101
102#[derive(Clone, Debug)]
103pub struct FragmentParallelismInfo {
104    pub distribution_type: FragmentDistributionType,
105    pub actor_count: usize,
106    pub vnode_count: usize,
107}
108
109pub(crate) trait FragmentTypeMaskExt {
110    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr;
111    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr;
112}
113
114impl FragmentTypeMaskExt for FragmentTypeMask {
115    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
116        Expr::col(fragment::Column::FragmentTypeMask)
117            .bit_and(Expr::value(flag as i32))
118            .ne(0)
119    }
120
121    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
122        Expr::col(fragment::Column::FragmentTypeMask)
123            .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
124            .ne(0)
125    }
126}
127
128#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
129#[serde(rename_all = "camelCase")] // for dashboard
130pub struct StreamingJobInfo {
131    pub job_id: ObjectId,
132    pub obj_type: ObjectType,
133    pub name: String,
134    pub job_status: JobStatus,
135    pub parallelism: StreamingParallelism,
136    pub max_parallelism: i32,
137    pub resource_group: String,
138    pub database_id: DatabaseId,
139    pub schema_id: SchemaId,
140}
141
142impl CatalogControllerInner {
143    /// List all fragment vnode mapping info for all CREATED streaming jobs.
144    pub async fn all_running_fragment_mappings(
145        &self,
146    ) -> MetaResult<impl Iterator<Item = FragmentWorkerSlotMapping> + '_> {
147        let txn = self.db.begin().await?;
148
149        let job_ids: Vec<ObjectId> = StreamingJob::find()
150            .select_only()
151            .column(streaming_job::Column::JobId)
152            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
153            .into_tuple()
154            .all(&txn)
155            .await?;
156
157        let mut result = vec![];
158        for job_id in job_ids {
159            let mappings = get_fragment_mappings(&txn, job_id).await?;
160
161            result.extend(mappings.into_iter());
162        }
163
164        Ok(result.into_iter())
165    }
166}
167
168impl NotificationManager {
169    pub(crate) fn notify_fragment_mapping(
170        &self,
171        operation: NotificationOperation,
172        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
173    ) {
174        let fragment_ids = fragment_mappings
175            .iter()
176            .map(|mapping| mapping.fragment_id)
177            .collect_vec();
178        if fragment_ids.is_empty() {
179            return;
180        }
181        // notify all fragment mappings to frontend.
182        for fragment_mapping in fragment_mappings {
183            self.notify_frontend_without_version(
184                operation,
185                NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
186            );
187        }
188
189        // update serving vnode mappings.
190        match operation {
191            NotificationOperation::Add | NotificationOperation::Update => {
192                self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
193                    fragment_ids,
194                ));
195            }
196            NotificationOperation::Delete => {
197                self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
198                    fragment_ids,
199                ));
200            }
201            op => {
202                tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
203            }
204        }
205    }
206}
207
208impl CatalogController {
209    pub fn extract_fragment_and_actors_from_fragments(
210        job_id: ObjectId,
211        fragments: impl Iterator<Item = &Fragment>,
212        actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
213        actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
214    ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
215        fragments
216            .map(|fragment| {
217                Self::extract_fragment_and_actors_for_new_job(
218                    job_id,
219                    fragment,
220                    actor_status,
221                    actor_splits,
222                )
223            })
224            .try_collect()
225    }
226
227    pub fn extract_fragment_and_actors_for_new_job(
228        job_id: ObjectId,
229        fragment: &Fragment,
230        actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
231        actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
232    ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
233        let vnode_count = fragment.vnode_count();
234        let Fragment {
235            fragment_id: pb_fragment_id,
236            fragment_type_mask: pb_fragment_type_mask,
237            distribution_type: pb_distribution_type,
238            actors: pb_actors,
239            state_table_ids: pb_state_table_ids,
240            nodes,
241            ..
242        } = fragment;
243
244        let state_table_ids = pb_state_table_ids.clone().into();
245
246        assert!(!pb_actors.is_empty());
247
248        let stream_node = {
249            let mut stream_node = nodes.clone();
250            visit_stream_node_mut(&mut stream_node, |body| {
251                #[expect(deprecated)]
252                if let NodeBody::Merge(m) = body {
253                    m.upstream_actor_id = vec![];
254                }
255            });
256
257            stream_node
258        };
259
260        let mut actors = vec![];
261
262        for actor in pb_actors {
263            let StreamActor {
264                actor_id,
265                fragment_id,
266                vnode_bitmap,
267                mview_definition: _,
268                expr_context: pb_expr_context,
269                ..
270            } = actor;
271
272            let splits = actor_splits.get(actor_id).map(|splits| {
273                ConnectorSplits::from(&PbConnectorSplits {
274                    splits: splits.iter().map(ConnectorSplit::from).collect(),
275                })
276            });
277            let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
278                anyhow::anyhow!(
279                    "actor {} in fragment {} has no actor_status",
280                    actor_id,
281                    fragment_id
282                )
283            })?;
284
285            let worker_id = status.worker_id() as _;
286
287            let pb_expr_context = pb_expr_context
288                .as_ref()
289                .expect("no expression context found");
290
291            #[expect(deprecated)]
292            actors.push(actor::Model {
293                actor_id: *actor_id as _,
294                fragment_id: *fragment_id as _,
295                status: status.get_state().unwrap().into(),
296                splits,
297                worker_id,
298                upstream_actor_ids: Default::default(),
299                vnode_bitmap: vnode_bitmap
300                    .as_ref()
301                    .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
302                expr_context: ExprContext::from(pb_expr_context),
303            });
304        }
305
306        let stream_node = StreamNode::from(&stream_node);
307
308        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
309            .unwrap()
310            .into();
311
312        #[expect(deprecated)]
313        let fragment = fragment::Model {
314            fragment_id: *pb_fragment_id as _,
315            job_id,
316            fragment_type_mask: (*pb_fragment_type_mask).into(),
317            distribution_type,
318            stream_node,
319            state_table_ids,
320            upstream_fragment_id: Default::default(),
321            vnode_count: vnode_count as _,
322        };
323
324        Ok((fragment, actors))
325    }
326
327    pub fn compose_table_fragments(
328        table_id: u32,
329        state: PbState,
330        ctx: Option<PbStreamContext>,
331        fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
332        parallelism: StreamingParallelism,
333        max_parallelism: usize,
334        job_definition: Option<String>,
335    ) -> MetaResult<StreamJobFragments> {
336        let mut pb_fragments = BTreeMap::new();
337        let mut pb_actor_splits = HashMap::new();
338        let mut pb_actor_status = BTreeMap::new();
339
340        for (fragment, actors) in fragments {
341            let (fragment, fragment_actor_status, fragment_actor_splits) =
342                Self::compose_fragment(fragment, actors, job_definition.clone())?;
343
344            pb_fragments.insert(fragment.fragment_id, fragment);
345
346            pb_actor_splits.extend(build_actor_split_impls(&fragment_actor_splits));
347            pb_actor_status.extend(fragment_actor_status.into_iter());
348        }
349
350        let table_fragments = StreamJobFragments {
351            stream_job_id: table_id.into(),
352            state: state as _,
353            fragments: pb_fragments,
354            actor_status: pb_actor_status,
355            actor_splits: pb_actor_splits,
356            ctx: ctx
357                .as_ref()
358                .map(StreamContext::from_protobuf)
359                .unwrap_or_default(),
360            assigned_parallelism: match parallelism {
361                StreamingParallelism::Custom => TableParallelism::Custom,
362                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
363                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
364            },
365            max_parallelism,
366        };
367
368        Ok(table_fragments)
369    }
370
371    #[allow(clippy::type_complexity)]
372    pub(crate) fn compose_fragment(
373        fragment: fragment::Model,
374        actors: Vec<actor::Model>,
375        job_definition: Option<String>,
376    ) -> MetaResult<(
377        Fragment,
378        HashMap<crate::model::ActorId, PbActorStatus>,
379        HashMap<crate::model::ActorId, PbConnectorSplits>,
380    )> {
381        let fragment::Model {
382            fragment_id,
383            fragment_type_mask,
384            distribution_type,
385            stream_node,
386            state_table_ids,
387            vnode_count,
388            ..
389        } = fragment;
390
391        let stream_node = stream_node.to_protobuf();
392        let mut upstream_fragments = HashSet::new();
393        visit_stream_node_body(&stream_node, |body| {
394            if let NodeBody::Merge(m) = body {
395                assert!(
396                    upstream_fragments.insert(m.upstream_fragment_id),
397                    "non-duplicate upstream fragment"
398                );
399            }
400        });
401
402        let mut pb_actors = vec![];
403
404        let mut pb_actor_status = HashMap::new();
405        let mut pb_actor_splits = HashMap::new();
406
407        for actor in actors {
408            if actor.fragment_id != fragment_id {
409                bail!(
410                    "fragment id {} from actor {} is different from fragment {}",
411                    actor.fragment_id,
412                    actor.actor_id,
413                    fragment_id
414                )
415            }
416
417            let actor::Model {
418                actor_id,
419                fragment_id,
420                status,
421                worker_id,
422                splits,
423                vnode_bitmap,
424                expr_context,
425                ..
426            } = actor;
427
428            let vnode_bitmap =
429                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
430            let pb_expr_context = Some(expr_context.to_protobuf());
431
432            pb_actor_status.insert(
433                actor_id as _,
434                PbActorStatus {
435                    location: PbActorLocation::from_worker(worker_id as u32),
436                    state: PbActorState::from(status) as _,
437                },
438            );
439
440            if let Some(splits) = splits {
441                pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
442            }
443
444            pb_actors.push(StreamActor {
445                actor_id: actor_id as _,
446                fragment_id: fragment_id as _,
447                vnode_bitmap,
448                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
449                expr_context: pb_expr_context,
450            })
451        }
452
453        let pb_state_table_ids = state_table_ids.into_u32_array();
454        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
455        let pb_fragment = Fragment {
456            fragment_id: fragment_id as _,
457            fragment_type_mask: fragment_type_mask.into(),
458            distribution_type: pb_distribution_type,
459            actors: pb_actors,
460            state_table_ids: pb_state_table_ids,
461            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
462            nodes: stream_node,
463        };
464
465        Ok((pb_fragment, pb_actor_status, pb_actor_splits))
466    }
467
468    pub async fn running_fragment_parallelisms(
469        &self,
470        id_filter: Option<HashSet<FragmentId>>,
471    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
472        let inner = self.inner.read().await;
473
474        let query_alias = Alias::new("fragment_actor_count");
475        let count_alias = Alias::new("count");
476
477        let mut query = SelectStatement::new()
478            .column(actor::Column::FragmentId)
479            .expr_as(actor::Column::ActorId.count(), count_alias.clone())
480            .from(Actor)
481            .group_by_col(actor::Column::FragmentId)
482            .to_owned();
483
484        if let Some(id_filter) = id_filter {
485            query.cond_having(actor::Column::FragmentId.is_in(id_filter));
486        }
487
488        let outer = SelectStatement::new()
489            .column((FragmentModel, fragment::Column::FragmentId))
490            .column(count_alias)
491            .column(fragment::Column::DistributionType)
492            .column(fragment::Column::VnodeCount)
493            .from_subquery(query.to_owned(), query_alias.clone())
494            .inner_join(
495                FragmentModel,
496                Expr::col((query_alias, actor::Column::FragmentId))
497                    .equals((FragmentModel, fragment::Column::FragmentId)),
498            )
499            .to_owned();
500
501        let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
502            Selector::<SelectGetableTuple<(FragmentId, i64, DistributionType, i32)>>::into_tuple(
503                outer.to_owned(),
504            )
505            .all(&inner.db)
506            .await?;
507
508        Ok(fragment_parallelisms
509            .into_iter()
510            .map(|(fragment_id, count, distribution_type, vnode_count)| {
511                (
512                    fragment_id,
513                    FragmentParallelismInfo {
514                        distribution_type: distribution_type.into(),
515                        actor_count: count as usize,
516                        vnode_count: vnode_count as usize,
517                    },
518                )
519            })
520            .collect())
521    }
522
523    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
524        let inner = self.inner.read().await;
525        let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
526            .select_only()
527            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
528            .into_tuple()
529            .all(&inner.db)
530            .await?;
531        Ok(fragment_jobs.into_iter().collect())
532    }
533
534    pub async fn get_fragment_job_id(
535        &self,
536        fragment_ids: Vec<FragmentId>,
537    ) -> MetaResult<Vec<ObjectId>> {
538        let inner = self.inner.read().await;
539
540        let object_ids: Vec<ObjectId> = FragmentModel::find()
541            .select_only()
542            .column(fragment::Column::JobId)
543            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
544            .into_tuple()
545            .all(&inner.db)
546            .await?;
547
548        Ok(object_ids)
549    }
550
551    pub async fn get_fragment_desc_by_id(
552        &self,
553        fragment_id: FragmentId,
554    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
555        let inner = self.inner.read().await;
556
557        // Get the fragment description
558        let fragment_opt = FragmentModel::find()
559            .select_only()
560            .columns([
561                fragment::Column::FragmentId,
562                fragment::Column::JobId,
563                fragment::Column::FragmentTypeMask,
564                fragment::Column::DistributionType,
565                fragment::Column::StateTableIds,
566                fragment::Column::VnodeCount,
567                fragment::Column::StreamNode,
568            ])
569            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
570            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
571            .filter(fragment::Column::FragmentId.eq(fragment_id))
572            .group_by(fragment::Column::FragmentId)
573            .into_model::<FragmentDesc>()
574            .one(&inner.db)
575            .await?;
576
577        let Some(fragment) = fragment_opt else {
578            return Ok(None); // Fragment not found
579        };
580
581        // Get upstream fragments
582        let upstreams: Vec<FragmentId> = FragmentRelation::find()
583            .select_only()
584            .column(fragment_relation::Column::SourceFragmentId)
585            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
586            .into_tuple()
587            .all(&inner.db)
588            .await?;
589
590        Ok(Some((fragment, upstreams)))
591    }
592
593    pub async fn list_fragment_database_ids(
594        &self,
595        select_fragment_ids: Option<Vec<FragmentId>>,
596    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
597        let inner = self.inner.read().await;
598        let select = FragmentModel::find()
599            .select_only()
600            .column(fragment::Column::FragmentId)
601            .column(object::Column::DatabaseId)
602            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
603        let select = if let Some(select_fragment_ids) = select_fragment_ids {
604            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
605        } else {
606            select
607        };
608        Ok(select.into_tuple().all(&inner.db).await?)
609    }
610
611    pub async fn get_job_fragments_by_id(
612        &self,
613        job_id: ObjectId,
614    ) -> MetaResult<StreamJobFragments> {
615        let inner = self.inner.read().await;
616        let fragment_actors = FragmentModel::find()
617            .find_with_related(Actor)
618            .filter(fragment::Column::JobId.eq(job_id))
619            .all(&inner.db)
620            .await?;
621
622        let job_info = StreamingJob::find_by_id(job_id)
623            .one(&inner.db)
624            .await?
625            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
626
627        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
628            .await?
629            .remove(&job_id);
630
631        Self::compose_table_fragments(
632            job_id as _,
633            job_info.job_status.into(),
634            job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
635            fragment_actors,
636            job_info.parallelism.clone(),
637            job_info.max_parallelism as _,
638            job_definition,
639        )
640    }
641
642    pub async fn get_fragment_actor_dispatchers(
643        &self,
644        fragment_ids: Vec<FragmentId>,
645    ) -> MetaResult<FragmentActorDispatchers> {
646        let inner = self.inner.read().await;
647        get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
648    }
649
650    pub async fn get_fragment_downstream_relations(
651        &self,
652        fragment_ids: Vec<FragmentId>,
653    ) -> MetaResult<FragmentDownstreamRelation> {
654        let inner = self.inner.read().await;
655        let mut stream = FragmentRelation::find()
656            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
657            .stream(&inner.db)
658            .await?;
659        let mut relations = FragmentDownstreamRelation::new();
660        while let Some(relation) = stream.try_next().await? {
661            relations
662                .entry(relation.source_fragment_id as _)
663                .or_default()
664                .push(DownstreamFragmentRelation {
665                    downstream_fragment_id: relation.target_fragment_id as _,
666                    dispatcher_type: relation.dispatcher_type,
667                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
668                    output_mapping: PbDispatchOutputMapping {
669                        indices: relation.output_indices.into_u32_array(),
670                        types: relation
671                            .output_type_mapping
672                            .unwrap_or_default()
673                            .to_protobuf(),
674                    },
675                });
676        }
677        Ok(relations)
678    }
679
680    pub async fn get_job_fragment_backfill_scan_type(
681        &self,
682        job_id: ObjectId,
683    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
684        let inner = self.inner.read().await;
685        let fragments: Vec<_> = FragmentModel::find()
686            .filter(fragment::Column::JobId.eq(job_id))
687            .all(&inner.db)
688            .await?;
689
690        let mut result = HashMap::new();
691
692        for fragment::Model {
693            fragment_id,
694            stream_node,
695            ..
696        } in fragments
697        {
698            let stream_node = stream_node.to_protobuf();
699            visit_stream_node_body(&stream_node, |body| {
700                if let NodeBody::StreamScan(node) = body {
701                    match node.stream_scan_type() {
702                        StreamScanType::Unspecified => {}
703                        scan_type => {
704                            result.insert(fragment_id as crate::model::FragmentId, scan_type);
705                        }
706                    }
707                }
708            });
709        }
710
711        Ok(result)
712    }
713
714    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
715        let inner = self.inner.read().await;
716        let job_states = StreamingJob::find()
717            .select_only()
718            .column(streaming_job::Column::JobId)
719            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
720            .join(JoinType::InnerJoin, object::Relation::Database2.def())
721            .column(object::Column::ObjType)
722            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
723            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
724            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
725            .column_as(
726                Expr::if_null(
727                    Expr::col((table::Entity, table::Column::Name)),
728                    Expr::if_null(
729                        Expr::col((source::Entity, source::Column::Name)),
730                        Expr::if_null(
731                            Expr::col((sink::Entity, sink::Column::Name)),
732                            Expr::val("<unknown>"),
733                        ),
734                    ),
735                ),
736                "name",
737            )
738            .columns([
739                streaming_job::Column::JobStatus,
740                streaming_job::Column::Parallelism,
741                streaming_job::Column::MaxParallelism,
742            ])
743            .column_as(
744                Expr::if_null(
745                    Expr::col((
746                        streaming_job::Entity,
747                        streaming_job::Column::SpecificResourceGroup,
748                    )),
749                    Expr::col((database::Entity, database::Column::ResourceGroup)),
750                ),
751                "resource_group",
752            )
753            .column(object::Column::DatabaseId)
754            .column(object::Column::SchemaId)
755            .into_model()
756            .all(&inner.db)
757            .await?;
758        Ok(job_states)
759    }
760
761    pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
762        let inner = self.inner.read().await;
763        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
764            .select_only()
765            .column(streaming_job::Column::MaxParallelism)
766            .into_tuple()
767            .one(&inner.db)
768            .await?
769            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
770        Ok(max_parallelism as usize)
771    }
772
773    /// Get all actor ids in the target streaming jobs.
774    pub async fn get_job_actor_mapping(
775        &self,
776        job_ids: Vec<ObjectId>,
777    ) -> MetaResult<HashMap<ObjectId, Vec<ActorId>>> {
778        let inner = self.inner.read().await;
779        let job_actors: Vec<(ObjectId, ActorId)> = Actor::find()
780            .select_only()
781            .column(fragment::Column::JobId)
782            .column(actor::Column::ActorId)
783            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
784            .filter(fragment::Column::JobId.is_in(job_ids))
785            .into_tuple()
786            .all(&inner.db)
787            .await?;
788        Ok(job_actors.into_iter().into_group_map())
789    }
790
791    /// Try to get internal table ids of each streaming job, used by metrics collection.
792    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
793        if let Ok(inner) = self.inner.try_read()
794            && let Ok(job_state_tables) = FragmentModel::find()
795                .select_only()
796                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
797                .into_tuple::<(ObjectId, I32Array)>()
798                .all(&inner.db)
799                .await
800        {
801            let mut job_internal_table_ids = HashMap::new();
802            for (job_id, state_table_ids) in job_state_tables {
803                job_internal_table_ids
804                    .entry(job_id)
805                    .or_insert_with(Vec::new)
806                    .extend(state_table_ids.into_inner());
807            }
808            return Some(job_internal_table_ids.into_iter().collect());
809        }
810        None
811    }
812
813    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
814        let inner = self.inner.read().await;
815        let count = FragmentModel::find().count(&inner.db).await?;
816        Ok(count > 0)
817    }
818
819    pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
820        let inner = self.inner.read().await;
821        let actor_cnt: Vec<(WorkerId, i64)> = Actor::find()
822            .select_only()
823            .column(actor::Column::WorkerId)
824            .column_as(actor::Column::ActorId.count(), "count")
825            .group_by(actor::Column::WorkerId)
826            .into_tuple()
827            .all(&inner.db)
828            .await?;
829
830        Ok(actor_cnt
831            .into_iter()
832            .map(|(worker_id, count)| (worker_id, count as usize))
833            .collect())
834    }
835
836    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
837    pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
838        let inner = self.inner.read().await;
839        let jobs = StreamingJob::find().all(&inner.db).await?;
840
841        let mut job_definition = resolve_streaming_job_definition(
842            &inner.db,
843            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
844        )
845        .await?;
846
847        let mut table_fragments = BTreeMap::new();
848        for job in jobs {
849            let fragment_actors = FragmentModel::find()
850                .find_with_related(Actor)
851                .filter(fragment::Column::JobId.eq(job.job_id))
852                .all(&inner.db)
853                .await?;
854
855            table_fragments.insert(
856                job.job_id as ObjectId,
857                Self::compose_table_fragments(
858                    job.job_id as _,
859                    job.job_status.into(),
860                    job.timezone.map(|tz| PbStreamContext { timezone: tz }),
861                    fragment_actors,
862                    job.parallelism.clone(),
863                    job.max_parallelism as _,
864                    job_definition.remove(&job.job_id),
865                )?,
866            );
867        }
868
869        Ok(table_fragments)
870    }
871
872    /// Returns pairs of (job id, actor ids), where actors belong to CDC table backfill fragment of the job.
873    pub async fn cdc_table_backfill_actor_ids(&self) -> MetaResult<HashMap<u32, HashSet<u32>>> {
874        let inner = self.inner.read().await;
875        let mut job_id_actor_ids = HashMap::default();
876        let stream_cdc_scan_flag = FragmentTypeFlag::StreamCdcScan as i32;
877        let fragment_type_mask = stream_cdc_scan_flag;
878        let fragment_actors: Vec<(fragment::Model, Vec<actor::Model>)> = FragmentModel::find()
879            .find_with_related(Actor)
880            .filter(fragment::Column::FragmentTypeMask.eq(fragment_type_mask))
881            .all(&inner.db)
882            .await?;
883        for (fragment, actors) in fragment_actors {
884            let e: &mut HashSet<u32> = job_id_actor_ids.entry(fragment.job_id as _).or_default();
885            e.extend(actors.iter().map(|a| a.actor_id as u32));
886        }
887        Ok(job_id_actor_ids)
888    }
889
890    pub async fn upstream_fragments(
891        &self,
892        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
893    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
894        let inner = self.inner.read().await;
895        let mut stream = FragmentRelation::find()
896            .select_only()
897            .columns([
898                fragment_relation::Column::SourceFragmentId,
899                fragment_relation::Column::TargetFragmentId,
900            ])
901            .filter(
902                fragment_relation::Column::TargetFragmentId
903                    .is_in(fragment_ids.map(|id| id as FragmentId)),
904            )
905            .into_tuple::<(FragmentId, FragmentId)>()
906            .stream(&inner.db)
907            .await?;
908        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
909        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
910            upstream_fragments
911                .entry(downstream_fragment_id as crate::model::FragmentId)
912                .or_default()
913                .insert(upstream_fragment_id as crate::model::FragmentId);
914        }
915        Ok(upstream_fragments)
916    }
917
918    pub async fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
919        let inner = self.inner.read().await;
920        let actor_locations: Vec<PartialActorLocation> =
921            Actor::find().into_partial_model().all(&inner.db).await?;
922        Ok(actor_locations)
923    }
924
925    pub async fn list_actor_info(
926        &self,
927    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
928        let inner = self.inner.read().await;
929        let actor_locations: Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)> =
930            Actor::find()
931                .join(JoinType::LeftJoin, actor::Relation::Fragment.def())
932                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
933                .select_only()
934                .columns([actor::Column::ActorId, actor::Column::FragmentId])
935                .column_as(object::Column::Oid, "job_id")
936                .column_as(object::Column::SchemaId, "schema_id")
937                .column_as(object::Column::ObjType, "type")
938                .into_tuple()
939                .all(&inner.db)
940                .await?;
941        Ok(actor_locations)
942    }
943
944    pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
945        let inner = self.inner.read().await;
946
947        let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
948            .select_only()
949            .filter(actor::Column::Splits.is_not_null())
950            .columns([actor::Column::ActorId, actor::Column::FragmentId])
951            .into_tuple()
952            .all(&inner.db)
953            .await?;
954
955        Ok(source_actors)
956    }
957
958    pub async fn list_creating_fragment_descs(
959        &self,
960    ) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
961        let inner = self.inner.read().await;
962        let mut result = Vec::new();
963        let fragments = FragmentModel::find()
964            .select_only()
965            .columns([
966                fragment::Column::FragmentId,
967                fragment::Column::JobId,
968                fragment::Column::FragmentTypeMask,
969                fragment::Column::DistributionType,
970                fragment::Column::StateTableIds,
971                fragment::Column::VnodeCount,
972                fragment::Column::StreamNode,
973            ])
974            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
975            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
976            .join(JoinType::LeftJoin, fragment::Relation::Object.def())
977            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
978            .filter(
979                streaming_job::Column::JobStatus
980                    .eq(JobStatus::Initial)
981                    .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
982            )
983            .group_by(fragment::Column::FragmentId)
984            .into_model::<FragmentDesc>()
985            .all(&inner.db)
986            .await?;
987        for fragment in fragments {
988            let upstreams: Vec<FragmentId> = FragmentRelation::find()
989                .select_only()
990                .column(fragment_relation::Column::SourceFragmentId)
991                .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
992                .into_tuple()
993                .all(&inner.db)
994                .await?;
995            result.push((fragment, upstreams));
996        }
997        Ok(result)
998    }
999
1000    pub async fn list_fragment_descs(&self) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
1001        let inner = self.inner.read().await;
1002        let mut result = Vec::new();
1003        let fragments = FragmentModel::find()
1004            .select_only()
1005            .columns([
1006                fragment::Column::FragmentId,
1007                fragment::Column::JobId,
1008                fragment::Column::FragmentTypeMask,
1009                fragment::Column::DistributionType,
1010                fragment::Column::StateTableIds,
1011                fragment::Column::VnodeCount,
1012                fragment::Column::StreamNode,
1013            ])
1014            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
1015            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
1016            .group_by(fragment::Column::FragmentId)
1017            .into_model::<FragmentDesc>()
1018            .all(&inner.db)
1019            .await?;
1020        for fragment in fragments {
1021            let upstreams: Vec<FragmentId> = FragmentRelation::find()
1022                .select_only()
1023                .column(fragment_relation::Column::SourceFragmentId)
1024                .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
1025                .into_tuple()
1026                .all(&inner.db)
1027                .await?;
1028            result.push((fragment, upstreams));
1029        }
1030        Ok(result)
1031    }
1032
1033    pub async fn list_sink_actor_mapping(
1034        &self,
1035    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1036        let inner = self.inner.read().await;
1037        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1038            .select_only()
1039            .columns([sink::Column::SinkId, sink::Column::Name])
1040            .into_tuple()
1041            .all(&inner.db)
1042            .await?;
1043        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1044        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1045
1046        let actor_with_type: Vec<(ActorId, SinkId)> = Actor::find()
1047            .select_only()
1048            .column(actor::Column::ActorId)
1049            .column(fragment::Column::JobId)
1050            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1051            .filter(
1052                fragment::Column::JobId
1053                    .is_in(sink_ids)
1054                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
1055            )
1056            .into_tuple()
1057            .all(&inner.db)
1058            .await?;
1059
1060        let mut sink_actor_mapping = HashMap::new();
1061        for (actor_id, sink_id) in actor_with_type {
1062            sink_actor_mapping
1063                .entry(sink_id)
1064                .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1065                .1
1066                .push(actor_id);
1067        }
1068
1069        Ok(sink_actor_mapping)
1070    }
1071
1072    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1073        let inner = self.inner.read().await;
1074        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1075            .select_only()
1076            .columns([
1077                fragment::Column::FragmentId,
1078                fragment::Column::JobId,
1079                fragment::Column::StateTableIds,
1080            ])
1081            .into_partial_model()
1082            .all(&inner.db)
1083            .await?;
1084        Ok(fragment_state_tables)
1085    }
1086
1087    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1088    /// collected
1089    pub async fn load_all_actors(
1090        &self,
1091        database_id: Option<DatabaseId>,
1092    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
1093    {
1094        let inner = self.inner.read().await;
1095        let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
1096        let filter_condition = if let Some(database_id) = database_id {
1097            filter_condition.and(object::Column::DatabaseId.eq(database_id))
1098        } else {
1099            filter_condition
1100        };
1101        #[expect(clippy::type_complexity)]
1102        let mut actor_info_stream: BoxStream<
1103            '_,
1104            Result<
1105                (
1106                    ActorId,
1107                    WorkerId,
1108                    Option<VnodeBitmap>,
1109                    FragmentId,
1110                    StreamNode,
1111                    I32Array,
1112                    DistributionType,
1113                    DatabaseId,
1114                    ObjectId,
1115                ),
1116                _,
1117            >,
1118        > = Actor::find()
1119            .select_only()
1120            .column(actor::Column::ActorId)
1121            .column(actor::Column::WorkerId)
1122            .column(actor::Column::VnodeBitmap)
1123            .column(fragment::Column::FragmentId)
1124            .column(fragment::Column::StreamNode)
1125            .column(fragment::Column::StateTableIds)
1126            .column(fragment::Column::DistributionType)
1127            .column(object::Column::DatabaseId)
1128            .column(object::Column::Oid)
1129            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1130            .join(JoinType::InnerJoin, fragment::Relation::Object.def())
1131            .filter(filter_condition)
1132            .into_tuple()
1133            .stream(&inner.db)
1134            .await?;
1135
1136        let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1137            HashMap::new();
1138
1139        while let Some((
1140            actor_id,
1141            worker_id,
1142            vnode_bitmap,
1143            fragment_id,
1144            node,
1145            state_table_ids,
1146            distribution_type,
1147            database_id,
1148            job_id,
1149        )) = actor_info_stream.try_next().await?
1150        {
1151            let fragment_infos = database_fragment_infos
1152                .entry(database_id)
1153                .or_default()
1154                .entry(job_id)
1155                .or_default();
1156            let state_table_ids = state_table_ids.into_inner();
1157            let state_table_ids = state_table_ids
1158                .into_iter()
1159                .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1160                .collect();
1161            let actor_info = InflightActorInfo {
1162                worker_id,
1163                vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1164            };
1165            match fragment_infos.entry(fragment_id) {
1166                Entry::Occupied(mut entry) => {
1167                    let info: &mut InflightFragmentInfo = entry.get_mut();
1168                    assert_eq!(info.state_table_ids, state_table_ids);
1169                    assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1170                }
1171                Entry::Vacant(entry) => {
1172                    entry.insert(InflightFragmentInfo {
1173                        fragment_id: fragment_id as _,
1174                        distribution_type,
1175                        nodes: node.to_protobuf(),
1176                        actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1177                        state_table_ids,
1178                    });
1179                }
1180            }
1181        }
1182
1183        debug!(?database_fragment_infos, "reload all actors");
1184
1185        Ok(database_fragment_infos)
1186    }
1187
1188    pub async fn migrate_actors(
1189        &self,
1190        plan: HashMap<WorkerSlotId, WorkerSlotId>,
1191    ) -> MetaResult<()> {
1192        let inner = self.inner.read().await;
1193        let txn = inner.db.begin().await?;
1194
1195        let actors: Vec<(
1196            FragmentId,
1197            DistributionType,
1198            ActorId,
1199            Option<VnodeBitmap>,
1200            WorkerId,
1201            ActorStatus,
1202        )> = Actor::find()
1203            .select_only()
1204            .columns([
1205                fragment::Column::FragmentId,
1206                fragment::Column::DistributionType,
1207            ])
1208            .columns([
1209                actor::Column::ActorId,
1210                actor::Column::VnodeBitmap,
1211                actor::Column::WorkerId,
1212                actor::Column::Status,
1213            ])
1214            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1215            .into_tuple()
1216            .all(&txn)
1217            .await?;
1218
1219        let mut actor_locations = HashMap::new();
1220
1221        for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1222            if *status != ActorStatus::Running {
1223                tracing::warn!(
1224                    "skipping actor {} in fragment {} with status {:?}",
1225                    actor_id,
1226                    fragment_id,
1227                    status
1228                );
1229                continue;
1230            }
1231
1232            actor_locations
1233                .entry(*worker_id)
1234                .or_insert(HashMap::new())
1235                .entry(*fragment_id)
1236                .or_insert(BTreeSet::new())
1237                .insert(*actor_id);
1238        }
1239
1240        let expired_or_changed_workers: HashSet<_> =
1241            plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1242
1243        let mut actor_migration_plan = HashMap::new();
1244        for (worker, fragment) in actor_locations {
1245            if expired_or_changed_workers.contains(&worker) {
1246                for (fragment_id, actors) in fragment {
1247                    debug!(
1248                        "worker {} expired or changed, migrating fragment {}",
1249                        worker, fragment_id
1250                    );
1251                    let worker_slot_to_actor: HashMap<_, _> = actors
1252                        .iter()
1253                        .enumerate()
1254                        .map(|(idx, actor_id)| {
1255                            (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1256                        })
1257                        .collect();
1258
1259                    for (worker_slot, actor) in worker_slot_to_actor {
1260                        if let Some(target) = plan.get(&worker_slot) {
1261                            actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1262                        }
1263                    }
1264                }
1265            }
1266        }
1267
1268        for (actor, worker) in actor_migration_plan {
1269            Actor::update_many()
1270                .col_expr(
1271                    actor::Column::WorkerId,
1272                    Expr::value(Value::Int(Some(worker))),
1273                )
1274                .filter(actor::Column::ActorId.eq(actor))
1275                .exec(&txn)
1276                .await?;
1277        }
1278
1279        txn.commit().await?;
1280
1281        Ok(())
1282    }
1283
1284    pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1285        let inner = self.inner.read().await;
1286
1287        let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1288            .select_only()
1289            .columns([fragment::Column::FragmentId])
1290            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1291            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1292            .into_tuple()
1293            .all(&inner.db)
1294            .await?;
1295
1296        let mut actor_locations = HashMap::new();
1297
1298        for (fragment_id, _, worker_id) in actors {
1299            *actor_locations
1300                .entry(worker_id)
1301                .or_insert(HashMap::new())
1302                .entry(fragment_id)
1303                .or_insert(0_usize) += 1;
1304        }
1305
1306        let mut result = HashSet::new();
1307        for (worker_id, mapping) in actor_locations {
1308            let max_fragment_len = mapping.values().max().unwrap();
1309
1310            result
1311                .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1312        }
1313
1314        Ok(result)
1315    }
1316
1317    pub async fn all_node_actors(
1318        &self,
1319        include_inactive: bool,
1320    ) -> MetaResult<HashMap<WorkerId, Vec<StreamActor>>> {
1321        let inner = self.inner.read().await;
1322        let fragment_actors = if include_inactive {
1323            FragmentModel::find()
1324                .find_with_related(Actor)
1325                .all(&inner.db)
1326                .await?
1327        } else {
1328            FragmentModel::find()
1329                .find_with_related(Actor)
1330                .filter(actor::Column::Status.eq(ActorStatus::Running))
1331                .all(&inner.db)
1332                .await?
1333        };
1334
1335        let job_definitions = resolve_streaming_job_definition(
1336            &inner.db,
1337            &HashSet::from_iter(fragment_actors.iter().map(|(fragment, _)| fragment.job_id)),
1338        )
1339        .await?;
1340
1341        let mut node_actors = HashMap::new();
1342        for (fragment, actors) in fragment_actors {
1343            let job_id = fragment.job_id;
1344            let (table_fragments, actor_status, _) = Self::compose_fragment(
1345                fragment,
1346                actors,
1347                job_definitions.get(&(job_id as _)).cloned(),
1348            )?;
1349            for actor in table_fragments.actors {
1350                let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
1351                node_actors
1352                    .entry(node_id)
1353                    .or_insert_with(Vec::new)
1354                    .push(actor);
1355            }
1356        }
1357
1358        Ok(node_actors)
1359    }
1360
1361    pub async fn get_worker_actor_ids(
1362        &self,
1363        job_ids: Vec<ObjectId>,
1364    ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
1365        let inner = self.inner.read().await;
1366        let actor_workers: Vec<(ActorId, WorkerId)> = Actor::find()
1367            .select_only()
1368            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1369            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1370            .filter(fragment::Column::JobId.is_in(job_ids))
1371            .into_tuple()
1372            .all(&inner.db)
1373            .await?;
1374
1375        let mut worker_actors = BTreeMap::new();
1376        for (actor_id, worker_id) in actor_workers {
1377            worker_actors
1378                .entry(worker_id)
1379                .or_insert_with(Vec::new)
1380                .push(actor_id);
1381        }
1382
1383        Ok(worker_actors)
1384    }
1385
1386    pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1387        let inner = self.inner.read().await;
1388        let txn = inner.db.begin().await?;
1389        for assignments in split_assignment.values() {
1390            for (actor_id, splits) in assignments {
1391                let actor_splits = splits.iter().map(Into::into).collect_vec();
1392                Actor::update(actor::ActiveModel {
1393                    actor_id: Set(*actor_id as _),
1394                    splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1395                        splits: actor_splits,
1396                    }))),
1397                    ..Default::default()
1398                })
1399                .exec(&txn)
1400                .await
1401                .map_err(|err| {
1402                    if err == DbErr::RecordNotUpdated {
1403                        MetaError::catalog_id_not_found("actor_id", actor_id)
1404                    } else {
1405                        err.into()
1406                    }
1407                })?;
1408            }
1409        }
1410        txn.commit().await?;
1411
1412        Ok(())
1413    }
1414
1415    #[await_tree::instrument]
1416    pub async fn fill_snapshot_backfill_epoch(
1417        &self,
1418        fragment_ids: impl Iterator<Item = FragmentId>,
1419        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1420        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1421    ) -> MetaResult<()> {
1422        let inner = self.inner.write().await;
1423        let txn = inner.db.begin().await?;
1424        for fragment_id in fragment_ids {
1425            let fragment = FragmentModel::find_by_id(fragment_id)
1426                .one(&txn)
1427                .await?
1428                .context(format!("fragment {} not found", fragment_id))?;
1429            let mut node = fragment.stream_node.to_protobuf();
1430            if crate::stream::fill_snapshot_backfill_epoch(
1431                &mut node,
1432                snapshot_backfill_info,
1433                cross_db_snapshot_backfill_info,
1434            )? {
1435                let node = StreamNode::from(&node);
1436                FragmentModel::update(fragment::ActiveModel {
1437                    fragment_id: Set(fragment_id),
1438                    stream_node: Set(node),
1439                    ..Default::default()
1440                })
1441                .exec(&txn)
1442                .await?;
1443            }
1444        }
1445        txn.commit().await?;
1446        Ok(())
1447    }
1448
1449    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1450    pub async fn get_running_actors_of_fragment(
1451        &self,
1452        fragment_id: FragmentId,
1453    ) -> MetaResult<Vec<ActorId>> {
1454        let inner = self.inner.read().await;
1455        let actors: Vec<ActorId> = Actor::find()
1456            .select_only()
1457            .column(actor::Column::ActorId)
1458            .filter(actor::Column::FragmentId.eq(fragment_id))
1459            .filter(actor::Column::Status.eq(ActorStatus::Running))
1460            .into_tuple()
1461            .all(&inner.db)
1462            .await?;
1463        Ok(actors)
1464    }
1465
1466    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1467    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1468    pub async fn get_running_actors_for_source_backfill(
1469        &self,
1470        source_backfill_fragment_id: FragmentId,
1471        source_fragment_id: FragmentId,
1472    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1473        let inner = self.inner.read().await;
1474        let txn = inner.db.begin().await?;
1475
1476        let fragment_relation: DispatcherType = FragmentRelation::find()
1477            .select_only()
1478            .column(fragment_relation::Column::DispatcherType)
1479            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1480            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1481            .into_tuple()
1482            .one(&txn)
1483            .await?
1484            .ok_or_else(|| {
1485                anyhow!(
1486                    "no fragment connection from source fragment {} to source backfill fragment {}",
1487                    source_fragment_id,
1488                    source_backfill_fragment_id
1489                )
1490            })?;
1491
1492        if fragment_relation != DispatcherType::NoShuffle {
1493            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1494        }
1495
1496        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1497            let result: MetaResult<DistributionType> = try {
1498                FragmentModel::find_by_id(fragment_id)
1499                    .select_only()
1500                    .column(fragment::Column::DistributionType)
1501                    .into_tuple()
1502                    .one(txn)
1503                    .await?
1504                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1505            };
1506            result
1507        };
1508
1509        let source_backfill_distribution_type =
1510            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1511        let source_distribution_type =
1512            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1513
1514        let load_fragment_actor_distribution = |txn, fragment_id: FragmentId| async move {
1515            Actor::find()
1516                .select_only()
1517                .column(actor::Column::ActorId)
1518                .column(actor::Column::VnodeBitmap)
1519                .filter(actor::Column::FragmentId.eq(fragment_id))
1520                .into_tuple()
1521                .stream(txn)
1522                .await?
1523                .map(|result| {
1524                    result.map(|(actor_id, vnode): (ActorId, Option<VnodeBitmap>)| {
1525                        (
1526                            actor_id as _,
1527                            vnode.map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1528                        )
1529                    })
1530                })
1531                .try_collect()
1532                .await
1533        };
1534
1535        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1536            load_fragment_actor_distribution(&txn, source_backfill_fragment_id).await?;
1537
1538        let source_actors = load_fragment_actor_distribution(&txn, source_fragment_id).await?;
1539
1540        Ok(resolve_no_shuffle_actor_dispatcher(
1541            source_distribution_type,
1542            &source_actors,
1543            source_backfill_distribution_type,
1544            &source_backfill_actors,
1545        )
1546        .into_iter()
1547        .map(|(source_actor, source_backfill_actor)| {
1548            (source_backfill_actor as _, source_actor as _)
1549        })
1550        .collect())
1551    }
1552
1553    pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
1554        let inner = self.inner.read().await;
1555        let actors: Vec<ActorId> = Actor::find()
1556            .select_only()
1557            .column(actor::Column::ActorId)
1558            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1559            .filter(fragment::Column::JobId.is_in(job_ids))
1560            .into_tuple()
1561            .all(&inner.db)
1562            .await?;
1563        Ok(actors)
1564    }
1565
1566    /// Get and filter the "**root**" fragments of the specified jobs.
1567    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1568    ///
1569    /// Root fragment connects to downstream jobs.
1570    ///
1571    /// ## What can be the root fragment
1572    /// - For sink, it should have one `Sink` fragment.
1573    /// - For MV, it should have one `MView` fragment.
1574    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1575    /// - For source, it should have one `Source` fragment.
1576    ///
1577    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1578    pub async fn get_root_fragments(
1579        &self,
1580        job_ids: Vec<ObjectId>,
1581    ) -> MetaResult<(HashMap<ObjectId, Fragment>, Vec<(ActorId, WorkerId)>)> {
1582        let inner = self.inner.read().await;
1583
1584        let job_definitions = resolve_streaming_job_definition(
1585            &inner.db,
1586            &HashSet::from_iter(job_ids.iter().copied()),
1587        )
1588        .await?;
1589
1590        let all_fragments = FragmentModel::find()
1591            .filter(fragment::Column::JobId.is_in(job_ids))
1592            .all(&inner.db)
1593            .await?;
1594        // job_id -> fragment
1595        let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1596        for fragment in all_fragments {
1597            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1598            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1599                _ = root_fragments.insert(fragment.job_id, fragment);
1600            } else if mask.contains(FragmentTypeFlag::Source) {
1601                // look for Source fragment only if there's no MView fragment
1602                // (notice try_insert here vs insert above)
1603                _ = root_fragments.try_insert(fragment.job_id, fragment);
1604            }
1605        }
1606
1607        let mut root_fragments_pb = HashMap::new();
1608        for (_, fragment) in root_fragments {
1609            let actors = fragment.find_related(Actor).all(&inner.db).await?;
1610
1611            let job_id = fragment.job_id;
1612            root_fragments_pb.insert(
1613                fragment.job_id,
1614                Self::compose_fragment(
1615                    fragment,
1616                    actors,
1617                    job_definitions.get(&(job_id as _)).cloned(),
1618                )?
1619                .0,
1620            );
1621        }
1622
1623        let actors: Vec<(ActorId, WorkerId)> = Actor::find()
1624            .select_only()
1625            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1626            .into_tuple()
1627            .all(&inner.db)
1628            .await?;
1629
1630        Ok((root_fragments_pb, actors))
1631    }
1632
1633    pub async fn get_root_fragment(
1634        &self,
1635        job_id: ObjectId,
1636    ) -> MetaResult<(Fragment, Vec<(ActorId, WorkerId)>)> {
1637        let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1638        let root_fragment = root_fragments
1639            .remove(&job_id)
1640            .context(format!("root fragment for job {} not found", job_id))?;
1641        Ok((root_fragment, actors))
1642    }
1643
1644    /// Get the downstream fragments connected to the specified job.
1645    pub async fn get_downstream_fragments(
1646        &self,
1647        job_id: ObjectId,
1648    ) -> MetaResult<(Vec<(PbDispatcherType, Fragment)>, Vec<(ActorId, WorkerId)>)> {
1649        let (root_fragment, actors) = self.get_root_fragment(job_id).await?;
1650
1651        let inner = self.inner.read().await;
1652        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1653            .filter(
1654                fragment_relation::Column::SourceFragmentId
1655                    .eq(root_fragment.fragment_id as FragmentId),
1656            )
1657            .all(&inner.db)
1658            .await?;
1659        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
1660            .await?
1661            .remove(&job_id);
1662
1663        let mut downstream_fragments = vec![];
1664        for fragment_relation::Model {
1665            target_fragment_id: fragment_id,
1666            dispatcher_type,
1667            ..
1668        } in downstream_fragment_relations
1669        {
1670            let mut fragment_actors = FragmentModel::find_by_id(fragment_id)
1671                .find_with_related(Actor)
1672                .all(&inner.db)
1673                .await?;
1674            if fragment_actors.is_empty() {
1675                bail!("No fragment found for fragment id {}", fragment_id);
1676            }
1677            assert_eq!(fragment_actors.len(), 1);
1678            let (fragment, actors) = fragment_actors.pop().unwrap();
1679            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1680            let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0;
1681            downstream_fragments.push((dispatch_type, fragment));
1682        }
1683
1684        Ok((downstream_fragments, actors))
1685    }
1686
1687    pub async fn load_source_fragment_ids(
1688        &self,
1689    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1690        let inner = self.inner.read().await;
1691        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1692            .select_only()
1693            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1694            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1695            .into_tuple()
1696            .all(&inner.db)
1697            .await?;
1698
1699        let mut source_fragment_ids = HashMap::new();
1700        for (fragment_id, stream_node) in fragments {
1701            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1702                source_fragment_ids
1703                    .entry(source_id as SourceId)
1704                    .or_insert_with(BTreeSet::new)
1705                    .insert(fragment_id);
1706            }
1707        }
1708        Ok(source_fragment_ids)
1709    }
1710
1711    pub async fn load_backfill_fragment_ids(
1712        &self,
1713    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1714        let inner = self.inner.read().await;
1715        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1716            .select_only()
1717            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1718            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1719            .into_tuple()
1720            .all(&inner.db)
1721            .await?;
1722
1723        let mut source_fragment_ids = HashMap::new();
1724        for (fragment_id, stream_node) in fragments {
1725            if let Some((source_id, upstream_source_fragment_id)) =
1726                stream_node.to_protobuf().find_source_backfill()
1727            {
1728                source_fragment_ids
1729                    .entry(source_id as SourceId)
1730                    .or_insert_with(BTreeSet::new)
1731                    .insert((fragment_id, upstream_source_fragment_id));
1732            }
1733        }
1734        Ok(source_fragment_ids)
1735    }
1736
1737    pub async fn load_actor_splits(&self) -> MetaResult<HashMap<ActorId, ConnectorSplits>> {
1738        let inner = self.inner.read().await;
1739        let splits: Vec<(ActorId, ConnectorSplits)> = Actor::find()
1740            .select_only()
1741            .columns([actor::Column::ActorId, actor::Column::Splits])
1742            .filter(actor::Column::Splits.is_not_null())
1743            .into_tuple()
1744            .all(&inner.db)
1745            .await?;
1746        Ok(splits.into_iter().collect())
1747    }
1748
1749    /// Get the actor count of `Materialize` or `Sink` fragment of the specified table.
1750    pub async fn get_actual_job_fragment_parallelism(
1751        &self,
1752        job_id: ObjectId,
1753    ) -> MetaResult<Option<usize>> {
1754        let inner = self.inner.read().await;
1755        let fragments: Vec<(FragmentId, i64)> = FragmentModel::find()
1756            .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1757            .select_only()
1758            .columns([fragment::Column::FragmentId])
1759            .column_as(actor::Column::ActorId.count(), "count")
1760            .filter(
1761                fragment::Column::JobId
1762                    .eq(job_id)
1763                    .and(FragmentTypeMask::intersects_any([
1764                        FragmentTypeFlag::Mview,
1765                        FragmentTypeFlag::Sink,
1766                    ])),
1767            )
1768            .group_by(fragment::Column::FragmentId)
1769            .into_tuple()
1770            .all(&inner.db)
1771            .await?;
1772
1773        Ok(fragments
1774            .into_iter()
1775            .at_most_one()
1776            .ok()
1777            .flatten()
1778            .map(|(_, count)| count as usize))
1779    }
1780
1781    pub async fn get_all_upstream_sink_infos(
1782        &self,
1783        target_table: &PbTable,
1784        target_fragment_id: FragmentId,
1785    ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1786        let incoming_sinks = self.get_table_incoming_sinks(target_table.id as _).await?;
1787
1788        let inner = self.inner.read().await;
1789        let txn = inner.db.begin().await?;
1790
1791        let sink_ids = incoming_sinks.iter().map(|s| s.id as SinkId).collect_vec();
1792        let sink_fragment_ids = get_sink_fragment_by_ids(&txn, sink_ids).await?;
1793
1794        let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1795        for pb_sink in &incoming_sinks {
1796            let sink_fragment_id =
1797                sink_fragment_ids
1798                    .get(&(pb_sink.id as _))
1799                    .cloned()
1800                    .ok_or(anyhow::anyhow!(
1801                        "sink fragment not found for sink id {}",
1802                        pb_sink.id
1803                    ))?;
1804            let upstream_info = build_upstream_sink_info(
1805                pb_sink,
1806                sink_fragment_id,
1807                target_table,
1808                target_fragment_id,
1809            )?;
1810            upstream_sink_infos.push(upstream_info);
1811        }
1812
1813        Ok(upstream_sink_infos)
1814    }
1815
1816    pub async fn get_mview_fragment_by_id(&self, table_id: TableId) -> MetaResult<FragmentId> {
1817        let inner = self.inner.read().await;
1818        let txn = inner.db.begin().await?;
1819
1820        let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1821            .select_only()
1822            .column(fragment::Column::FragmentId)
1823            .filter(
1824                fragment::Column::JobId
1825                    .eq(table_id)
1826                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1827            )
1828            .into_tuple()
1829            .all(&txn)
1830            .await?;
1831
1832        if mview_fragment.len() != 1 {
1833            return Err(anyhow::anyhow!(
1834                "expected exactly one mview fragment for table {}, found {}",
1835                table_id,
1836                mview_fragment.len()
1837            )
1838            .into());
1839        }
1840
1841        Ok(mview_fragment.into_iter().next().unwrap())
1842    }
1843
1844    pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1845        let inner = self.inner.read().await;
1846        let txn = inner.db.begin().await?;
1847        has_table_been_migrated(&txn, table_id).await
1848    }
1849}
1850
1851#[cfg(test)]
1852mod tests {
1853    use std::collections::{BTreeMap, HashMap, HashSet};
1854
1855    use itertools::Itertools;
1856    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1857    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1858    use risingwave_common::util::iter_util::ZipEqDebug;
1859    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1860    use risingwave_meta_model::actor::ActorStatus;
1861    use risingwave_meta_model::fragment::DistributionType;
1862    use risingwave_meta_model::{
1863        ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1864        VnodeBitmap, actor, fragment,
1865    };
1866    use risingwave_pb::common::PbActorLocation;
1867    use risingwave_pb::meta::table_fragments::PbActorStatus;
1868    use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1869    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1870    use risingwave_pb::plan_common::PbExprContext;
1871    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1872    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1873    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1874
1875    use crate::MetaResult;
1876    use crate::controller::catalog::CatalogController;
1877    use crate::model::{Fragment, StreamActor};
1878
1879    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1880
1881    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1882
1883    const TEST_FRAGMENT_ID: FragmentId = 1;
1884
1885    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1886
1887    const TEST_JOB_ID: ObjectId = 1;
1888
1889    const TEST_STATE_TABLE_ID: TableId = 1000;
1890
1891    fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1892        let mut upstream_actor_ids = BTreeMap::new();
1893        upstream_actor_ids.insert(
1894            TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1895            HashSet::from_iter([(actor_id + 100)]),
1896        );
1897        upstream_actor_ids.insert(
1898            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1899            HashSet::from_iter([(actor_id + 200)]),
1900        );
1901        upstream_actor_ids
1902    }
1903
1904    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1905        let mut input = vec![];
1906        for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1907            input.push(PbStreamNode {
1908                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1909                    upstream_fragment_id: *upstream_fragment_id as _,
1910                    ..Default::default()
1911                }))),
1912                ..Default::default()
1913            });
1914        }
1915
1916        PbStreamNode {
1917            input,
1918            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1919            ..Default::default()
1920        }
1921    }
1922
1923    #[tokio::test]
1924    async fn test_extract_fragment() -> MetaResult<()> {
1925        let actor_count = 3u32;
1926        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1927            .map(|actor_id| {
1928                (
1929                    actor_id as _,
1930                    generate_upstream_actor_ids_for_actor(actor_id),
1931                )
1932            })
1933            .collect();
1934
1935        let actor_bitmaps = ActorMapping::new_uniform(
1936            (0..actor_count).map(|i| i as _),
1937            VirtualNode::COUNT_FOR_TEST,
1938        )
1939        .to_bitmaps();
1940
1941        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1942
1943        let pb_actors = (0..actor_count)
1944            .map(|actor_id| StreamActor {
1945                actor_id: actor_id as _,
1946                fragment_id: TEST_FRAGMENT_ID as _,
1947                vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1948                mview_definition: "".to_owned(),
1949                expr_context: Some(PbExprContext {
1950                    time_zone: String::from("America/New_York"),
1951                    strict_mode: false,
1952                }),
1953            })
1954            .collect_vec();
1955
1956        let pb_fragment = Fragment {
1957            fragment_id: TEST_FRAGMENT_ID as _,
1958            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1959            distribution_type: PbFragmentDistributionType::Hash as _,
1960            actors: pb_actors.clone(),
1961            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1962            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1963            nodes: stream_node.clone(),
1964        };
1965
1966        let pb_actor_status = (0..actor_count)
1967            .map(|actor_id| {
1968                (
1969                    actor_id,
1970                    PbActorStatus {
1971                        location: PbActorLocation::from_worker(0),
1972                        state: PbActorState::Running as _,
1973                    },
1974                )
1975            })
1976            .collect();
1977
1978        let pb_actor_splits = Default::default();
1979
1980        let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
1981            TEST_JOB_ID,
1982            &pb_fragment,
1983            &pb_actor_status,
1984            &pb_actor_splits,
1985        )?;
1986
1987        check_fragment(fragment, pb_fragment);
1988        check_actors(
1989            actors,
1990            &upstream_actor_ids,
1991            pb_actors,
1992            Default::default(),
1993            &stream_node,
1994        );
1995
1996        Ok(())
1997    }
1998
1999    #[tokio::test]
2000    async fn test_compose_fragment() -> MetaResult<()> {
2001        let actor_count = 3u32;
2002
2003        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2004            .map(|actor_id| {
2005                (
2006                    actor_id as _,
2007                    generate_upstream_actor_ids_for_actor(actor_id),
2008                )
2009            })
2010            .collect();
2011
2012        let mut actor_bitmaps = ActorMapping::new_uniform(
2013            (0..actor_count).map(|i| i as _),
2014            VirtualNode::COUNT_FOR_TEST,
2015        )
2016        .to_bitmaps();
2017
2018        let actors = (0..actor_count)
2019            .map(|actor_id| {
2020                let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
2021                    splits: vec![PbConnectorSplit {
2022                        split_type: "dummy".to_owned(),
2023                        ..Default::default()
2024                    }],
2025                }));
2026
2027                #[expect(deprecated)]
2028                actor::Model {
2029                    actor_id: actor_id as ActorId,
2030                    fragment_id: TEST_FRAGMENT_ID,
2031                    status: ActorStatus::Running,
2032                    splits: actor_splits,
2033                    worker_id: 0,
2034                    upstream_actor_ids: Default::default(),
2035                    vnode_bitmap: actor_bitmaps
2036                        .remove(&actor_id)
2037                        .map(|bitmap| bitmap.to_protobuf())
2038                        .as_ref()
2039                        .map(VnodeBitmap::from),
2040                    expr_context: ExprContext::from(&PbExprContext {
2041                        time_zone: String::from("America/New_York"),
2042                        strict_mode: false,
2043                    }),
2044                }
2045            })
2046            .collect_vec();
2047
2048        let stream_node = {
2049            let template_actor = actors.first().cloned().unwrap();
2050
2051            let template_upstream_actor_ids = upstream_actor_ids
2052                .get(&(template_actor.actor_id as _))
2053                .unwrap();
2054
2055            generate_merger_stream_node(template_upstream_actor_ids)
2056        };
2057
2058        #[expect(deprecated)]
2059        let fragment = fragment::Model {
2060            fragment_id: TEST_FRAGMENT_ID,
2061            job_id: TEST_JOB_ID,
2062            fragment_type_mask: 0,
2063            distribution_type: DistributionType::Hash,
2064            stream_node: StreamNode::from(&stream_node),
2065            state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
2066            upstream_fragment_id: Default::default(),
2067            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2068        };
2069
2070        let (pb_fragment, pb_actor_status, pb_actor_splits) =
2071            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2072
2073        assert_eq!(pb_actor_status.len(), actor_count as usize);
2074        assert_eq!(pb_actor_splits.len(), actor_count as usize);
2075
2076        let pb_actors = pb_fragment.actors.clone();
2077
2078        check_fragment(fragment, pb_fragment);
2079        check_actors(
2080            actors,
2081            &upstream_actor_ids,
2082            pb_actors,
2083            pb_actor_splits,
2084            &stream_node,
2085        );
2086
2087        Ok(())
2088    }
2089
2090    fn check_actors(
2091        actors: Vec<actor::Model>,
2092        actor_upstreams: &FragmentActorUpstreams,
2093        pb_actors: Vec<StreamActor>,
2094        pb_actor_splits: HashMap<u32, PbConnectorSplits>,
2095        stream_node: &PbStreamNode,
2096    ) {
2097        for (
2098            actor::Model {
2099                actor_id,
2100                fragment_id,
2101                status,
2102                splits,
2103                worker_id: _,
2104                vnode_bitmap,
2105                expr_context,
2106                ..
2107            },
2108            StreamActor {
2109                actor_id: pb_actor_id,
2110                fragment_id: pb_fragment_id,
2111                vnode_bitmap: pb_vnode_bitmap,
2112                mview_definition,
2113                expr_context: pb_expr_context,
2114                ..
2115            },
2116        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2117        {
2118            assert_eq!(actor_id, pb_actor_id as ActorId);
2119            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2120
2121            assert_eq!(
2122                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2123                pb_vnode_bitmap,
2124            );
2125
2126            assert_eq!(mview_definition, "");
2127
2128            visit_stream_node_body(stream_node, |body| {
2129                if let PbNodeBody::Merge(m) = body {
2130                    assert!(
2131                        actor_upstreams
2132                            .get(&(actor_id as _))
2133                            .unwrap()
2134                            .contains_key(&m.upstream_fragment_id)
2135                    );
2136                }
2137            });
2138
2139            assert_eq!(status, ActorStatus::Running);
2140
2141            assert_eq!(
2142                splits,
2143                pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
2144            );
2145
2146            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2147        }
2148    }
2149
2150    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2151        let Fragment {
2152            fragment_id,
2153            fragment_type_mask,
2154            distribution_type: pb_distribution_type,
2155            actors: _,
2156            state_table_ids: pb_state_table_ids,
2157            maybe_vnode_count: _,
2158            nodes,
2159        } = pb_fragment;
2160
2161        assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2162        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2163        assert_eq!(
2164            pb_distribution_type,
2165            PbFragmentDistributionType::from(fragment.distribution_type)
2166        );
2167
2168        assert_eq!(
2169            pb_state_table_ids,
2170            fragment.state_table_ids.into_u32_array()
2171        );
2172        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2173    }
2174}