risingwave_meta/controller/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
25use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut};
26use risingwave_connector::source::SplitImpl;
27use risingwave_meta_model::actor::ActorStatus;
28use risingwave_meta_model::actor_dispatcher::DispatcherType;
29use risingwave_meta_model::fragment::DistributionType;
30use risingwave_meta_model::object::ObjectType;
31use risingwave_meta_model::prelude::{
32    Actor, Fragment as FragmentModel, FragmentRelation, Sink, StreamingJob,
33};
34use risingwave_meta_model::{
35    ActorId, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, JobStatus, ObjectId,
36    SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId,
37    actor, database, fragment, fragment_relation, object, sink, source, streaming_job, table,
38};
39use risingwave_meta_model_migration::{Alias, SelectStatement};
40use risingwave_pb::common::PbActorLocation;
41use risingwave_pb::meta::subscribe_response::{
42    Info as NotificationInfo, Operation as NotificationOperation,
43};
44use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
45use risingwave_pb::meta::table_fragments::fragment::{
46    FragmentDistributionType, PbFragmentDistributionType,
47};
48use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
49use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping};
50use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
51use risingwave_pb::stream_plan::stream_node::NodeBody;
52use risingwave_pb::stream_plan::{
53    DispatchStrategy, PbDispatcherType, PbFragmentTypeFlag, PbStreamContext, PbStreamNode,
54    PbStreamScanType, StreamScanType,
55};
56use sea_orm::ActiveValue::Set;
57use sea_orm::sea_query::Expr;
58use sea_orm::{
59    ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, ModelTrait, PaginatorTrait,
60    QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
61};
62use serde::{Deserialize, Serialize};
63use tracing::debug;
64
65use crate::barrier::SnapshotBackfillInfo;
66use crate::controller::catalog::{CatalogController, CatalogControllerInner};
67use crate::controller::scale::resolve_streaming_job_definition;
68use crate::controller::utils::{
69    FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
70    get_fragment_mappings, rebuild_fragment_mapping_from_actors,
71    resolve_no_shuffle_actor_dispatcher,
72};
73use crate::manager::LocalNotification;
74use crate::model::{
75    DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
76    StreamActor, StreamContext, StreamJobFragments, TableParallelism,
77};
78use crate::stream::{SplitAssignment, build_actor_split_impls};
79use crate::{MetaError, MetaResult, model};
80
81/// Some information of running (inflight) actors.
82#[derive(Clone, Debug)]
83pub struct InflightActorInfo {
84    pub worker_id: WorkerId,
85    pub vnode_bitmap: Option<Bitmap>,
86}
87
88#[derive(Clone, Debug)]
89pub struct InflightFragmentInfo {
90    pub fragment_id: crate::model::FragmentId,
91    pub distribution_type: DistributionType,
92    pub nodes: PbStreamNode,
93    pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
94    pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
95}
96
97#[derive(Clone, Debug)]
98pub struct FragmentParallelismInfo {
99    pub distribution_type: FragmentDistributionType,
100    pub actor_count: usize,
101    pub vnode_count: usize,
102}
103
104#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
105#[serde(rename_all = "camelCase")] // for dashboard
106pub struct StreamingJobInfo {
107    pub job_id: ObjectId,
108    pub obj_type: ObjectType,
109    pub name: String,
110    pub job_status: JobStatus,
111    pub parallelism: StreamingParallelism,
112    pub max_parallelism: i32,
113    pub resource_group: String,
114}
115
116impl CatalogControllerInner {
117    /// List all fragment vnode mapping info for all CREATED streaming jobs.
118    pub async fn all_running_fragment_mappings(
119        &self,
120    ) -> MetaResult<impl Iterator<Item = FragmentWorkerSlotMapping> + '_> {
121        let txn = self.db.begin().await?;
122
123        let job_ids: Vec<ObjectId> = StreamingJob::find()
124            .select_only()
125            .column(streaming_job::Column::JobId)
126            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
127            .into_tuple()
128            .all(&txn)
129            .await?;
130
131        let mut result = vec![];
132        for job_id in job_ids {
133            let mappings = get_fragment_mappings(&txn, job_id).await?;
134
135            result.extend(mappings.into_iter());
136        }
137
138        Ok(result.into_iter())
139    }
140}
141
142impl CatalogController {
143    pub(crate) async fn notify_fragment_mapping(
144        &self,
145        operation: NotificationOperation,
146        fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
147    ) {
148        let fragment_ids = fragment_mappings
149            .iter()
150            .map(|mapping| mapping.fragment_id)
151            .collect_vec();
152        // notify all fragment mappings to frontend.
153        for fragment_mapping in fragment_mappings {
154            self.env
155                .notification_manager()
156                .notify_frontend(
157                    operation,
158                    NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
159                )
160                .await;
161        }
162
163        // update serving vnode mappings.
164        match operation {
165            NotificationOperation::Add | NotificationOperation::Update => {
166                self.env
167                    .notification_manager()
168                    .notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
169                        fragment_ids,
170                    ))
171                    .await;
172            }
173            NotificationOperation::Delete => {
174                self.env
175                    .notification_manager()
176                    .notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
177                        fragment_ids,
178                    ))
179                    .await;
180            }
181            op => {
182                tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
183            }
184        }
185    }
186
187    pub fn extract_fragment_and_actors_from_fragments(
188        stream_job_fragments: &StreamJobFragments,
189    ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
190        stream_job_fragments
191            .fragments
192            .values()
193            .map(|fragment| {
194                Self::extract_fragment_and_actors_for_new_job(
195                    stream_job_fragments.stream_job_id.table_id as _,
196                    fragment,
197                    &stream_job_fragments.actor_status,
198                    &stream_job_fragments.actor_splits,
199                )
200            })
201            .try_collect()
202    }
203
204    fn extract_fragment_and_actors_for_new_job(
205        job_id: ObjectId,
206        fragment: &Fragment,
207        actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
208        actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
209    ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
210        let vnode_count = fragment.vnode_count();
211        let Fragment {
212            fragment_id: pb_fragment_id,
213            fragment_type_mask: pb_fragment_type_mask,
214            distribution_type: pb_distribution_type,
215            actors: pb_actors,
216            state_table_ids: pb_state_table_ids,
217            nodes,
218            ..
219        } = fragment;
220
221        let state_table_ids = pb_state_table_ids.clone().into();
222
223        assert!(!pb_actors.is_empty());
224
225        let stream_node = {
226            let mut stream_node = nodes.clone();
227            visit_stream_node_mut(&mut stream_node, |body| {
228                #[expect(deprecated)]
229                if let NodeBody::Merge(m) = body {
230                    m.upstream_actor_id = vec![];
231                }
232            });
233
234            stream_node
235        };
236
237        let mut actors = vec![];
238
239        for actor in pb_actors {
240            let StreamActor {
241                actor_id,
242                fragment_id,
243                vnode_bitmap,
244                mview_definition: _,
245                expr_context: pb_expr_context,
246                ..
247            } = actor;
248
249            let splits = actor_splits.get(actor_id).map(|splits| {
250                ConnectorSplits::from(&PbConnectorSplits {
251                    splits: splits.iter().map(ConnectorSplit::from).collect(),
252                })
253            });
254            let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
255                anyhow::anyhow!(
256                    "actor {} in fragment {} has no actor_status",
257                    actor_id,
258                    fragment_id
259                )
260            })?;
261
262            let worker_id = status.worker_id() as _;
263
264            let pb_expr_context = pb_expr_context
265                .as_ref()
266                .expect("no expression context found");
267
268            #[expect(deprecated)]
269            actors.push(actor::Model {
270                actor_id: *actor_id as _,
271                fragment_id: *fragment_id as _,
272                status: status.get_state().unwrap().into(),
273                splits,
274                worker_id,
275                upstream_actor_ids: Default::default(),
276                vnode_bitmap: vnode_bitmap
277                    .as_ref()
278                    .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
279                expr_context: ExprContext::from(pb_expr_context),
280            });
281        }
282
283        let stream_node = StreamNode::from(&stream_node);
284
285        let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
286            .unwrap()
287            .into();
288
289        #[expect(deprecated)]
290        let fragment = fragment::Model {
291            fragment_id: *pb_fragment_id as _,
292            job_id,
293            fragment_type_mask: *pb_fragment_type_mask as _,
294            distribution_type,
295            stream_node,
296            state_table_ids,
297            upstream_fragment_id: Default::default(),
298            vnode_count: vnode_count as _,
299        };
300
301        Ok((fragment, actors))
302    }
303
304    pub fn compose_table_fragments(
305        table_id: u32,
306        state: PbState,
307        ctx: Option<PbStreamContext>,
308        fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
309        parallelism: StreamingParallelism,
310        max_parallelism: usize,
311        job_definition: Option<String>,
312    ) -> MetaResult<StreamJobFragments> {
313        let mut pb_fragments = BTreeMap::new();
314        let mut pb_actor_splits = HashMap::new();
315        let mut pb_actor_status = BTreeMap::new();
316
317        for (fragment, actors) in fragments {
318            let (fragment, fragment_actor_status, fragment_actor_splits) =
319                Self::compose_fragment(fragment, actors, job_definition.clone())?;
320
321            pb_fragments.insert(fragment.fragment_id, fragment);
322
323            pb_actor_splits.extend(build_actor_split_impls(&fragment_actor_splits));
324            pb_actor_status.extend(fragment_actor_status.into_iter());
325        }
326
327        let table_fragments = StreamJobFragments {
328            stream_job_id: table_id.into(),
329            state: state as _,
330            fragments: pb_fragments,
331            actor_status: pb_actor_status,
332            actor_splits: pb_actor_splits,
333            ctx: ctx
334                .as_ref()
335                .map(StreamContext::from_protobuf)
336                .unwrap_or_default(),
337            assigned_parallelism: match parallelism {
338                StreamingParallelism::Custom => TableParallelism::Custom,
339                StreamingParallelism::Adaptive => TableParallelism::Adaptive,
340                StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
341            },
342            max_parallelism,
343        };
344
345        Ok(table_fragments)
346    }
347
348    #[allow(clippy::type_complexity)]
349    pub(crate) fn compose_fragment(
350        fragment: fragment::Model,
351        actors: Vec<actor::Model>,
352        job_definition: Option<String>,
353    ) -> MetaResult<(
354        Fragment,
355        HashMap<crate::model::ActorId, PbActorStatus>,
356        HashMap<crate::model::ActorId, PbConnectorSplits>,
357    )> {
358        let fragment::Model {
359            fragment_id,
360            fragment_type_mask,
361            distribution_type,
362            stream_node,
363            state_table_ids,
364            vnode_count,
365            ..
366        } = fragment;
367
368        let stream_node = stream_node.to_protobuf();
369        let mut upstream_fragments = HashSet::new();
370        visit_stream_node(&stream_node, |body| {
371            if let NodeBody::Merge(m) = body {
372                assert!(
373                    upstream_fragments.insert(m.upstream_fragment_id),
374                    "non-duplicate upstream fragment"
375                );
376            }
377        });
378
379        let mut pb_actors = vec![];
380
381        let mut pb_actor_status = HashMap::new();
382        let mut pb_actor_splits = HashMap::new();
383
384        for actor in actors {
385            if actor.fragment_id != fragment_id {
386                bail!(
387                    "fragment id {} from actor {} is different from fragment {}",
388                    actor.fragment_id,
389                    actor.actor_id,
390                    fragment_id
391                )
392            }
393
394            let actor::Model {
395                actor_id,
396                fragment_id,
397                status,
398                worker_id,
399                splits,
400                vnode_bitmap,
401                expr_context,
402                ..
403            } = actor;
404
405            let vnode_bitmap =
406                vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
407            let pb_expr_context = Some(expr_context.to_protobuf());
408
409            pb_actor_status.insert(
410                actor_id as _,
411                PbActorStatus {
412                    location: PbActorLocation::from_worker(worker_id as u32),
413                    state: PbActorState::from(status) as _,
414                },
415            );
416
417            if let Some(splits) = splits {
418                pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
419            }
420
421            pb_actors.push(StreamActor {
422                actor_id: actor_id as _,
423                fragment_id: fragment_id as _,
424                vnode_bitmap,
425                mview_definition: job_definition.clone().unwrap_or("".to_owned()),
426                expr_context: pb_expr_context,
427            })
428        }
429
430        let pb_state_table_ids = state_table_ids.into_u32_array();
431        let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
432        let pb_fragment = Fragment {
433            fragment_id: fragment_id as _,
434            fragment_type_mask: fragment_type_mask as _,
435            distribution_type: pb_distribution_type,
436            actors: pb_actors,
437            state_table_ids: pb_state_table_ids,
438            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
439            nodes: stream_node,
440        };
441
442        Ok((pb_fragment, pb_actor_status, pb_actor_splits))
443    }
444
445    pub async fn running_fragment_parallelisms(
446        &self,
447        id_filter: Option<HashSet<FragmentId>>,
448    ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
449        let inner = self.inner.read().await;
450
451        let query_alias = Alias::new("fragment_actor_count");
452        let count_alias = Alias::new("count");
453
454        let mut query = SelectStatement::new()
455            .column(actor::Column::FragmentId)
456            .expr_as(actor::Column::ActorId.count(), count_alias.clone())
457            .from(Actor)
458            .group_by_col(actor::Column::FragmentId)
459            .to_owned();
460
461        if let Some(id_filter) = id_filter {
462            query.cond_having(actor::Column::FragmentId.is_in(id_filter));
463        }
464
465        let outer = SelectStatement::new()
466            .column((FragmentModel, fragment::Column::FragmentId))
467            .column(count_alias)
468            .column(fragment::Column::DistributionType)
469            .column(fragment::Column::VnodeCount)
470            .from_subquery(query.to_owned(), query_alias.clone())
471            .inner_join(
472                FragmentModel,
473                Expr::col((query_alias, actor::Column::FragmentId))
474                    .equals((FragmentModel, fragment::Column::FragmentId)),
475            )
476            .to_owned();
477
478        let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
479            Selector::<SelectGetableTuple<(FragmentId, i64, DistributionType, i32)>>::into_tuple(
480                outer.to_owned(),
481            )
482            .all(&inner.db)
483            .await?;
484
485        Ok(fragment_parallelisms
486            .into_iter()
487            .map(|(fragment_id, count, distribution_type, vnode_count)| {
488                (
489                    fragment_id,
490                    FragmentParallelismInfo {
491                        distribution_type: distribution_type.into(),
492                        actor_count: count as usize,
493                        vnode_count: vnode_count as usize,
494                    },
495                )
496            })
497            .collect())
498    }
499
500    pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
501        let inner = self.inner.read().await;
502        let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
503            .select_only()
504            .columns([fragment::Column::FragmentId, fragment::Column::JobId])
505            .into_tuple()
506            .all(&inner.db)
507            .await?;
508        Ok(fragment_jobs.into_iter().collect())
509    }
510
511    pub async fn get_fragment_job_id(
512        &self,
513        fragment_ids: Vec<FragmentId>,
514    ) -> MetaResult<Vec<ObjectId>> {
515        let inner = self.inner.read().await;
516
517        let object_ids: Vec<ObjectId> = FragmentModel::find()
518            .select_only()
519            .column(fragment::Column::JobId)
520            .filter(fragment::Column::FragmentId.is_in(fragment_ids))
521            .into_tuple()
522            .all(&inner.db)
523            .await?;
524
525        Ok(object_ids)
526    }
527
528    pub async fn list_fragment_database_ids(
529        &self,
530        select_fragment_ids: Option<Vec<FragmentId>>,
531    ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
532        let inner = self.inner.read().await;
533        let select = FragmentModel::find()
534            .select_only()
535            .column(fragment::Column::FragmentId)
536            .column(object::Column::DatabaseId)
537            .join(JoinType::InnerJoin, fragment::Relation::Object.def());
538        let select = if let Some(select_fragment_ids) = select_fragment_ids {
539            select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
540        } else {
541            select
542        };
543        Ok(select.into_tuple().all(&inner.db).await?)
544    }
545
546    pub async fn get_job_fragments_by_id(
547        &self,
548        job_id: ObjectId,
549    ) -> MetaResult<StreamJobFragments> {
550        let inner = self.inner.read().await;
551        let fragment_actors = FragmentModel::find()
552            .find_with_related(Actor)
553            .filter(fragment::Column::JobId.eq(job_id))
554            .all(&inner.db)
555            .await?;
556
557        let job_info = StreamingJob::find_by_id(job_id)
558            .one(&inner.db)
559            .await?
560            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
561
562        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
563            .await?
564            .remove(&job_id);
565
566        Self::compose_table_fragments(
567            job_id as _,
568            job_info.job_status.into(),
569            job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
570            fragment_actors,
571            job_info.parallelism.clone(),
572            job_info.max_parallelism as _,
573            job_definition,
574        )
575    }
576
577    pub async fn get_fragment_actor_dispatchers(
578        &self,
579        fragment_ids: Vec<FragmentId>,
580    ) -> MetaResult<FragmentActorDispatchers> {
581        let inner = self.inner.read().await;
582        get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
583    }
584
585    pub async fn get_fragment_downstream_relations(
586        &self,
587        fragment_ids: Vec<FragmentId>,
588    ) -> MetaResult<FragmentDownstreamRelation> {
589        let inner = self.inner.read().await;
590        let mut stream = FragmentRelation::find()
591            .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
592            .stream(&inner.db)
593            .await?;
594        let mut relations = FragmentDownstreamRelation::new();
595        while let Some(relation) = stream.try_next().await? {
596            relations
597                .entry(relation.source_fragment_id as _)
598                .or_default()
599                .push(DownstreamFragmentRelation {
600                    downstream_fragment_id: relation.target_fragment_id as _,
601                    dispatcher_type: relation.dispatcher_type,
602                    dist_key_indices: relation.dist_key_indices.into_u32_array(),
603                    output_indices: relation.output_indices.into_u32_array(),
604                });
605        }
606        Ok(relations)
607    }
608
609    pub async fn get_job_fragment_backfill_scan_type(
610        &self,
611        job_id: ObjectId,
612    ) -> MetaResult<HashMap<model::FragmentId, PbStreamScanType>> {
613        let inner = self.inner.read().await;
614        let fragments: Vec<_> = FragmentModel::find()
615            .filter(fragment::Column::JobId.eq(job_id))
616            .all(&inner.db)
617            .await?;
618
619        let mut result = HashMap::new();
620
621        for fragment::Model {
622            fragment_id,
623            stream_node,
624            ..
625        } in fragments
626        {
627            let stream_node = stream_node.to_protobuf();
628            visit_stream_node(&stream_node, |body| {
629                if let NodeBody::StreamScan(node) = body {
630                    match node.stream_scan_type() {
631                        StreamScanType::Unspecified => {}
632                        scan_type => {
633                            result.insert(fragment_id as model::FragmentId, scan_type);
634                        }
635                    }
636                }
637            });
638        }
639
640        Ok(result)
641    }
642
643    pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
644        let inner = self.inner.read().await;
645        let job_states = StreamingJob::find()
646            .select_only()
647            .column(streaming_job::Column::JobId)
648            .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
649            .join(JoinType::InnerJoin, object::Relation::Database2.def())
650            .column(object::Column::ObjType)
651            .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
652            .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
653            .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
654            .column_as(
655                Expr::if_null(
656                    Expr::col((table::Entity, table::Column::Name)),
657                    Expr::if_null(
658                        Expr::col((source::Entity, source::Column::Name)),
659                        Expr::if_null(
660                            Expr::col((sink::Entity, sink::Column::Name)),
661                            Expr::val("<unknown>"),
662                        ),
663                    ),
664                ),
665                "name",
666            )
667            .columns([
668                streaming_job::Column::JobStatus,
669                streaming_job::Column::Parallelism,
670                streaming_job::Column::MaxParallelism,
671            ])
672            .column_as(
673                Expr::if_null(
674                    Expr::col((
675                        streaming_job::Entity,
676                        streaming_job::Column::SpecificResourceGroup,
677                    )),
678                    Expr::col((database::Entity, database::Column::ResourceGroup)),
679                ),
680                "resource_group",
681            )
682            .into_model()
683            .all(&inner.db)
684            .await?;
685        Ok(job_states)
686    }
687
688    pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
689        let inner = self.inner.read().await;
690        let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
691            .select_only()
692            .column(streaming_job::Column::MaxParallelism)
693            .into_tuple()
694            .one(&inner.db)
695            .await?
696            .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
697        Ok(max_parallelism as usize)
698    }
699
700    /// Get all actor ids in the target streaming jobs.
701    pub async fn get_job_actor_mapping(
702        &self,
703        job_ids: Vec<ObjectId>,
704    ) -> MetaResult<HashMap<ObjectId, Vec<ActorId>>> {
705        let inner = self.inner.read().await;
706        let job_actors: Vec<(ObjectId, ActorId)> = Actor::find()
707            .select_only()
708            .column(fragment::Column::JobId)
709            .column(actor::Column::ActorId)
710            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
711            .filter(fragment::Column::JobId.is_in(job_ids))
712            .into_tuple()
713            .all(&inner.db)
714            .await?;
715        Ok(job_actors.into_iter().into_group_map())
716    }
717
718    /// Try to get internal table ids of each streaming job, used by metrics collection.
719    pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
720        if let Ok(inner) = self.inner.try_read() {
721            if let Ok(job_state_tables) = FragmentModel::find()
722                .select_only()
723                .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
724                .into_tuple::<(ObjectId, I32Array)>()
725                .all(&inner.db)
726                .await
727            {
728                let mut job_internal_table_ids = HashMap::new();
729                for (job_id, state_table_ids) in job_state_tables {
730                    job_internal_table_ids
731                        .entry(job_id)
732                        .or_insert_with(Vec::new)
733                        .extend(state_table_ids.into_inner());
734                }
735                return Some(job_internal_table_ids.into_iter().collect());
736            }
737        }
738        None
739    }
740
741    pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
742        let inner = self.inner.read().await;
743        let count = FragmentModel::find().count(&inner.db).await?;
744        Ok(count > 0)
745    }
746
747    pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
748        let inner = self.inner.read().await;
749        let actor_cnt: Vec<(WorkerId, i64)> = Actor::find()
750            .select_only()
751            .column(actor::Column::WorkerId)
752            .column_as(actor::Column::ActorId.count(), "count")
753            .group_by(actor::Column::WorkerId)
754            .into_tuple()
755            .all(&inner.db)
756            .await?;
757
758        Ok(actor_cnt
759            .into_iter()
760            .map(|(worker_id, count)| (worker_id, count as usize))
761            .collect())
762    }
763
764    // TODO: This function is too heavy, we should avoid using it and implement others on demand.
765    pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
766        let inner = self.inner.read().await;
767        let jobs = StreamingJob::find().all(&inner.db).await?;
768
769        let mut job_definition = resolve_streaming_job_definition(
770            &inner.db,
771            &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
772        )
773        .await?;
774
775        let mut table_fragments = BTreeMap::new();
776        for job in jobs {
777            let fragment_actors = FragmentModel::find()
778                .find_with_related(Actor)
779                .filter(fragment::Column::JobId.eq(job.job_id))
780                .all(&inner.db)
781                .await?;
782
783            table_fragments.insert(
784                job.job_id as ObjectId,
785                Self::compose_table_fragments(
786                    job.job_id as _,
787                    job.job_status.into(),
788                    job.timezone.map(|tz| PbStreamContext { timezone: tz }),
789                    fragment_actors,
790                    job.parallelism.clone(),
791                    job.max_parallelism as _,
792                    job_definition.remove(&job.job_id),
793                )?,
794            );
795        }
796
797        Ok(table_fragments)
798    }
799
800    pub async fn upstream_fragments(
801        &self,
802        fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
803    ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
804        let inner = self.inner.read().await;
805        let mut stream = FragmentRelation::find()
806            .select_only()
807            .columns([
808                fragment_relation::Column::SourceFragmentId,
809                fragment_relation::Column::TargetFragmentId,
810            ])
811            .filter(
812                fragment_relation::Column::TargetFragmentId
813                    .is_in(fragment_ids.map(|id| id as FragmentId)),
814            )
815            .into_tuple::<(FragmentId, FragmentId)>()
816            .stream(&inner.db)
817            .await?;
818        let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
819        while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
820            upstream_fragments
821                .entry(downstream_fragment_id as crate::model::FragmentId)
822                .or_default()
823                .insert(upstream_fragment_id as crate::model::FragmentId);
824        }
825        Ok(upstream_fragments)
826    }
827
828    pub async fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
829        let inner = self.inner.read().await;
830        let actor_locations: Vec<PartialActorLocation> =
831            Actor::find().into_partial_model().all(&inner.db).await?;
832        Ok(actor_locations)
833    }
834
835    pub async fn list_actor_info(
836        &self,
837    ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
838        let inner = self.inner.read().await;
839        let actor_locations: Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)> =
840            Actor::find()
841                .join(JoinType::LeftJoin, actor::Relation::Fragment.def())
842                .join(JoinType::LeftJoin, fragment::Relation::Object.def())
843                .select_only()
844                .columns([actor::Column::ActorId, actor::Column::FragmentId])
845                .column_as(object::Column::Oid, "job_id")
846                .column_as(object::Column::SchemaId, "schema_id")
847                .column_as(object::Column::ObjType, "type")
848                .into_tuple()
849                .all(&inner.db)
850                .await?;
851        Ok(actor_locations)
852    }
853
854    pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
855        let inner = self.inner.read().await;
856
857        let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
858            .select_only()
859            .filter(actor::Column::Splits.is_not_null())
860            .columns([actor::Column::ActorId, actor::Column::FragmentId])
861            .into_tuple()
862            .all(&inner.db)
863            .await?;
864
865        Ok(source_actors)
866    }
867
868    pub async fn list_fragment_descs(&self) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
869        let inner = self.inner.read().await;
870        let mut result = Vec::new();
871        let fragments = FragmentModel::find()
872            .select_only()
873            .columns([
874                fragment::Column::FragmentId,
875                fragment::Column::JobId,
876                fragment::Column::FragmentTypeMask,
877                fragment::Column::DistributionType,
878                fragment::Column::StateTableIds,
879                fragment::Column::VnodeCount,
880                fragment::Column::StreamNode,
881            ])
882            .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
883            .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
884            .group_by(fragment::Column::FragmentId)
885            .into_model::<FragmentDesc>()
886            .all(&inner.db)
887            .await?;
888        for fragment in fragments {
889            let upstreams: Vec<FragmentId> = FragmentRelation::find()
890                .select_only()
891                .column(fragment_relation::Column::SourceFragmentId)
892                .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
893                .into_tuple()
894                .all(&inner.db)
895                .await?;
896            result.push((fragment, upstreams));
897        }
898        Ok(result)
899    }
900
901    pub async fn list_sink_actor_mapping(
902        &self,
903    ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
904        let inner = self.inner.read().await;
905        let sink_id_names: Vec<(SinkId, String)> = Sink::find()
906            .select_only()
907            .columns([sink::Column::SinkId, sink::Column::Name])
908            .into_tuple()
909            .all(&inner.db)
910            .await?;
911        let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
912        let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
913
914        let actor_with_type: Vec<(ActorId, SinkId, i32)> = Actor::find()
915            .select_only()
916            .column(actor::Column::ActorId)
917            .columns([fragment::Column::JobId, fragment::Column::FragmentTypeMask])
918            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
919            .filter(fragment::Column::JobId.is_in(sink_ids))
920            .into_tuple()
921            .all(&inner.db)
922            .await?;
923
924        let mut sink_actor_mapping = HashMap::new();
925        for (actor_id, sink_id, type_mask) in actor_with_type {
926            if type_mask & PbFragmentTypeFlag::Sink as i32 != 0 {
927                sink_actor_mapping
928                    .entry(sink_id)
929                    .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
930                    .1
931                    .push(actor_id);
932            }
933        }
934
935        Ok(sink_actor_mapping)
936    }
937
938    pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
939        let inner = self.inner.read().await;
940        let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
941            .select_only()
942            .columns([
943                fragment::Column::FragmentId,
944                fragment::Column::JobId,
945                fragment::Column::StateTableIds,
946            ])
947            .into_partial_model()
948            .all(&inner.db)
949            .await?;
950        Ok(fragment_state_tables)
951    }
952
953    /// Used in [`crate::barrier::GlobalBarrierManager`], load all running actor that need to be sent or
954    /// collected
955    pub async fn load_all_actors(
956        &self,
957        database_id: Option<DatabaseId>,
958    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
959    {
960        let inner = self.inner.read().await;
961        let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
962        let filter_condition = if let Some(database_id) = database_id {
963            filter_condition.and(object::Column::DatabaseId.eq(database_id))
964        } else {
965            filter_condition
966        };
967        #[expect(clippy::type_complexity)]
968        let mut actor_info_stream: BoxStream<
969            '_,
970            Result<
971                (
972                    ActorId,
973                    WorkerId,
974                    Option<VnodeBitmap>,
975                    FragmentId,
976                    StreamNode,
977                    I32Array,
978                    DistributionType,
979                    DatabaseId,
980                    ObjectId,
981                ),
982                _,
983            >,
984        > = Actor::find()
985            .select_only()
986            .column(actor::Column::ActorId)
987            .column(actor::Column::WorkerId)
988            .column(actor::Column::VnodeBitmap)
989            .column(fragment::Column::FragmentId)
990            .column(fragment::Column::StreamNode)
991            .column(fragment::Column::StateTableIds)
992            .column(fragment::Column::DistributionType)
993            .column(object::Column::DatabaseId)
994            .column(object::Column::Oid)
995            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
996            .join(JoinType::InnerJoin, fragment::Relation::Object.def())
997            .filter(filter_condition)
998            .into_tuple()
999            .stream(&inner.db)
1000            .await?;
1001
1002        let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1003            HashMap::new();
1004
1005        while let Some((
1006            actor_id,
1007            worker_id,
1008            vnode_bitmap,
1009            fragment_id,
1010            node,
1011            state_table_ids,
1012            distribution_type,
1013            database_id,
1014            job_id,
1015        )) = actor_info_stream.try_next().await?
1016        {
1017            let fragment_infos = database_fragment_infos
1018                .entry(database_id)
1019                .or_default()
1020                .entry(job_id)
1021                .or_default();
1022            let state_table_ids = state_table_ids.into_inner();
1023            let state_table_ids = state_table_ids
1024                .into_iter()
1025                .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1026                .collect();
1027            let actor_info = InflightActorInfo {
1028                worker_id,
1029                vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1030            };
1031            match fragment_infos.entry(fragment_id) {
1032                Entry::Occupied(mut entry) => {
1033                    let info: &mut InflightFragmentInfo = entry.get_mut();
1034                    assert_eq!(info.state_table_ids, state_table_ids);
1035                    assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1036                }
1037                Entry::Vacant(entry) => {
1038                    entry.insert(InflightFragmentInfo {
1039                        fragment_id: fragment_id as _,
1040                        distribution_type,
1041                        nodes: node.to_protobuf(),
1042                        actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1043                        state_table_ids,
1044                    });
1045                }
1046            }
1047        }
1048
1049        debug!(?database_fragment_infos, "reload all actors");
1050
1051        Ok(database_fragment_infos)
1052    }
1053
1054    pub async fn migrate_actors(
1055        &self,
1056        plan: HashMap<WorkerSlotId, WorkerSlotId>,
1057    ) -> MetaResult<()> {
1058        let inner = self.inner.read().await;
1059        let txn = inner.db.begin().await?;
1060
1061        let actors: Vec<(
1062            FragmentId,
1063            DistributionType,
1064            ActorId,
1065            Option<VnodeBitmap>,
1066            WorkerId,
1067            ActorStatus,
1068        )> = Actor::find()
1069            .select_only()
1070            .columns([
1071                fragment::Column::FragmentId,
1072                fragment::Column::DistributionType,
1073            ])
1074            .columns([
1075                actor::Column::ActorId,
1076                actor::Column::VnodeBitmap,
1077                actor::Column::WorkerId,
1078                actor::Column::Status,
1079            ])
1080            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1081            .into_tuple()
1082            .all(&txn)
1083            .await?;
1084
1085        let mut actor_locations = HashMap::new();
1086
1087        for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1088            if *status != ActorStatus::Running {
1089                tracing::warn!(
1090                    "skipping actor {} in fragment {} with status {:?}",
1091                    actor_id,
1092                    fragment_id,
1093                    status
1094                );
1095                continue;
1096            }
1097
1098            actor_locations
1099                .entry(*worker_id)
1100                .or_insert(HashMap::new())
1101                .entry(*fragment_id)
1102                .or_insert(BTreeSet::new())
1103                .insert(*actor_id);
1104        }
1105
1106        let expired_or_changed_workers: HashSet<_> =
1107            plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1108
1109        let mut actor_migration_plan = HashMap::new();
1110        for (worker, fragment) in actor_locations {
1111            if expired_or_changed_workers.contains(&worker) {
1112                for (fragment_id, actors) in fragment {
1113                    debug!(
1114                        "worker {} expired or changed, migrating fragment {}",
1115                        worker, fragment_id
1116                    );
1117                    let worker_slot_to_actor: HashMap<_, _> = actors
1118                        .iter()
1119                        .enumerate()
1120                        .map(|(idx, actor_id)| {
1121                            (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1122                        })
1123                        .collect();
1124
1125                    for (worker_slot, actor) in worker_slot_to_actor {
1126                        if let Some(target) = plan.get(&worker_slot) {
1127                            actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1128                        }
1129                    }
1130                }
1131            }
1132        }
1133
1134        for (actor, worker) in actor_migration_plan {
1135            Actor::update_many()
1136                .col_expr(
1137                    actor::Column::WorkerId,
1138                    Expr::value(Value::Int(Some(worker))),
1139                )
1140                .filter(actor::Column::ActorId.eq(actor))
1141                .exec(&txn)
1142                .await?;
1143        }
1144
1145        txn.commit().await?;
1146
1147        self.notify_fragment_mapping(
1148            NotificationOperation::Update,
1149            rebuild_fragment_mapping_from_actors(actors),
1150        )
1151        .await;
1152
1153        Ok(())
1154    }
1155
1156    pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1157        let inner = self.inner.read().await;
1158
1159        let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1160            .select_only()
1161            .columns([fragment::Column::FragmentId])
1162            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1163            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1164            .into_tuple()
1165            .all(&inner.db)
1166            .await?;
1167
1168        let mut actor_locations = HashMap::new();
1169
1170        for (fragment_id, _, worker_id) in actors {
1171            *actor_locations
1172                .entry(worker_id)
1173                .or_insert(HashMap::new())
1174                .entry(fragment_id)
1175                .or_insert(0_usize) += 1;
1176        }
1177
1178        let mut result = HashSet::new();
1179        for (worker_id, mapping) in actor_locations {
1180            let max_fragment_len = mapping.values().max().unwrap();
1181
1182            result
1183                .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1184        }
1185
1186        Ok(result)
1187    }
1188
1189    pub async fn all_node_actors(
1190        &self,
1191        include_inactive: bool,
1192    ) -> MetaResult<HashMap<WorkerId, Vec<StreamActor>>> {
1193        let inner = self.inner.read().await;
1194        let fragment_actors = if include_inactive {
1195            FragmentModel::find()
1196                .find_with_related(Actor)
1197                .all(&inner.db)
1198                .await?
1199        } else {
1200            FragmentModel::find()
1201                .find_with_related(Actor)
1202                .filter(actor::Column::Status.eq(ActorStatus::Running))
1203                .all(&inner.db)
1204                .await?
1205        };
1206
1207        let job_definitions = resolve_streaming_job_definition(
1208            &inner.db,
1209            &HashSet::from_iter(fragment_actors.iter().map(|(fragment, _)| fragment.job_id)),
1210        )
1211        .await?;
1212
1213        let mut node_actors = HashMap::new();
1214        for (fragment, actors) in fragment_actors {
1215            let job_id = fragment.job_id;
1216            let (table_fragments, actor_status, _) = Self::compose_fragment(
1217                fragment,
1218                actors,
1219                job_definitions.get(&(job_id as _)).cloned(),
1220            )?;
1221            for actor in table_fragments.actors {
1222                let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
1223                node_actors
1224                    .entry(node_id)
1225                    .or_insert_with(Vec::new)
1226                    .push(actor);
1227            }
1228        }
1229
1230        Ok(node_actors)
1231    }
1232
1233    pub async fn get_worker_actor_ids(
1234        &self,
1235        job_ids: Vec<ObjectId>,
1236    ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
1237        let inner = self.inner.read().await;
1238        let actor_workers: Vec<(ActorId, WorkerId)> = Actor::find()
1239            .select_only()
1240            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1241            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1242            .filter(fragment::Column::JobId.is_in(job_ids))
1243            .into_tuple()
1244            .all(&inner.db)
1245            .await?;
1246
1247        let mut worker_actors = BTreeMap::new();
1248        for (actor_id, worker_id) in actor_workers {
1249            worker_actors
1250                .entry(worker_id)
1251                .or_insert_with(Vec::new)
1252                .push(actor_id);
1253        }
1254
1255        Ok(worker_actors)
1256    }
1257
1258    pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1259        let inner = self.inner.read().await;
1260        let txn = inner.db.begin().await?;
1261        for assignments in split_assignment.values() {
1262            for (actor_id, splits) in assignments {
1263                let actor_splits = splits.iter().map(Into::into).collect_vec();
1264                Actor::update(actor::ActiveModel {
1265                    actor_id: Set(*actor_id as _),
1266                    splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1267                        splits: actor_splits,
1268                    }))),
1269                    ..Default::default()
1270                })
1271                .exec(&txn)
1272                .await
1273                .map_err(|err| {
1274                    if err == DbErr::RecordNotUpdated {
1275                        MetaError::catalog_id_not_found("actor_id", actor_id)
1276                    } else {
1277                        err.into()
1278                    }
1279                })?;
1280            }
1281        }
1282        txn.commit().await?;
1283
1284        Ok(())
1285    }
1286
1287    pub async fn fill_snapshot_backfill_epoch(
1288        &self,
1289        fragment_ids: impl Iterator<Item = FragmentId>,
1290        snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1291        cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1292    ) -> MetaResult<()> {
1293        let inner = self.inner.write().await;
1294        let txn = inner.db.begin().await?;
1295        for fragment_id in fragment_ids {
1296            let fragment = FragmentModel::find_by_id(fragment_id)
1297                .one(&txn)
1298                .await?
1299                .context(format!("fragment {} not found", fragment_id))?;
1300            let mut node = fragment.stream_node.to_protobuf();
1301            if crate::stream::fill_snapshot_backfill_epoch(
1302                &mut node,
1303                snapshot_backfill_info,
1304                cross_db_snapshot_backfill_info,
1305            )? {
1306                let node = StreamNode::from(&node);
1307                FragmentModel::update(fragment::ActiveModel {
1308                    fragment_id: Set(fragment_id),
1309                    stream_node: Set(node),
1310                    ..Default::default()
1311                })
1312                .exec(&txn)
1313                .await?;
1314            }
1315        }
1316        txn.commit().await?;
1317        Ok(())
1318    }
1319
1320    /// Get the actor ids of the fragment with `fragment_id` with `Running` status.
1321    pub async fn get_running_actors_of_fragment(
1322        &self,
1323        fragment_id: FragmentId,
1324    ) -> MetaResult<Vec<ActorId>> {
1325        let inner = self.inner.read().await;
1326        let actors: Vec<ActorId> = Actor::find()
1327            .select_only()
1328            .column(actor::Column::ActorId)
1329            .filter(actor::Column::FragmentId.eq(fragment_id))
1330            .filter(actor::Column::Status.eq(ActorStatus::Running))
1331            .into_tuple()
1332            .all(&inner.db)
1333            .await?;
1334        Ok(actors)
1335    }
1336
1337    /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status.
1338    /// (`backfill_actor_id`, `upstream_source_actor_id`)
1339    pub async fn get_running_actors_for_source_backfill(
1340        &self,
1341        source_backfill_fragment_id: FragmentId,
1342        source_fragment_id: FragmentId,
1343    ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1344        let inner = self.inner.read().await;
1345        let txn = inner.db.begin().await?;
1346
1347        let fragment_relation: DispatcherType = FragmentRelation::find()
1348            .select_only()
1349            .column(fragment_relation::Column::DispatcherType)
1350            .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1351            .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1352            .into_tuple()
1353            .one(&txn)
1354            .await?
1355            .ok_or_else(|| {
1356                anyhow!(
1357                    "no fragment connection from source fragment {} to source backfill fragment {}",
1358                    source_fragment_id,
1359                    source_backfill_fragment_id
1360                )
1361            })?;
1362
1363        if fragment_relation != DispatcherType::NoShuffle {
1364            return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1365        }
1366
1367        let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1368            let result: MetaResult<DistributionType> = try {
1369                FragmentModel::find_by_id(fragment_id)
1370                    .select_only()
1371                    .column(fragment::Column::DistributionType)
1372                    .into_tuple()
1373                    .one(txn)
1374                    .await?
1375                    .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1376            };
1377            result
1378        };
1379
1380        let source_backfill_distribution_type =
1381            load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1382        let source_distribution_type =
1383            load_fragment_distribution_type(&txn, source_fragment_id).await?;
1384
1385        let load_fragment_actor_distribution = |txn, fragment_id: FragmentId| async move {
1386            Actor::find()
1387                .select_only()
1388                .column(actor::Column::ActorId)
1389                .column(actor::Column::VnodeBitmap)
1390                .filter(actor::Column::FragmentId.eq(fragment_id))
1391                .into_tuple()
1392                .stream(txn)
1393                .await?
1394                .map(|result| {
1395                    result.map(|(actor_id, vnode): (ActorId, Option<VnodeBitmap>)| {
1396                        (
1397                            actor_id as _,
1398                            vnode.map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1399                        )
1400                    })
1401                })
1402                .try_collect()
1403                .await
1404        };
1405
1406        let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1407            load_fragment_actor_distribution(&txn, source_backfill_fragment_id).await?;
1408
1409        let source_actors = load_fragment_actor_distribution(&txn, source_fragment_id).await?;
1410
1411        Ok(resolve_no_shuffle_actor_dispatcher(
1412            source_distribution_type,
1413            &source_actors,
1414            source_backfill_distribution_type,
1415            &source_backfill_actors,
1416        )
1417        .into_iter()
1418        .map(|(source_actor, source_backfill_actor)| {
1419            (source_backfill_actor as _, source_actor as _)
1420        })
1421        .collect())
1422    }
1423
1424    pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
1425        let inner = self.inner.read().await;
1426        let actors: Vec<ActorId> = Actor::find()
1427            .select_only()
1428            .column(actor::Column::ActorId)
1429            .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1430            .filter(fragment::Column::JobId.is_in(job_ids))
1431            .into_tuple()
1432            .all(&inner.db)
1433            .await?;
1434        Ok(actors)
1435    }
1436
1437    /// Get and filter the "**root**" fragments of the specified jobs.
1438    /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
1439    ///
1440    /// Root fragment connects to downstream jobs.
1441    ///
1442    /// ## What can be the root fragment
1443    /// - For sink, it should have one `Sink` fragment.
1444    /// - For MV, it should have one `MView` fragment.
1445    /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
1446    /// - For source, it should have one `Source` fragment.
1447    ///
1448    /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
1449    pub async fn get_root_fragments(
1450        &self,
1451        job_ids: Vec<ObjectId>,
1452    ) -> MetaResult<(HashMap<ObjectId, Fragment>, Vec<(ActorId, WorkerId)>)> {
1453        let inner = self.inner.read().await;
1454
1455        let job_definitions = resolve_streaming_job_definition(
1456            &inner.db,
1457            &HashSet::from_iter(job_ids.iter().copied()),
1458        )
1459        .await?;
1460
1461        let all_fragments = FragmentModel::find()
1462            .filter(fragment::Column::JobId.is_in(job_ids))
1463            .all(&inner.db)
1464            .await?;
1465        // job_id -> fragment
1466        let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1467        for fragment in all_fragments {
1468            if (fragment.fragment_type_mask & PbFragmentTypeFlag::Mview as i32) != 0
1469                || (fragment.fragment_type_mask & PbFragmentTypeFlag::Sink as i32) != 0
1470            {
1471                _ = root_fragments.insert(fragment.job_id, fragment);
1472            } else if fragment.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
1473                // look for Source fragment only if there's no MView fragment
1474                // (notice try_insert here vs insert above)
1475                _ = root_fragments.try_insert(fragment.job_id, fragment);
1476            }
1477        }
1478
1479        let mut root_fragments_pb = HashMap::new();
1480        for (_, fragment) in root_fragments {
1481            let actors = fragment.find_related(Actor).all(&inner.db).await?;
1482
1483            let job_id = fragment.job_id;
1484            root_fragments_pb.insert(
1485                fragment.job_id,
1486                Self::compose_fragment(
1487                    fragment,
1488                    actors,
1489                    job_definitions.get(&(job_id as _)).cloned(),
1490                )?
1491                .0,
1492            );
1493        }
1494
1495        let actors: Vec<(ActorId, WorkerId)> = Actor::find()
1496            .select_only()
1497            .columns([actor::Column::ActorId, actor::Column::WorkerId])
1498            .into_tuple()
1499            .all(&inner.db)
1500            .await?;
1501
1502        Ok((root_fragments_pb, actors))
1503    }
1504
1505    pub async fn get_root_fragment(
1506        &self,
1507        job_id: ObjectId,
1508    ) -> MetaResult<(Fragment, Vec<(ActorId, WorkerId)>)> {
1509        let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1510        let root_fragment = root_fragments
1511            .remove(&job_id)
1512            .context(format!("root fragment for job {} not found", job_id))?;
1513        Ok((root_fragment, actors))
1514    }
1515
1516    /// Get the downstream fragments connected to the specified job.
1517    pub async fn get_downstream_fragments(
1518        &self,
1519        job_id: ObjectId,
1520    ) -> MetaResult<(Vec<(DispatchStrategy, Fragment)>, Vec<(ActorId, WorkerId)>)> {
1521        let (root_fragment, actors) = self.get_root_fragment(job_id).await?;
1522
1523        let inner = self.inner.read().await;
1524        let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1525            .filter(
1526                fragment_relation::Column::SourceFragmentId
1527                    .eq(root_fragment.fragment_id as FragmentId),
1528            )
1529            .all(&inner.db)
1530            .await?;
1531        let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
1532            .await?
1533            .remove(&job_id);
1534
1535        let mut downstream_fragments = vec![];
1536        for fragment_relation::Model {
1537            target_fragment_id: fragment_id,
1538            dispatcher_type,
1539            dist_key_indices,
1540            output_indices,
1541            ..
1542        } in downstream_fragment_relations
1543        {
1544            let mut fragment_actors = FragmentModel::find_by_id(fragment_id)
1545                .find_with_related(Actor)
1546                .all(&inner.db)
1547                .await?;
1548            if fragment_actors.is_empty() {
1549                bail!("No fragment found for fragment id {}", fragment_id);
1550            }
1551            assert_eq!(fragment_actors.len(), 1);
1552            let (fragment, actors) = fragment_actors.pop().unwrap();
1553            let dispatch_strategy = DispatchStrategy {
1554                r#type: PbDispatcherType::from(dispatcher_type) as _,
1555                dist_key_indices: dist_key_indices.into_u32_array(),
1556                output_indices: output_indices.into_u32_array(),
1557            };
1558            let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0;
1559            downstream_fragments.push((dispatch_strategy, fragment));
1560        }
1561
1562        Ok((downstream_fragments, actors))
1563    }
1564
1565    pub async fn load_source_fragment_ids(
1566        &self,
1567    ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1568        let inner = self.inner.read().await;
1569        let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1570            .select_only()
1571            .columns([
1572                fragment::Column::FragmentId,
1573                fragment::Column::FragmentTypeMask,
1574                fragment::Column::StreamNode,
1575            ])
1576            .into_tuple()
1577            .all(&inner.db)
1578            .await?;
1579        fragments.retain(|(_, mask, _)| *mask & PbFragmentTypeFlag::Source as i32 != 0);
1580
1581        let mut source_fragment_ids = HashMap::new();
1582        for (fragment_id, _, stream_node) in fragments {
1583            if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1584                source_fragment_ids
1585                    .entry(source_id as SourceId)
1586                    .or_insert_with(BTreeSet::new)
1587                    .insert(fragment_id);
1588            }
1589        }
1590        Ok(source_fragment_ids)
1591    }
1592
1593    pub async fn load_backfill_fragment_ids(
1594        &self,
1595    ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1596        let inner = self.inner.read().await;
1597        let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1598            .select_only()
1599            .columns([
1600                fragment::Column::FragmentId,
1601                fragment::Column::FragmentTypeMask,
1602                fragment::Column::StreamNode,
1603            ])
1604            .into_tuple()
1605            .all(&inner.db)
1606            .await?;
1607        fragments.retain(|(_, mask, _)| *mask & PbFragmentTypeFlag::SourceScan as i32 != 0);
1608
1609        let mut source_fragment_ids = HashMap::new();
1610        for (fragment_id, _, stream_node) in fragments {
1611            if let Some((source_id, upstream_source_fragment_id)) =
1612                stream_node.to_protobuf().find_source_backfill()
1613            {
1614                source_fragment_ids
1615                    .entry(source_id as SourceId)
1616                    .or_insert_with(BTreeSet::new)
1617                    .insert((fragment_id, upstream_source_fragment_id));
1618            }
1619        }
1620        Ok(source_fragment_ids)
1621    }
1622
1623    pub async fn load_actor_splits(&self) -> MetaResult<HashMap<ActorId, ConnectorSplits>> {
1624        let inner = self.inner.read().await;
1625        let splits: Vec<(ActorId, ConnectorSplits)> = Actor::find()
1626            .select_only()
1627            .columns([actor::Column::ActorId, actor::Column::Splits])
1628            .filter(actor::Column::Splits.is_not_null())
1629            .into_tuple()
1630            .all(&inner.db)
1631            .await?;
1632        Ok(splits.into_iter().collect())
1633    }
1634
1635    /// Get the actor count of `Materialize` or `Sink` fragment of the specified table.
1636    pub async fn get_actual_job_fragment_parallelism(
1637        &self,
1638        job_id: ObjectId,
1639    ) -> MetaResult<Option<usize>> {
1640        let inner = self.inner.read().await;
1641        let mut fragments: Vec<(FragmentId, i32, i64)> = FragmentModel::find()
1642            .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1643            .select_only()
1644            .columns([
1645                fragment::Column::FragmentId,
1646                fragment::Column::FragmentTypeMask,
1647            ])
1648            .column_as(actor::Column::ActorId.count(), "count")
1649            .filter(fragment::Column::JobId.eq(job_id))
1650            .group_by(fragment::Column::FragmentId)
1651            .into_tuple()
1652            .all(&inner.db)
1653            .await?;
1654
1655        fragments.retain(|(_, mask, _)| {
1656            *mask & PbFragmentTypeFlag::Mview as i32 != 0
1657                || *mask & PbFragmentTypeFlag::Sink as i32 != 0
1658        });
1659
1660        Ok(fragments
1661            .into_iter()
1662            .at_most_one()
1663            .ok()
1664            .flatten()
1665            .map(|(_, _, count)| count as usize))
1666    }
1667}
1668
1669#[cfg(test)]
1670mod tests {
1671    use std::collections::{BTreeMap, HashMap, HashSet};
1672
1673    use itertools::Itertools;
1674    use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1675    use risingwave_common::util::iter_util::ZipEqDebug;
1676    use risingwave_common::util::stream_graph_visitor::visit_stream_node;
1677    use risingwave_meta_model::actor::ActorStatus;
1678    use risingwave_meta_model::fragment::DistributionType;
1679    use risingwave_meta_model::{
1680        ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1681        VnodeBitmap, actor, fragment,
1682    };
1683    use risingwave_pb::common::PbActorLocation;
1684    use risingwave_pb::meta::table_fragments::PbActorStatus;
1685    use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1686    use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1687    use risingwave_pb::plan_common::PbExprContext;
1688    use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1689    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1690    use risingwave_pb::stream_plan::{MergeNode, PbFragmentTypeFlag, PbStreamNode, PbUnionNode};
1691
1692    use crate::MetaResult;
1693    use crate::controller::catalog::CatalogController;
1694    use crate::model::{ActorUpstreams, Fragment, FragmentActorUpstreams, StreamActor};
1695
1696    const TEST_FRAGMENT_ID: FragmentId = 1;
1697
1698    const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1699
1700    const TEST_JOB_ID: ObjectId = 1;
1701
1702    const TEST_STATE_TABLE_ID: TableId = 1000;
1703
1704    fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1705        let mut upstream_actor_ids = BTreeMap::new();
1706        upstream_actor_ids.insert(
1707            TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1708            HashSet::from_iter([(actor_id + 100)]),
1709        );
1710        upstream_actor_ids.insert(
1711            (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1712            HashSet::from_iter([(actor_id + 200)]),
1713        );
1714        upstream_actor_ids
1715    }
1716
1717    fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1718        let mut input = vec![];
1719        for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1720            input.push(PbStreamNode {
1721                node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1722                    upstream_fragment_id: *upstream_fragment_id as _,
1723                    ..Default::default()
1724                }))),
1725                ..Default::default()
1726            });
1727        }
1728
1729        PbStreamNode {
1730            input,
1731            node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1732            ..Default::default()
1733        }
1734    }
1735
1736    #[tokio::test]
1737    async fn test_extract_fragment() -> MetaResult<()> {
1738        let actor_count = 3u32;
1739        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1740            .map(|actor_id| {
1741                (
1742                    actor_id as _,
1743                    generate_upstream_actor_ids_for_actor(actor_id),
1744                )
1745            })
1746            .collect();
1747
1748        let actor_bitmaps = ActorMapping::new_uniform(
1749            (0..actor_count).map(|i| i as _),
1750            VirtualNode::COUNT_FOR_TEST,
1751        )
1752        .to_bitmaps();
1753
1754        let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1755
1756        let pb_actors = (0..actor_count)
1757            .map(|actor_id| StreamActor {
1758                actor_id: actor_id as _,
1759                fragment_id: TEST_FRAGMENT_ID as _,
1760                vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1761                mview_definition: "".to_owned(),
1762                expr_context: Some(PbExprContext {
1763                    time_zone: String::from("America/New_York"),
1764                    strict_mode: false,
1765                }),
1766            })
1767            .collect_vec();
1768
1769        let pb_fragment = Fragment {
1770            fragment_id: TEST_FRAGMENT_ID as _,
1771            fragment_type_mask: PbFragmentTypeFlag::Source as _,
1772            distribution_type: PbFragmentDistributionType::Hash as _,
1773            actors: pb_actors.clone(),
1774            state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1775            maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1776            nodes: stream_node.clone(),
1777        };
1778
1779        let pb_actor_status = (0..actor_count)
1780            .map(|actor_id| {
1781                (
1782                    actor_id,
1783                    PbActorStatus {
1784                        location: PbActorLocation::from_worker(0),
1785                        state: PbActorState::Running as _,
1786                    },
1787                )
1788            })
1789            .collect();
1790
1791        let pb_actor_splits = Default::default();
1792
1793        let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
1794            TEST_JOB_ID,
1795            &pb_fragment,
1796            &pb_actor_status,
1797            &pb_actor_splits,
1798        )?;
1799
1800        check_fragment(fragment, pb_fragment);
1801        check_actors(
1802            actors,
1803            &upstream_actor_ids,
1804            pb_actors,
1805            Default::default(),
1806            &stream_node,
1807        );
1808
1809        Ok(())
1810    }
1811
1812    #[tokio::test]
1813    async fn test_compose_fragment() -> MetaResult<()> {
1814        let actor_count = 3u32;
1815
1816        let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1817            .map(|actor_id| {
1818                (
1819                    actor_id as _,
1820                    generate_upstream_actor_ids_for_actor(actor_id),
1821                )
1822            })
1823            .collect();
1824
1825        let mut actor_bitmaps = ActorMapping::new_uniform(
1826            (0..actor_count).map(|i| i as _),
1827            VirtualNode::COUNT_FOR_TEST,
1828        )
1829        .to_bitmaps();
1830
1831        let actors = (0..actor_count)
1832            .map(|actor_id| {
1833                let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
1834                    splits: vec![PbConnectorSplit {
1835                        split_type: "dummy".to_owned(),
1836                        ..Default::default()
1837                    }],
1838                }));
1839
1840                #[expect(deprecated)]
1841                actor::Model {
1842                    actor_id: actor_id as ActorId,
1843                    fragment_id: TEST_FRAGMENT_ID,
1844                    status: ActorStatus::Running,
1845                    splits: actor_splits,
1846                    worker_id: 0,
1847                    upstream_actor_ids: Default::default(),
1848                    vnode_bitmap: actor_bitmaps
1849                        .remove(&actor_id)
1850                        .map(|bitmap| bitmap.to_protobuf())
1851                        .as_ref()
1852                        .map(VnodeBitmap::from),
1853                    expr_context: ExprContext::from(&PbExprContext {
1854                        time_zone: String::from("America/New_York"),
1855                        strict_mode: false,
1856                    }),
1857                }
1858            })
1859            .collect_vec();
1860
1861        let stream_node = {
1862            let template_actor = actors.first().cloned().unwrap();
1863
1864            let template_upstream_actor_ids = upstream_actor_ids
1865                .get(&(template_actor.actor_id as _))
1866                .unwrap();
1867
1868            generate_merger_stream_node(template_upstream_actor_ids)
1869        };
1870
1871        #[expect(deprecated)]
1872        let fragment = fragment::Model {
1873            fragment_id: TEST_FRAGMENT_ID,
1874            job_id: TEST_JOB_ID,
1875            fragment_type_mask: 0,
1876            distribution_type: DistributionType::Hash,
1877            stream_node: StreamNode::from(&stream_node),
1878            state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
1879            upstream_fragment_id: Default::default(),
1880            vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1881        };
1882
1883        let (pb_fragment, pb_actor_status, pb_actor_splits) =
1884            CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
1885
1886        assert_eq!(pb_actor_status.len(), actor_count as usize);
1887        assert_eq!(pb_actor_splits.len(), actor_count as usize);
1888
1889        let pb_actors = pb_fragment.actors.clone();
1890
1891        check_fragment(fragment, pb_fragment);
1892        check_actors(
1893            actors,
1894            &upstream_actor_ids,
1895            pb_actors,
1896            pb_actor_splits,
1897            &stream_node,
1898        );
1899
1900        Ok(())
1901    }
1902
1903    fn check_actors(
1904        actors: Vec<actor::Model>,
1905        actor_upstreams: &FragmentActorUpstreams,
1906        pb_actors: Vec<StreamActor>,
1907        pb_actor_splits: HashMap<u32, PbConnectorSplits>,
1908        stream_node: &PbStreamNode,
1909    ) {
1910        for (
1911            actor::Model {
1912                actor_id,
1913                fragment_id,
1914                status,
1915                splits,
1916                worker_id: _,
1917                vnode_bitmap,
1918                expr_context,
1919                ..
1920            },
1921            StreamActor {
1922                actor_id: pb_actor_id,
1923                fragment_id: pb_fragment_id,
1924                vnode_bitmap: pb_vnode_bitmap,
1925                mview_definition,
1926                expr_context: pb_expr_context,
1927                ..
1928            },
1929        ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
1930        {
1931            assert_eq!(actor_id, pb_actor_id as ActorId);
1932            assert_eq!(fragment_id, pb_fragment_id as FragmentId);
1933
1934            assert_eq!(
1935                vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1936                pb_vnode_bitmap,
1937            );
1938
1939            assert_eq!(mview_definition, "");
1940
1941            visit_stream_node(stream_node, |body| {
1942                if let PbNodeBody::Merge(m) = body {
1943                    assert!(
1944                        actor_upstreams
1945                            .get(&(actor_id as _))
1946                            .unwrap()
1947                            .contains_key(&m.upstream_fragment_id)
1948                    );
1949                }
1950            });
1951
1952            assert_eq!(status, ActorStatus::Running);
1953
1954            assert_eq!(
1955                splits,
1956                pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
1957            );
1958
1959            assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
1960        }
1961    }
1962
1963    fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
1964        let Fragment {
1965            fragment_id,
1966            fragment_type_mask,
1967            distribution_type: pb_distribution_type,
1968            actors: _,
1969            state_table_ids: pb_state_table_ids,
1970            maybe_vnode_count: _,
1971            nodes,
1972        } = pb_fragment;
1973
1974        assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
1975        assert_eq!(fragment_type_mask, fragment.fragment_type_mask as u32);
1976        assert_eq!(
1977            pb_distribution_type,
1978            PbFragmentDistributionType::from(fragment.distribution_type)
1979        );
1980
1981        assert_eq!(
1982            pb_state_table_ids,
1983            fragment.state_table_ids.into_u32_array()
1984        );
1985        assert_eq!(fragment.stream_node.to_protobuf(), nodes);
1986    }
1987}