risingwave_meta/model/
stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
21use risingwave_common::hash::{
22    ActorAlignmentId, IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat,
23};
24use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node_body};
25use risingwave_meta_model::{DispatcherType, SourceId, StreamingParallelism, WorkerId};
26use risingwave_pb::catalog::Table;
27use risingwave_pb::common::{ActorInfo, PbActorLocation};
28use risingwave_pb::meta::table_fragments::actor_status::ActorState;
29use risingwave_pb::meta::table_fragments::fragment::{
30    FragmentDistributionType, PbFragmentDistributionType,
31};
32use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
33use risingwave_pb::meta::table_parallelism::{
34    FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
35    PbParallelism,
36};
37use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
38use risingwave_pb::plan_common::PbExprContext;
39use risingwave_pb::stream_plan::stream_node::NodeBody;
40use risingwave_pb::stream_plan::{
41    DispatchStrategy, Dispatcher, PbDispatchOutputMapping, PbDispatcher, PbStreamActor,
42    PbStreamContext, StreamNode,
43};
44
45use super::{ActorId, FragmentId};
46
47/// The parallelism for a `TableFragments`.
48#[derive(Debug, Copy, Clone, Eq, PartialEq)]
49pub enum TableParallelism {
50    /// This is when the system decides the parallelism, based on the available worker parallelisms.
51    Adaptive,
52    /// We set this when the `TableFragments` parallelism is changed.
53    /// All fragments which are part of the `TableFragment` will have the same parallelism as this.
54    Fixed(usize),
55    /// We set this when the individual parallelisms of the `Fragments`
56    /// can differ within a `TableFragments`.
57    /// This is set for `risectl`, since it has a low-level interface,
58    /// scale individual `Fragments` within `TableFragments`.
59    /// When that happens, the `TableFragments` no longer has a consistent
60    /// parallelism, so we set this to indicate that.
61    Custom,
62}
63
64impl From<PbTableParallelism> for TableParallelism {
65    fn from(value: PbTableParallelism) -> Self {
66        use Parallelism::*;
67        match &value.parallelism {
68            Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
69            Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
70            Some(Custom(_)) => Self::Custom,
71            _ => unreachable!(),
72        }
73    }
74}
75
76impl From<TableParallelism> for PbTableParallelism {
77    fn from(value: TableParallelism) -> Self {
78        use TableParallelism::*;
79
80        let parallelism = match value {
81            Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
82            Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
83                parallelism: n as u32,
84            }),
85            Custom => PbParallelism::Custom(PbCustomParallelism {}),
86        };
87
88        Self {
89            parallelism: Some(parallelism),
90        }
91    }
92}
93
94impl From<StreamingParallelism> for TableParallelism {
95    fn from(value: StreamingParallelism) -> Self {
96        match value {
97            StreamingParallelism::Adaptive => TableParallelism::Adaptive,
98            StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
99            StreamingParallelism::Custom => TableParallelism::Custom,
100        }
101    }
102}
103
104impl From<TableParallelism> for StreamingParallelism {
105    fn from(value: TableParallelism) -> Self {
106        match value {
107            TableParallelism::Adaptive => StreamingParallelism::Adaptive,
108            TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
109            TableParallelism::Custom => StreamingParallelism::Custom,
110        }
111    }
112}
113
114pub type ActorUpstreams = BTreeMap<FragmentId, HashMap<ActorId, ActorInfo>>;
115pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
116pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
117pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
118
119pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
120/// downstream `fragment_id` -> original upstream `fragment_id` -> new upstream `fragment_id`
121pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
122/// The newly added no-shuffle actor dispatcher from upstream fragment to downstream fragment
123/// upstream `fragment_id` -> downstream `fragment_id` -> upstream `actor_id` -> downstream `actor_id`
124pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
125
126#[derive(Debug, Clone)]
127pub struct DownstreamFragmentRelation {
128    pub downstream_fragment_id: FragmentId,
129    pub dispatcher_type: DispatcherType,
130    pub dist_key_indices: Vec<u32>,
131    pub output_mapping: PbDispatchOutputMapping,
132}
133
134impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
135    fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
136        Self {
137            downstream_fragment_id: fragment_id,
138            dispatcher_type: dispatch.get_type().unwrap().into(),
139            dist_key_indices: dispatch.dist_key_indices,
140            output_mapping: dispatch.output_mapping.unwrap(),
141        }
142    }
143}
144
145#[derive(Debug, Clone)]
146pub struct StreamJobFragmentsToCreate {
147    pub inner: StreamJobFragments,
148    pub downstreams: FragmentDownstreamRelation,
149}
150
151impl Deref for StreamJobFragmentsToCreate {
152    type Target = StreamJobFragments;
153
154    fn deref(&self) -> &Self::Target {
155        &self.inner
156    }
157}
158
159#[derive(Clone, Debug)]
160pub struct StreamActor {
161    pub actor_id: u32,
162    pub fragment_id: u32,
163    pub vnode_bitmap: Option<Bitmap>,
164    pub mview_definition: String,
165    pub expr_context: Option<PbExprContext>,
166}
167
168impl StreamActor {
169    fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
170        PbStreamActor {
171            actor_id: self.actor_id,
172            fragment_id: self.fragment_id,
173            dispatcher: dispatchers.collect(),
174            vnode_bitmap: self
175                .vnode_bitmap
176                .as_ref()
177                .map(|bitmap| bitmap.to_protobuf()),
178            mview_definition: self.mview_definition.clone(),
179            expr_context: self.expr_context.clone(),
180        }
181    }
182}
183
184#[derive(Clone, Debug, Default)]
185pub struct Fragment {
186    pub fragment_id: FragmentId,
187    pub fragment_type_mask: FragmentTypeMask,
188    pub distribution_type: PbFragmentDistributionType,
189    pub actors: Vec<StreamActor>,
190    pub state_table_ids: Vec<u32>,
191    pub maybe_vnode_count: Option<u32>,
192    pub nodes: StreamNode,
193}
194
195impl Fragment {
196    pub fn to_protobuf(
197        &self,
198        upstream_fragments: impl Iterator<Item = FragmentId>,
199        dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
200    ) -> PbFragment {
201        PbFragment {
202            fragment_id: self.fragment_id,
203            fragment_type_mask: self.fragment_type_mask.into(),
204            distribution_type: self.distribution_type as _,
205            actors: self
206                .actors
207                .iter()
208                .map(|actor| {
209                    actor.to_protobuf(
210                        dispatchers
211                            .and_then(|dispatchers| dispatchers.get(&(actor.actor_id as _)))
212                            .into_iter()
213                            .flatten()
214                            .cloned(),
215                    )
216                })
217                .collect(),
218            state_table_ids: self.state_table_ids.clone(),
219            upstream_fragment_ids: upstream_fragments.collect(),
220            maybe_vnode_count: self.maybe_vnode_count,
221            nodes: Some(self.nodes.clone()),
222        }
223    }
224}
225
226impl VnodeCountCompat for Fragment {
227    fn vnode_count_inner(&self) -> VnodeCount {
228        VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
229    }
230}
231
232impl IsSingleton for Fragment {
233    fn is_singleton(&self) -> bool {
234        matches!(self.distribution_type, FragmentDistributionType::Single)
235    }
236}
237
238/// Fragments of a streaming job. Corresponds to [`PbTableFragments`].
239/// (It was previously called `TableFragments` due to historical reasons.)
240///
241/// We store whole fragments in a single column family as follow:
242/// `stream_job_id` => `StreamJobFragments`.
243#[derive(Debug, Clone)]
244pub struct StreamJobFragments {
245    /// The table id.
246    pub stream_job_id: TableId,
247
248    /// The state of the table fragments.
249    pub state: State,
250
251    /// The table fragments.
252    pub fragments: BTreeMap<FragmentId, Fragment>,
253
254    /// The status of actors
255    pub actor_status: BTreeMap<ActorId, ActorStatus>,
256
257    /// The streaming context associated with this stream plan and its fragments
258    pub ctx: StreamContext,
259
260    /// The parallelism assigned to this table fragments
261    pub assigned_parallelism: TableParallelism,
262
263    /// The max parallelism specified when the streaming job was created, i.e., expected vnode count.
264    ///
265    /// The reason for persisting this value is mainly to check if a parallelism change (via `ALTER
266    /// .. SET PARALLELISM`) is valid, so that the behavior can be consistent with the creation of
267    /// the streaming job.
268    ///
269    /// Note that the actual vnode count, denoted by `vnode_count` in `fragments`, may be different
270    /// from this value (see `StreamFragmentGraph.max_parallelism` for more details.). As a result,
271    /// checking the parallelism change with this value can be inaccurate in some cases. However,
272    /// when generating resizing plans, we still take the `vnode_count` of each fragment into account.
273    pub max_parallelism: usize,
274}
275
276#[derive(Debug, Clone, Default)]
277pub struct StreamContext {
278    /// The timezone used to interpret timestamps and dates for conversion
279    pub timezone: Option<String>,
280}
281
282impl StreamContext {
283    pub fn to_protobuf(&self) -> PbStreamContext {
284        PbStreamContext {
285            timezone: self.timezone.clone().unwrap_or("".into()),
286        }
287    }
288
289    pub fn to_expr_context(&self) -> PbExprContext {
290        PbExprContext {
291            // `self.timezone` must always be set; an invalid value is used here for debugging if it's not.
292            time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
293            strict_mode: false,
294        }
295    }
296
297    pub fn from_protobuf(prost: &PbStreamContext) -> Self {
298        Self {
299            timezone: if prost.get_timezone().is_empty() {
300                None
301            } else {
302                Some(prost.get_timezone().clone())
303            },
304        }
305    }
306}
307
308impl StreamJobFragments {
309    pub fn to_protobuf(
310        &self,
311        fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
312        fragment_dispatchers: &FragmentActorDispatchers,
313    ) -> PbTableFragments {
314        PbTableFragments {
315            table_id: self.stream_job_id.table_id(),
316            state: self.state as _,
317            fragments: self
318                .fragments
319                .iter()
320                .map(|(id, fragment)| {
321                    (
322                        *id,
323                        fragment.to_protobuf(
324                            fragment_upstreams.get(id).into_iter().flatten().cloned(),
325                            fragment_dispatchers.get(&(*id as _)),
326                        ),
327                    )
328                })
329                .collect(),
330            actor_status: self.actor_status.clone().into_iter().collect(),
331            ctx: Some(self.ctx.to_protobuf()),
332            parallelism: Some(self.assigned_parallelism.into()),
333            node_label: "".to_owned(),
334            backfill_done: true,
335            max_parallelism: Some(self.max_parallelism as _),
336        }
337    }
338}
339
340pub type StreamJobActorsToCreate =
341    HashMap<WorkerId, HashMap<FragmentId, (StreamNode, Vec<StreamActorWithUpDownstreams>)>>;
342
343impl StreamJobFragments {
344    /// Create a new `TableFragments` with state of `Initial`, with other fields empty.
345    pub fn for_test(table_id: TableId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
346        Self::new(
347            table_id,
348            fragments,
349            &BTreeMap::new(),
350            StreamContext::default(),
351            TableParallelism::Adaptive,
352            VirtualNode::COUNT_FOR_TEST,
353        )
354    }
355
356    /// Create a new `TableFragments` with state of `Initial`, with the status of actors set to
357    /// `Inactive` on the given workers.
358    pub fn new(
359        stream_job_id: TableId,
360        fragments: BTreeMap<FragmentId, Fragment>,
361        actor_locations: &BTreeMap<ActorId, ActorAlignmentId>,
362        ctx: StreamContext,
363        table_parallelism: TableParallelism,
364        max_parallelism: usize,
365    ) -> Self {
366        let actor_status = actor_locations
367            .iter()
368            .map(|(&actor_id, alignment_id)| {
369                (
370                    actor_id,
371                    ActorStatus {
372                        location: PbActorLocation::from_worker(alignment_id.worker_id()),
373                        state: ActorState::Inactive as i32,
374                    },
375                )
376            })
377            .collect();
378
379        Self {
380            stream_job_id,
381            state: State::Initial,
382            fragments,
383            actor_status,
384            ctx,
385            assigned_parallelism: table_parallelism,
386            max_parallelism,
387        }
388    }
389
390    pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
391        self.fragments.keys().cloned()
392    }
393
394    pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
395        self.fragments.values()
396    }
397
398    pub fn fragment_actors(&self, fragment_id: FragmentId) -> &[StreamActor] {
399        self.fragments
400            .get(&fragment_id)
401            .map(|f| f.actors.as_slice())
402            .unwrap_or_default()
403    }
404
405    /// Returns the table id.
406    pub fn stream_job_id(&self) -> TableId {
407        self.stream_job_id
408    }
409
410    /// Returns the state of the table fragments.
411    pub fn state(&self) -> State {
412        self.state
413    }
414
415    /// Returns the timezone of the table
416    pub fn timezone(&self) -> Option<String> {
417        self.ctx.timezone.clone()
418    }
419
420    /// Returns whether the table fragments is in `Created` state.
421    pub fn is_created(&self) -> bool {
422        self.state == State::Created
423    }
424
425    /// Returns whether the table fragments is in `Initial` state.
426    pub fn is_initial(&self) -> bool {
427        self.state == State::Initial
428    }
429
430    /// Set the state of the table fragments.
431    pub fn set_state(&mut self, state: State) {
432        self.state = state;
433    }
434
435    /// Update state of all actors
436    pub fn update_actors_state(&mut self, state: ActorState) {
437        for actor_status in self.actor_status.values_mut() {
438            actor_status.set_state(state);
439        }
440    }
441
442    /// Returns actor ids associated with this table.
443    pub fn actor_ids(&self) -> Vec<ActorId> {
444        self.fragments
445            .values()
446            .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
447            .collect()
448    }
449
450    pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
451        self.fragments
452            .values()
453            .flat_map(|fragment| {
454                fragment
455                    .actors
456                    .iter()
457                    .map(|actor| (actor.actor_id, fragment.fragment_id))
458            })
459            .collect()
460    }
461
462    /// Returns actors associated with this table.
463    #[cfg(test)]
464    pub fn actors(&self) -> Vec<StreamActor> {
465        self.fragments
466            .values()
467            .flat_map(|fragment| fragment.actors.clone())
468            .collect()
469    }
470
471    /// Returns the actor ids with the given fragment type.
472    pub fn filter_actor_ids(
473        &self,
474        check_type: impl Fn(FragmentTypeMask) -> bool + 'static,
475    ) -> impl Iterator<Item = ActorId> + '_ {
476        self.fragments
477            .values()
478            .filter(move |fragment| check_type(fragment.fragment_type_mask))
479            .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
480    }
481
482    /// Returns mview actor ids.
483    pub fn mview_actor_ids(&self) -> Vec<ActorId> {
484        Self::filter_actor_ids(self, |fragment_type_mask| {
485            fragment_type_mask.contains(FragmentTypeFlag::Mview)
486        })
487        .collect()
488    }
489
490    /// Returns actor ids that need to be tracked when creating MV.
491    pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
492        let mut actor_ids = vec![];
493        for fragment in self.fragments.values() {
494            if fragment
495                .fragment_type_mask
496                .contains(FragmentTypeFlag::CdcFilter)
497            {
498                // Note: CDC table job contains a StreamScan fragment (StreamCdcScan node) and a CdcFilter fragment.
499                // We don't track any fragments' progress.
500                return vec![];
501            }
502            if fragment.fragment_type_mask.contains_any([
503                FragmentTypeFlag::Values,
504                FragmentTypeFlag::StreamScan,
505                FragmentTypeFlag::SourceScan,
506            ]) {
507                actor_ids.extend(fragment.actors.iter().map(|actor| {
508                    (
509                        actor.actor_id,
510                        BackfillUpstreamType::from_fragment_type_mask(fragment.fragment_type_mask),
511                    )
512                }));
513            }
514        }
515        actor_ids
516    }
517
518    pub fn root_fragment(&self) -> Option<Fragment> {
519        self.mview_fragment()
520            .or_else(|| self.sink_fragment())
521            .or_else(|| self.source_fragment())
522    }
523
524    /// Returns the fragment with the `Mview` type flag.
525    pub fn mview_fragment(&self) -> Option<Fragment> {
526        self.fragments
527            .values()
528            .find(|fragment| {
529                fragment
530                    .fragment_type_mask
531                    .contains(FragmentTypeFlag::Mview)
532            })
533            .cloned()
534    }
535
536    pub fn source_fragment(&self) -> Option<Fragment> {
537        self.fragments
538            .values()
539            .find(|fragment| {
540                fragment
541                    .fragment_type_mask
542                    .contains(FragmentTypeFlag::Source)
543            })
544            .cloned()
545    }
546
547    pub fn sink_fragment(&self) -> Option<Fragment> {
548        self.fragments
549            .values()
550            .find(|fragment| fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink))
551            .cloned()
552    }
553
554    pub fn snapshot_backfill_actor_ids(&self) -> HashSet<ActorId> {
555        Self::filter_actor_ids(self, |mask| {
556            mask.contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
557        })
558        .collect()
559    }
560
561    /// Extract the fragments that include source executors that contains an external stream source,
562    /// grouping by source id.
563    pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
564        let mut source_fragments = HashMap::new();
565
566        for fragment in self.fragments() {
567            {
568                if let Some(source_id) = fragment.nodes.find_stream_source() {
569                    source_fragments
570                        .entry(source_id as SourceId)
571                        .or_insert(BTreeSet::new())
572                        .insert(fragment.fragment_id as FragmentId);
573                }
574            }
575        }
576        source_fragments
577    }
578
579    /// Returns (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`)).
580    ///
581    /// Note: the fragment `source_backfill_fragment_id` may actually have multiple upstream fragments,
582    /// but only one of them is the upstream source fragment, which is what we return.
583    pub fn source_backfill_fragments(
584        &self,
585    ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
586        let mut source_backfill_fragments = HashMap::new();
587
588        for fragment in self.fragments() {
589            {
590                if let Some((source_id, upstream_source_fragment_id)) =
591                    fragment.nodes.find_source_backfill()
592                {
593                    source_backfill_fragments
594                        .entry(source_id as SourceId)
595                        .or_insert(BTreeSet::new())
596                        .insert((fragment.fragment_id, upstream_source_fragment_id));
597                }
598            }
599        }
600        source_backfill_fragments
601    }
602
603    /// Find the table job's `Union` fragment.
604    /// Panics if not found.
605    pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
606        let mut union_fragment_id = None;
607        for (fragment_id, fragment) in &self.fragments {
608            {
609                {
610                    visit_stream_node_body(&fragment.nodes, |body| {
611                        if let NodeBody::Union(_) = body {
612                            if let Some(union_fragment_id) = union_fragment_id.as_mut() {
613                                // The union fragment should be unique.
614                                assert_eq!(*union_fragment_id, *fragment_id);
615                            } else {
616                                union_fragment_id = Some(*fragment_id);
617                            }
618                        }
619                    })
620                }
621            }
622        }
623
624        let union_fragment_id =
625            union_fragment_id.expect("fragment of placeholder merger not found");
626
627        (self
628            .fragments
629            .get_mut(&union_fragment_id)
630            .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
631    }
632
633    /// Resolve dependent table
634    fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
635        let table_id = match stream_node.node_body.as_ref() {
636            Some(NodeBody::StreamScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
637            Some(NodeBody::StreamCdcScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
638            _ => None,
639        };
640        if let Some(table_id) = table_id {
641            table_ids.entry(table_id).or_default().add_assign(1);
642        }
643
644        for child in &stream_node.input {
645            Self::resolve_dependent_table(child, table_ids);
646        }
647    }
648
649    /// Returns upstream table counts.
650    pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
651        let mut table_ids = HashMap::new();
652        self.fragments.values().for_each(|fragment| {
653            Self::resolve_dependent_table(&fragment.nodes, &mut table_ids);
654        });
655
656        table_ids
657    }
658
659    /// Returns states of actors group by worker id.
660    pub fn worker_actor_states(&self) -> BTreeMap<WorkerId, Vec<(ActorId, ActorState)>> {
661        let mut map = BTreeMap::default();
662        for (&actor_id, actor_status) in &self.actor_status {
663            let node_id = actor_status.worker_id() as WorkerId;
664            map.entry(node_id)
665                .or_insert_with(Vec::new)
666                .push((actor_id, actor_status.state()));
667        }
668        map
669    }
670
671    /// Returns actor locations group by worker id.
672    pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
673        let mut map = BTreeMap::default();
674        for (&actor_id, actor_status) in &self.actor_status {
675            let node_id = actor_status.worker_id() as WorkerId;
676            map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
677        }
678        map
679    }
680
681    /// Returns the status of actors group by worker id.
682    pub fn active_actors(&self) -> Vec<StreamActor> {
683        let mut actors = vec![];
684        for fragment in self.fragments.values() {
685            for actor in &fragment.actors {
686                if self.actor_status[&actor.actor_id].state == ActorState::Inactive as i32 {
687                    continue;
688                }
689                actors.push(actor.clone());
690            }
691        }
692        actors
693    }
694
695    pub fn actors_to_create(
696        &self,
697    ) -> impl Iterator<
698        Item = (
699            FragmentId,
700            &StreamNode,
701            impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
702        ),
703    > + '_ {
704        self.fragments.values().map(move |fragment| {
705            (
706                fragment.fragment_id,
707                &fragment.nodes,
708                fragment.actors.iter().map(move |actor| {
709                    let worker_id = self
710                        .actor_status
711                        .get(&actor.actor_id)
712                        .expect("should exist")
713                        .worker_id() as WorkerId;
714                    (actor, worker_id)
715                }),
716            )
717        })
718    }
719
720    pub fn mv_table_id(&self) -> Option<u32> {
721        if self
722            .fragments
723            .values()
724            .flat_map(|f| f.state_table_ids.iter())
725            .any(|table_id| *table_id == self.stream_job_id.table_id)
726        {
727            Some(self.stream_job_id.table_id)
728        } else {
729            None
730        }
731    }
732
733    pub fn collect_tables(fragments: impl Iterator<Item = &Fragment>) -> BTreeMap<u32, Table> {
734        let mut tables = BTreeMap::new();
735        for fragment in fragments {
736            stream_graph_visitor::visit_stream_node_tables_inner(
737                &mut fragment.nodes.clone(),
738                false,
739                true,
740                |table, _| {
741                    let table_id = table.id;
742                    tables
743                        .try_insert(table_id, table.clone())
744                        .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
745                },
746            );
747        }
748        tables
749    }
750
751    /// Returns the internal table ids without the mview table.
752    pub fn internal_table_ids(&self) -> Vec<u32> {
753        self.fragments
754            .values()
755            .flat_map(|f| f.state_table_ids.clone())
756            .filter(|&t| t != self.stream_job_id.table_id)
757            .collect_vec()
758    }
759
760    /// Returns all internal table ids including the mview table.
761    pub fn all_table_ids(&self) -> impl Iterator<Item = u32> + '_ {
762        self.fragments
763            .values()
764            .flat_map(|f| f.state_table_ids.clone())
765    }
766
767    /// Fill the `expr_context` in `StreamActor`. Used for compatibility.
768    pub fn fill_expr_context(mut self) -> Self {
769        self.fragments.values_mut().for_each(|fragment| {
770            fragment.actors.iter_mut().for_each(|actor| {
771                if actor.expr_context.is_none() {
772                    actor.expr_context = Some(self.ctx.to_expr_context());
773                }
774            });
775        });
776        self
777    }
778}
779
780#[derive(Debug, Clone, Copy, PartialEq, Eq)]
781pub enum BackfillUpstreamType {
782    MView,
783    Values,
784    Source,
785}
786
787impl BackfillUpstreamType {
788    pub fn from_fragment_type_mask(mask: FragmentTypeMask) -> Self {
789        let is_mview = mask.contains(FragmentTypeFlag::StreamScan);
790        let is_values = mask.contains(FragmentTypeFlag::Values);
791        let is_source = mask.contains(FragmentTypeFlag::SourceScan);
792
793        // Note: in theory we can have multiple backfill executors in one fragment, but currently it's not possible.
794        // See <https://github.com/risingwavelabs/risingwave/issues/6236>.
795        debug_assert!(
796            is_mview as u8 + is_values as u8 + is_source as u8 == 1,
797            "a backfill fragment should either be mview, value or source, found {:?}",
798            mask
799        );
800
801        if is_mview {
802            BackfillUpstreamType::MView
803        } else if is_values {
804            BackfillUpstreamType::Values
805        } else if is_source {
806            BackfillUpstreamType::Source
807        } else {
808            unreachable!("invalid fragment type mask: {:?}", mask);
809        }
810    }
811}