risingwave_meta/controller/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
26use risingwave_common::util::stream_graph_visitor::{
27    visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_connector::source::SplitImpl;
30use risingwave_meta_model::actor::ActorStatus;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34    Actor, Fragment as FragmentModel, FragmentRelation, Sink, SourceSplits, StreamingJob,
35};
36use risingwave_meta_model::{
37    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39    VnodeBitmap, WorkerId, actor, database, fragment, fragment_relation, object, sink, source,
40    source_splits, streaming_job, table,
41};
42use risingwave_meta_model_migration::{Alias, ExprTrait, OnConflict, SelectStatement, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46    Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
49use risingwave_pb::meta::table_fragments::fragment::{
50    FragmentDistributionType, PbFragmentDistributionType,
51};
52use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
53use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping};
54use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57    PbDispatchOutputMapping, PbDispatcherType, PbStreamContext, PbStreamNode, PbStreamScanType,
58    StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63    ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, ModelTrait, PaginatorTrait,
64    QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
65};
66use serde::{Deserialize, Serialize};
67use tracing::debug;
68
69use crate::barrier::SnapshotBackfillInfo;
70use crate::controller::catalog::{CatalogController, CatalogControllerInner};
71use crate::controller::scale::resolve_streaming_job_definition;
72use crate::controller::utils::{
73    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
74    get_fragment_mappings, get_sink_fragment_by_ids, has_table_been_migrated,
75    resolve_no_shuffle_actor_dispatcher,
76};
77use crate::manager::{LocalNotification, NotificationManager};
78use crate::model::{
79    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
80    StreamActor, StreamContext, StreamJobFragments, TableParallelism,
81};
82use crate::rpc::ddl_controller::build_upstream_sink_info;
83use crate::stream::{SplitAssignment, UpstreamSinkInfo};
84use crate::{MetaError, MetaResult};
85
86/// Some information of running (inflight) actors.
87#[derive(Clone, Debug)]
88pub struct InflightActorInfo {
89    pub worker_id: WorkerId,
90    pub vnode_bitmap: Option<Bitmap>,
91}
92
93#[derive(Clone, Debug)]
94pub struct InflightFragmentInfo {
95    pub fragment_id: crate::model::FragmentId,
96    pub distribution_type: DistributionType,
97    pub nodes: PbStreamNode,
98    pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
99    pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
100}
101
102#[derive(Clone, Debug)]
103pub struct FragmentParallelismInfo {
104    pub distribution_type: FragmentDistributionType,
105    pub actor_count: usize,
106    pub vnode_count: usize,
107}
108
109pub(crate) trait FragmentTypeMaskExt {
110    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr;
111    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr;
112}
113
114impl FragmentTypeMaskExt for FragmentTypeMask {
115    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
116        Expr::col(fragment::Column::FragmentTypeMask)
117            .bit_and(Expr::value(flag as i32))
118            .ne(0)
119    }
120
121    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
122        Expr::col(fragment::Column::FragmentTypeMask)
123            .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
124            .ne(0)
125    }
126}
127
128#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
129#[serde(rename_all = "camelCase")] // for dashboard
130pub struct StreamingJobInfo {
131    pub job_id: ObjectId,
132    pub obj_type: ObjectType,
133    pub name: String,
134    pub job_status: JobStatus,
135    pub parallelism: StreamingParallelism,
136    pub max_parallelism: i32,
137    pub resource_group: String,
138    pub database_id: DatabaseId,
139    pub schema_id: SchemaId,
140}
141
142impl CatalogControllerInner {
143    /// List all fragment vnode mapping info for all CREATED streaming jobs.
144    pub async fn all_running_fragment_mappings(
145        &self,
146    ) -> MetaResult<impl Iterator<Item = FragmentWorkerSlotMapping> + '_> {
147        let txn = self.db.begin().await?;
148
149        let job_ids: Vec<ObjectId> = StreamingJob::find()
150            .select_only()
151            .column(streaming_job::Column::JobId)
152            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
153            .into_tuple()
154            .all(&txn)
155            .await?;
156
157        let mut result = vec![];
158        for job_id in job_ids {
159            let mappings = get_fragment_mappings(&txn, job_id).await?;
160
161            result.extend(mappings.into_iter());
162        }
163
164        Ok(result.into_iter())
165    }
166}
167
168impl NotificationManager {
169    pub(crate) fn notify_fragment_mapping(
170        &self,
171        operation: NotificationOperation,
172        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
173    ) {
174        let fragment_ids = fragment_mappings
175            .iter()
176            .map(|mapping| mapping.fragment_id)
177            .collect_vec();
178        if fragment_ids.is_empty() {
179            return;
180        }
181        // notify all fragment mappings to frontend.
182        for fragment_mapping in fragment_mappings {
183            self.notify_frontend_without_version(
184                operation,
185                NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
186            );
187        }
188
189        // update serving vnode mappings.
190        match operation {
191            NotificationOperation::Add | NotificationOperation::Update => {
192                self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
193                    fragment_ids,
194                ));
195            }
196            NotificationOperation::Delete => {
197                self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
198                    fragment_ids,
199                ));
200            }
201            op => {
202                tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
203            }
204        }
205    }
206}
207
208impl CatalogController {
209    pub fn extract_fragment_and_actors_from_fragments(
210        job_id: ObjectId,
211        fragments: impl Iterator<Item = &Fragment>,
212        actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
213        actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
214    ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
215        fragments
216            .map(|fragment| {
217                Self::extract_fragment_and_actors_for_new_job(
218                    job_id,
219                    fragment,
220                    actor_status,
221                    actor_splits,
222                )
223            })
224            .try_collect()
225    }
226
227    pub fn extract_fragment_and_actors_for_new_job(
228        job_id: ObjectId,
229        fragment: &Fragment,
230        actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
231        actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
232    ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
233        let vnode_count = fragment.vnode_count();
234        let Fragment {
235            fragment_id: pb_fragment_id,
236            fragment_type_mask: pb_fragment_type_mask,
237            distribution_type: pb_distribution_type,
238            actors: pb_actors,
239            state_table_ids: pb_state_table_ids,
240            nodes,
241            ..
242        } = fragment;
243
244        let state_table_ids = pb_state_table_ids.clone().into();
245
246        assert!(!pb_actors.is_empty());
247
248        let stream_node = {
249            let mut stream_node = nodes.clone();
250            visit_stream_node_mut(&mut stream_node, |body| {
251                #[expect(deprecated)]
252                if let NodeBody::Merge(m) = body {
253                    m.upstream_actor_id = vec![];
254                }
255            });
256
257            stream_node
258        };
259
260        let mut actors = vec![];
261
262        for actor in pb_actors {
263            let StreamActor {
264                actor_id,
265                fragment_id,
266                vnode_bitmap,
267                mview_definition: _,
268                expr_context: pb_expr_context,
269                ..
270            } = actor;
271
272            let splits = actor_splits.get(actor_id).map(|splits| {
273                ConnectorSplits::from(&PbConnectorSplits {
274                    splits: splits.iter().map(ConnectorSplit::from).collect(),
275                })
276            });
277            let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
278                anyhow::anyhow!(
279                    "actor {} in fragment {} has no actor_status",
280                    actor_id,
281                    fragment_id
282                )
283            })?;
284
285            let worker_id = status.worker_id() as _;
286
287            let pb_expr_context = pb_expr_context
288                .as_ref()
289                .expect("no expression context found");
290
291            #[expect(deprecated)]
292            actors.push(actor::Model {
293                actor_id: *actor_id as _,
294                fragment_id: *fragment_id as _,
295                status: status.get_state().unwrap().into(),
296                splits,
297                worker_id,
298                upstream_actor_ids: Default::default(),
299                vnode_bitmap: vnode_bitmap
300                    .as_ref()
301                    .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
302                expr_context: ExprContext::from(pb_expr_context),
303            });
304        }
305
306        let stream_node = StreamNode::from(&stream_node);
307
308        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
309            .unwrap()
310            .into();
311
312        #[expect(deprecated)]
313        let fragment = fragment::Model {
314            fragment_id: *pb_fragment_id as _,
315            job_id,
316            fragment_type_mask: (*pb_fragment_type_mask).into(),
317            distribution_type,
318            stream_node,
319            state_table_ids,
320            upstream_fragment_id: Default::default(),
321            vnode_count: vnode_count as _,
322        };
323
324        Ok((fragment, actors))
325    }
326
327    pub fn compose_table_fragments(
328        table_id: u32,
329        state: PbState,
330        ctx: Option<PbStreamContext>,
331        fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
332        parallelism: StreamingParallelism,
333        max_parallelism: usize,
334        job_definition: Option<String>,
335    ) -> MetaResult<StreamJobFragments> {
336        let mut pb_fragments = BTreeMap::new();
337        let mut pb_actor_status = BTreeMap::new();
338
339        for (fragment, actors) in fragments {
340            let (fragment, fragment_actor_status, _) =
341                Self::compose_fragment(fragment, actors, job_definition.clone())?;
342
343            pb_fragments.insert(fragment.fragment_id, fragment);
344            pb_actor_status.extend(fragment_actor_status.into_iter());
345        }
346
347        let table_fragments = StreamJobFragments {
348            stream_job_id: table_id.into(),
349            state: state as _,
350            fragments: pb_fragments,
351            actor_status: pb_actor_status,
352            ctx: ctx
353                .as_ref()
354                .map(StreamContext::from_protobuf)
355                .unwrap_or_default(),
356            assigned_parallelism: match parallelism {
357                StreamingParallelism::Custom => TableParallelism::Custom,
358                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
359                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
360            },
361            max_parallelism,
362        };
363
364        Ok(table_fragments)
365    }
366
367    #[allow(clippy::type_complexity)]
368    pub(crate) fn compose_fragment(
369        fragment: fragment::Model,
370        actors: Vec<actor::Model>,
371        job_definition: Option<String>,
372    ) -> MetaResult<(
373        Fragment,
374        HashMap<crate::model::ActorId, PbActorStatus>,
375        HashMap<crate::model::ActorId, PbConnectorSplits>,
376    )> {
377        let fragment::Model {
378            fragment_id,
379            fragment_type_mask,
380            distribution_type,
381            stream_node,
382            state_table_ids,
383            vnode_count,
384            ..
385        } = fragment;
386
387        let stream_node = stream_node.to_protobuf();
388        let mut upstream_fragments = HashSet::new();
389        visit_stream_node_body(&stream_node, |body| {
390            if let NodeBody::Merge(m) = body {
391                assert!(
392                    upstream_fragments.insert(m.upstream_fragment_id),
393                    "non-duplicate upstream fragment"
394                );
395            }
396        });
397
398        let mut pb_actors = vec![];
399
400        let mut pb_actor_status = HashMap::new();
401        let mut pb_actor_splits = HashMap::new();
402
403        for actor in actors {
404            if actor.fragment_id != fragment_id {
405                bail!(
406                    "fragment id {} from actor {} is different from fragment {}",
407                    actor.fragment_id,
408                    actor.actor_id,
409                    fragment_id
410                )
411            }
412
413            let actor::Model {
414                actor_id,
415                fragment_id,
416                status,
417                worker_id,
418                splits,
419                vnode_bitmap,
420                expr_context,
421                ..
422            } = actor;
423
424            let vnode_bitmap =
425                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
426            let pb_expr_context = Some(expr_context.to_protobuf());
427
428            pb_actor_status.insert(
429                actor_id as _,
430                PbActorStatus {
431                    location: PbActorLocation::from_worker(worker_id as u32),
432                    state: PbActorState::from(status) as _,
433                },
434            );
435
436            if let Some(splits) = splits {
437                pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
438            }
439
440            pb_actors.push(StreamActor {
441                actor_id: actor_id as _,
442                fragment_id: fragment_id as _,
443                vnode_bitmap,
444                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
445                expr_context: pb_expr_context,
446            })
447        }
448
449        let pb_state_table_ids = state_table_ids.into_u32_array();
450        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
451        let pb_fragment = Fragment {
452            fragment_id: fragment_id as _,
453            fragment_type_mask: fragment_type_mask.into(),
454            distribution_type: pb_distribution_type,
455            actors: pb_actors,
456            state_table_ids: pb_state_table_ids,
457            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
458            nodes: stream_node,
459        };
460
461        Ok((pb_fragment, pb_actor_status, pb_actor_splits))
462    }
463
464    pub async fn running_fragment_parallelisms(
465        &self,
466        id_filter: Option<HashSet<FragmentId>>,
467    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
468        let inner = self.inner.read().await;
469
470        let query_alias = Alias::new("fragment_actor_count");
471        let count_alias = Alias::new("count");
472
473        let mut query = SelectStatement::new()
474            .column(actor::Column::FragmentId)
475            .expr_as(actor::Column::ActorId.count(), count_alias.clone())
476            .from(Actor)
477            .group_by_col(actor::Column::FragmentId)
478            .to_owned();
479
480        if let Some(id_filter) = id_filter {
481            query.cond_having(actor::Column::FragmentId.is_in(id_filter));
482        }
483
484        let outer = SelectStatement::new()
485            .column((FragmentModel, fragment::Column::FragmentId))
486            .column(count_alias)
487            .column(fragment::Column::DistributionType)
488            .column(fragment::Column::VnodeCount)
489            .from_subquery(query.to_owned(), query_alias.clone())
490            .inner_join(
491                FragmentModel,
492                Expr::col((query_alias, actor::Column::FragmentId))
493                    .equals((FragmentModel, fragment::Column::FragmentId)),
494            )
495            .to_owned();
496
497        let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
498            Selector::<SelectGetableTuple<(FragmentId, i64, DistributionType, i32)>>::into_tuple(
499                outer.to_owned(),
500            )
501            .all(&inner.db)
502            .await?;
503
504        Ok(fragment_parallelisms
505            .into_iter()
506            .map(|(fragment_id, count, distribution_type, vnode_count)| {
507                (
508                    fragment_id,
509                    FragmentParallelismInfo {
510                        distribution_type: distribution_type.into(),
511                        actor_count: count as usize,
512                        vnode_count: vnode_count as usize,
513                    },
514                )
515            })
516            .collect())
517    }
518
519    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
520        let inner = self.inner.read().await;
521        let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
522            .select_only()
523            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
524            .into_tuple()
525            .all(&inner.db)
526            .await?;
527        Ok(fragment_jobs.into_iter().collect())
528    }
529
530    pub async fn get_fragment_job_id(
531        &self,
532        fragment_ids: Vec<FragmentId>,
533    ) -> MetaResult<Vec<ObjectId>> {
534        let inner = self.inner.read().await;
535
536        let object_ids: Vec<ObjectId> = FragmentModel::find()
537            .select_only()
538            .column(fragment::Column::JobId)
539            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
540            .into_tuple()
541            .all(&inner.db)
542            .await?;
543
544        Ok(object_ids)
545    }
546
547    pub async fn get_fragment_desc_by_id(
548        &self,
549        fragment_id: FragmentId,
550    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
551        let inner = self.inner.read().await;
552
553        // Get the fragment description
554        let fragment_opt = FragmentModel::find()
555            .select_only()
556            .columns([
557                fragment::Column::FragmentId,
558                fragment::Column::JobId,
559                fragment::Column::FragmentTypeMask,
560                fragment::Column::DistributionType,
561                fragment::Column::StateTableIds,
562                fragment::Column::VnodeCount,
563                fragment::Column::StreamNode,
564            ])
565            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
566            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
567            .filter(fragment::Column::FragmentId.eq(fragment_id))
568            .group_by(fragment::Column::FragmentId)
569            .into_model::<FragmentDesc>()
570            .one(&inner.db)
571            .await?;
572
573        let Some(fragment) = fragment_opt else {
574            return Ok(None); // Fragment not found
575        };
576
577        // Get upstream fragments
578        let upstreams: Vec<FragmentId> = FragmentRelation::find()
579            .select_only()
580            .column(fragment_relation::Column::SourceFragmentId)
581            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
582            .into_tuple()
583            .all(&inner.db)
584            .await?;
585
586        Ok(Some((fragment, upstreams)))
587    }
588
589    pub async fn list_fragment_database_ids(
590        &self,
591        select_fragment_ids: Option<Vec<FragmentId>>,
592    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
593        let inner = self.inner.read().await;
594        let select = FragmentModel::find()
595            .select_only()
596            .column(fragment::Column::FragmentId)
597            .column(object::Column::DatabaseId)
598            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
599        let select = if let Some(select_fragment_ids) = select_fragment_ids {
600            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
601        } else {
602            select
603        };
604        Ok(select.into_tuple().all(&inner.db).await?)
605    }
606
607    pub async fn get_job_fragments_by_id(
608        &self,
609        job_id: ObjectId,
610    ) -> MetaResult<StreamJobFragments> {
611        let inner = self.inner.read().await;
612        let fragment_actors = FragmentModel::find()
613            .find_with_related(Actor)
614            .filter(fragment::Column::JobId.eq(job_id))
615            .all(&inner.db)
616            .await?;
617
618        let job_info = StreamingJob::find_by_id(job_id)
619            .one(&inner.db)
620            .await?
621            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
622
623        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
624            .await?
625            .remove(&job_id);
626
627        Self::compose_table_fragments(
628            job_id as _,
629            job_info.job_status.into(),
630            job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
631            fragment_actors,
632            job_info.parallelism.clone(),
633            job_info.max_parallelism as _,
634            job_definition,
635        )
636    }
637
638    pub async fn get_fragment_actor_dispatchers(
639        &self,
640        fragment_ids: Vec<FragmentId>,
641    ) -> MetaResult<FragmentActorDispatchers> {
642        let inner = self.inner.read().await;
643        get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
644    }
645
646    pub async fn get_fragment_downstream_relations(
647        &self,
648        fragment_ids: Vec<FragmentId>,
649    ) -> MetaResult<FragmentDownstreamRelation> {
650        let inner = self.inner.read().await;
651        let mut stream = FragmentRelation::find()
652            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
653            .stream(&inner.db)
654            .await?;
655        let mut relations = FragmentDownstreamRelation::new();
656        while let Some(relation) = stream.try_next().await? {
657            relations
658                .entry(relation.source_fragment_id as _)
659                .or_default()
660                .push(DownstreamFragmentRelation {
661                    downstream_fragment_id: relation.target_fragment_id as _,
662                    dispatcher_type: relation.dispatcher_type,
663                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
664                    output_mapping: PbDispatchOutputMapping {
665                        indices: relation.output_indices.into_u32_array(),
666                        types: relation
667                            .output_type_mapping
668                            .unwrap_or_default()
669                            .to_protobuf(),
670                    },
671                });
672        }
673        Ok(relations)
674    }
675
676    pub async fn get_job_fragment_backfill_scan_type(
677        &self,
678        job_id: ObjectId,
679    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
680        let inner = self.inner.read().await;
681        let fragments: Vec<_> = FragmentModel::find()
682            .filter(fragment::Column::JobId.eq(job_id))
683            .all(&inner.db)
684            .await?;
685
686        let mut result = HashMap::new();
687
688        for fragment::Model {
689            fragment_id,
690            stream_node,
691            ..
692        } in fragments
693        {
694            let stream_node = stream_node.to_protobuf();
695            visit_stream_node_body(&stream_node, |body| {
696                if let NodeBody::StreamScan(node) = body {
697                    match node.stream_scan_type() {
698                        StreamScanType::Unspecified => {}
699                        scan_type => {
700                            result.insert(fragment_id as crate::model::FragmentId, scan_type);
701                        }
702                    }
703                }
704            });
705        }
706
707        Ok(result)
708    }
709
710    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
711        let inner = self.inner.read().await;
712        let job_states = StreamingJob::find()
713            .select_only()
714            .column(streaming_job::Column::JobId)
715            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
716            .join(JoinType::InnerJoin, object::Relation::Database2.def())
717            .column(object::Column::ObjType)
718            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
719            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
720            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
721            .column_as(
722                Expr::if_null(
723                    Expr::col((table::Entity, table::Column::Name)),
724                    Expr::if_null(
725                        Expr::col((source::Entity, source::Column::Name)),
726                        Expr::if_null(
727                            Expr::col((sink::Entity, sink::Column::Name)),
728                            Expr::val("<unknown>"),
729                        ),
730                    ),
731                ),
732                "name",
733            )
734            .columns([
735                streaming_job::Column::JobStatus,
736                streaming_job::Column::Parallelism,
737                streaming_job::Column::MaxParallelism,
738            ])
739            .column_as(
740                Expr::if_null(
741                    Expr::col((
742                        streaming_job::Entity,
743                        streaming_job::Column::SpecificResourceGroup,
744                    )),
745                    Expr::col((database::Entity, database::Column::ResourceGroup)),
746                ),
747                "resource_group",
748            )
749            .column(object::Column::DatabaseId)
750            .column(object::Column::SchemaId)
751            .into_model()
752            .all(&inner.db)
753            .await?;
754        Ok(job_states)
755    }
756
757    pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
758        let inner = self.inner.read().await;
759        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
760            .select_only()
761            .column(streaming_job::Column::MaxParallelism)
762            .into_tuple()
763            .one(&inner.db)
764            .await?
765            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
766        Ok(max_parallelism as usize)
767    }
768
769    /// Get all actor ids in the target streaming jobs.
770    pub async fn get_job_actor_mapping(
771        &self,
772        job_ids: Vec<ObjectId>,
773    ) -> MetaResult<HashMap<ObjectId, Vec<ActorId>>> {
774        let inner = self.inner.read().await;
775        let job_actors: Vec<(ObjectId, ActorId)> = Actor::find()
776            .select_only()
777            .column(fragment::Column::JobId)
778            .column(actor::Column::ActorId)
779            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
780            .filter(fragment::Column::JobId.is_in(job_ids))
781            .into_tuple()
782            .all(&inner.db)
783            .await?;
784        Ok(job_actors.into_iter().into_group_map())
785    }
786
787    /// Try to get internal table ids of each streaming job, used by metrics collection.
788    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
789        if let Ok(inner) = self.inner.try_read()
790            && let Ok(job_state_tables) = FragmentModel::find()
791                .select_only()
792                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
793                .into_tuple::<(ObjectId, I32Array)>()
794                .all(&inner.db)
795                .await
796        {
797            let mut job_internal_table_ids = HashMap::new();
798            for (job_id, state_table_ids) in job_state_tables {
799                job_internal_table_ids
800                    .entry(job_id)
801                    .or_insert_with(Vec::new)
802                    .extend(state_table_ids.into_inner());
803            }
804            return Some(job_internal_table_ids.into_iter().collect());
805        }
806        None
807    }
808
809    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
810        let inner = self.inner.read().await;
811        let count = FragmentModel::find().count(&inner.db).await?;
812        Ok(count > 0)
813    }
814
815    pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
816        let inner = self.inner.read().await;
817        let actor_cnt: Vec<(WorkerId, i64)> = Actor::find()
818            .select_only()
819            .column(actor::Column::WorkerId)
820            .column_as(actor::Column::ActorId.count(), "count")
821            .group_by(actor::Column::WorkerId)
822            .into_tuple()
823            .all(&inner.db)
824            .await?;
825
826        Ok(actor_cnt
827            .into_iter()
828            .map(|(worker_id, count)| (worker_id, count as usize))
829            .collect())
830    }
831
832    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
833    pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
834        let inner = self.inner.read().await;
835        let jobs = StreamingJob::find().all(&inner.db).await?;
836
837        let mut job_definition = resolve_streaming_job_definition(
838            &inner.db,
839            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
840        )
841        .await?;
842
843        let mut table_fragments = BTreeMap::new();
844        for job in jobs {
845            let fragment_actors = FragmentModel::find()
846                .find_with_related(Actor)
847                .filter(fragment::Column::JobId.eq(job.job_id))
848                .all(&inner.db)
849                .await?;
850
851            table_fragments.insert(
852                job.job_id as ObjectId,
853                Self::compose_table_fragments(
854                    job.job_id as _,
855                    job.job_status.into(),
856                    job.timezone.map(|tz| PbStreamContext { timezone: tz }),
857                    fragment_actors,
858                    job.parallelism.clone(),
859                    job.max_parallelism as _,
860                    job_definition.remove(&job.job_id),
861                )?,
862            );
863        }
864
865        Ok(table_fragments)
866    }
867
868    /// Returns pairs of (job id, actor ids), where actors belong to CDC table backfill fragment of the job.
869    pub async fn cdc_table_backfill_actor_ids(&self) -> MetaResult<HashMap<u32, HashSet<u32>>> {
870        let inner = self.inner.read().await;
871        let mut job_id_actor_ids = HashMap::default();
872        let stream_cdc_scan_flag = FragmentTypeFlag::StreamCdcScan as i32;
873        let fragment_type_mask = stream_cdc_scan_flag;
874        let fragment_actors: Vec<(fragment::Model, Vec<actor::Model>)> = FragmentModel::find()
875            .find_with_related(Actor)
876            .filter(fragment::Column::FragmentTypeMask.eq(fragment_type_mask))
877            .all(&inner.db)
878            .await?;
879        for (fragment, actors) in fragment_actors {
880            let e: &mut HashSet<u32> = job_id_actor_ids.entry(fragment.job_id as _).or_default();
881            e.extend(actors.iter().map(|a| a.actor_id as u32));
882        }
883        Ok(job_id_actor_ids)
884    }
885
886    pub async fn upstream_fragments(
887        &self,
888        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
889    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
890        let inner = self.inner.read().await;
891        let mut stream = FragmentRelation::find()
892            .select_only()
893            .columns([
894                fragment_relation::Column::SourceFragmentId,
895                fragment_relation::Column::TargetFragmentId,
896            ])
897            .filter(
898                fragment_relation::Column::TargetFragmentId
899                    .is_in(fragment_ids.map(|id| id as FragmentId)),
900            )
901            .into_tuple::<(FragmentId, FragmentId)>()
902            .stream(&inner.db)
903            .await?;
904        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
905        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
906            upstream_fragments
907                .entry(downstream_fragment_id as crate::model::FragmentId)
908                .or_default()
909                .insert(upstream_fragment_id as crate::model::FragmentId);
910        }
911        Ok(upstream_fragments)
912    }
913
914    pub async fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
915        let inner = self.inner.read().await;
916        let actor_locations: Vec<PartialActorLocation> =
917            Actor::find().into_partial_model().all(&inner.db).await?;
918        Ok(actor_locations)
919    }
920
921    pub async fn list_actor_info(
922        &self,
923    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
924        let inner = self.inner.read().await;
925        let actor_locations: Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)> =
926            Actor::find()
927                .join(JoinType::LeftJoin, actor::Relation::Fragment.def())
928                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
929                .select_only()
930                .columns([actor::Column::ActorId, actor::Column::FragmentId])
931                .column_as(object::Column::Oid, "job_id")
932                .column_as(object::Column::SchemaId, "schema_id")
933                .column_as(object::Column::ObjType, "type")
934                .into_tuple()
935                .all(&inner.db)
936                .await?;
937        Ok(actor_locations)
938    }
939
940    pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
941        let inner = self.inner.read().await;
942
943        let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
944            .select_only()
945            .filter(actor::Column::Splits.is_not_null())
946            .columns([actor::Column::ActorId, actor::Column::FragmentId])
947            .into_tuple()
948            .all(&inner.db)
949            .await?;
950
951        Ok(source_actors)
952    }
953
954    pub async fn list_creating_fragment_descs(
955        &self,
956    ) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
957        let inner = self.inner.read().await;
958        let mut result = Vec::new();
959        let fragments = FragmentModel::find()
960            .select_only()
961            .columns([
962                fragment::Column::FragmentId,
963                fragment::Column::JobId,
964                fragment::Column::FragmentTypeMask,
965                fragment::Column::DistributionType,
966                fragment::Column::StateTableIds,
967                fragment::Column::VnodeCount,
968                fragment::Column::StreamNode,
969            ])
970            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
971            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
972            .join(JoinType::LeftJoin, fragment::Relation::Object.def())
973            .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
974            .filter(
975                streaming_job::Column::JobStatus
976                    .eq(JobStatus::Initial)
977                    .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
978            )
979            .group_by(fragment::Column::FragmentId)
980            .into_model::<FragmentDesc>()
981            .all(&inner.db)
982            .await?;
983        for fragment in fragments {
984            let upstreams: Vec<FragmentId> = FragmentRelation::find()
985                .select_only()
986                .column(fragment_relation::Column::SourceFragmentId)
987                .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
988                .into_tuple()
989                .all(&inner.db)
990                .await?;
991            result.push((fragment, upstreams));
992        }
993        Ok(result)
994    }
995
996    pub async fn list_fragment_descs(&self) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
997        let inner = self.inner.read().await;
998        let mut result = Vec::new();
999        let fragments = FragmentModel::find()
1000            .select_only()
1001            .columns([
1002                fragment::Column::FragmentId,
1003                fragment::Column::JobId,
1004                fragment::Column::FragmentTypeMask,
1005                fragment::Column::DistributionType,
1006                fragment::Column::StateTableIds,
1007                fragment::Column::VnodeCount,
1008                fragment::Column::StreamNode,
1009            ])
1010            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
1011            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
1012            .group_by(fragment::Column::FragmentId)
1013            .into_model::<FragmentDesc>()
1014            .all(&inner.db)
1015            .await?;
1016        for fragment in fragments {
1017            let upstreams: Vec<FragmentId> = FragmentRelation::find()
1018                .select_only()
1019                .column(fragment_relation::Column::SourceFragmentId)
1020                .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
1021                .into_tuple()
1022                .all(&inner.db)
1023                .await?;
1024            result.push((fragment, upstreams));
1025        }
1026        Ok(result)
1027    }
1028
1029    pub async fn list_sink_actor_mapping(
1030        &self,
1031    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1032        let inner = self.inner.read().await;
1033        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1034            .select_only()
1035            .columns([sink::Column::SinkId, sink::Column::Name])
1036            .into_tuple()
1037            .all(&inner.db)
1038            .await?;
1039        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1040        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1041
1042        let actor_with_type: Vec<(ActorId, SinkId)> = Actor::find()
1043            .select_only()
1044            .column(actor::Column::ActorId)
1045            .column(fragment::Column::JobId)
1046            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1047            .filter(
1048                fragment::Column::JobId
1049                    .is_in(sink_ids)
1050                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
1051            )
1052            .into_tuple()
1053            .all(&inner.db)
1054            .await?;
1055
1056        let mut sink_actor_mapping = HashMap::new();
1057        for (actor_id, sink_id) in actor_with_type {
1058            sink_actor_mapping
1059                .entry(sink_id)
1060                .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1061                .1
1062                .push(actor_id);
1063        }
1064
1065        Ok(sink_actor_mapping)
1066    }
1067
1068    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1069        let inner = self.inner.read().await;
1070        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1071            .select_only()
1072            .columns([
1073                fragment::Column::FragmentId,
1074                fragment::Column::JobId,
1075                fragment::Column::StateTableIds,
1076            ])
1077            .into_partial_model()
1078            .all(&inner.db)
1079            .await?;
1080        Ok(fragment_state_tables)
1081    }
1082
1083    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1084    /// collected
1085    pub async fn load_all_actors(
1086        &self,
1087        database_id: Option<DatabaseId>,
1088    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
1089    {
1090        let inner = self.inner.read().await;
1091        let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
1092        let filter_condition = if let Some(database_id) = database_id {
1093            filter_condition.and(object::Column::DatabaseId.eq(database_id))
1094        } else {
1095            filter_condition
1096        };
1097        #[expect(clippy::type_complexity)]
1098        let mut actor_info_stream: BoxStream<
1099            '_,
1100            Result<
1101                (
1102                    ActorId,
1103                    WorkerId,
1104                    Option<VnodeBitmap>,
1105                    FragmentId,
1106                    StreamNode,
1107                    I32Array,
1108                    DistributionType,
1109                    DatabaseId,
1110                    ObjectId,
1111                ),
1112                _,
1113            >,
1114        > = Actor::find()
1115            .select_only()
1116            .column(actor::Column::ActorId)
1117            .column(actor::Column::WorkerId)
1118            .column(actor::Column::VnodeBitmap)
1119            .column(fragment::Column::FragmentId)
1120            .column(fragment::Column::StreamNode)
1121            .column(fragment::Column::StateTableIds)
1122            .column(fragment::Column::DistributionType)
1123            .column(object::Column::DatabaseId)
1124            .column(object::Column::Oid)
1125            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1126            .join(JoinType::InnerJoin, fragment::Relation::Object.def())
1127            .filter(filter_condition)
1128            .into_tuple()
1129            .stream(&inner.db)
1130            .await?;
1131
1132        let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1133            HashMap::new();
1134
1135        while let Some((
1136            actor_id,
1137            worker_id,
1138            vnode_bitmap,
1139            fragment_id,
1140            node,
1141            state_table_ids,
1142            distribution_type,
1143            database_id,
1144            job_id,
1145        )) = actor_info_stream.try_next().await?
1146        {
1147            let fragment_infos = database_fragment_infos
1148                .entry(database_id)
1149                .or_default()
1150                .entry(job_id)
1151                .or_default();
1152            let state_table_ids = state_table_ids.into_inner();
1153            let state_table_ids = state_table_ids
1154                .into_iter()
1155                .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1156                .collect();
1157            let actor_info = InflightActorInfo {
1158                worker_id,
1159                vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1160            };
1161            match fragment_infos.entry(fragment_id) {
1162                Entry::Occupied(mut entry) => {
1163                    let info: &mut InflightFragmentInfo = entry.get_mut();
1164                    assert_eq!(info.state_table_ids, state_table_ids);
1165                    assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1166                }
1167                Entry::Vacant(entry) => {
1168                    entry.insert(InflightFragmentInfo {
1169                        fragment_id: fragment_id as _,
1170                        distribution_type,
1171                        nodes: node.to_protobuf(),
1172                        actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1173                        state_table_ids,
1174                    });
1175                }
1176            }
1177        }
1178
1179        debug!(?database_fragment_infos, "reload all actors");
1180
1181        Ok(database_fragment_infos)
1182    }
1183
1184    pub async fn migrate_actors(
1185        &self,
1186        plan: HashMap<WorkerSlotId, WorkerSlotId>,
1187    ) -> MetaResult<()> {
1188        let inner = self.inner.read().await;
1189        let txn = inner.db.begin().await?;
1190
1191        let actors: Vec<(
1192            FragmentId,
1193            DistributionType,
1194            ActorId,
1195            Option<VnodeBitmap>,
1196            WorkerId,
1197            ActorStatus,
1198        )> = Actor::find()
1199            .select_only()
1200            .columns([
1201                fragment::Column::FragmentId,
1202                fragment::Column::DistributionType,
1203            ])
1204            .columns([
1205                actor::Column::ActorId,
1206                actor::Column::VnodeBitmap,
1207                actor::Column::WorkerId,
1208                actor::Column::Status,
1209            ])
1210            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1211            .into_tuple()
1212            .all(&txn)
1213            .await?;
1214
1215        let mut actor_locations = HashMap::new();
1216
1217        for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1218            if *status != ActorStatus::Running {
1219                tracing::warn!(
1220                    "skipping actor {} in fragment {} with status {:?}",
1221                    actor_id,
1222                    fragment_id,
1223                    status
1224                );
1225                continue;
1226            }
1227
1228            actor_locations
1229                .entry(*worker_id)
1230                .or_insert(HashMap::new())
1231                .entry(*fragment_id)
1232                .or_insert(BTreeSet::new())
1233                .insert(*actor_id);
1234        }
1235
1236        let expired_or_changed_workers: HashSet<_> =
1237            plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1238
1239        let mut actor_migration_plan = HashMap::new();
1240        for (worker, fragment) in actor_locations {
1241            if expired_or_changed_workers.contains(&worker) {
1242                for (fragment_id, actors) in fragment {
1243                    debug!(
1244                        "worker {} expired or changed, migrating fragment {}",
1245                        worker, fragment_id
1246                    );
1247                    let worker_slot_to_actor: HashMap<_, _> = actors
1248                        .iter()
1249                        .enumerate()
1250                        .map(|(idx, actor_id)| {
1251                            (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1252                        })
1253                        .collect();
1254
1255                    for (worker_slot, actor) in worker_slot_to_actor {
1256                        if let Some(target) = plan.get(&worker_slot) {
1257                            actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1258                        }
1259                    }
1260                }
1261            }
1262        }
1263
1264        for (actor, worker) in actor_migration_plan {
1265            Actor::update_many()
1266                .col_expr(
1267                    actor::Column::WorkerId,
1268                    Expr::value(Value::Int(Some(worker))),
1269                )
1270                .filter(actor::Column::ActorId.eq(actor))
1271                .exec(&txn)
1272                .await?;
1273        }
1274
1275        txn.commit().await?;
1276
1277        Ok(())
1278    }
1279
1280    pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1281        let inner = self.inner.read().await;
1282
1283        let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1284            .select_only()
1285            .columns([fragment::Column::FragmentId])
1286            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1287            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1288            .into_tuple()
1289            .all(&inner.db)
1290            .await?;
1291
1292        let mut actor_locations = HashMap::new();
1293
1294        for (fragment_id, _, worker_id) in actors {
1295            *actor_locations
1296                .entry(worker_id)
1297                .or_insert(HashMap::new())
1298                .entry(fragment_id)
1299                .or_insert(0_usize) += 1;
1300        }
1301
1302        let mut result = HashSet::new();
1303        for (worker_id, mapping) in actor_locations {
1304            let max_fragment_len = mapping.values().max().unwrap();
1305
1306            result
1307                .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1308        }
1309
1310        Ok(result)
1311    }
1312
1313    pub async fn all_node_actors(
1314        &self,
1315        include_inactive: bool,
1316    ) -> MetaResult<HashMap<WorkerId, Vec<StreamActor>>> {
1317        let inner = self.inner.read().await;
1318        let fragment_actors = if include_inactive {
1319            FragmentModel::find()
1320                .find_with_related(Actor)
1321                .all(&inner.db)
1322                .await?
1323        } else {
1324            FragmentModel::find()
1325                .find_with_related(Actor)
1326                .filter(actor::Column::Status.eq(ActorStatus::Running))
1327                .all(&inner.db)
1328                .await?
1329        };
1330
1331        let job_definitions = resolve_streaming_job_definition(
1332            &inner.db,
1333            &HashSet::from_iter(fragment_actors.iter().map(|(fragment, _)| fragment.job_id)),
1334        )
1335        .await?;
1336
1337        let mut node_actors = HashMap::new();
1338        for (fragment, actors) in fragment_actors {
1339            let job_id = fragment.job_id;
1340            let (table_fragments, actor_status, _) = Self::compose_fragment(
1341                fragment,
1342                actors,
1343                job_definitions.get(&(job_id as _)).cloned(),
1344            )?;
1345            for actor in table_fragments.actors {
1346                let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
1347                node_actors
1348                    .entry(node_id)
1349                    .or_insert_with(Vec::new)
1350                    .push(actor);
1351            }
1352        }
1353
1354        Ok(node_actors)
1355    }
1356
1357    pub async fn get_worker_actor_ids(
1358        &self,
1359        job_ids: Vec<ObjectId>,
1360    ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
1361        let inner = self.inner.read().await;
1362        let actor_workers: Vec<(ActorId, WorkerId)> = Actor::find()
1363            .select_only()
1364            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1365            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1366            .filter(fragment::Column::JobId.is_in(job_ids))
1367            .into_tuple()
1368            .all(&inner.db)
1369            .await?;
1370
1371        let mut worker_actors = BTreeMap::new();
1372        for (actor_id, worker_id) in actor_workers {
1373            worker_actors
1374                .entry(worker_id)
1375                .or_insert_with(Vec::new)
1376                .push(actor_id);
1377        }
1378
1379        Ok(worker_actors)
1380    }
1381
1382    pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1383        let inner = self.inner.read().await;
1384        let txn = inner.db.begin().await?;
1385        for assignments in split_assignment.values() {
1386            for (actor_id, splits) in assignments {
1387                let actor_splits = splits.iter().map(Into::into).collect_vec();
1388                Actor::update(actor::ActiveModel {
1389                    actor_id: Set(*actor_id as _),
1390                    splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1391                        splits: actor_splits,
1392                    }))),
1393                    ..Default::default()
1394                })
1395                .exec(&txn)
1396                .await
1397                .map_err(|err| {
1398                    if err == DbErr::RecordNotUpdated {
1399                        MetaError::catalog_id_not_found("actor_id", actor_id)
1400                    } else {
1401                        err.into()
1402                    }
1403                })?;
1404            }
1405        }
1406        txn.commit().await?;
1407
1408        Ok(())
1409    }
1410
1411    #[await_tree::instrument]
1412    pub async fn fill_snapshot_backfill_epoch(
1413        &self,
1414        fragment_ids: impl Iterator<Item = FragmentId>,
1415        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1416        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1417    ) -> MetaResult<()> {
1418        let inner = self.inner.write().await;
1419        let txn = inner.db.begin().await?;
1420        for fragment_id in fragment_ids {
1421            let fragment = FragmentModel::find_by_id(fragment_id)
1422                .one(&txn)
1423                .await?
1424                .context(format!("fragment {} not found", fragment_id))?;
1425            let mut node = fragment.stream_node.to_protobuf();
1426            if crate::stream::fill_snapshot_backfill_epoch(
1427                &mut node,
1428                snapshot_backfill_info,
1429                cross_db_snapshot_backfill_info,
1430            )? {
1431                let node = StreamNode::from(&node);
1432                FragmentModel::update(fragment::ActiveModel {
1433                    fragment_id: Set(fragment_id),
1434                    stream_node: Set(node),
1435                    ..Default::default()
1436                })
1437                .exec(&txn)
1438                .await?;
1439            }
1440        }
1441        txn.commit().await?;
1442        Ok(())
1443    }
1444
1445    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1446    pub async fn get_running_actors_of_fragment(
1447        &self,
1448        fragment_id: FragmentId,
1449    ) -> MetaResult<Vec<ActorId>> {
1450        let inner = self.inner.read().await;
1451        let actors: Vec<ActorId> = Actor::find()
1452            .select_only()
1453            .column(actor::Column::ActorId)
1454            .filter(actor::Column::FragmentId.eq(fragment_id))
1455            .filter(actor::Column::Status.eq(ActorStatus::Running))
1456            .into_tuple()
1457            .all(&inner.db)
1458            .await?;
1459        Ok(actors)
1460    }
1461
1462    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1463    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1464    pub async fn get_running_actors_for_source_backfill(
1465        &self,
1466        source_backfill_fragment_id: FragmentId,
1467        source_fragment_id: FragmentId,
1468    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1469        let inner = self.inner.read().await;
1470        let txn = inner.db.begin().await?;
1471
1472        let fragment_relation: DispatcherType = FragmentRelation::find()
1473            .select_only()
1474            .column(fragment_relation::Column::DispatcherType)
1475            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1476            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1477            .into_tuple()
1478            .one(&txn)
1479            .await?
1480            .ok_or_else(|| {
1481                anyhow!(
1482                    "no fragment connection from source fragment {} to source backfill fragment {}",
1483                    source_fragment_id,
1484                    source_backfill_fragment_id
1485                )
1486            })?;
1487
1488        if fragment_relation != DispatcherType::NoShuffle {
1489            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1490        }
1491
1492        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1493            let result: MetaResult<DistributionType> = try {
1494                FragmentModel::find_by_id(fragment_id)
1495                    .select_only()
1496                    .column(fragment::Column::DistributionType)
1497                    .into_tuple()
1498                    .one(txn)
1499                    .await?
1500                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1501            };
1502            result
1503        };
1504
1505        let source_backfill_distribution_type =
1506            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1507        let source_distribution_type =
1508            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1509
1510        let load_fragment_actor_distribution = |txn, fragment_id: FragmentId| async move {
1511            Actor::find()
1512                .select_only()
1513                .column(actor::Column::ActorId)
1514                .column(actor::Column::VnodeBitmap)
1515                .filter(actor::Column::FragmentId.eq(fragment_id))
1516                .into_tuple()
1517                .stream(txn)
1518                .await?
1519                .map(|result| {
1520                    result.map(|(actor_id, vnode): (ActorId, Option<VnodeBitmap>)| {
1521                        (
1522                            actor_id as _,
1523                            vnode.map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1524                        )
1525                    })
1526                })
1527                .try_collect()
1528                .await
1529        };
1530
1531        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1532            load_fragment_actor_distribution(&txn, source_backfill_fragment_id).await?;
1533
1534        let source_actors = load_fragment_actor_distribution(&txn, source_fragment_id).await?;
1535
1536        Ok(resolve_no_shuffle_actor_dispatcher(
1537            source_distribution_type,
1538            &source_actors,
1539            source_backfill_distribution_type,
1540            &source_backfill_actors,
1541        )
1542        .into_iter()
1543        .map(|(source_actor, source_backfill_actor)| {
1544            (source_backfill_actor as _, source_actor as _)
1545        })
1546        .collect())
1547    }
1548
1549    pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
1550        let inner = self.inner.read().await;
1551        let actors: Vec<ActorId> = Actor::find()
1552            .select_only()
1553            .column(actor::Column::ActorId)
1554            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1555            .filter(fragment::Column::JobId.is_in(job_ids))
1556            .into_tuple()
1557            .all(&inner.db)
1558            .await?;
1559        Ok(actors)
1560    }
1561
1562    /// Get and filter the "**root**" fragments of the specified jobs.
1563    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1564    ///
1565    /// Root fragment connects to downstream jobs.
1566    ///
1567    /// ## What can be the root fragment
1568    /// - For sink, it should have one `Sink` fragment.
1569    /// - For MV, it should have one `MView` fragment.
1570    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1571    /// - For source, it should have one `Source` fragment.
1572    ///
1573    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1574    pub async fn get_root_fragments(
1575        &self,
1576        job_ids: Vec<ObjectId>,
1577    ) -> MetaResult<(HashMap<ObjectId, Fragment>, Vec<(ActorId, WorkerId)>)> {
1578        let inner = self.inner.read().await;
1579
1580        let job_definitions = resolve_streaming_job_definition(
1581            &inner.db,
1582            &HashSet::from_iter(job_ids.iter().copied()),
1583        )
1584        .await?;
1585
1586        let all_fragments = FragmentModel::find()
1587            .filter(fragment::Column::JobId.is_in(job_ids))
1588            .all(&inner.db)
1589            .await?;
1590        // job_id -> fragment
1591        let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1592        for fragment in all_fragments {
1593            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1594            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1595                _ = root_fragments.insert(fragment.job_id, fragment);
1596            } else if mask.contains(FragmentTypeFlag::Source) {
1597                // look for Source fragment only if there's no MView fragment
1598                // (notice try_insert here vs insert above)
1599                _ = root_fragments.try_insert(fragment.job_id, fragment);
1600            }
1601        }
1602
1603        let mut root_fragments_pb = HashMap::new();
1604        for (_, fragment) in root_fragments {
1605            let actors = fragment.find_related(Actor).all(&inner.db).await?;
1606
1607            let job_id = fragment.job_id;
1608            root_fragments_pb.insert(
1609                fragment.job_id,
1610                Self::compose_fragment(
1611                    fragment,
1612                    actors,
1613                    job_definitions.get(&(job_id as _)).cloned(),
1614                )?
1615                .0,
1616            );
1617        }
1618
1619        let actors: Vec<(ActorId, WorkerId)> = Actor::find()
1620            .select_only()
1621            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1622            .into_tuple()
1623            .all(&inner.db)
1624            .await?;
1625
1626        Ok((root_fragments_pb, actors))
1627    }
1628
1629    pub async fn get_root_fragment(
1630        &self,
1631        job_id: ObjectId,
1632    ) -> MetaResult<(Fragment, Vec<(ActorId, WorkerId)>)> {
1633        let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1634        let root_fragment = root_fragments
1635            .remove(&job_id)
1636            .context(format!("root fragment for job {} not found", job_id))?;
1637        Ok((root_fragment, actors))
1638    }
1639
1640    /// Get the downstream fragments connected to the specified job.
1641    pub async fn get_downstream_fragments(
1642        &self,
1643        job_id: ObjectId,
1644    ) -> MetaResult<(Vec<(PbDispatcherType, Fragment)>, Vec<(ActorId, WorkerId)>)> {
1645        let (root_fragment, actors) = self.get_root_fragment(job_id).await?;
1646
1647        let inner = self.inner.read().await;
1648        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1649            .filter(
1650                fragment_relation::Column::SourceFragmentId
1651                    .eq(root_fragment.fragment_id as FragmentId),
1652            )
1653            .all(&inner.db)
1654            .await?;
1655        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
1656            .await?
1657            .remove(&job_id);
1658
1659        let mut downstream_fragments = vec![];
1660        for fragment_relation::Model {
1661            target_fragment_id: fragment_id,
1662            dispatcher_type,
1663            ..
1664        } in downstream_fragment_relations
1665        {
1666            let mut fragment_actors = FragmentModel::find_by_id(fragment_id)
1667                .find_with_related(Actor)
1668                .all(&inner.db)
1669                .await?;
1670            if fragment_actors.is_empty() {
1671                bail!("No fragment found for fragment id {}", fragment_id);
1672            }
1673            assert_eq!(fragment_actors.len(), 1);
1674            let (fragment, actors) = fragment_actors.pop().unwrap();
1675            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1676            let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0;
1677            downstream_fragments.push((dispatch_type, fragment));
1678        }
1679
1680        Ok((downstream_fragments, actors))
1681    }
1682
1683    pub async fn load_source_fragment_ids(
1684        &self,
1685    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1686        let inner = self.inner.read().await;
1687        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1688            .select_only()
1689            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1690            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1691            .into_tuple()
1692            .all(&inner.db)
1693            .await?;
1694
1695        let mut source_fragment_ids = HashMap::new();
1696        for (fragment_id, stream_node) in fragments {
1697            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1698                source_fragment_ids
1699                    .entry(source_id as SourceId)
1700                    .or_insert_with(BTreeSet::new)
1701                    .insert(fragment_id);
1702            }
1703        }
1704        Ok(source_fragment_ids)
1705    }
1706
1707    pub async fn load_backfill_fragment_ids(
1708        &self,
1709    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1710        let inner = self.inner.read().await;
1711        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1712            .select_only()
1713            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1714            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1715            .into_tuple()
1716            .all(&inner.db)
1717            .await?;
1718
1719        let mut source_fragment_ids = HashMap::new();
1720        for (fragment_id, stream_node) in fragments {
1721            if let Some((source_id, upstream_source_fragment_id)) =
1722                stream_node.to_protobuf().find_source_backfill()
1723            {
1724                source_fragment_ids
1725                    .entry(source_id as SourceId)
1726                    .or_insert_with(BTreeSet::new)
1727                    .insert((fragment_id, upstream_source_fragment_id));
1728            }
1729        }
1730        Ok(source_fragment_ids)
1731    }
1732
1733    pub async fn load_actor_splits(&self) -> MetaResult<HashMap<ActorId, ConnectorSplits>> {
1734        let inner = self.inner.read().await;
1735        let splits: Vec<(ActorId, ConnectorSplits)> = Actor::find()
1736            .select_only()
1737            .columns([actor::Column::ActorId, actor::Column::Splits])
1738            .filter(actor::Column::Splits.is_not_null())
1739            .into_tuple()
1740            .all(&inner.db)
1741            .await?;
1742        Ok(splits.into_iter().collect())
1743    }
1744
1745    /// Get the actor count of `Materialize` or `Sink` fragment of the specified table.
1746    pub async fn get_actual_job_fragment_parallelism(
1747        &self,
1748        job_id: ObjectId,
1749    ) -> MetaResult<Option<usize>> {
1750        let inner = self.inner.read().await;
1751        let fragments: Vec<(FragmentId, i64)> = FragmentModel::find()
1752            .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1753            .select_only()
1754            .columns([fragment::Column::FragmentId])
1755            .column_as(actor::Column::ActorId.count(), "count")
1756            .filter(
1757                fragment::Column::JobId
1758                    .eq(job_id)
1759                    .and(FragmentTypeMask::intersects_any([
1760                        FragmentTypeFlag::Mview,
1761                        FragmentTypeFlag::Sink,
1762                    ])),
1763            )
1764            .group_by(fragment::Column::FragmentId)
1765            .into_tuple()
1766            .all(&inner.db)
1767            .await?;
1768
1769        Ok(fragments
1770            .into_iter()
1771            .at_most_one()
1772            .ok()
1773            .flatten()
1774            .map(|(_, count)| count as usize))
1775    }
1776
1777    pub async fn get_all_upstream_sink_infos(
1778        &self,
1779        target_table: &PbTable,
1780        target_fragment_id: FragmentId,
1781    ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1782        let incoming_sinks = self.get_table_incoming_sinks(target_table.id as _).await?;
1783
1784        let inner = self.inner.read().await;
1785        let txn = inner.db.begin().await?;
1786
1787        let sink_ids = incoming_sinks.iter().map(|s| s.id as SinkId).collect_vec();
1788        let sink_fragment_ids = get_sink_fragment_by_ids(&txn, sink_ids).await?;
1789
1790        let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1791        for pb_sink in &incoming_sinks {
1792            let sink_fragment_id =
1793                sink_fragment_ids
1794                    .get(&(pb_sink.id as _))
1795                    .cloned()
1796                    .ok_or(anyhow::anyhow!(
1797                        "sink fragment not found for sink id {}",
1798                        pb_sink.id
1799                    ))?;
1800            let upstream_info = build_upstream_sink_info(
1801                pb_sink,
1802                sink_fragment_id,
1803                target_table,
1804                target_fragment_id,
1805            )?;
1806            upstream_sink_infos.push(upstream_info);
1807        }
1808
1809        Ok(upstream_sink_infos)
1810    }
1811
1812    pub async fn get_mview_fragment_by_id(&self, table_id: TableId) -> MetaResult<FragmentId> {
1813        let inner = self.inner.read().await;
1814        let txn = inner.db.begin().await?;
1815
1816        let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1817            .select_only()
1818            .column(fragment::Column::FragmentId)
1819            .filter(
1820                fragment::Column::JobId
1821                    .eq(table_id)
1822                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1823            )
1824            .into_tuple()
1825            .all(&txn)
1826            .await?;
1827
1828        if mview_fragment.len() != 1 {
1829            return Err(anyhow::anyhow!(
1830                "expected exactly one mview fragment for table {}, found {}",
1831                table_id,
1832                mview_fragment.len()
1833            )
1834            .into());
1835        }
1836
1837        Ok(mview_fragment.into_iter().next().unwrap())
1838    }
1839
1840    pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1841        let inner = self.inner.read().await;
1842        let txn = inner.db.begin().await?;
1843        has_table_been_migrated(&txn, table_id).await
1844    }
1845
1846    pub async fn update_source_splits(
1847        &self,
1848        source_splits: &HashMap<SourceId, Vec<SplitImpl>>,
1849    ) -> MetaResult<()> {
1850        let inner = self.inner.read().await;
1851        let txn = inner.db.begin().await?;
1852
1853        let models: Vec<source_splits::ActiveModel> = source_splits
1854            .iter()
1855            .map(|(source_id, splits)| source_splits::ActiveModel {
1856                source_id: Set(*source_id as _),
1857                splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1858                    splits: splits.iter().map(Into::into).collect_vec(),
1859                }))),
1860            })
1861            .collect();
1862
1863        SourceSplits::insert_many(models)
1864            .on_conflict(
1865                OnConflict::column(source_splits::Column::SourceId)
1866                    .update_column(source_splits::Column::Splits)
1867                    .to_owned(),
1868            )
1869            .exec(&txn)
1870            .await?;
1871
1872        txn.commit().await?;
1873
1874        Ok(())
1875    }
1876}
1877
1878#[cfg(test)]
1879mod tests {
1880    use std::collections::{BTreeMap, HashMap, HashSet};
1881
1882    use itertools::Itertools;
1883    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1884    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1885    use risingwave_common::util::iter_util::ZipEqDebug;
1886    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1887    use risingwave_meta_model::actor::ActorStatus;
1888    use risingwave_meta_model::fragment::DistributionType;
1889    use risingwave_meta_model::{
1890        ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1891        VnodeBitmap, actor, fragment,
1892    };
1893    use risingwave_pb::common::PbActorLocation;
1894    use risingwave_pb::meta::table_fragments::PbActorStatus;
1895    use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1896    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1897    use risingwave_pb::plan_common::PbExprContext;
1898    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1899    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1900    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1901
1902    use crate::MetaResult;
1903    use crate::controller::catalog::CatalogController;
1904    use crate::model::{Fragment, StreamActor};
1905
1906    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1907
1908    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1909
1910    const TEST_FRAGMENT_ID: FragmentId = 1;
1911
1912    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1913
1914    const TEST_JOB_ID: ObjectId = 1;
1915
1916    const TEST_STATE_TABLE_ID: TableId = 1000;
1917
1918    fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1919        let mut upstream_actor_ids = BTreeMap::new();
1920        upstream_actor_ids.insert(
1921            TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1922            HashSet::from_iter([(actor_id + 100)]),
1923        );
1924        upstream_actor_ids.insert(
1925            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1926            HashSet::from_iter([(actor_id + 200)]),
1927        );
1928        upstream_actor_ids
1929    }
1930
1931    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1932        let mut input = vec![];
1933        for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1934            input.push(PbStreamNode {
1935                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1936                    upstream_fragment_id: *upstream_fragment_id as _,
1937                    ..Default::default()
1938                }))),
1939                ..Default::default()
1940            });
1941        }
1942
1943        PbStreamNode {
1944            input,
1945            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1946            ..Default::default()
1947        }
1948    }
1949
1950    #[tokio::test]
1951    async fn test_extract_fragment() -> MetaResult<()> {
1952        let actor_count = 3u32;
1953        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1954            .map(|actor_id| {
1955                (
1956                    actor_id as _,
1957                    generate_upstream_actor_ids_for_actor(actor_id),
1958                )
1959            })
1960            .collect();
1961
1962        let actor_bitmaps = ActorMapping::new_uniform(
1963            (0..actor_count).map(|i| i as _),
1964            VirtualNode::COUNT_FOR_TEST,
1965        )
1966        .to_bitmaps();
1967
1968        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1969
1970        let pb_actors = (0..actor_count)
1971            .map(|actor_id| StreamActor {
1972                actor_id: actor_id as _,
1973                fragment_id: TEST_FRAGMENT_ID as _,
1974                vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1975                mview_definition: "".to_owned(),
1976                expr_context: Some(PbExprContext {
1977                    time_zone: String::from("America/New_York"),
1978                    strict_mode: false,
1979                }),
1980            })
1981            .collect_vec();
1982
1983        let pb_fragment = Fragment {
1984            fragment_id: TEST_FRAGMENT_ID as _,
1985            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1986            distribution_type: PbFragmentDistributionType::Hash as _,
1987            actors: pb_actors.clone(),
1988            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1989            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1990            nodes: stream_node.clone(),
1991        };
1992
1993        let pb_actor_status = (0..actor_count)
1994            .map(|actor_id| {
1995                (
1996                    actor_id,
1997                    PbActorStatus {
1998                        location: PbActorLocation::from_worker(0),
1999                        state: PbActorState::Running as _,
2000                    },
2001                )
2002            })
2003            .collect();
2004
2005        let pb_actor_splits = Default::default();
2006
2007        let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
2008            TEST_JOB_ID,
2009            &pb_fragment,
2010            &pb_actor_status,
2011            &pb_actor_splits,
2012        )?;
2013
2014        check_fragment(fragment, pb_fragment);
2015        check_actors(
2016            actors,
2017            &upstream_actor_ids,
2018            pb_actors,
2019            Default::default(),
2020            &stream_node,
2021        );
2022
2023        Ok(())
2024    }
2025
2026    #[tokio::test]
2027    async fn test_compose_fragment() -> MetaResult<()> {
2028        let actor_count = 3u32;
2029
2030        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2031            .map(|actor_id| {
2032                (
2033                    actor_id as _,
2034                    generate_upstream_actor_ids_for_actor(actor_id),
2035                )
2036            })
2037            .collect();
2038
2039        let mut actor_bitmaps = ActorMapping::new_uniform(
2040            (0..actor_count).map(|i| i as _),
2041            VirtualNode::COUNT_FOR_TEST,
2042        )
2043        .to_bitmaps();
2044
2045        let actors = (0..actor_count)
2046            .map(|actor_id| {
2047                let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
2048                    splits: vec![PbConnectorSplit {
2049                        split_type: "dummy".to_owned(),
2050                        ..Default::default()
2051                    }],
2052                }));
2053
2054                #[expect(deprecated)]
2055                actor::Model {
2056                    actor_id: actor_id as ActorId,
2057                    fragment_id: TEST_FRAGMENT_ID,
2058                    status: ActorStatus::Running,
2059                    splits: actor_splits,
2060                    worker_id: 0,
2061                    upstream_actor_ids: Default::default(),
2062                    vnode_bitmap: actor_bitmaps
2063                        .remove(&actor_id)
2064                        .map(|bitmap| bitmap.to_protobuf())
2065                        .as_ref()
2066                        .map(VnodeBitmap::from),
2067                    expr_context: ExprContext::from(&PbExprContext {
2068                        time_zone: String::from("America/New_York"),
2069                        strict_mode: false,
2070                    }),
2071                }
2072            })
2073            .collect_vec();
2074
2075        let stream_node = {
2076            let template_actor = actors.first().cloned().unwrap();
2077
2078            let template_upstream_actor_ids = upstream_actor_ids
2079                .get(&(template_actor.actor_id as _))
2080                .unwrap();
2081
2082            generate_merger_stream_node(template_upstream_actor_ids)
2083        };
2084
2085        #[expect(deprecated)]
2086        let fragment = fragment::Model {
2087            fragment_id: TEST_FRAGMENT_ID,
2088            job_id: TEST_JOB_ID,
2089            fragment_type_mask: 0,
2090            distribution_type: DistributionType::Hash,
2091            stream_node: StreamNode::from(&stream_node),
2092            state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
2093            upstream_fragment_id: Default::default(),
2094            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2095        };
2096
2097        let (pb_fragment, pb_actor_status, pb_actor_splits) =
2098            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2099
2100        assert_eq!(pb_actor_status.len(), actor_count as usize);
2101        assert_eq!(pb_actor_splits.len(), actor_count as usize);
2102
2103        let pb_actors = pb_fragment.actors.clone();
2104
2105        check_fragment(fragment, pb_fragment);
2106        check_actors(
2107            actors,
2108            &upstream_actor_ids,
2109            pb_actors,
2110            pb_actor_splits,
2111            &stream_node,
2112        );
2113
2114        Ok(())
2115    }
2116
2117    fn check_actors(
2118        actors: Vec<actor::Model>,
2119        actor_upstreams: &FragmentActorUpstreams,
2120        pb_actors: Vec<StreamActor>,
2121        pb_actor_splits: HashMap<u32, PbConnectorSplits>,
2122        stream_node: &PbStreamNode,
2123    ) {
2124        for (
2125            actor::Model {
2126                actor_id,
2127                fragment_id,
2128                status,
2129                splits,
2130                worker_id: _,
2131                vnode_bitmap,
2132                expr_context,
2133                ..
2134            },
2135            StreamActor {
2136                actor_id: pb_actor_id,
2137                fragment_id: pb_fragment_id,
2138                vnode_bitmap: pb_vnode_bitmap,
2139                mview_definition,
2140                expr_context: pb_expr_context,
2141                ..
2142            },
2143        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2144        {
2145            assert_eq!(actor_id, pb_actor_id as ActorId);
2146            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2147
2148            assert_eq!(
2149                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2150                pb_vnode_bitmap,
2151            );
2152
2153            assert_eq!(mview_definition, "");
2154
2155            visit_stream_node_body(stream_node, |body| {
2156                if let PbNodeBody::Merge(m) = body {
2157                    assert!(
2158                        actor_upstreams
2159                            .get(&(actor_id as _))
2160                            .unwrap()
2161                            .contains_key(&m.upstream_fragment_id)
2162                    );
2163                }
2164            });
2165
2166            assert_eq!(status, ActorStatus::Running);
2167
2168            assert_eq!(
2169                splits,
2170                pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
2171            );
2172
2173            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2174        }
2175    }
2176
2177    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2178        let Fragment {
2179            fragment_id,
2180            fragment_type_mask,
2181            distribution_type: pb_distribution_type,
2182            actors: _,
2183            state_table_ids: pb_state_table_ids,
2184            maybe_vnode_count: _,
2185            nodes,
2186        } = pb_fragment;
2187
2188        assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2189        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2190        assert_eq!(
2191            pb_distribution_type,
2192            PbFragmentDistributionType::from(fragment.distribution_type)
2193        );
2194
2195        assert_eq!(
2196            pb_state_table_ids,
2197            fragment.state_table_ids.into_u32_array()
2198        );
2199        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2200    }
2201}