risingwave_meta/model/
stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::{
22    IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat, WorkerSlotId,
23};
24use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node};
25use risingwave_connector::source::SplitImpl;
26use risingwave_meta_model::{DispatcherType, SourceId, StreamingParallelism, WorkerId};
27use risingwave_pb::catalog::Table;
28use risingwave_pb::common::{ActorInfo, PbActorLocation};
29use risingwave_pb::meta::table_fragments::actor_status::ActorState;
30use risingwave_pb::meta::table_fragments::fragment::{
31    FragmentDistributionType, PbFragmentDistributionType,
32};
33use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
34use risingwave_pb::meta::table_parallelism::{
35    FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
36    PbParallelism,
37};
38use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
39use risingwave_pb::plan_common::PbExprContext;
40use risingwave_pb::stream_plan::stream_node::NodeBody;
41use risingwave_pb::stream_plan::{
42    DispatchStrategy, Dispatcher, FragmentTypeFlag, PbDispatcher, PbStreamActor, PbStreamContext,
43    StreamNode,
44};
45
46use super::{ActorId, FragmentId};
47use crate::model::MetadataModelResult;
48use crate::stream::{SplitAssignment, build_actor_connector_splits};
49
50/// The parallelism for a `TableFragments`.
51#[derive(Debug, Copy, Clone, Eq, PartialEq)]
52pub enum TableParallelism {
53    /// This is when the system decides the parallelism, based on the available worker parallelisms.
54    Adaptive,
55    /// We set this when the `TableFragments` parallelism is changed.
56    /// All fragments which are part of the `TableFragment` will have the same parallelism as this.
57    Fixed(usize),
58    /// We set this when the individual parallelisms of the `Fragments`
59    /// can differ within a `TableFragments`.
60    /// This is set for `risectl`, since it has a low-level interface,
61    /// scale individual `Fragments` within `TableFragments`.
62    /// When that happens, the `TableFragments` no longer has a consistent
63    /// parallelism, so we set this to indicate that.
64    Custom,
65}
66
67impl From<PbTableParallelism> for TableParallelism {
68    fn from(value: PbTableParallelism) -> Self {
69        use Parallelism::*;
70        match &value.parallelism {
71            Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
72            Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
73            Some(Custom(_)) => Self::Custom,
74            _ => unreachable!(),
75        }
76    }
77}
78
79impl From<TableParallelism> for PbTableParallelism {
80    fn from(value: TableParallelism) -> Self {
81        use TableParallelism::*;
82
83        let parallelism = match value {
84            Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
85            Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
86                parallelism: n as u32,
87            }),
88            Custom => PbParallelism::Custom(PbCustomParallelism {}),
89        };
90
91        Self {
92            parallelism: Some(parallelism),
93        }
94    }
95}
96
97impl From<StreamingParallelism> for TableParallelism {
98    fn from(value: StreamingParallelism) -> Self {
99        match value {
100            StreamingParallelism::Adaptive => TableParallelism::Adaptive,
101            StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
102            StreamingParallelism::Custom => TableParallelism::Custom,
103        }
104    }
105}
106
107impl From<TableParallelism> for StreamingParallelism {
108    fn from(value: TableParallelism) -> Self {
109        match value {
110            TableParallelism::Adaptive => StreamingParallelism::Adaptive,
111            TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
112            TableParallelism::Custom => StreamingParallelism::Custom,
113        }
114    }
115}
116
117pub type ActorUpstreams = BTreeMap<FragmentId, HashMap<ActorId, ActorInfo>>;
118pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
119pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
120pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
121
122pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
123/// downstream `fragment_id` -> original upstream `fragment_id` -> new upstream `fragment_id`
124pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
125/// The newly added no-shuffle actor dispatcher from upstream fragment to downstream fragment
126/// upstream `fragment_id` -> downstream `fragment_id` -> upstream `actor_id` -> downstream `actor_id`
127pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
128
129#[derive(Debug, Clone)]
130pub struct DownstreamFragmentRelation {
131    pub downstream_fragment_id: FragmentId,
132    pub dispatcher_type: DispatcherType,
133    pub dist_key_indices: Vec<u32>,
134    pub output_indices: Vec<u32>,
135}
136
137impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
138    fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
139        Self {
140            downstream_fragment_id: fragment_id,
141            dispatcher_type: dispatch.get_type().unwrap().into(),
142            dist_key_indices: dispatch.dist_key_indices,
143            output_indices: dispatch.output_indices,
144        }
145    }
146}
147
148#[derive(Debug, Clone)]
149pub struct StreamJobFragmentsToCreate {
150    pub inner: StreamJobFragments,
151    pub downstreams: FragmentDownstreamRelation,
152}
153
154impl Deref for StreamJobFragmentsToCreate {
155    type Target = StreamJobFragments;
156
157    fn deref(&self) -> &Self::Target {
158        &self.inner
159    }
160}
161
162#[derive(Clone, Debug)]
163pub struct StreamActor {
164    pub actor_id: u32,
165    pub fragment_id: u32,
166    pub vnode_bitmap: Option<Bitmap>,
167    pub mview_definition: String,
168    pub expr_context: Option<PbExprContext>,
169}
170
171impl StreamActor {
172    fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
173        PbStreamActor {
174            actor_id: self.actor_id,
175            fragment_id: self.fragment_id,
176            dispatcher: dispatchers.collect(),
177            vnode_bitmap: self
178                .vnode_bitmap
179                .as_ref()
180                .map(|bitmap| bitmap.to_protobuf()),
181            mview_definition: self.mview_definition.clone(),
182            expr_context: self.expr_context.clone(),
183        }
184    }
185}
186
187#[derive(Clone, Debug, Default)]
188pub struct Fragment {
189    pub fragment_id: FragmentId,
190    pub fragment_type_mask: u32,
191    pub distribution_type: PbFragmentDistributionType,
192    pub actors: Vec<StreamActor>,
193    pub state_table_ids: Vec<u32>,
194    pub maybe_vnode_count: Option<u32>,
195    pub nodes: StreamNode,
196}
197
198impl Fragment {
199    pub fn to_protobuf(
200        &self,
201        upstream_fragments: impl Iterator<Item = FragmentId>,
202        dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
203    ) -> PbFragment {
204        PbFragment {
205            fragment_id: self.fragment_id,
206            fragment_type_mask: self.fragment_type_mask,
207            distribution_type: self.distribution_type as _,
208            actors: self
209                .actors
210                .iter()
211                .map(|actor| {
212                    actor.to_protobuf(
213                        dispatchers
214                            .and_then(|dispatchers| dispatchers.get(&(actor.actor_id as _)))
215                            .into_iter()
216                            .flatten()
217                            .cloned(),
218                    )
219                })
220                .collect(),
221            state_table_ids: self.state_table_ids.clone(),
222            upstream_fragment_ids: upstream_fragments.collect(),
223            maybe_vnode_count: self.maybe_vnode_count,
224            nodes: Some(self.nodes.clone()),
225        }
226    }
227}
228
229impl VnodeCountCompat for Fragment {
230    fn vnode_count_inner(&self) -> VnodeCount {
231        VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
232    }
233}
234
235impl IsSingleton for Fragment {
236    fn is_singleton(&self) -> bool {
237        matches!(self.distribution_type, FragmentDistributionType::Single)
238    }
239}
240
241/// Fragments of a streaming job. Corresponds to [`PbTableFragments`].
242/// (It was previously called `TableFragments` due to historical reasons.)
243///
244/// We store whole fragments in a single column family as follow:
245/// `stream_job_id` => `StreamJobFragments`.
246#[derive(Debug, Clone)]
247pub struct StreamJobFragments {
248    /// The table id.
249    pub stream_job_id: TableId,
250
251    /// The state of the table fragments.
252    pub state: State,
253
254    /// The table fragments.
255    pub fragments: BTreeMap<FragmentId, Fragment>,
256
257    /// The status of actors
258    pub actor_status: BTreeMap<ActorId, ActorStatus>,
259
260    /// The splits of actors,
261    /// incl. both `Source` and `SourceBackfill` actors.
262    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
263
264    /// The streaming context associated with this stream plan and its fragments
265    pub ctx: StreamContext,
266
267    /// The parallelism assigned to this table fragments
268    pub assigned_parallelism: TableParallelism,
269
270    /// The max parallelism specified when the streaming job was created, i.e., expected vnode count.
271    ///
272    /// The reason for persisting this value is mainly to check if a parallelism change (via `ALTER
273    /// .. SET PARALLELISM`) is valid, so that the behavior can be consistent with the creation of
274    /// the streaming job.
275    ///
276    /// Note that the actual vnode count, denoted by `vnode_count` in `fragments`, may be different
277    /// from this value (see `StreamFragmentGraph.max_parallelism` for more details.). As a result,
278    /// checking the parallelism change with this value can be inaccurate in some cases. However,
279    /// when generating resizing plans, we still take the `vnode_count` of each fragment into account.
280    pub max_parallelism: usize,
281}
282
283#[derive(Debug, Clone, Default)]
284pub struct StreamContext {
285    /// The timezone used to interpret timestamps and dates for conversion
286    pub timezone: Option<String>,
287}
288
289impl StreamContext {
290    pub fn to_protobuf(&self) -> PbStreamContext {
291        PbStreamContext {
292            timezone: self.timezone.clone().unwrap_or("".into()),
293        }
294    }
295
296    pub fn to_expr_context(&self) -> PbExprContext {
297        PbExprContext {
298            // `self.timezone` must always be set; an invalid value is used here for debugging if it's not.
299            time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
300            strict_mode: false,
301        }
302    }
303
304    pub fn from_protobuf(prost: &PbStreamContext) -> Self {
305        Self {
306            timezone: if prost.get_timezone().is_empty() {
307                None
308            } else {
309                Some(prost.get_timezone().clone())
310            },
311        }
312    }
313}
314
315impl StreamJobFragments {
316    pub fn to_protobuf(
317        &self,
318        fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
319        fragment_dispatchers: &FragmentActorDispatchers,
320    ) -> PbTableFragments {
321        PbTableFragments {
322            table_id: self.stream_job_id.table_id(),
323            state: self.state as _,
324            fragments: self
325                .fragments
326                .iter()
327                .map(|(id, fragment)| {
328                    (
329                        *id,
330                        fragment.to_protobuf(
331                            fragment_upstreams.get(id).into_iter().flatten().cloned(),
332                            fragment_dispatchers.get(&(*id as _)),
333                        ),
334                    )
335                })
336                .collect(),
337            actor_status: self.actor_status.clone().into_iter().collect(),
338            actor_splits: build_actor_connector_splits(&self.actor_splits),
339            ctx: Some(self.ctx.to_protobuf()),
340            parallelism: Some(self.assigned_parallelism.into()),
341            node_label: "".to_owned(),
342            backfill_done: true,
343            max_parallelism: Some(self.max_parallelism as _),
344        }
345    }
346}
347
348pub type StreamJobActorsToCreate =
349    HashMap<WorkerId, HashMap<FragmentId, (StreamNode, Vec<StreamActorWithUpDownstreams>)>>;
350
351impl StreamJobFragments {
352    /// Create a new `TableFragments` with state of `Initial`, with other fields empty.
353    pub fn for_test(table_id: TableId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
354        Self::new(
355            table_id,
356            fragments,
357            &BTreeMap::new(),
358            StreamContext::default(),
359            TableParallelism::Adaptive,
360            VirtualNode::COUNT_FOR_TEST,
361        )
362    }
363
364    /// Create a new `TableFragments` with state of `Initial`, with the status of actors set to
365    /// `Inactive` on the given workers.
366    pub fn new(
367        stream_job_id: TableId,
368        fragments: BTreeMap<FragmentId, Fragment>,
369        actor_locations: &BTreeMap<ActorId, WorkerSlotId>,
370        ctx: StreamContext,
371        table_parallelism: TableParallelism,
372        max_parallelism: usize,
373    ) -> Self {
374        let actor_status = actor_locations
375            .iter()
376            .map(|(&actor_id, worker_slot_id)| {
377                (
378                    actor_id,
379                    ActorStatus {
380                        location: PbActorLocation::from_worker(worker_slot_id.worker_id()),
381                        state: ActorState::Inactive as i32,
382                    },
383                )
384            })
385            .collect();
386
387        Self {
388            stream_job_id,
389            state: State::Initial,
390            fragments,
391            actor_status,
392            actor_splits: HashMap::default(),
393            ctx,
394            assigned_parallelism: table_parallelism,
395            max_parallelism,
396        }
397    }
398
399    pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
400        self.fragments.keys().cloned()
401    }
402
403    pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
404        self.fragments.values()
405    }
406
407    pub fn fragment_actors(&self, fragment_id: FragmentId) -> &[StreamActor] {
408        self.fragments
409            .get(&fragment_id)
410            .map(|f| f.actors.as_slice())
411            .unwrap_or_default()
412    }
413
414    /// Returns the table id.
415    pub fn stream_job_id(&self) -> TableId {
416        self.stream_job_id
417    }
418
419    /// Returns the state of the table fragments.
420    pub fn state(&self) -> State {
421        self.state
422    }
423
424    /// Returns the timezone of the table
425    pub fn timezone(&self) -> Option<String> {
426        self.ctx.timezone.clone()
427    }
428
429    /// Returns whether the table fragments is in `Created` state.
430    pub fn is_created(&self) -> bool {
431        self.state == State::Created
432    }
433
434    /// Returns whether the table fragments is in `Initial` state.
435    pub fn is_initial(&self) -> bool {
436        self.state == State::Initial
437    }
438
439    /// Set the state of the table fragments.
440    pub fn set_state(&mut self, state: State) {
441        self.state = state;
442    }
443
444    /// Update state of all actors
445    pub fn update_actors_state(&mut self, state: ActorState) {
446        for actor_status in self.actor_status.values_mut() {
447            actor_status.set_state(state);
448        }
449    }
450
451    pub fn set_actor_splits_by_split_assignment(&mut self, split_assignment: SplitAssignment) {
452        self.actor_splits = split_assignment.into_values().flatten().collect();
453    }
454
455    /// Returns actor ids associated with this table.
456    pub fn actor_ids(&self) -> Vec<ActorId> {
457        self.fragments
458            .values()
459            .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
460            .collect()
461    }
462
463    pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
464        self.fragments
465            .values()
466            .flat_map(|fragment| {
467                fragment
468                    .actors
469                    .iter()
470                    .map(|actor| (actor.actor_id, fragment.fragment_id))
471            })
472            .collect()
473    }
474
475    /// Returns actors associated with this table.
476    #[cfg(test)]
477    pub fn actors(&self) -> Vec<StreamActor> {
478        self.fragments
479            .values()
480            .flat_map(|fragment| fragment.actors.clone())
481            .collect()
482    }
483
484    /// Returns the actor ids with the given fragment type.
485    pub fn filter_actor_ids(
486        &self,
487        check_type: impl Fn(u32) -> bool + 'static,
488    ) -> impl Iterator<Item = ActorId> + '_ {
489        self.fragments
490            .values()
491            .filter(move |fragment| check_type(fragment.fragment_type_mask))
492            .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
493    }
494
495    /// Returns mview actor ids.
496    pub fn mview_actor_ids(&self) -> Vec<ActorId> {
497        Self::filter_actor_ids(self, |fragment_type_mask| {
498            (fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0
499        })
500        .collect()
501    }
502
503    /// Returns actor ids that need to be tracked when creating MV.
504    pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
505        let mut actor_ids = vec![];
506        for fragment in self.fragments.values() {
507            if fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32 != 0 {
508                // Note: CDC table job contains a StreamScan fragment (StreamCdcScan node) and a CdcFilter fragment.
509                // We don't track any fragments' progress.
510                return vec![];
511            }
512            if (fragment.fragment_type_mask
513                & (FragmentTypeFlag::Values as u32
514                    | FragmentTypeFlag::StreamScan as u32
515                    | FragmentTypeFlag::SourceScan as u32))
516                != 0
517            {
518                actor_ids.extend(fragment.actors.iter().map(|actor| {
519                    (
520                        actor.actor_id,
521                        BackfillUpstreamType::from_fragment_type_mask(fragment.fragment_type_mask),
522                    )
523                }));
524            }
525        }
526        actor_ids
527    }
528
529    pub fn root_fragment(&self) -> Option<Fragment> {
530        self.mview_fragment()
531            .or_else(|| self.sink_fragment())
532            .or_else(|| self.source_fragment())
533    }
534
535    /// Returns the fragment with the `Mview` type flag.
536    pub fn mview_fragment(&self) -> Option<Fragment> {
537        self.fragments
538            .values()
539            .find(|fragment| (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0)
540            .cloned()
541    }
542
543    pub fn source_fragment(&self) -> Option<Fragment> {
544        self.fragments
545            .values()
546            .find(|fragment| (fragment.fragment_type_mask & FragmentTypeFlag::Source as u32) != 0)
547            .cloned()
548    }
549
550    pub fn sink_fragment(&self) -> Option<Fragment> {
551        self.fragments
552            .values()
553            .find(|fragment| (fragment.fragment_type_mask & FragmentTypeFlag::Sink as u32) != 0)
554            .cloned()
555    }
556
557    pub fn snapshot_backfill_actor_ids(&self) -> HashSet<ActorId> {
558        Self::filter_actor_ids(self, |mask| {
559            (mask & FragmentTypeFlag::SnapshotBackfillStreamScan as u32) != 0
560        })
561        .collect()
562    }
563
564    /// Extract the fragments that include source executors that contains an external stream source,
565    /// grouping by source id.
566    pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
567        let mut source_fragments = HashMap::new();
568
569        for fragment in self.fragments() {
570            {
571                if let Some(source_id) = fragment.nodes.find_stream_source() {
572                    source_fragments
573                        .entry(source_id as SourceId)
574                        .or_insert(BTreeSet::new())
575                        .insert(fragment.fragment_id as FragmentId);
576                }
577            }
578        }
579        source_fragments
580    }
581
582    /// Returns (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`)).
583    ///
584    /// Note: the fragment `source_backfill_fragment_id` may actually have multiple upstream fragments,
585    /// but only one of them is the upstream source fragment, which is what we return.
586    pub fn source_backfill_fragments(
587        &self,
588    ) -> MetadataModelResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
589        let mut source_backfill_fragments = HashMap::new();
590
591        for fragment in self.fragments() {
592            {
593                if let Some((source_id, upstream_source_fragment_id)) =
594                    fragment.nodes.find_source_backfill()
595                {
596                    source_backfill_fragments
597                        .entry(source_id as SourceId)
598                        .or_insert(BTreeSet::new())
599                        .insert((fragment.fragment_id, upstream_source_fragment_id));
600                }
601            }
602        }
603        Ok(source_backfill_fragments)
604    }
605
606    /// Find the table job's `Union` fragment.
607    /// Panics if not found.
608    pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
609        let mut union_fragment_id = None;
610        for (fragment_id, fragment) in &self.fragments {
611            {
612                {
613                    visit_stream_node(&fragment.nodes, |body| {
614                        if let NodeBody::Union(_) = body {
615                            if let Some(union_fragment_id) = union_fragment_id.as_mut() {
616                                // The union fragment should be unique.
617                                assert_eq!(*union_fragment_id, *fragment_id);
618                            } else {
619                                union_fragment_id = Some(*fragment_id);
620                            }
621                        }
622                    })
623                }
624            }
625        }
626
627        let union_fragment_id =
628            union_fragment_id.expect("fragment of placeholder merger not found");
629
630        (self
631            .fragments
632            .get_mut(&union_fragment_id)
633            .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
634    }
635
636    /// Resolve dependent table
637    fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
638        let table_id = match stream_node.node_body.as_ref() {
639            Some(NodeBody::StreamScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
640            Some(NodeBody::StreamCdcScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
641            _ => None,
642        };
643        if let Some(table_id) = table_id {
644            table_ids.entry(table_id).or_default().add_assign(1);
645        }
646
647        for child in &stream_node.input {
648            Self::resolve_dependent_table(child, table_ids);
649        }
650    }
651
652    /// Returns upstream table counts.
653    pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
654        let mut table_ids = HashMap::new();
655        self.fragments.values().for_each(|fragment| {
656            Self::resolve_dependent_table(&fragment.nodes, &mut table_ids);
657        });
658
659        table_ids
660    }
661
662    /// Returns states of actors group by worker id.
663    pub fn worker_actor_states(&self) -> BTreeMap<WorkerId, Vec<(ActorId, ActorState)>> {
664        let mut map = BTreeMap::default();
665        for (&actor_id, actor_status) in &self.actor_status {
666            let node_id = actor_status.worker_id() as WorkerId;
667            map.entry(node_id)
668                .or_insert_with(Vec::new)
669                .push((actor_id, actor_status.state()));
670        }
671        map
672    }
673
674    /// Returns actor locations group by worker id.
675    pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
676        let mut map = BTreeMap::default();
677        for (&actor_id, actor_status) in &self.actor_status {
678            let node_id = actor_status.worker_id() as WorkerId;
679            map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
680        }
681        map
682    }
683
684    /// Returns the status of actors group by worker id.
685    pub fn active_actors(&self) -> Vec<StreamActor> {
686        let mut actors = vec![];
687        for fragment in self.fragments.values() {
688            for actor in &fragment.actors {
689                if self.actor_status[&actor.actor_id].state == ActorState::Inactive as i32 {
690                    continue;
691                }
692                actors.push(actor.clone());
693            }
694        }
695        actors
696    }
697
698    pub fn actors_to_create(
699        &self,
700    ) -> impl Iterator<
701        Item = (
702            FragmentId,
703            &StreamNode,
704            impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
705        ),
706    > + '_ {
707        self.fragments.values().map(move |fragment| {
708            (
709                fragment.fragment_id,
710                &fragment.nodes,
711                fragment.actors.iter().map(move |actor| {
712                    let worker_id = self
713                        .actor_status
714                        .get(&actor.actor_id)
715                        .expect("should exist")
716                        .worker_id() as WorkerId;
717                    (actor, worker_id)
718                }),
719            )
720        })
721    }
722
723    pub fn mv_table_id(&self) -> Option<u32> {
724        if self
725            .fragments
726            .values()
727            .flat_map(|f| f.state_table_ids.iter())
728            .any(|table_id| *table_id == self.stream_job_id.table_id)
729        {
730            Some(self.stream_job_id.table_id)
731        } else {
732            None
733        }
734    }
735
736    /// Retrieve the **complete** internal tables map of the whole graph.
737    ///
738    /// Compared to [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`],
739    /// the table catalogs returned here are complete, with all fields filled.
740    pub fn internal_tables(&self) -> BTreeMap<u32, Table> {
741        self.collect_tables_inner(true)
742    }
743
744    /// `internal_tables()` with additional table in `Materialize` node.
745    pub fn all_tables(&self) -> BTreeMap<u32, Table> {
746        self.collect_tables_inner(false)
747    }
748
749    fn collect_tables_inner(&self, internal_tables_only: bool) -> BTreeMap<u32, Table> {
750        let mut tables = BTreeMap::new();
751        for fragment in self.fragments.values() {
752            stream_graph_visitor::visit_stream_node_tables_inner(
753                &mut fragment.nodes.clone(),
754                internal_tables_only,
755                true,
756                |table, _| {
757                    let table_id = table.id;
758                    tables
759                        .try_insert(table_id, table.clone())
760                        .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
761                },
762            );
763        }
764        tables
765    }
766
767    /// Returns the internal table ids without the mview table.
768    pub fn internal_table_ids(&self) -> Vec<u32> {
769        self.fragments
770            .values()
771            .flat_map(|f| f.state_table_ids.clone())
772            .filter(|&t| t != self.stream_job_id.table_id)
773            .collect_vec()
774    }
775
776    /// Returns all internal table ids including the mview table.
777    pub fn all_table_ids(&self) -> impl Iterator<Item = u32> + '_ {
778        self.fragments
779            .values()
780            .flat_map(|f| f.state_table_ids.clone())
781    }
782
783    /// Fill the `expr_context` in `StreamActor`. Used for compatibility.
784    pub fn fill_expr_context(mut self) -> Self {
785        self.fragments.values_mut().for_each(|fragment| {
786            fragment.actors.iter_mut().for_each(|actor| {
787                if actor.expr_context.is_none() {
788                    actor.expr_context = Some(self.ctx.to_expr_context());
789                }
790            });
791        });
792        self
793    }
794}
795
796#[derive(Debug, Clone, Copy, PartialEq, Eq)]
797pub enum BackfillUpstreamType {
798    MView,
799    Values,
800    Source,
801}
802
803impl BackfillUpstreamType {
804    pub fn from_fragment_type_mask(mask: u32) -> Self {
805        let is_mview = (mask & FragmentTypeFlag::StreamScan as u32) != 0;
806        let is_values = (mask & FragmentTypeFlag::Values as u32) != 0;
807        let is_source = (mask & FragmentTypeFlag::SourceScan as u32) != 0;
808
809        // Note: in theory we can have multiple backfill executors in one fragment, but currently it's not possible.
810        // See <https://github.com/risingwavelabs/risingwave/issues/6236>.
811        debug_assert!(
812            is_mview as u8 + is_values as u8 + is_source as u8 == 1,
813            "a backfill fragment should either be mview, value or source, found {:?}",
814            mask
815        );
816
817        if is_mview {
818            BackfillUpstreamType::MView
819        } else if is_values {
820            BackfillUpstreamType::Values
821        } else if is_source {
822            BackfillUpstreamType::Source
823        } else {
824            unreachable!("invalid fragment type mask: {}", mask);
825        }
826    }
827}