risingwave_meta/model/
stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
21use risingwave_common::hash::{
22    ActorAlignmentId, IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat,
23};
24use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node_body};
25use risingwave_connector::source::SplitImpl;
26use risingwave_meta_model::{DispatcherType, SourceId, StreamingParallelism, WorkerId};
27use risingwave_pb::catalog::Table;
28use risingwave_pb::common::{ActorInfo, PbActorLocation};
29use risingwave_pb::meta::table_fragments::actor_status::ActorState;
30use risingwave_pb::meta::table_fragments::fragment::{
31    FragmentDistributionType, PbFragmentDistributionType,
32};
33use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
34use risingwave_pb::meta::table_parallelism::{
35    FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
36    PbParallelism,
37};
38use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
39use risingwave_pb::plan_common::PbExprContext;
40use risingwave_pb::stream_plan::stream_node::NodeBody;
41use risingwave_pb::stream_plan::{
42    DispatchStrategy, Dispatcher, PbDispatchOutputMapping, PbDispatcher, PbStreamActor,
43    PbStreamContext, StreamNode,
44};
45
46use super::{ActorId, FragmentId};
47use crate::stream::{SplitAssignment, build_actor_connector_splits};
48
49/// The parallelism for a `TableFragments`.
50#[derive(Debug, Copy, Clone, Eq, PartialEq)]
51pub enum TableParallelism {
52    /// This is when the system decides the parallelism, based on the available worker parallelisms.
53    Adaptive,
54    /// We set this when the `TableFragments` parallelism is changed.
55    /// All fragments which are part of the `TableFragment` will have the same parallelism as this.
56    Fixed(usize),
57    /// We set this when the individual parallelisms of the `Fragments`
58    /// can differ within a `TableFragments`.
59    /// This is set for `risectl`, since it has a low-level interface,
60    /// scale individual `Fragments` within `TableFragments`.
61    /// When that happens, the `TableFragments` no longer has a consistent
62    /// parallelism, so we set this to indicate that.
63    Custom,
64}
65
66impl From<PbTableParallelism> for TableParallelism {
67    fn from(value: PbTableParallelism) -> Self {
68        use Parallelism::*;
69        match &value.parallelism {
70            Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
71            Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
72            Some(Custom(_)) => Self::Custom,
73            _ => unreachable!(),
74        }
75    }
76}
77
78impl From<TableParallelism> for PbTableParallelism {
79    fn from(value: TableParallelism) -> Self {
80        use TableParallelism::*;
81
82        let parallelism = match value {
83            Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
84            Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
85                parallelism: n as u32,
86            }),
87            Custom => PbParallelism::Custom(PbCustomParallelism {}),
88        };
89
90        Self {
91            parallelism: Some(parallelism),
92        }
93    }
94}
95
96impl From<StreamingParallelism> for TableParallelism {
97    fn from(value: StreamingParallelism) -> Self {
98        match value {
99            StreamingParallelism::Adaptive => TableParallelism::Adaptive,
100            StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
101            StreamingParallelism::Custom => TableParallelism::Custom,
102        }
103    }
104}
105
106impl From<TableParallelism> for StreamingParallelism {
107    fn from(value: TableParallelism) -> Self {
108        match value {
109            TableParallelism::Adaptive => StreamingParallelism::Adaptive,
110            TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
111            TableParallelism::Custom => StreamingParallelism::Custom,
112        }
113    }
114}
115
116pub type ActorUpstreams = BTreeMap<FragmentId, HashMap<ActorId, ActorInfo>>;
117pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
118pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
119pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
120
121pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
122/// downstream `fragment_id` -> original upstream `fragment_id` -> new upstream `fragment_id`
123pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
124/// The newly added no-shuffle actor dispatcher from upstream fragment to downstream fragment
125/// upstream `fragment_id` -> downstream `fragment_id` -> upstream `actor_id` -> downstream `actor_id`
126pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
127
128#[derive(Debug, Clone)]
129pub struct DownstreamFragmentRelation {
130    pub downstream_fragment_id: FragmentId,
131    pub dispatcher_type: DispatcherType,
132    pub dist_key_indices: Vec<u32>,
133    pub output_mapping: PbDispatchOutputMapping,
134}
135
136impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
137    fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
138        Self {
139            downstream_fragment_id: fragment_id,
140            dispatcher_type: dispatch.get_type().unwrap().into(),
141            dist_key_indices: dispatch.dist_key_indices,
142            output_mapping: dispatch.output_mapping.unwrap(),
143        }
144    }
145}
146
147#[derive(Debug, Clone)]
148pub struct StreamJobFragmentsToCreate {
149    pub inner: StreamJobFragments,
150    pub downstreams: FragmentDownstreamRelation,
151}
152
153impl Deref for StreamJobFragmentsToCreate {
154    type Target = StreamJobFragments;
155
156    fn deref(&self) -> &Self::Target {
157        &self.inner
158    }
159}
160
161#[derive(Clone, Debug)]
162pub struct StreamActor {
163    pub actor_id: u32,
164    pub fragment_id: u32,
165    pub vnode_bitmap: Option<Bitmap>,
166    pub mview_definition: String,
167    pub expr_context: Option<PbExprContext>,
168}
169
170impl StreamActor {
171    fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
172        PbStreamActor {
173            actor_id: self.actor_id,
174            fragment_id: self.fragment_id,
175            dispatcher: dispatchers.collect(),
176            vnode_bitmap: self
177                .vnode_bitmap
178                .as_ref()
179                .map(|bitmap| bitmap.to_protobuf()),
180            mview_definition: self.mview_definition.clone(),
181            expr_context: self.expr_context.clone(),
182        }
183    }
184}
185
186#[derive(Clone, Debug, Default)]
187pub struct Fragment {
188    pub fragment_id: FragmentId,
189    pub fragment_type_mask: FragmentTypeMask,
190    pub distribution_type: PbFragmentDistributionType,
191    pub actors: Vec<StreamActor>,
192    pub state_table_ids: Vec<u32>,
193    pub maybe_vnode_count: Option<u32>,
194    pub nodes: StreamNode,
195}
196
197impl Fragment {
198    pub fn to_protobuf(
199        &self,
200        upstream_fragments: impl Iterator<Item = FragmentId>,
201        dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
202    ) -> PbFragment {
203        PbFragment {
204            fragment_id: self.fragment_id,
205            fragment_type_mask: self.fragment_type_mask.into(),
206            distribution_type: self.distribution_type as _,
207            actors: self
208                .actors
209                .iter()
210                .map(|actor| {
211                    actor.to_protobuf(
212                        dispatchers
213                            .and_then(|dispatchers| dispatchers.get(&(actor.actor_id as _)))
214                            .into_iter()
215                            .flatten()
216                            .cloned(),
217                    )
218                })
219                .collect(),
220            state_table_ids: self.state_table_ids.clone(),
221            upstream_fragment_ids: upstream_fragments.collect(),
222            maybe_vnode_count: self.maybe_vnode_count,
223            nodes: Some(self.nodes.clone()),
224        }
225    }
226}
227
228impl VnodeCountCompat for Fragment {
229    fn vnode_count_inner(&self) -> VnodeCount {
230        VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
231    }
232}
233
234impl IsSingleton for Fragment {
235    fn is_singleton(&self) -> bool {
236        matches!(self.distribution_type, FragmentDistributionType::Single)
237    }
238}
239
240/// Fragments of a streaming job. Corresponds to [`PbTableFragments`].
241/// (It was previously called `TableFragments` due to historical reasons.)
242///
243/// We store whole fragments in a single column family as follow:
244/// `stream_job_id` => `StreamJobFragments`.
245#[derive(Debug, Clone)]
246pub struct StreamJobFragments {
247    /// The table id.
248    pub stream_job_id: TableId,
249
250    /// The state of the table fragments.
251    pub state: State,
252
253    /// The table fragments.
254    pub fragments: BTreeMap<FragmentId, Fragment>,
255
256    /// The status of actors
257    pub actor_status: BTreeMap<ActorId, ActorStatus>,
258
259    /// The splits of actors,
260    /// incl. both `Source` and `SourceBackfill` actors.
261    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
262
263    /// The streaming context associated with this stream plan and its fragments
264    pub ctx: StreamContext,
265
266    /// The parallelism assigned to this table fragments
267    pub assigned_parallelism: TableParallelism,
268
269    /// The max parallelism specified when the streaming job was created, i.e., expected vnode count.
270    ///
271    /// The reason for persisting this value is mainly to check if a parallelism change (via `ALTER
272    /// .. SET PARALLELISM`) is valid, so that the behavior can be consistent with the creation of
273    /// the streaming job.
274    ///
275    /// Note that the actual vnode count, denoted by `vnode_count` in `fragments`, may be different
276    /// from this value (see `StreamFragmentGraph.max_parallelism` for more details.). As a result,
277    /// checking the parallelism change with this value can be inaccurate in some cases. However,
278    /// when generating resizing plans, we still take the `vnode_count` of each fragment into account.
279    pub max_parallelism: usize,
280}
281
282#[derive(Debug, Clone, Default)]
283pub struct StreamContext {
284    /// The timezone used to interpret timestamps and dates for conversion
285    pub timezone: Option<String>,
286}
287
288impl StreamContext {
289    pub fn to_protobuf(&self) -> PbStreamContext {
290        PbStreamContext {
291            timezone: self.timezone.clone().unwrap_or("".into()),
292        }
293    }
294
295    pub fn to_expr_context(&self) -> PbExprContext {
296        PbExprContext {
297            // `self.timezone` must always be set; an invalid value is used here for debugging if it's not.
298            time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
299            strict_mode: false,
300        }
301    }
302
303    pub fn from_protobuf(prost: &PbStreamContext) -> Self {
304        Self {
305            timezone: if prost.get_timezone().is_empty() {
306                None
307            } else {
308                Some(prost.get_timezone().clone())
309            },
310        }
311    }
312}
313
314impl StreamJobFragments {
315    pub fn to_protobuf(
316        &self,
317        fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
318        fragment_dispatchers: &FragmentActorDispatchers,
319    ) -> PbTableFragments {
320        PbTableFragments {
321            table_id: self.stream_job_id.table_id(),
322            state: self.state as _,
323            fragments: self
324                .fragments
325                .iter()
326                .map(|(id, fragment)| {
327                    (
328                        *id,
329                        fragment.to_protobuf(
330                            fragment_upstreams.get(id).into_iter().flatten().cloned(),
331                            fragment_dispatchers.get(&(*id as _)),
332                        ),
333                    )
334                })
335                .collect(),
336            actor_status: self.actor_status.clone().into_iter().collect(),
337            actor_splits: build_actor_connector_splits(&self.actor_splits),
338            ctx: Some(self.ctx.to_protobuf()),
339            parallelism: Some(self.assigned_parallelism.into()),
340            node_label: "".to_owned(),
341            backfill_done: true,
342            max_parallelism: Some(self.max_parallelism as _),
343        }
344    }
345}
346
347pub type StreamJobActorsToCreate =
348    HashMap<WorkerId, HashMap<FragmentId, (StreamNode, Vec<StreamActorWithUpDownstreams>)>>;
349
350impl StreamJobFragments {
351    /// Create a new `TableFragments` with state of `Initial`, with other fields empty.
352    pub fn for_test(table_id: TableId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
353        Self::new(
354            table_id,
355            fragments,
356            &BTreeMap::new(),
357            StreamContext::default(),
358            TableParallelism::Adaptive,
359            VirtualNode::COUNT_FOR_TEST,
360        )
361    }
362
363    /// Create a new `TableFragments` with state of `Initial`, with the status of actors set to
364    /// `Inactive` on the given workers.
365    pub fn new(
366        stream_job_id: TableId,
367        fragments: BTreeMap<FragmentId, Fragment>,
368        actor_locations: &BTreeMap<ActorId, ActorAlignmentId>,
369        ctx: StreamContext,
370        table_parallelism: TableParallelism,
371        max_parallelism: usize,
372    ) -> Self {
373        let actor_status = actor_locations
374            .iter()
375            .map(|(&actor_id, alignment_id)| {
376                (
377                    actor_id,
378                    ActorStatus {
379                        location: PbActorLocation::from_worker(alignment_id.worker_id()),
380                        state: ActorState::Inactive as i32,
381                    },
382                )
383            })
384            .collect();
385
386        Self {
387            stream_job_id,
388            state: State::Initial,
389            fragments,
390            actor_status,
391            actor_splits: HashMap::default(),
392            ctx,
393            assigned_parallelism: table_parallelism,
394            max_parallelism,
395        }
396    }
397
398    pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
399        self.fragments.keys().cloned()
400    }
401
402    pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
403        self.fragments.values()
404    }
405
406    pub fn fragment_actors(&self, fragment_id: FragmentId) -> &[StreamActor] {
407        self.fragments
408            .get(&fragment_id)
409            .map(|f| f.actors.as_slice())
410            .unwrap_or_default()
411    }
412
413    /// Returns the table id.
414    pub fn stream_job_id(&self) -> TableId {
415        self.stream_job_id
416    }
417
418    /// Returns the state of the table fragments.
419    pub fn state(&self) -> State {
420        self.state
421    }
422
423    /// Returns the timezone of the table
424    pub fn timezone(&self) -> Option<String> {
425        self.ctx.timezone.clone()
426    }
427
428    /// Returns whether the table fragments is in `Created` state.
429    pub fn is_created(&self) -> bool {
430        self.state == State::Created
431    }
432
433    /// Returns whether the table fragments is in `Initial` state.
434    pub fn is_initial(&self) -> bool {
435        self.state == State::Initial
436    }
437
438    /// Set the state of the table fragments.
439    pub fn set_state(&mut self, state: State) {
440        self.state = state;
441    }
442
443    /// Update state of all actors
444    pub fn update_actors_state(&mut self, state: ActorState) {
445        for actor_status in self.actor_status.values_mut() {
446            actor_status.set_state(state);
447        }
448    }
449
450    pub fn set_actor_splits_by_split_assignment(&mut self, split_assignment: SplitAssignment) {
451        self.actor_splits = split_assignment.into_values().flatten().collect();
452    }
453
454    /// Returns actor ids associated with this table.
455    pub fn actor_ids(&self) -> Vec<ActorId> {
456        self.fragments
457            .values()
458            .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
459            .collect()
460    }
461
462    pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
463        self.fragments
464            .values()
465            .flat_map(|fragment| {
466                fragment
467                    .actors
468                    .iter()
469                    .map(|actor| (actor.actor_id, fragment.fragment_id))
470            })
471            .collect()
472    }
473
474    /// Returns actors associated with this table.
475    #[cfg(test)]
476    pub fn actors(&self) -> Vec<StreamActor> {
477        self.fragments
478            .values()
479            .flat_map(|fragment| fragment.actors.clone())
480            .collect()
481    }
482
483    /// Returns the actor ids with the given fragment type.
484    pub fn filter_actor_ids(
485        &self,
486        check_type: impl Fn(FragmentTypeMask) -> bool + 'static,
487    ) -> impl Iterator<Item = ActorId> + '_ {
488        self.fragments
489            .values()
490            .filter(move |fragment| check_type(fragment.fragment_type_mask))
491            .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
492    }
493
494    /// Returns mview actor ids.
495    pub fn mview_actor_ids(&self) -> Vec<ActorId> {
496        Self::filter_actor_ids(self, |fragment_type_mask| {
497            fragment_type_mask.contains(FragmentTypeFlag::Mview)
498        })
499        .collect()
500    }
501
502    /// Returns actor ids that need to be tracked when creating MV.
503    pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
504        let mut actor_ids = vec![];
505        for fragment in self.fragments.values() {
506            if fragment
507                .fragment_type_mask
508                .contains(FragmentTypeFlag::CdcFilter)
509            {
510                // Note: CDC table job contains a StreamScan fragment (StreamCdcScan node) and a CdcFilter fragment.
511                // We don't track any fragments' progress.
512                return vec![];
513            }
514            if fragment.fragment_type_mask.contains_any([
515                FragmentTypeFlag::Values,
516                FragmentTypeFlag::StreamScan,
517                FragmentTypeFlag::SourceScan,
518            ]) {
519                actor_ids.extend(fragment.actors.iter().map(|actor| {
520                    (
521                        actor.actor_id,
522                        BackfillUpstreamType::from_fragment_type_mask(fragment.fragment_type_mask),
523                    )
524                }));
525            }
526        }
527        actor_ids
528    }
529
530    pub fn root_fragment(&self) -> Option<Fragment> {
531        self.mview_fragment()
532            .or_else(|| self.sink_fragment())
533            .or_else(|| self.source_fragment())
534    }
535
536    /// Returns the fragment with the `Mview` type flag.
537    pub fn mview_fragment(&self) -> Option<Fragment> {
538        self.fragments
539            .values()
540            .find(|fragment| {
541                fragment
542                    .fragment_type_mask
543                    .contains(FragmentTypeFlag::Mview)
544            })
545            .cloned()
546    }
547
548    pub fn source_fragment(&self) -> Option<Fragment> {
549        self.fragments
550            .values()
551            .find(|fragment| {
552                fragment
553                    .fragment_type_mask
554                    .contains(FragmentTypeFlag::Source)
555            })
556            .cloned()
557    }
558
559    pub fn sink_fragment(&self) -> Option<Fragment> {
560        self.fragments
561            .values()
562            .find(|fragment| fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink))
563            .cloned()
564    }
565
566    pub fn snapshot_backfill_actor_ids(&self) -> HashSet<ActorId> {
567        Self::filter_actor_ids(self, |mask| {
568            mask.contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
569        })
570        .collect()
571    }
572
573    /// Extract the fragments that include source executors that contains an external stream source,
574    /// grouping by source id.
575    pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
576        let mut source_fragments = HashMap::new();
577
578        for fragment in self.fragments() {
579            {
580                if let Some(source_id) = fragment.nodes.find_stream_source() {
581                    source_fragments
582                        .entry(source_id as SourceId)
583                        .or_insert(BTreeSet::new())
584                        .insert(fragment.fragment_id as FragmentId);
585                }
586            }
587        }
588        source_fragments
589    }
590
591    /// Returns (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`)).
592    ///
593    /// Note: the fragment `source_backfill_fragment_id` may actually have multiple upstream fragments,
594    /// but only one of them is the upstream source fragment, which is what we return.
595    pub fn source_backfill_fragments(
596        &self,
597    ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
598        let mut source_backfill_fragments = HashMap::new();
599
600        for fragment in self.fragments() {
601            {
602                if let Some((source_id, upstream_source_fragment_id)) =
603                    fragment.nodes.find_source_backfill()
604                {
605                    source_backfill_fragments
606                        .entry(source_id as SourceId)
607                        .or_insert(BTreeSet::new())
608                        .insert((fragment.fragment_id, upstream_source_fragment_id));
609                }
610            }
611        }
612        source_backfill_fragments
613    }
614
615    /// Find the table job's `Union` fragment.
616    /// Panics if not found.
617    pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
618        let mut union_fragment_id = None;
619        for (fragment_id, fragment) in &self.fragments {
620            {
621                {
622                    visit_stream_node_body(&fragment.nodes, |body| {
623                        if let NodeBody::Union(_) = body {
624                            if let Some(union_fragment_id) = union_fragment_id.as_mut() {
625                                // The union fragment should be unique.
626                                assert_eq!(*union_fragment_id, *fragment_id);
627                            } else {
628                                union_fragment_id = Some(*fragment_id);
629                            }
630                        }
631                    })
632                }
633            }
634        }
635
636        let union_fragment_id =
637            union_fragment_id.expect("fragment of placeholder merger not found");
638
639        (self
640            .fragments
641            .get_mut(&union_fragment_id)
642            .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
643    }
644
645    /// Resolve dependent table
646    fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
647        let table_id = match stream_node.node_body.as_ref() {
648            Some(NodeBody::StreamScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
649            Some(NodeBody::StreamCdcScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
650            _ => None,
651        };
652        if let Some(table_id) = table_id {
653            table_ids.entry(table_id).or_default().add_assign(1);
654        }
655
656        for child in &stream_node.input {
657            Self::resolve_dependent_table(child, table_ids);
658        }
659    }
660
661    /// Returns upstream table counts.
662    pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
663        let mut table_ids = HashMap::new();
664        self.fragments.values().for_each(|fragment| {
665            Self::resolve_dependent_table(&fragment.nodes, &mut table_ids);
666        });
667
668        table_ids
669    }
670
671    /// Returns states of actors group by worker id.
672    pub fn worker_actor_states(&self) -> BTreeMap<WorkerId, Vec<(ActorId, ActorState)>> {
673        let mut map = BTreeMap::default();
674        for (&actor_id, actor_status) in &self.actor_status {
675            let node_id = actor_status.worker_id() as WorkerId;
676            map.entry(node_id)
677                .or_insert_with(Vec::new)
678                .push((actor_id, actor_status.state()));
679        }
680        map
681    }
682
683    /// Returns actor locations group by worker id.
684    pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
685        let mut map = BTreeMap::default();
686        for (&actor_id, actor_status) in &self.actor_status {
687            let node_id = actor_status.worker_id() as WorkerId;
688            map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
689        }
690        map
691    }
692
693    /// Returns the status of actors group by worker id.
694    pub fn active_actors(&self) -> Vec<StreamActor> {
695        let mut actors = vec![];
696        for fragment in self.fragments.values() {
697            for actor in &fragment.actors {
698                if self.actor_status[&actor.actor_id].state == ActorState::Inactive as i32 {
699                    continue;
700                }
701                actors.push(actor.clone());
702            }
703        }
704        actors
705    }
706
707    pub fn actors_to_create(
708        &self,
709    ) -> impl Iterator<
710        Item = (
711            FragmentId,
712            &StreamNode,
713            impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
714        ),
715    > + '_ {
716        self.fragments.values().map(move |fragment| {
717            (
718                fragment.fragment_id,
719                &fragment.nodes,
720                fragment.actors.iter().map(move |actor| {
721                    let worker_id = self
722                        .actor_status
723                        .get(&actor.actor_id)
724                        .expect("should exist")
725                        .worker_id() as WorkerId;
726                    (actor, worker_id)
727                }),
728            )
729        })
730    }
731
732    pub fn mv_table_id(&self) -> Option<u32> {
733        if self
734            .fragments
735            .values()
736            .flat_map(|f| f.state_table_ids.iter())
737            .any(|table_id| *table_id == self.stream_job_id.table_id)
738        {
739            Some(self.stream_job_id.table_id)
740        } else {
741            None
742        }
743    }
744
745    pub fn collect_tables(fragments: impl Iterator<Item = &Fragment>) -> BTreeMap<u32, Table> {
746        let mut tables = BTreeMap::new();
747        for fragment in fragments {
748            stream_graph_visitor::visit_stream_node_tables_inner(
749                &mut fragment.nodes.clone(),
750                false,
751                true,
752                |table, _| {
753                    let table_id = table.id;
754                    tables
755                        .try_insert(table_id, table.clone())
756                        .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
757                },
758            );
759        }
760        tables
761    }
762
763    /// Returns the internal table ids without the mview table.
764    pub fn internal_table_ids(&self) -> Vec<u32> {
765        self.fragments
766            .values()
767            .flat_map(|f| f.state_table_ids.clone())
768            .filter(|&t| t != self.stream_job_id.table_id)
769            .collect_vec()
770    }
771
772    /// Returns all internal table ids including the mview table.
773    pub fn all_table_ids(&self) -> impl Iterator<Item = u32> + '_ {
774        self.fragments
775            .values()
776            .flat_map(|f| f.state_table_ids.clone())
777    }
778
779    /// Fill the `expr_context` in `StreamActor`. Used for compatibility.
780    pub fn fill_expr_context(mut self) -> Self {
781        self.fragments.values_mut().for_each(|fragment| {
782            fragment.actors.iter_mut().for_each(|actor| {
783                if actor.expr_context.is_none() {
784                    actor.expr_context = Some(self.ctx.to_expr_context());
785                }
786            });
787        });
788        self
789    }
790}
791
792#[derive(Debug, Clone, Copy, PartialEq, Eq)]
793pub enum BackfillUpstreamType {
794    MView,
795    Values,
796    Source,
797}
798
799impl BackfillUpstreamType {
800    pub fn from_fragment_type_mask(mask: FragmentTypeMask) -> Self {
801        let is_mview = mask.contains(FragmentTypeFlag::StreamScan);
802        let is_values = mask.contains(FragmentTypeFlag::Values);
803        let is_source = mask.contains(FragmentTypeFlag::SourceScan);
804
805        // Note: in theory we can have multiple backfill executors in one fragment, but currently it's not possible.
806        // See <https://github.com/risingwavelabs/risingwave/issues/6236>.
807        debug_assert!(
808            is_mview as u8 + is_values as u8 + is_source as u8 == 1,
809            "a backfill fragment should either be mview, value or source, found {:?}",
810            mask
811        );
812
813        if is_mview {
814            BackfillUpstreamType::MView
815        } else if is_values {
816            BackfillUpstreamType::Values
817        } else if is_source {
818            BackfillUpstreamType::Source
819        } else {
820            unreachable!("invalid fragment type mask: {:?}", mask);
821        }
822    }
823}