risingwave_meta/controller/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::TryStreamExt;
20use futures::stream::BoxStream;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
26use risingwave_common::util::stream_graph_visitor::{
27    visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_connector::source::SplitImpl;
30use risingwave_meta_model::actor::ActorStatus;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34    Actor, Fragment as FragmentModel, FragmentRelation, FragmentSplits, Sink, StreamingJob,
35};
36use risingwave_meta_model::{
37    ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38    JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39    VnodeBitmap, WorkerId, actor, database, fragment, fragment_relation, fragment_splits, object,
40    sink, source, streaming_job, table,
41};
42use risingwave_meta_model_migration::{ExprTrait, OnConflict, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46    Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
49use risingwave_pb::meta::table_fragments::fragment::{
50    FragmentDistributionType, PbFragmentDistributionType,
51};
52use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
53use risingwave_pb::meta::{FragmentDistribution, PbFragmentWorkerSlotMapping};
54use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
55use risingwave_pb::stream_plan;
56use risingwave_pb::stream_plan::stream_node::NodeBody;
57use risingwave_pb::stream_plan::{
58    PbDispatchOutputMapping, PbDispatcherType, PbStreamContext, PbStreamNode, PbStreamScanType,
59    StreamScanType,
60};
61use sea_orm::ActiveValue::Set;
62use sea_orm::sea_query::Expr;
63use sea_orm::{
64    ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, PaginatorTrait, QueryFilter,
65    QuerySelect, RelationTrait, TransactionTrait, Value,
66};
67use serde::{Deserialize, Serialize};
68use tracing::debug;
69
70use crate::barrier::{SharedActorInfos, SharedFragmentInfo, SnapshotBackfillInfo};
71use crate::controller::catalog::CatalogController;
72use crate::controller::scale::resolve_streaming_job_definition;
73use crate::controller::utils::{
74    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
75    get_sink_fragment_by_ids, has_table_been_migrated, rebuild_fragment_mapping,
76    resolve_no_shuffle_actor_dispatcher,
77};
78use crate::manager::{LocalNotification, NotificationManager};
79use crate::model::{
80    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
81    StreamActor, StreamContext, StreamJobFragments, TableParallelism,
82};
83use crate::rpc::ddl_controller::build_upstream_sink_info;
84use crate::stream::{SplitAssignment, UpstreamSinkInfo};
85use crate::{MetaError, MetaResult, model};
86
87/// Some information of running (inflight) actors.
88#[derive(Clone, Debug)]
89pub struct InflightActorInfo {
90    pub worker_id: WorkerId,
91    pub vnode_bitmap: Option<Bitmap>,
92    pub splits: Vec<SplitImpl>,
93}
94
95#[derive(Clone, Debug)]
96pub struct InflightFragmentInfo {
97    pub fragment_id: crate::model::FragmentId,
98    pub distribution_type: DistributionType,
99    pub fragment_type_mask: FragmentTypeMask,
100    pub vnode_count: usize,
101    pub nodes: PbStreamNode,
102    pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
103    pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
104}
105
106#[derive(Clone, Debug)]
107pub struct FragmentParallelismInfo {
108    pub distribution_type: FragmentDistributionType,
109    pub actor_count: usize,
110    pub vnode_count: usize,
111}
112
113#[easy_ext::ext(FragmentTypeMaskExt)]
114pub impl FragmentTypeMask {
115    fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
116        Expr::col(fragment::Column::FragmentTypeMask)
117            .bit_and(Expr::value(flag as i32))
118            .ne(0)
119    }
120
121    fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
122        Expr::col(fragment::Column::FragmentTypeMask)
123            .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
124            .ne(0)
125    }
126
127    fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
128        Expr::col(fragment::Column::FragmentTypeMask)
129            .bit_and(Expr::value(flag as i32))
130            .eq(0)
131    }
132}
133
134#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
135#[serde(rename_all = "camelCase")] // for dashboard
136pub struct StreamingJobInfo {
137    pub job_id: ObjectId,
138    pub obj_type: ObjectType,
139    pub name: String,
140    pub job_status: JobStatus,
141    pub parallelism: StreamingParallelism,
142    pub max_parallelism: i32,
143    pub resource_group: String,
144    pub database_id: DatabaseId,
145    pub schema_id: SchemaId,
146}
147
148impl NotificationManager {
149    pub(crate) fn notify_fragment_mapping(
150        &self,
151        operation: NotificationOperation,
152        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
153    ) {
154        let fragment_ids = fragment_mappings
155            .iter()
156            .map(|mapping| mapping.fragment_id)
157            .collect_vec();
158        if fragment_ids.is_empty() {
159            return;
160        }
161        // notify all fragment mappings to frontend.
162        for fragment_mapping in fragment_mappings {
163            self.notify_frontend_without_version(
164                operation,
165                NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
166            );
167        }
168
169        // update serving vnode mappings.
170        match operation {
171            NotificationOperation::Add | NotificationOperation::Update => {
172                self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
173                    fragment_ids,
174                ));
175            }
176            NotificationOperation::Delete => {
177                self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
178                    fragment_ids,
179                ));
180            }
181            op => {
182                tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
183            }
184        }
185    }
186}
187
188impl CatalogController {
189    pub fn extract_fragment_and_actors_from_fragments(
190        job_id: ObjectId,
191        fragments: impl Iterator<Item = &Fragment>,
192        actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
193        actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
194    ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
195        fragments
196            .map(|fragment| {
197                Self::extract_fragment_and_actors_for_new_job(
198                    job_id,
199                    fragment,
200                    actor_status,
201                    actor_splits,
202                )
203            })
204            .try_collect()
205    }
206
207    pub fn extract_fragment_and_actors_for_new_job(
208        job_id: ObjectId,
209        fragment: &Fragment,
210        actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
211        actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
212    ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
213        let vnode_count = fragment.vnode_count();
214        let Fragment {
215            fragment_id: pb_fragment_id,
216            fragment_type_mask: pb_fragment_type_mask,
217            distribution_type: pb_distribution_type,
218            actors: pb_actors,
219            state_table_ids: pb_state_table_ids,
220            nodes,
221            ..
222        } = fragment;
223
224        let state_table_ids = pb_state_table_ids.clone().into();
225
226        assert!(!pb_actors.is_empty());
227
228        let stream_node = {
229            let mut stream_node = nodes.clone();
230            visit_stream_node_mut(&mut stream_node, |body| {
231                #[expect(deprecated)]
232                if let NodeBody::Merge(m) = body {
233                    m.upstream_actor_id = vec![];
234                }
235            });
236
237            stream_node
238        };
239
240        let mut actors = vec![];
241
242        for actor in pb_actors {
243            let StreamActor {
244                actor_id,
245                fragment_id,
246                vnode_bitmap,
247                mview_definition: _,
248                expr_context: pb_expr_context,
249                ..
250            } = actor;
251
252            let splits = actor_splits.get(actor_id).map(|splits| {
253                ConnectorSplits::from(&PbConnectorSplits {
254                    splits: splits.iter().map(ConnectorSplit::from).collect(),
255                })
256            });
257            let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
258                anyhow::anyhow!(
259                    "actor {} in fragment {} has no actor_status",
260                    actor_id,
261                    fragment_id
262                )
263            })?;
264
265            let worker_id = status.worker_id() as _;
266
267            let pb_expr_context = pb_expr_context
268                .as_ref()
269                .expect("no expression context found");
270
271            #[expect(deprecated)]
272            actors.push(actor::Model {
273                actor_id: *actor_id as _,
274                fragment_id: *fragment_id as _,
275                status: status.get_state().unwrap().into(),
276                splits,
277                worker_id,
278                upstream_actor_ids: Default::default(),
279                vnode_bitmap: vnode_bitmap
280                    .as_ref()
281                    .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
282                expr_context: ExprContext::from(pb_expr_context),
283            });
284        }
285
286        let stream_node = StreamNode::from(&stream_node);
287
288        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
289            .unwrap()
290            .into();
291
292        #[expect(deprecated)]
293        let fragment = fragment::Model {
294            fragment_id: *pb_fragment_id as _,
295            job_id,
296            fragment_type_mask: (*pb_fragment_type_mask).into(),
297            distribution_type,
298            stream_node,
299            state_table_ids,
300            upstream_fragment_id: Default::default(),
301            vnode_count: vnode_count as _,
302        };
303
304        Ok((fragment, actors))
305    }
306
307    pub fn compose_table_fragments(
308        table_id: u32,
309        state: PbState,
310        ctx: Option<PbStreamContext>,
311        fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
312        parallelism: StreamingParallelism,
313        max_parallelism: usize,
314        job_definition: Option<String>,
315    ) -> MetaResult<StreamJobFragments> {
316        let mut pb_fragments = BTreeMap::new();
317        let mut pb_actor_status = BTreeMap::new();
318
319        for (fragment, actors) in fragments {
320            let (fragment, fragment_actor_status, _) =
321                Self::compose_fragment(fragment, actors, job_definition.clone())?;
322
323            pb_fragments.insert(fragment.fragment_id, fragment);
324            pb_actor_status.extend(fragment_actor_status.into_iter());
325        }
326
327        let table_fragments = StreamJobFragments {
328            stream_job_id: table_id.into(),
329            state: state as _,
330            fragments: pb_fragments,
331            actor_status: pb_actor_status,
332            ctx: ctx
333                .as_ref()
334                .map(StreamContext::from_protobuf)
335                .unwrap_or_default(),
336            assigned_parallelism: match parallelism {
337                StreamingParallelism::Custom => TableParallelism::Custom,
338                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
339                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
340            },
341            max_parallelism,
342        };
343
344        Ok(table_fragments)
345    }
346
347    #[allow(clippy::type_complexity)]
348    pub(crate) fn compose_fragment(
349        fragment: fragment::Model,
350        actors: Vec<actor::Model>,
351        job_definition: Option<String>,
352    ) -> MetaResult<(
353        Fragment,
354        HashMap<crate::model::ActorId, PbActorStatus>,
355        HashMap<crate::model::ActorId, PbConnectorSplits>,
356    )> {
357        let fragment::Model {
358            fragment_id,
359            fragment_type_mask,
360            distribution_type,
361            stream_node,
362            state_table_ids,
363            vnode_count,
364            ..
365        } = fragment;
366
367        let stream_node = stream_node.to_protobuf();
368        let mut upstream_fragments = HashSet::new();
369        visit_stream_node_body(&stream_node, |body| {
370            if let NodeBody::Merge(m) = body {
371                assert!(
372                    upstream_fragments.insert(m.upstream_fragment_id),
373                    "non-duplicate upstream fragment"
374                );
375            }
376        });
377
378        let mut pb_actors = vec![];
379
380        let mut pb_actor_status = HashMap::new();
381        let mut pb_actor_splits = HashMap::new();
382
383        for actor in actors {
384            if actor.fragment_id != fragment_id {
385                bail!(
386                    "fragment id {} from actor {} is different from fragment {}",
387                    actor.fragment_id,
388                    actor.actor_id,
389                    fragment_id
390                )
391            }
392
393            let actor::Model {
394                actor_id,
395                fragment_id,
396                status,
397                worker_id,
398                splits,
399                vnode_bitmap,
400                expr_context,
401                ..
402            } = actor;
403
404            let vnode_bitmap =
405                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
406            let pb_expr_context = Some(expr_context.to_protobuf());
407
408            pb_actor_status.insert(
409                actor_id as _,
410                PbActorStatus {
411                    location: PbActorLocation::from_worker(worker_id as u32),
412                    state: PbActorState::from(status) as _,
413                },
414            );
415
416            if let Some(splits) = splits {
417                pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
418            }
419
420            pb_actors.push(StreamActor {
421                actor_id: actor_id as _,
422                fragment_id: fragment_id as _,
423                vnode_bitmap,
424                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
425                expr_context: pb_expr_context,
426            })
427        }
428
429        let pb_state_table_ids = state_table_ids.into_u32_array();
430        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
431        let pb_fragment = Fragment {
432            fragment_id: fragment_id as _,
433            fragment_type_mask: fragment_type_mask.into(),
434            distribution_type: pb_distribution_type,
435            actors: pb_actors,
436            state_table_ids: pb_state_table_ids,
437            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
438            nodes: stream_node,
439        };
440
441        Ok((pb_fragment, pb_actor_status, pb_actor_splits))
442    }
443
444    pub fn running_fragment_parallelisms(
445        &self,
446        id_filter: Option<HashSet<FragmentId>>,
447    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
448        let info = self.env.shared_actor_infos().read_guard();
449
450        let mut result = HashMap::new();
451        for (fragment_id, fragment) in info.iter_over_fragments() {
452            if let Some(id_filter) = &id_filter
453                && !id_filter.contains(&(*fragment_id as _))
454            {
455                continue; // Skip fragments not in the filter
456            }
457
458            result.insert(
459                *fragment_id as _,
460                FragmentParallelismInfo {
461                    distribution_type: fragment.distribution_type.into(),
462                    actor_count: fragment.actors.len() as _,
463                    vnode_count: fragment.vnode_count,
464                },
465            );
466        }
467
468        Ok(result)
469    }
470
471    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
472        let inner = self.inner.read().await;
473        let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
474            .select_only()
475            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
476            .into_tuple()
477            .all(&inner.db)
478            .await?;
479        Ok(fragment_jobs.into_iter().collect())
480    }
481
482    pub async fn get_fragment_job_id(
483        &self,
484        fragment_ids: Vec<FragmentId>,
485    ) -> MetaResult<Vec<ObjectId>> {
486        let inner = self.inner.read().await;
487
488        let object_ids: Vec<ObjectId> = FragmentModel::find()
489            .select_only()
490            .column(fragment::Column::JobId)
491            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
492            .into_tuple()
493            .all(&inner.db)
494            .await?;
495
496        Ok(object_ids)
497    }
498
499    pub async fn get_fragment_desc_by_id(
500        &self,
501        fragment_id: FragmentId,
502    ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
503        let inner = self.inner.read().await;
504
505        let fragment_model_opt = FragmentModel::find_by_id(fragment_id)
506            .one(&inner.db)
507            .await?;
508
509        let fragment = fragment_model_opt.map(|fragment| {
510            let info = self.env.shared_actor_infos().read_guard();
511
512            let SharedFragmentInfo { actors, .. } = info
513                .get_fragment(fragment.fragment_id as _)
514                .unwrap_or_else(|| {
515                    panic!(
516                        "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
517                        fragment.fragment_id,
518                        fragment.job_id
519                    )
520                });
521
522            FragmentDesc {
523                fragment_id: fragment.fragment_id,
524                job_id: fragment.job_id,
525                fragment_type_mask: fragment.fragment_type_mask,
526                distribution_type: fragment.distribution_type,
527                state_table_ids: fragment.state_table_ids.clone(),
528                vnode_count: fragment.vnode_count,
529                stream_node: fragment.stream_node.clone(),
530                parallelism: actors.len() as _,
531            }
532        });
533
534        let Some(fragment) = fragment else {
535            return Ok(None); // Fragment not found
536        };
537
538        // Get upstream fragments
539        let upstreams: Vec<FragmentId> = FragmentRelation::find()
540            .select_only()
541            .column(fragment_relation::Column::SourceFragmentId)
542            .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
543            .into_tuple()
544            .all(&inner.db)
545            .await?;
546
547        Ok(Some((fragment, upstreams)))
548    }
549
550    pub async fn list_fragment_database_ids(
551        &self,
552        select_fragment_ids: Option<Vec<FragmentId>>,
553    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
554        let inner = self.inner.read().await;
555        let select = FragmentModel::find()
556            .select_only()
557            .column(fragment::Column::FragmentId)
558            .column(object::Column::DatabaseId)
559            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
560        let select = if let Some(select_fragment_ids) = select_fragment_ids {
561            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
562        } else {
563            select
564        };
565        Ok(select.into_tuple().all(&inner.db).await?)
566    }
567
568    pub async fn get_job_fragments_by_id(
569        &self,
570        job_id: ObjectId,
571    ) -> MetaResult<StreamJobFragments> {
572        let inner = self.inner.read().await;
573        let fragment_actors = FragmentModel::find()
574            .find_with_related(Actor)
575            .filter(fragment::Column::JobId.eq(job_id))
576            .all(&inner.db)
577            .await?;
578
579        let job_info = StreamingJob::find_by_id(job_id)
580            .one(&inner.db)
581            .await?
582            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
583
584        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
585            .await?
586            .remove(&job_id);
587
588        Self::compose_table_fragments(
589            job_id as _,
590            job_info.job_status.into(),
591            job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
592            fragment_actors,
593            job_info.parallelism.clone(),
594            job_info.max_parallelism as _,
595            job_definition,
596        )
597    }
598
599    pub async fn get_fragment_actor_dispatchers(
600        &self,
601        fragment_ids: Vec<FragmentId>,
602    ) -> MetaResult<FragmentActorDispatchers> {
603        let inner = self.inner.read().await;
604        get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
605    }
606
607    pub async fn get_fragment_downstream_relations(
608        &self,
609        fragment_ids: Vec<FragmentId>,
610    ) -> MetaResult<FragmentDownstreamRelation> {
611        let inner = self.inner.read().await;
612        let mut stream = FragmentRelation::find()
613            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
614            .stream(&inner.db)
615            .await?;
616        let mut relations = FragmentDownstreamRelation::new();
617        while let Some(relation) = stream.try_next().await? {
618            relations
619                .entry(relation.source_fragment_id as _)
620                .or_default()
621                .push(DownstreamFragmentRelation {
622                    downstream_fragment_id: relation.target_fragment_id as _,
623                    dispatcher_type: relation.dispatcher_type,
624                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
625                    output_mapping: PbDispatchOutputMapping {
626                        indices: relation.output_indices.into_u32_array(),
627                        types: relation
628                            .output_type_mapping
629                            .unwrap_or_default()
630                            .to_protobuf(),
631                    },
632                });
633        }
634        Ok(relations)
635    }
636
637    pub async fn get_job_fragment_backfill_scan_type(
638        &self,
639        job_id: ObjectId,
640    ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
641        let inner = self.inner.read().await;
642        let fragments: Vec<_> = FragmentModel::find()
643            .filter(fragment::Column::JobId.eq(job_id))
644            .all(&inner.db)
645            .await?;
646
647        let mut result = HashMap::new();
648
649        for fragment::Model {
650            fragment_id,
651            stream_node,
652            ..
653        } in fragments
654        {
655            let stream_node = stream_node.to_protobuf();
656            visit_stream_node_body(&stream_node, |body| {
657                if let NodeBody::StreamScan(node) = body {
658                    match node.stream_scan_type() {
659                        StreamScanType::Unspecified => {}
660                        scan_type => {
661                            result.insert(fragment_id as crate::model::FragmentId, scan_type);
662                        }
663                    }
664                }
665            });
666        }
667
668        Ok(result)
669    }
670
671    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
672        let inner = self.inner.read().await;
673        let job_states = StreamingJob::find()
674            .select_only()
675            .column(streaming_job::Column::JobId)
676            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
677            .join(JoinType::InnerJoin, object::Relation::Database2.def())
678            .column(object::Column::ObjType)
679            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
680            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
681            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
682            .column_as(
683                Expr::if_null(
684                    Expr::col((table::Entity, table::Column::Name)),
685                    Expr::if_null(
686                        Expr::col((source::Entity, source::Column::Name)),
687                        Expr::if_null(
688                            Expr::col((sink::Entity, sink::Column::Name)),
689                            Expr::val("<unknown>"),
690                        ),
691                    ),
692                ),
693                "name",
694            )
695            .columns([
696                streaming_job::Column::JobStatus,
697                streaming_job::Column::Parallelism,
698                streaming_job::Column::MaxParallelism,
699            ])
700            .column_as(
701                Expr::if_null(
702                    Expr::col((
703                        streaming_job::Entity,
704                        streaming_job::Column::SpecificResourceGroup,
705                    )),
706                    Expr::col((database::Entity, database::Column::ResourceGroup)),
707                ),
708                "resource_group",
709            )
710            .column(object::Column::DatabaseId)
711            .column(object::Column::SchemaId)
712            .into_model()
713            .all(&inner.db)
714            .await?;
715        Ok(job_states)
716    }
717
718    pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
719        let inner = self.inner.read().await;
720        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
721            .select_only()
722            .column(streaming_job::Column::MaxParallelism)
723            .into_tuple()
724            .one(&inner.db)
725            .await?
726            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
727        Ok(max_parallelism as usize)
728    }
729
730    /// Try to get internal table ids of each streaming job, used by metrics collection.
731    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
732        if let Ok(inner) = self.inner.try_read()
733            && let Ok(job_state_tables) = FragmentModel::find()
734                .select_only()
735                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
736                .into_tuple::<(ObjectId, I32Array)>()
737                .all(&inner.db)
738                .await
739        {
740            let mut job_internal_table_ids = HashMap::new();
741            for (job_id, state_table_ids) in job_state_tables {
742                job_internal_table_ids
743                    .entry(job_id)
744                    .or_insert_with(Vec::new)
745                    .extend(state_table_ids.into_inner());
746            }
747            return Some(job_internal_table_ids.into_iter().collect());
748        }
749        None
750    }
751
752    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
753        let inner = self.inner.read().await;
754        let count = FragmentModel::find().count(&inner.db).await?;
755        Ok(count > 0)
756    }
757
758    pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
759        let read_guard = self.env.shared_actor_infos().read_guard();
760        let actor_cnt: HashMap<WorkerId, _> = read_guard
761            .iter_over_fragments()
762            .flat_map(|(_, fragment)| {
763                fragment
764                    .actors
765                    .iter()
766                    .map(|(actor_id, actor)| (actor.worker_id, *actor_id))
767            })
768            .into_group_map()
769            .into_iter()
770            .map(|(k, v)| (k, v.len()))
771            .collect();
772
773        Ok(actor_cnt)
774    }
775
776    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
777    pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
778        let inner = self.inner.read().await;
779        let jobs = StreamingJob::find().all(&inner.db).await?;
780
781        let mut job_definition = resolve_streaming_job_definition(
782            &inner.db,
783            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
784        )
785        .await?;
786
787        let mut table_fragments = BTreeMap::new();
788        for job in jobs {
789            let fragment_actors = FragmentModel::find()
790                .find_with_related(Actor)
791                .filter(fragment::Column::JobId.eq(job.job_id))
792                .all(&inner.db)
793                .await?;
794
795            table_fragments.insert(
796                job.job_id as ObjectId,
797                Self::compose_table_fragments(
798                    job.job_id as _,
799                    job.job_status.into(),
800                    job.timezone.map(|tz| PbStreamContext { timezone: tz }),
801                    fragment_actors,
802                    job.parallelism.clone(),
803                    job.max_parallelism as _,
804                    job_definition.remove(&job.job_id),
805                )?,
806            );
807        }
808
809        Ok(table_fragments)
810    }
811
812    /// Returns pairs of (job id, actor ids), where actors belong to CDC table backfill fragment of the job.
813    pub async fn cdc_table_backfill_actor_ids(&self) -> MetaResult<HashMap<u32, HashSet<u32>>> {
814        let inner = self.inner.read().await;
815        let mut job_id_actor_ids = HashMap::default();
816        let stream_cdc_scan_flag = FragmentTypeFlag::StreamCdcScan as i32;
817        let fragment_type_mask = stream_cdc_scan_flag;
818        let fragment_actors: Vec<(fragment::Model, Vec<actor::Model>)> = FragmentModel::find()
819            .find_with_related(Actor)
820            .filter(fragment::Column::FragmentTypeMask.eq(fragment_type_mask))
821            .all(&inner.db)
822            .await?;
823        for (fragment, actors) in fragment_actors {
824            let e: &mut HashSet<u32> = job_id_actor_ids.entry(fragment.job_id as _).or_default();
825            e.extend(actors.iter().map(|a| a.actor_id as u32));
826        }
827        Ok(job_id_actor_ids)
828    }
829
830    pub async fn upstream_fragments(
831        &self,
832        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
833    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
834        let inner = self.inner.read().await;
835        let mut stream = FragmentRelation::find()
836            .select_only()
837            .columns([
838                fragment_relation::Column::SourceFragmentId,
839                fragment_relation::Column::TargetFragmentId,
840            ])
841            .filter(
842                fragment_relation::Column::TargetFragmentId
843                    .is_in(fragment_ids.map(|id| id as FragmentId)),
844            )
845            .into_tuple::<(FragmentId, FragmentId)>()
846            .stream(&inner.db)
847            .await?;
848        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
849        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
850            upstream_fragments
851                .entry(downstream_fragment_id as crate::model::FragmentId)
852                .or_default()
853                .insert(upstream_fragment_id as crate::model::FragmentId);
854        }
855        Ok(upstream_fragments)
856    }
857
858    pub fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
859        let info = self.env.shared_actor_infos().read_guard();
860
861        let actor_locations = info
862            .iter_over_fragments()
863            .flat_map(|(fragment_id, fragment)| {
864                fragment
865                    .actors
866                    .iter()
867                    .map(|(actor_id, actor)| PartialActorLocation {
868                        actor_id: *actor_id as _,
869                        fragment_id: *fragment_id as _,
870                        worker_id: actor.worker_id,
871                        status: ActorStatus::Running,
872                    })
873            })
874            .collect_vec();
875
876        Ok(actor_locations)
877    }
878
879    pub async fn list_actor_info(
880        &self,
881    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
882        let inner = self.inner.read().await;
883
884        let fragment_objects: Vec<(FragmentId, ObjectId, SchemaId, ObjectType)> =
885            FragmentModel::find()
886                .select_only()
887                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
888                .column_as(object::Column::Oid, "job_id")
889                .column_as(object::Column::SchemaId, "schema_id")
890                .column_as(object::Column::ObjType, "type")
891                .into_tuple()
892                .all(&inner.db)
893                .await?;
894
895        let actor_infos = {
896            let info = self.env.shared_actor_infos().read_guard();
897
898            fragment_objects
899                .into_iter()
900                .flat_map(|(fragment_id, object_id, schema_id, object_type)| {
901                    let SharedFragmentInfo {
902                        fragment_id,
903                        actors,
904                        ..
905                    } = info.get_fragment(fragment_id as _).unwrap();
906                    actors.keys().map(move |actor_id| {
907                        (
908                            *actor_id as _,
909                            *fragment_id as _,
910                            object_id,
911                            schema_id,
912                            object_type,
913                        )
914                    })
915                })
916                .collect_vec()
917        };
918
919        Ok(actor_infos)
920    }
921
922    pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
923        let inner = self.inner.read().await;
924
925        let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
926            .select_only()
927            .filter(actor::Column::Splits.is_not_null())
928            .columns([actor::Column::ActorId, actor::Column::FragmentId])
929            .into_tuple()
930            .all(&inner.db)
931            .await?;
932
933        Ok(source_actors)
934    }
935
936    pub fn get_worker_slot_mappings(&self) -> Vec<PbFragmentWorkerSlotMapping> {
937        let guard = self.env.shared_actor_info.read_guard();
938        guard
939            .iter_over_fragments()
940            .map(|(_, fragment)| rebuild_fragment_mapping(fragment))
941            .collect_vec()
942    }
943
944    pub async fn list_fragment_descs(
945        &self,
946        is_creating: bool,
947    ) -> MetaResult<Vec<(FragmentDistribution, Vec<FragmentId>)>> {
948        let inner = self.inner.read().await;
949
950        let txn = inner.db.begin().await?;
951
952        let fragments: Vec<_> = if is_creating {
953            FragmentModel::find()
954                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
955                .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
956                .filter(
957                    streaming_job::Column::JobStatus
958                        .eq(JobStatus::Initial)
959                        .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
960                )
961                .all(&txn)
962                .await?
963        } else {
964            FragmentModel::find().all(&txn).await?
965        };
966
967        let fragment_ids = fragments.iter().map(|f| f.fragment_id).collect_vec();
968
969        let upstreams: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
970            .select_only()
971            .columns([
972                fragment_relation::Column::TargetFragmentId,
973                fragment_relation::Column::SourceFragmentId,
974            ])
975            .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids))
976            .into_tuple()
977            .all(&txn)
978            .await?;
979
980        let guard = self.env.shared_actor_info.read_guard();
981
982        let mut result = Vec::new();
983
984        let mut all_upstreams = upstreams.into_iter().into_group_map();
985
986        for fragment_desc in fragments {
987            // note: here sometimes fragment is not found in the cache, fallback to 0
988            let parallelism = guard
989                .get_fragment(fragment_desc.fragment_id as _)
990                .map(|fragment| fragment.actors.len())
991                .unwrap_or_default();
992
993            let upstreams = all_upstreams
994                .remove(&fragment_desc.fragment_id)
995                .unwrap_or_default();
996
997            let fragment = FragmentDistribution {
998                fragment_id: fragment_desc.fragment_id as _,
999                table_id: fragment_desc.job_id as _,
1000                distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type)
1001                    as _,
1002                state_table_ids: fragment_desc.state_table_ids.into_u32_array(),
1003                upstream_fragment_ids: upstreams.iter().map(|id| *id as _).collect(),
1004                fragment_type_mask: fragment_desc.fragment_type_mask as _,
1005                parallelism: parallelism as _,
1006                vnode_count: fragment_desc.vnode_count as _,
1007                node: Some(fragment_desc.stream_node.to_protobuf()),
1008            };
1009
1010            result.push((fragment, upstreams));
1011        }
1012        Ok(result)
1013    }
1014
1015    pub async fn list_sink_actor_mapping(
1016        &self,
1017    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1018        let inner = self.inner.read().await;
1019        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1020            .select_only()
1021            .columns([sink::Column::SinkId, sink::Column::Name])
1022            .into_tuple()
1023            .all(&inner.db)
1024            .await?;
1025        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1026        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1027
1028        let actor_with_type: Vec<(ActorId, SinkId)> = Actor::find()
1029            .select_only()
1030            .column(actor::Column::ActorId)
1031            .column(fragment::Column::JobId)
1032            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1033            .filter(
1034                fragment::Column::JobId
1035                    .is_in(sink_ids)
1036                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
1037            )
1038            .into_tuple()
1039            .all(&inner.db)
1040            .await?;
1041
1042        let mut sink_actor_mapping = HashMap::new();
1043        for (actor_id, sink_id) in actor_with_type {
1044            sink_actor_mapping
1045                .entry(sink_id)
1046                .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1047                .1
1048                .push(actor_id);
1049        }
1050
1051        Ok(sink_actor_mapping)
1052    }
1053
1054    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1055        let inner = self.inner.read().await;
1056        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1057            .select_only()
1058            .columns([
1059                fragment::Column::FragmentId,
1060                fragment::Column::JobId,
1061                fragment::Column::StateTableIds,
1062            ])
1063            .into_partial_model()
1064            .all(&inner.db)
1065            .await?;
1066        Ok(fragment_state_tables)
1067    }
1068
1069    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
1070    /// collected
1071    pub async fn load_all_actors(
1072        &self,
1073        database_id: Option<DatabaseId>,
1074    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
1075    {
1076        let inner = self.inner.read().await;
1077        let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
1078        let filter_condition = if let Some(database_id) = database_id {
1079            filter_condition.and(object::Column::DatabaseId.eq(database_id))
1080        } else {
1081            filter_condition
1082        };
1083        #[expect(clippy::type_complexity)]
1084        let mut actor_info_stream: BoxStream<
1085            '_,
1086            Result<
1087                (
1088                    ActorId,
1089                    WorkerId,
1090                    Option<VnodeBitmap>,
1091                    Option<ConnectorSplits>,
1092                    FragmentId,
1093                    StreamNode,
1094                    I32Array,
1095                    DistributionType,
1096                    i32, // for vnode_count
1097                    i32, // for fragment_type_mask
1098                    DatabaseId,
1099                    ObjectId,
1100                ),
1101                _,
1102            >,
1103        > = Actor::find()
1104            .select_only()
1105            .column(actor::Column::ActorId)
1106            .column(actor::Column::WorkerId)
1107            .column(actor::Column::VnodeBitmap)
1108            .column(actor::Column::Splits)
1109            .column(fragment::Column::FragmentId)
1110            .column(fragment::Column::StreamNode)
1111            .column(fragment::Column::StateTableIds)
1112            .column(fragment::Column::DistributionType)
1113            .column(fragment::Column::VnodeCount)
1114            .column(fragment::Column::FragmentTypeMask)
1115            .column(object::Column::DatabaseId)
1116            .column(object::Column::Oid)
1117            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1118            .join(JoinType::InnerJoin, fragment::Relation::Object.def())
1119            .filter(filter_condition)
1120            .into_tuple()
1121            .stream(&inner.db)
1122            .await?;
1123
1124        let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1125            HashMap::new();
1126
1127        while let Some((
1128            actor_id,
1129            worker_id,
1130            vnode_bitmap,
1131            splits,
1132            fragment_id,
1133            node,
1134            state_table_ids,
1135            distribution_type,
1136            vnode_count,
1137            fragment_type_mask,
1138            database_id,
1139            job_id,
1140        )) = actor_info_stream.try_next().await?
1141        {
1142            let fragment_infos = database_fragment_infos
1143                .entry(database_id)
1144                .or_default()
1145                .entry(job_id)
1146                .or_default();
1147            let state_table_ids = state_table_ids.into_inner();
1148            let state_table_ids = state_table_ids
1149                .into_iter()
1150                .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1151                .collect();
1152
1153            let actor_info = InflightActorInfo {
1154                worker_id,
1155                vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1156                splits: splits
1157                    .map(|connector_splits| {
1158                        connector_splits
1159                            .to_protobuf()
1160                            .splits
1161                            .iter()
1162                            .map(|connector_split| SplitImpl::try_from(connector_split).unwrap())
1163                            .collect_vec()
1164                    })
1165                    .unwrap_or_default(),
1166            };
1167            match fragment_infos.entry(fragment_id) {
1168                Entry::Occupied(mut entry) => {
1169                    let info: &mut InflightFragmentInfo = entry.get_mut();
1170                    assert_eq!(info.state_table_ids, state_table_ids);
1171                    assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1172                }
1173                Entry::Vacant(entry) => {
1174                    entry.insert(InflightFragmentInfo {
1175                        fragment_id: fragment_id as _,
1176                        distribution_type,
1177                        fragment_type_mask: FragmentTypeMask::from(fragment_type_mask),
1178                        vnode_count: vnode_count as _,
1179                        nodes: node.to_protobuf(),
1180                        actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1181                        state_table_ids,
1182                    });
1183                }
1184            }
1185        }
1186
1187        debug!(?database_fragment_infos, "reload all actors");
1188
1189        Ok(database_fragment_infos)
1190    }
1191
1192    pub async fn migrate_actors(
1193        &self,
1194        plan: HashMap<WorkerSlotId, WorkerSlotId>,
1195    ) -> MetaResult<()> {
1196        let inner = self.inner.read().await;
1197        let txn = inner.db.begin().await?;
1198
1199        let actors: Vec<(
1200            FragmentId,
1201            DistributionType,
1202            ActorId,
1203            Option<VnodeBitmap>,
1204            WorkerId,
1205            ActorStatus,
1206        )> = Actor::find()
1207            .select_only()
1208            .columns([
1209                fragment::Column::FragmentId,
1210                fragment::Column::DistributionType,
1211            ])
1212            .columns([
1213                actor::Column::ActorId,
1214                actor::Column::VnodeBitmap,
1215                actor::Column::WorkerId,
1216                actor::Column::Status,
1217            ])
1218            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1219            .into_tuple()
1220            .all(&txn)
1221            .await?;
1222
1223        let mut actor_locations = HashMap::new();
1224
1225        for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1226            if *status != ActorStatus::Running {
1227                tracing::warn!(
1228                    "skipping actor {} in fragment {} with status {:?}",
1229                    actor_id,
1230                    fragment_id,
1231                    status
1232                );
1233                continue;
1234            }
1235
1236            actor_locations
1237                .entry(*worker_id)
1238                .or_insert(HashMap::new())
1239                .entry(*fragment_id)
1240                .or_insert(BTreeSet::new())
1241                .insert(*actor_id);
1242        }
1243
1244        let expired_or_changed_workers: HashSet<_> =
1245            plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1246
1247        let mut actor_migration_plan = HashMap::new();
1248        for (worker, fragment) in actor_locations {
1249            if expired_or_changed_workers.contains(&worker) {
1250                for (fragment_id, actors) in fragment {
1251                    debug!(
1252                        "worker {} expired or changed, migrating fragment {}",
1253                        worker, fragment_id
1254                    );
1255                    let worker_slot_to_actor: HashMap<_, _> = actors
1256                        .iter()
1257                        .enumerate()
1258                        .map(|(idx, actor_id)| {
1259                            (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1260                        })
1261                        .collect();
1262
1263                    for (worker_slot, actor) in worker_slot_to_actor {
1264                        if let Some(target) = plan.get(&worker_slot) {
1265                            actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1266                        }
1267                    }
1268                }
1269            }
1270        }
1271
1272        for (actor, worker) in actor_migration_plan {
1273            Actor::update_many()
1274                .col_expr(
1275                    actor::Column::WorkerId,
1276                    Expr::value(Value::Int(Some(worker))),
1277                )
1278                .filter(actor::Column::ActorId.eq(actor))
1279                .exec(&txn)
1280                .await?;
1281        }
1282
1283        txn.commit().await?;
1284
1285        Ok(())
1286    }
1287
1288    pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1289        let inner = self.inner.read().await;
1290
1291        let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1292            .select_only()
1293            .columns([fragment::Column::FragmentId])
1294            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1295            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1296            .into_tuple()
1297            .all(&inner.db)
1298            .await?;
1299
1300        let mut actor_locations = HashMap::new();
1301
1302        for (fragment_id, _, worker_id) in actors {
1303            *actor_locations
1304                .entry(worker_id)
1305                .or_insert(HashMap::new())
1306                .entry(fragment_id)
1307                .or_insert(0_usize) += 1;
1308        }
1309
1310        let mut result = HashSet::new();
1311        for (worker_id, mapping) in actor_locations {
1312            let max_fragment_len = mapping.values().max().unwrap();
1313
1314            result
1315                .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1316        }
1317
1318        Ok(result)
1319    }
1320
1321    pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1322        let inner = self.inner.read().await;
1323        let txn = inner.db.begin().await?;
1324        for assignments in split_assignment.values() {
1325            for (actor_id, splits) in assignments {
1326                let actor_splits = splits.iter().map(Into::into).collect_vec();
1327                Actor::update(actor::ActiveModel {
1328                    actor_id: Set(*actor_id as _),
1329                    splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1330                        splits: actor_splits,
1331                    }))),
1332                    ..Default::default()
1333                })
1334                .exec(&txn)
1335                .await
1336                .map_err(|err| {
1337                    if err == DbErr::RecordNotUpdated {
1338                        MetaError::catalog_id_not_found("actor_id", actor_id)
1339                    } else {
1340                        err.into()
1341                    }
1342                })?;
1343            }
1344        }
1345        txn.commit().await?;
1346
1347        Ok(())
1348    }
1349
1350    #[await_tree::instrument]
1351    pub async fn fill_snapshot_backfill_epoch(
1352        &self,
1353        fragment_ids: impl Iterator<Item = FragmentId>,
1354        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1355        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1356    ) -> MetaResult<()> {
1357        let inner = self.inner.write().await;
1358        let txn = inner.db.begin().await?;
1359        for fragment_id in fragment_ids {
1360            let fragment = FragmentModel::find_by_id(fragment_id)
1361                .one(&txn)
1362                .await?
1363                .context(format!("fragment {} not found", fragment_id))?;
1364            let mut node = fragment.stream_node.to_protobuf();
1365            if crate::stream::fill_snapshot_backfill_epoch(
1366                &mut node,
1367                snapshot_backfill_info,
1368                cross_db_snapshot_backfill_info,
1369            )? {
1370                let node = StreamNode::from(&node);
1371                FragmentModel::update(fragment::ActiveModel {
1372                    fragment_id: Set(fragment_id),
1373                    stream_node: Set(node),
1374                    ..Default::default()
1375                })
1376                .exec(&txn)
1377                .await?;
1378            }
1379        }
1380        txn.commit().await?;
1381        Ok(())
1382    }
1383
1384    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1385    pub fn get_running_actors_of_fragment(
1386        &self,
1387        fragment_id: FragmentId,
1388    ) -> MetaResult<HashSet<model::ActorId>> {
1389        let info = self.env.shared_actor_infos().read_guard();
1390
1391        let actors = info
1392            .get_fragment(fragment_id as _)
1393            .map(|SharedFragmentInfo { actors, .. }| actors.keys().copied().collect())
1394            .unwrap_or_default();
1395
1396        Ok(actors)
1397    }
1398
1399    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1400    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1401    pub async fn get_running_actors_for_source_backfill(
1402        &self,
1403        source_backfill_fragment_id: FragmentId,
1404        source_fragment_id: FragmentId,
1405    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1406        let inner = self.inner.read().await;
1407        let txn = inner.db.begin().await?;
1408
1409        let fragment_relation: DispatcherType = FragmentRelation::find()
1410            .select_only()
1411            .column(fragment_relation::Column::DispatcherType)
1412            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1413            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1414            .into_tuple()
1415            .one(&txn)
1416            .await?
1417            .ok_or_else(|| {
1418                anyhow!(
1419                    "no fragment connection from source fragment {} to source backfill fragment {}",
1420                    source_fragment_id,
1421                    source_backfill_fragment_id
1422                )
1423            })?;
1424
1425        if fragment_relation != DispatcherType::NoShuffle {
1426            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1427        }
1428
1429        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1430            let result: MetaResult<DistributionType> = try {
1431                FragmentModel::find_by_id(fragment_id)
1432                    .select_only()
1433                    .column(fragment::Column::DistributionType)
1434                    .into_tuple()
1435                    .one(txn)
1436                    .await?
1437                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1438            };
1439            result
1440        };
1441
1442        let source_backfill_distribution_type =
1443            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1444        let source_distribution_type =
1445            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1446
1447        let load_fragment_actor_distribution =
1448            |actor_info: &SharedActorInfos,
1449             fragment_id: FragmentId|
1450             -> HashMap<crate::model::ActorId, Option<Bitmap>> {
1451                let guard = actor_info.read_guard();
1452
1453                guard
1454                    .get_fragment(fragment_id as _)
1455                    .map(|fragment| {
1456                        fragment
1457                            .actors
1458                            .iter()
1459                            .map(|(actor_id, actor)| {
1460                                (
1461                                    *actor_id as _,
1462                                    actor
1463                                        .vnode_bitmap
1464                                        .as_ref()
1465                                        .map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1466                                )
1467                            })
1468                            .collect()
1469                    })
1470                    .unwrap_or_default()
1471            };
1472
1473        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1474            load_fragment_actor_distribution(
1475                self.env.shared_actor_infos(),
1476                source_backfill_fragment_id,
1477            );
1478
1479        let source_actors =
1480            load_fragment_actor_distribution(self.env.shared_actor_infos(), source_fragment_id);
1481
1482        Ok(resolve_no_shuffle_actor_dispatcher(
1483            source_distribution_type,
1484            &source_actors,
1485            source_backfill_distribution_type,
1486            &source_backfill_actors,
1487        )
1488        .into_iter()
1489        .map(|(source_actor, source_backfill_actor)| {
1490            (source_backfill_actor as _, source_actor as _)
1491        })
1492        .collect())
1493    }
1494
1495    /// Get and filter the "**root**" fragments of the specified jobs.
1496    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1497    ///
1498    /// Root fragment connects to downstream jobs.
1499    ///
1500    /// ## What can be the root fragment
1501    /// - For sink, it should have one `Sink` fragment.
1502    /// - For MV, it should have one `MView` fragment.
1503    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1504    /// - For source, it should have one `Source` fragment.
1505    ///
1506    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1507    pub async fn get_root_fragments(
1508        &self,
1509        job_ids: Vec<ObjectId>,
1510    ) -> MetaResult<(
1511        HashMap<ObjectId, (SharedFragmentInfo, PbStreamNode)>,
1512        HashMap<ActorId, WorkerId>,
1513    )> {
1514        let inner = self.inner.read().await;
1515
1516        let all_fragments = FragmentModel::find()
1517            .filter(fragment::Column::JobId.is_in(job_ids))
1518            .all(&inner.db)
1519            .await?;
1520        // job_id -> fragment
1521        let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1522        for fragment in all_fragments {
1523            let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1524            if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1525                _ = root_fragments.insert(fragment.job_id, fragment);
1526            } else if mask.contains(FragmentTypeFlag::Source) {
1527                // look for Source fragment only if there's no MView fragment
1528                // (notice try_insert here vs insert above)
1529                _ = root_fragments.try_insert(fragment.job_id, fragment);
1530            }
1531        }
1532
1533        let mut root_fragments_pb = HashMap::new();
1534
1535        let info = self.env.shared_actor_infos().read_guard();
1536
1537        let root_fragment_to_jobs: HashMap<_, _> = root_fragments
1538            .iter()
1539            .map(|(job_id, fragment)| (fragment.fragment_id as u32, *job_id))
1540            .collect();
1541
1542        for fragment in root_fragment_to_jobs.keys() {
1543            let fragment_info = info.get_fragment(*fragment).context(format!(
1544                "fragment {} not found in shared actor info",
1545                fragment
1546            ))?;
1547
1548            let job_id = root_fragment_to_jobs[&fragment_info.fragment_id];
1549            let fragment = root_fragments
1550                .get(&job_id)
1551                .context(format!("root fragment for job {} not found", job_id))?;
1552
1553            root_fragments_pb.insert(
1554                job_id,
1555                (fragment_info.clone(), fragment.stream_node.to_protobuf()),
1556            );
1557        }
1558
1559        let mut all_actor_locations = HashMap::new();
1560
1561        for (_, SharedFragmentInfo { actors, .. }) in info.iter_over_fragments() {
1562            for (actor_id, actor_info) in actors {
1563                all_actor_locations.insert(*actor_id as ActorId, actor_info.worker_id);
1564            }
1565        }
1566
1567        Ok((root_fragments_pb, all_actor_locations))
1568    }
1569
1570    pub async fn get_root_fragment(
1571        &self,
1572        job_id: ObjectId,
1573    ) -> MetaResult<(SharedFragmentInfo, HashMap<ActorId, WorkerId>)> {
1574        let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1575        let (root_fragment, _) = root_fragments
1576            .remove(&job_id)
1577            .context(format!("root fragment for job {} not found", job_id))?;
1578
1579        Ok((root_fragment, actors))
1580    }
1581
1582    /// Get the downstream fragments connected to the specified job.
1583    pub async fn get_downstream_fragments(
1584        &self,
1585        job_id: ObjectId,
1586    ) -> MetaResult<(
1587        Vec<(
1588            stream_plan::DispatcherType,
1589            SharedFragmentInfo,
1590            PbStreamNode,
1591        )>,
1592        HashMap<ActorId, WorkerId>,
1593    )> {
1594        let (root_fragment, actor_locations) = self.get_root_fragment(job_id).await?;
1595
1596        let inner = self.inner.read().await;
1597        let txn = inner.db.begin().await?;
1598        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1599            .filter(
1600                fragment_relation::Column::SourceFragmentId
1601                    .eq(root_fragment.fragment_id as FragmentId),
1602            )
1603            .all(&txn)
1604            .await?;
1605
1606        let downstream_fragment_ids = downstream_fragment_relations
1607            .iter()
1608            .map(|model| model.target_fragment_id as FragmentId)
1609            .collect::<HashSet<_>>();
1610
1611        let downstream_fragment_nodes: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1612            .select_only()
1613            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1614            .filter(fragment::Column::FragmentId.is_in(downstream_fragment_ids))
1615            .into_tuple()
1616            .all(&txn)
1617            .await?;
1618
1619        let downstream_fragment_nodes: HashMap<_, _> = downstream_fragment_nodes
1620            .into_iter()
1621            .map(|(id, node)| (id as u32, node))
1622            .collect();
1623
1624        let mut downstream_fragments = vec![];
1625
1626        let info = self.env.shared_actor_infos().read_guard();
1627
1628        let fragment_map: HashMap<_, _> = downstream_fragment_relations
1629            .iter()
1630            .map(|model| (model.target_fragment_id as u32, model.dispatcher_type))
1631            .collect();
1632
1633        for fragment_id in fragment_map.keys() {
1634            let fragment_info @ SharedFragmentInfo { actors, .. } =
1635                info.get_fragment(*fragment_id).unwrap();
1636
1637            let dispatcher_type = fragment_map[fragment_id];
1638
1639            if actors.is_empty() {
1640                bail!("No fragment found for fragment id {}", fragment_id);
1641            }
1642
1643            let dispatch_type = PbDispatcherType::from(dispatcher_type);
1644
1645            let nodes = downstream_fragment_nodes
1646                .get(fragment_id)
1647                .context(format!(
1648                    "downstream fragment node for id {} not found",
1649                    fragment_id
1650                ))?
1651                .to_protobuf();
1652
1653            downstream_fragments.push((dispatch_type, fragment_info.clone(), nodes));
1654        }
1655        Ok((downstream_fragments, actor_locations))
1656    }
1657
1658    pub async fn load_source_fragment_ids(
1659        &self,
1660    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1661        let inner = self.inner.read().await;
1662        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1663            .select_only()
1664            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1665            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1666            .into_tuple()
1667            .all(&inner.db)
1668            .await?;
1669
1670        let mut source_fragment_ids = HashMap::new();
1671        for (fragment_id, stream_node) in fragments {
1672            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1673                source_fragment_ids
1674                    .entry(source_id as SourceId)
1675                    .or_insert_with(BTreeSet::new)
1676                    .insert(fragment_id);
1677            }
1678        }
1679        Ok(source_fragment_ids)
1680    }
1681
1682    pub async fn load_backfill_fragment_ids(
1683        &self,
1684    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1685        let inner = self.inner.read().await;
1686        let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1687            .select_only()
1688            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1689            .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1690            .into_tuple()
1691            .all(&inner.db)
1692            .await?;
1693
1694        let mut source_fragment_ids = HashMap::new();
1695        for (fragment_id, stream_node) in fragments {
1696            if let Some((source_id, upstream_source_fragment_id)) =
1697                stream_node.to_protobuf().find_source_backfill()
1698            {
1699                source_fragment_ids
1700                    .entry(source_id as SourceId)
1701                    .or_insert_with(BTreeSet::new)
1702                    .insert((fragment_id, upstream_source_fragment_id));
1703            }
1704        }
1705        Ok(source_fragment_ids)
1706    }
1707
1708    /// Get the actor count of `Materialize` or `Sink` fragment of the specified table.
1709    pub async fn get_actual_job_fragment_parallelism(
1710        &self,
1711        job_id: ObjectId,
1712    ) -> MetaResult<Option<usize>> {
1713        let inner = self.inner.read().await;
1714        let fragments: Vec<(FragmentId, i64)> = FragmentModel::find()
1715            .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1716            .select_only()
1717            .columns([fragment::Column::FragmentId])
1718            .column_as(actor::Column::ActorId.count(), "count")
1719            .filter(
1720                fragment::Column::JobId
1721                    .eq(job_id)
1722                    .and(FragmentTypeMask::intersects_any([
1723                        FragmentTypeFlag::Mview,
1724                        FragmentTypeFlag::Sink,
1725                    ])),
1726            )
1727            .group_by(fragment::Column::FragmentId)
1728            .into_tuple()
1729            .all(&inner.db)
1730            .await?;
1731
1732        Ok(fragments
1733            .into_iter()
1734            .at_most_one()
1735            .ok()
1736            .flatten()
1737            .map(|(_, count)| count as usize))
1738    }
1739
1740    pub async fn get_all_upstream_sink_infos(
1741        &self,
1742        target_table: &PbTable,
1743        target_fragment_id: FragmentId,
1744    ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1745        let incoming_sinks = self.get_table_incoming_sinks(target_table.id as _).await?;
1746
1747        let inner = self.inner.read().await;
1748        let txn = inner.db.begin().await?;
1749
1750        let sink_ids = incoming_sinks.iter().map(|s| s.id as SinkId).collect_vec();
1751        let sink_fragment_ids = get_sink_fragment_by_ids(&txn, sink_ids).await?;
1752
1753        let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1754        for pb_sink in &incoming_sinks {
1755            let sink_fragment_id =
1756                sink_fragment_ids
1757                    .get(&(pb_sink.id as _))
1758                    .cloned()
1759                    .ok_or(anyhow::anyhow!(
1760                        "sink fragment not found for sink id {}",
1761                        pb_sink.id
1762                    ))?;
1763            let upstream_info = build_upstream_sink_info(
1764                pb_sink,
1765                sink_fragment_id,
1766                target_table,
1767                target_fragment_id,
1768            )?;
1769            upstream_sink_infos.push(upstream_info);
1770        }
1771
1772        Ok(upstream_sink_infos)
1773    }
1774
1775    pub async fn get_mview_fragment_by_id(&self, table_id: TableId) -> MetaResult<FragmentId> {
1776        let inner = self.inner.read().await;
1777        let txn = inner.db.begin().await?;
1778
1779        let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1780            .select_only()
1781            .column(fragment::Column::FragmentId)
1782            .filter(
1783                fragment::Column::JobId
1784                    .eq(table_id)
1785                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1786            )
1787            .into_tuple()
1788            .all(&txn)
1789            .await?;
1790
1791        if mview_fragment.len() != 1 {
1792            return Err(anyhow::anyhow!(
1793                "expected exactly one mview fragment for table {}, found {}",
1794                table_id,
1795                mview_fragment.len()
1796            )
1797            .into());
1798        }
1799
1800        Ok(mview_fragment.into_iter().next().unwrap())
1801    }
1802
1803    pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1804        let inner = self.inner.read().await;
1805        let txn = inner.db.begin().await?;
1806        has_table_been_migrated(&txn, table_id).await
1807    }
1808
1809    pub async fn update_fragment_splits(
1810        &self,
1811        fragment_splits: &HashMap<FragmentId, Vec<SplitImpl>>,
1812    ) -> MetaResult<()> {
1813        if fragment_splits.is_empty() {
1814            return Ok(());
1815        }
1816
1817        let inner = self.inner.write().await;
1818        let txn = inner.db.begin().await?;
1819
1820        let models: Vec<fragment_splits::ActiveModel> = fragment_splits
1821            .iter()
1822            .map(|(fragment_id, splits)| fragment_splits::ActiveModel {
1823                fragment_id: Set(*fragment_id as _),
1824                splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1825                    splits: splits.iter().map(Into::into).collect_vec(),
1826                }))),
1827            })
1828            .collect();
1829
1830        FragmentSplits::insert_many(models)
1831            .on_conflict(
1832                OnConflict::column(fragment_splits::Column::FragmentId)
1833                    .update_column(fragment_splits::Column::Splits)
1834                    .to_owned(),
1835            )
1836            .exec(&txn)
1837            .await?;
1838
1839        txn.commit().await?;
1840
1841        Ok(())
1842    }
1843}
1844
1845#[cfg(test)]
1846mod tests {
1847    use std::collections::{BTreeMap, HashMap, HashSet};
1848
1849    use itertools::Itertools;
1850    use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1851    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1852    use risingwave_common::util::iter_util::ZipEqDebug;
1853    use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1854    use risingwave_meta_model::actor::ActorStatus;
1855    use risingwave_meta_model::fragment::DistributionType;
1856    use risingwave_meta_model::{
1857        ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1858        VnodeBitmap, actor, fragment,
1859    };
1860    use risingwave_pb::common::PbActorLocation;
1861    use risingwave_pb::meta::table_fragments::PbActorStatus;
1862    use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1863    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1864    use risingwave_pb::plan_common::PbExprContext;
1865    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1866    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1867    use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1868
1869    use crate::MetaResult;
1870    use crate::controller::catalog::CatalogController;
1871    use crate::model::{Fragment, StreamActor};
1872
1873    type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1874
1875    type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1876
1877    const TEST_FRAGMENT_ID: FragmentId = 1;
1878
1879    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1880
1881    const TEST_JOB_ID: ObjectId = 1;
1882
1883    const TEST_STATE_TABLE_ID: TableId = 1000;
1884
1885    fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1886        let mut upstream_actor_ids = BTreeMap::new();
1887        upstream_actor_ids.insert(
1888            TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1889            HashSet::from_iter([(actor_id + 100)]),
1890        );
1891        upstream_actor_ids.insert(
1892            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1893            HashSet::from_iter([(actor_id + 200)]),
1894        );
1895        upstream_actor_ids
1896    }
1897
1898    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1899        let mut input = vec![];
1900        for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1901            input.push(PbStreamNode {
1902                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1903                    upstream_fragment_id: *upstream_fragment_id as _,
1904                    ..Default::default()
1905                }))),
1906                ..Default::default()
1907            });
1908        }
1909
1910        PbStreamNode {
1911            input,
1912            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1913            ..Default::default()
1914        }
1915    }
1916
1917    #[tokio::test]
1918    async fn test_extract_fragment() -> MetaResult<()> {
1919        let actor_count = 3u32;
1920        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1921            .map(|actor_id| {
1922                (
1923                    actor_id as _,
1924                    generate_upstream_actor_ids_for_actor(actor_id),
1925                )
1926            })
1927            .collect();
1928
1929        let actor_bitmaps = ActorMapping::new_uniform(
1930            (0..actor_count).map(|i| i as _),
1931            VirtualNode::COUNT_FOR_TEST,
1932        )
1933        .to_bitmaps();
1934
1935        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1936
1937        let pb_actors = (0..actor_count)
1938            .map(|actor_id| StreamActor {
1939                actor_id: actor_id as _,
1940                fragment_id: TEST_FRAGMENT_ID as _,
1941                vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1942                mview_definition: "".to_owned(),
1943                expr_context: Some(PbExprContext {
1944                    time_zone: String::from("America/New_York"),
1945                    strict_mode: false,
1946                }),
1947            })
1948            .collect_vec();
1949
1950        let pb_fragment = Fragment {
1951            fragment_id: TEST_FRAGMENT_ID as _,
1952            fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
1953            distribution_type: PbFragmentDistributionType::Hash as _,
1954            actors: pb_actors.clone(),
1955            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1956            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1957            nodes: stream_node.clone(),
1958        };
1959
1960        let pb_actor_status = (0..actor_count)
1961            .map(|actor_id| {
1962                (
1963                    actor_id,
1964                    PbActorStatus {
1965                        location: PbActorLocation::from_worker(0),
1966                        state: PbActorState::Running as _,
1967                    },
1968                )
1969            })
1970            .collect();
1971
1972        let pb_actor_splits = Default::default();
1973
1974        let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
1975            TEST_JOB_ID,
1976            &pb_fragment,
1977            &pb_actor_status,
1978            &pb_actor_splits,
1979        )?;
1980
1981        check_fragment(fragment, pb_fragment);
1982        check_actors(
1983            actors,
1984            &upstream_actor_ids,
1985            pb_actors,
1986            Default::default(),
1987            &stream_node,
1988        );
1989
1990        Ok(())
1991    }
1992
1993    #[tokio::test]
1994    async fn test_compose_fragment() -> MetaResult<()> {
1995        let actor_count = 3u32;
1996
1997        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1998            .map(|actor_id| {
1999                (
2000                    actor_id as _,
2001                    generate_upstream_actor_ids_for_actor(actor_id),
2002                )
2003            })
2004            .collect();
2005
2006        let mut actor_bitmaps = ActorMapping::new_uniform(
2007            (0..actor_count).map(|i| i as _),
2008            VirtualNode::COUNT_FOR_TEST,
2009        )
2010        .to_bitmaps();
2011
2012        let actors = (0..actor_count)
2013            .map(|actor_id| {
2014                let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
2015                    splits: vec![PbConnectorSplit {
2016                        split_type: "dummy".to_owned(),
2017                        ..Default::default()
2018                    }],
2019                }));
2020
2021                #[expect(deprecated)]
2022                actor::Model {
2023                    actor_id: actor_id as ActorId,
2024                    fragment_id: TEST_FRAGMENT_ID,
2025                    status: ActorStatus::Running,
2026                    splits: actor_splits,
2027                    worker_id: 0,
2028                    upstream_actor_ids: Default::default(),
2029                    vnode_bitmap: actor_bitmaps
2030                        .remove(&actor_id)
2031                        .map(|bitmap| bitmap.to_protobuf())
2032                        .as_ref()
2033                        .map(VnodeBitmap::from),
2034                    expr_context: ExprContext::from(&PbExprContext {
2035                        time_zone: String::from("America/New_York"),
2036                        strict_mode: false,
2037                    }),
2038                }
2039            })
2040            .collect_vec();
2041
2042        let stream_node = {
2043            let template_actor = actors.first().cloned().unwrap();
2044
2045            let template_upstream_actor_ids = upstream_actor_ids
2046                .get(&(template_actor.actor_id as _))
2047                .unwrap();
2048
2049            generate_merger_stream_node(template_upstream_actor_ids)
2050        };
2051
2052        #[expect(deprecated)]
2053        let fragment = fragment::Model {
2054            fragment_id: TEST_FRAGMENT_ID,
2055            job_id: TEST_JOB_ID,
2056            fragment_type_mask: 0,
2057            distribution_type: DistributionType::Hash,
2058            stream_node: StreamNode::from(&stream_node),
2059            state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
2060            upstream_fragment_id: Default::default(),
2061            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2062        };
2063
2064        let (pb_fragment, pb_actor_status, pb_actor_splits) =
2065            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2066
2067        assert_eq!(pb_actor_status.len(), actor_count as usize);
2068        assert_eq!(pb_actor_splits.len(), actor_count as usize);
2069
2070        let pb_actors = pb_fragment.actors.clone();
2071
2072        check_fragment(fragment, pb_fragment);
2073        check_actors(
2074            actors,
2075            &upstream_actor_ids,
2076            pb_actors,
2077            pb_actor_splits,
2078            &stream_node,
2079        );
2080
2081        Ok(())
2082    }
2083
2084    fn check_actors(
2085        actors: Vec<actor::Model>,
2086        actor_upstreams: &FragmentActorUpstreams,
2087        pb_actors: Vec<StreamActor>,
2088        pb_actor_splits: HashMap<u32, PbConnectorSplits>,
2089        stream_node: &PbStreamNode,
2090    ) {
2091        for (
2092            actor::Model {
2093                actor_id,
2094                fragment_id,
2095                status,
2096                splits,
2097                worker_id: _,
2098                vnode_bitmap,
2099                expr_context,
2100                ..
2101            },
2102            StreamActor {
2103                actor_id: pb_actor_id,
2104                fragment_id: pb_fragment_id,
2105                vnode_bitmap: pb_vnode_bitmap,
2106                mview_definition,
2107                expr_context: pb_expr_context,
2108                ..
2109            },
2110        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2111        {
2112            assert_eq!(actor_id, pb_actor_id as ActorId);
2113            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2114
2115            assert_eq!(
2116                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2117                pb_vnode_bitmap,
2118            );
2119
2120            assert_eq!(mview_definition, "");
2121
2122            visit_stream_node_body(stream_node, |body| {
2123                if let PbNodeBody::Merge(m) = body {
2124                    assert!(
2125                        actor_upstreams
2126                            .get(&(actor_id as _))
2127                            .unwrap()
2128                            .contains_key(&m.upstream_fragment_id)
2129                    );
2130                }
2131            });
2132
2133            assert_eq!(status, ActorStatus::Running);
2134
2135            assert_eq!(
2136                splits,
2137                pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
2138            );
2139
2140            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2141        }
2142    }
2143
2144    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2145        let Fragment {
2146            fragment_id,
2147            fragment_type_mask,
2148            distribution_type: pb_distribution_type,
2149            actors: _,
2150            state_table_ids: pb_state_table_ids,
2151            maybe_vnode_count: _,
2152            nodes,
2153        } = pb_fragment;
2154
2155        assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2156        assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2157        assert_eq!(
2158            pb_distribution_type,
2159            PbFragmentDistributionType::from(fragment.distribution_type)
2160        );
2161
2162        assert_eq!(
2163            pb_state_table_ids,
2164            fragment.state_table_ids.into_u32_array()
2165        );
2166        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2167    }
2168}