risingwave_meta/model/
stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::{
22    IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat, WorkerSlotId,
23};
24use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node};
25use risingwave_connector::source::SplitImpl;
26use risingwave_meta_model::actor_dispatcher::DispatcherType;
27use risingwave_meta_model::{SourceId, StreamingParallelism, WorkerId};
28use risingwave_pb::catalog::Table;
29use risingwave_pb::common::PbActorLocation;
30use risingwave_pb::meta::table_fragments::actor_status::ActorState;
31use risingwave_pb::meta::table_fragments::fragment::{
32    FragmentDistributionType, PbFragmentDistributionType,
33};
34use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
35use risingwave_pb::meta::table_parallelism::{
36    FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
37    PbParallelism,
38};
39use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
40use risingwave_pb::plan_common::PbExprContext;
41use risingwave_pb::stream_plan::stream_node::NodeBody;
42use risingwave_pb::stream_plan::{
43    DispatchStrategy, Dispatcher, FragmentTypeFlag, PbDispatcher, PbStreamActor, PbStreamContext,
44    StreamNode,
45};
46
47use super::{ActorId, FragmentId};
48use crate::model::MetadataModelResult;
49use crate::stream::{SplitAssignment, build_actor_connector_splits};
50
51/// The parallelism for a `TableFragments`.
52#[derive(Debug, Copy, Clone, Eq, PartialEq)]
53pub enum TableParallelism {
54    /// This is when the system decides the parallelism, based on the available worker parallelisms.
55    Adaptive,
56    /// We set this when the `TableFragments` parallelism is changed.
57    /// All fragments which are part of the `TableFragment` will have the same parallelism as this.
58    Fixed(usize),
59    /// We set this when the individual parallelisms of the `Fragments`
60    /// can differ within a `TableFragments`.
61    /// This is set for `risectl`, since it has a low-level interface,
62    /// scale individual `Fragments` within `TableFragments`.
63    /// When that happens, the `TableFragments` no longer has a consistent
64    /// parallelism, so we set this to indicate that.
65    Custom,
66}
67
68impl From<PbTableParallelism> for TableParallelism {
69    fn from(value: PbTableParallelism) -> Self {
70        use Parallelism::*;
71        match &value.parallelism {
72            Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
73            Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
74            Some(Custom(_)) => Self::Custom,
75            _ => unreachable!(),
76        }
77    }
78}
79
80impl From<TableParallelism> for PbTableParallelism {
81    fn from(value: TableParallelism) -> Self {
82        use TableParallelism::*;
83
84        let parallelism = match value {
85            Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
86            Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
87                parallelism: n as u32,
88            }),
89            Custom => PbParallelism::Custom(PbCustomParallelism {}),
90        };
91
92        Self {
93            parallelism: Some(parallelism),
94        }
95    }
96}
97
98impl From<StreamingParallelism> for TableParallelism {
99    fn from(value: StreamingParallelism) -> Self {
100        match value {
101            StreamingParallelism::Adaptive => TableParallelism::Adaptive,
102            StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
103            StreamingParallelism::Custom => TableParallelism::Custom,
104        }
105    }
106}
107
108impl From<TableParallelism> for StreamingParallelism {
109    fn from(value: TableParallelism) -> Self {
110        match value {
111            TableParallelism::Adaptive => StreamingParallelism::Adaptive,
112            TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
113            TableParallelism::Custom => StreamingParallelism::Custom,
114        }
115    }
116}
117
118pub type ActorUpstreams = BTreeMap<FragmentId, HashSet<ActorId>>;
119pub type FragmentActorUpstreams = HashMap<ActorId, ActorUpstreams>;
120pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
121pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
122pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
123
124pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
125/// downstream `fragment_id` -> original upstream `fragment_id` -> new upstream `fragment_id`
126pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
127/// The newly added no-shuffle actor dispatcher from upstream fragment to downstream fragment
128/// upstream `fragment_id` -> downstream `fragment_id` -> upstream `actor_id` -> downstream `actor_id`
129pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
130
131#[derive(Debug, Clone)]
132pub struct DownstreamFragmentRelation {
133    pub downstream_fragment_id: FragmentId,
134    pub dispatcher_type: DispatcherType,
135    pub dist_key_indices: Vec<u32>,
136    pub output_indices: Vec<u32>,
137}
138
139impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
140    fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
141        Self {
142            downstream_fragment_id: fragment_id,
143            dispatcher_type: dispatch.get_type().unwrap().into(),
144            dist_key_indices: dispatch.dist_key_indices,
145            output_indices: dispatch.output_indices,
146        }
147    }
148}
149
150#[derive(Debug, Clone)]
151pub struct StreamJobFragmentsToCreate {
152    pub inner: StreamJobFragments,
153    pub downstreams: FragmentDownstreamRelation,
154}
155
156impl Deref for StreamJobFragmentsToCreate {
157    type Target = StreamJobFragments;
158
159    fn deref(&self) -> &Self::Target {
160        &self.inner
161    }
162}
163
164#[derive(Clone, Debug)]
165pub struct StreamActor {
166    pub actor_id: u32,
167    pub fragment_id: u32,
168    pub vnode_bitmap: Option<Bitmap>,
169    pub mview_definition: String,
170    pub expr_context: Option<PbExprContext>,
171}
172
173impl StreamActor {
174    fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
175        PbStreamActor {
176            actor_id: self.actor_id,
177            fragment_id: self.fragment_id,
178            dispatcher: dispatchers.collect(),
179            vnode_bitmap: self
180                .vnode_bitmap
181                .as_ref()
182                .map(|bitmap| bitmap.to_protobuf()),
183            mview_definition: self.mview_definition.clone(),
184            expr_context: self.expr_context.clone(),
185        }
186    }
187}
188
189#[derive(Clone, Debug, Default)]
190pub struct Fragment {
191    pub fragment_id: FragmentId,
192    pub fragment_type_mask: u32,
193    pub distribution_type: PbFragmentDistributionType,
194    pub actors: Vec<StreamActor>,
195    pub state_table_ids: Vec<u32>,
196    pub maybe_vnode_count: Option<u32>,
197    pub nodes: StreamNode,
198}
199
200impl Fragment {
201    pub fn to_protobuf(
202        &self,
203        upstream_fragments: impl Iterator<Item = FragmentId>,
204        dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
205    ) -> PbFragment {
206        PbFragment {
207            fragment_id: self.fragment_id,
208            fragment_type_mask: self.fragment_type_mask,
209            distribution_type: self.distribution_type as _,
210            actors: self
211                .actors
212                .iter()
213                .map(|actor| {
214                    actor.to_protobuf(
215                        dispatchers
216                            .and_then(|dispatchers| dispatchers.get(&(actor.actor_id as _)))
217                            .into_iter()
218                            .flatten()
219                            .cloned(),
220                    )
221                })
222                .collect(),
223            state_table_ids: self.state_table_ids.clone(),
224            upstream_fragment_ids: upstream_fragments.collect(),
225            maybe_vnode_count: self.maybe_vnode_count,
226            nodes: Some(self.nodes.clone()),
227        }
228    }
229}
230
231impl VnodeCountCompat for Fragment {
232    fn vnode_count_inner(&self) -> VnodeCount {
233        VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
234    }
235}
236
237impl IsSingleton for Fragment {
238    fn is_singleton(&self) -> bool {
239        matches!(self.distribution_type, FragmentDistributionType::Single)
240    }
241}
242
243/// Fragments of a streaming job. Corresponds to [`PbTableFragments`].
244/// (It was previously called `TableFragments` due to historical reasons.)
245///
246/// We store whole fragments in a single column family as follow:
247/// `stream_job_id` => `StreamJobFragments`.
248#[derive(Debug, Clone)]
249pub struct StreamJobFragments {
250    /// The table id.
251    pub stream_job_id: TableId,
252
253    /// The state of the table fragments.
254    pub state: State,
255
256    /// The table fragments.
257    pub fragments: BTreeMap<FragmentId, Fragment>,
258
259    /// The status of actors
260    pub actor_status: BTreeMap<ActorId, ActorStatus>,
261
262    /// The splits of actors,
263    /// incl. both `Source` and `SourceBackfill` actors.
264    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
265
266    /// The streaming context associated with this stream plan and its fragments
267    pub ctx: StreamContext,
268
269    /// The parallelism assigned to this table fragments
270    pub assigned_parallelism: TableParallelism,
271
272    /// The max parallelism specified when the streaming job was created, i.e., expected vnode count.
273    ///
274    /// The reason for persisting this value is mainly to check if a parallelism change (via `ALTER
275    /// .. SET PARALLELISM`) is valid, so that the behavior can be consistent with the creation of
276    /// the streaming job.
277    ///
278    /// Note that the actual vnode count, denoted by `vnode_count` in `fragments`, may be different
279    /// from this value (see `StreamFragmentGraph.max_parallelism` for more details.). As a result,
280    /// checking the parallelism change with this value can be inaccurate in some cases. However,
281    /// when generating resizing plans, we still take the `vnode_count` of each fragment into account.
282    pub max_parallelism: usize,
283}
284
285#[derive(Debug, Clone, Default)]
286pub struct StreamContext {
287    /// The timezone used to interpret timestamps and dates for conversion
288    pub timezone: Option<String>,
289}
290
291impl StreamContext {
292    pub fn to_protobuf(&self) -> PbStreamContext {
293        PbStreamContext {
294            timezone: self.timezone.clone().unwrap_or("".into()),
295        }
296    }
297
298    pub fn to_expr_context(&self) -> PbExprContext {
299        PbExprContext {
300            // `self.timezone` must always be set; an invalid value is used here for debugging if it's not.
301            time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
302            strict_mode: false,
303        }
304    }
305
306    pub fn from_protobuf(prost: &PbStreamContext) -> Self {
307        Self {
308            timezone: if prost.get_timezone().is_empty() {
309                None
310            } else {
311                Some(prost.get_timezone().clone())
312            },
313        }
314    }
315}
316
317impl StreamJobFragments {
318    pub fn to_protobuf(
319        &self,
320        fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
321        fragment_dispatchers: &FragmentActorDispatchers,
322    ) -> PbTableFragments {
323        PbTableFragments {
324            table_id: self.stream_job_id.table_id(),
325            state: self.state as _,
326            fragments: self
327                .fragments
328                .iter()
329                .map(|(id, fragment)| {
330                    (
331                        *id,
332                        fragment.to_protobuf(
333                            fragment_upstreams.get(id).into_iter().flatten().cloned(),
334                            fragment_dispatchers.get(&(*id as _)),
335                        ),
336                    )
337                })
338                .collect(),
339            actor_status: self.actor_status.clone().into_iter().collect(),
340            actor_splits: build_actor_connector_splits(&self.actor_splits),
341            ctx: Some(self.ctx.to_protobuf()),
342            parallelism: Some(self.assigned_parallelism.into()),
343            node_label: "".to_owned(),
344            backfill_done: true,
345            max_parallelism: Some(self.max_parallelism as _),
346        }
347    }
348}
349
350pub type StreamJobActorsToCreate =
351    HashMap<WorkerId, HashMap<FragmentId, (StreamNode, Vec<StreamActorWithUpDownstreams>)>>;
352
353impl StreamJobFragments {
354    /// Create a new `TableFragments` with state of `Initial`, with other fields empty.
355    pub fn for_test(table_id: TableId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
356        Self::new(
357            table_id,
358            fragments,
359            &BTreeMap::new(),
360            StreamContext::default(),
361            TableParallelism::Adaptive,
362            VirtualNode::COUNT_FOR_TEST,
363        )
364    }
365
366    /// Create a new `TableFragments` with state of `Initial`, with the status of actors set to
367    /// `Inactive` on the given workers.
368    pub fn new(
369        stream_job_id: TableId,
370        fragments: BTreeMap<FragmentId, Fragment>,
371        actor_locations: &BTreeMap<ActorId, WorkerSlotId>,
372        ctx: StreamContext,
373        table_parallelism: TableParallelism,
374        max_parallelism: usize,
375    ) -> Self {
376        let actor_status = actor_locations
377            .iter()
378            .map(|(&actor_id, worker_slot_id)| {
379                (
380                    actor_id,
381                    ActorStatus {
382                        location: PbActorLocation::from_worker(worker_slot_id.worker_id()),
383                        state: ActorState::Inactive as i32,
384                    },
385                )
386            })
387            .collect();
388
389        Self {
390            stream_job_id,
391            state: State::Initial,
392            fragments,
393            actor_status,
394            actor_splits: HashMap::default(),
395            ctx,
396            assigned_parallelism: table_parallelism,
397            max_parallelism,
398        }
399    }
400
401    pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
402        self.fragments.keys().cloned()
403    }
404
405    pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
406        self.fragments.values()
407    }
408
409    /// Returns the table id.
410    pub fn stream_job_id(&self) -> TableId {
411        self.stream_job_id
412    }
413
414    /// Returns the state of the table fragments.
415    pub fn state(&self) -> State {
416        self.state
417    }
418
419    /// Returns the timezone of the table
420    pub fn timezone(&self) -> Option<String> {
421        self.ctx.timezone.clone()
422    }
423
424    /// Returns whether the table fragments is in `Created` state.
425    pub fn is_created(&self) -> bool {
426        self.state == State::Created
427    }
428
429    /// Returns whether the table fragments is in `Initial` state.
430    pub fn is_initial(&self) -> bool {
431        self.state == State::Initial
432    }
433
434    /// Set the state of the table fragments.
435    pub fn set_state(&mut self, state: State) {
436        self.state = state;
437    }
438
439    /// Update state of all actors
440    pub fn update_actors_state(&mut self, state: ActorState) {
441        for actor_status in self.actor_status.values_mut() {
442            actor_status.set_state(state);
443        }
444    }
445
446    pub fn set_actor_splits_by_split_assignment(&mut self, split_assignment: SplitAssignment) {
447        self.actor_splits = split_assignment.into_values().flatten().collect();
448    }
449
450    /// Returns actor ids associated with this table.
451    pub fn actor_ids(&self) -> Vec<ActorId> {
452        self.fragments
453            .values()
454            .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
455            .collect()
456    }
457
458    pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
459        self.fragments
460            .values()
461            .flat_map(|fragment| {
462                fragment
463                    .actors
464                    .iter()
465                    .map(|actor| (actor.actor_id, fragment.fragment_id))
466            })
467            .collect()
468    }
469
470    /// Returns actors associated with this table.
471    #[cfg(test)]
472    pub fn actors(&self) -> Vec<StreamActor> {
473        self.fragments
474            .values()
475            .flat_map(|fragment| fragment.actors.clone())
476            .collect()
477    }
478
479    /// Returns the actor ids with the given fragment type.
480    pub fn filter_actor_ids(
481        &self,
482        check_type: impl Fn(u32) -> bool + 'static,
483    ) -> impl Iterator<Item = ActorId> + '_ {
484        self.fragments
485            .values()
486            .filter(move |fragment| check_type(fragment.fragment_type_mask))
487            .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
488    }
489
490    /// Returns mview actor ids.
491    pub fn mview_actor_ids(&self) -> Vec<ActorId> {
492        Self::filter_actor_ids(self, |fragment_type_mask| {
493            (fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0
494        })
495        .collect()
496    }
497
498    /// Returns actor ids that need to be tracked when creating MV.
499    pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
500        let mut actor_ids = vec![];
501        for fragment in self.fragments.values() {
502            if fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32 != 0 {
503                // Note: CDC table job contains a StreamScan fragment (StreamCdcScan node) and a CdcFilter fragment.
504                // We don't track any fragments' progress.
505                return vec![];
506            }
507            if (fragment.fragment_type_mask
508                & (FragmentTypeFlag::Values as u32
509                    | FragmentTypeFlag::StreamScan as u32
510                    | FragmentTypeFlag::SourceScan as u32))
511                != 0
512            {
513                actor_ids.extend(fragment.actors.iter().map(|actor| {
514                    (
515                        actor.actor_id,
516                        BackfillUpstreamType::from_fragment_type_mask(fragment.fragment_type_mask),
517                    )
518                }));
519            }
520        }
521        actor_ids
522    }
523
524    pub fn root_fragment(&self) -> Option<Fragment> {
525        self.mview_fragment()
526            .or_else(|| self.sink_fragment())
527            .or_else(|| self.source_fragment())
528    }
529
530    /// Returns the fragment with the `Mview` type flag.
531    pub fn mview_fragment(&self) -> Option<Fragment> {
532        self.fragments
533            .values()
534            .find(|fragment| (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0)
535            .cloned()
536    }
537
538    pub fn source_fragment(&self) -> Option<Fragment> {
539        self.fragments
540            .values()
541            .find(|fragment| (fragment.fragment_type_mask & FragmentTypeFlag::Source as u32) != 0)
542            .cloned()
543    }
544
545    pub fn sink_fragment(&self) -> Option<Fragment> {
546        self.fragments
547            .values()
548            .find(|fragment| (fragment.fragment_type_mask & FragmentTypeFlag::Sink as u32) != 0)
549            .cloned()
550    }
551
552    pub fn snapshot_backfill_actor_ids(&self) -> HashSet<ActorId> {
553        Self::filter_actor_ids(self, |mask| {
554            (mask & FragmentTypeFlag::SnapshotBackfillStreamScan as u32) != 0
555        })
556        .collect()
557    }
558
559    /// Extract the fragments that include source executors that contains an external stream source,
560    /// grouping by source id.
561    pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
562        let mut source_fragments = HashMap::new();
563
564        for fragment in self.fragments() {
565            {
566                if let Some(source_id) = fragment.nodes.find_stream_source() {
567                    source_fragments
568                        .entry(source_id as SourceId)
569                        .or_insert(BTreeSet::new())
570                        .insert(fragment.fragment_id as FragmentId);
571                }
572            }
573        }
574        source_fragments
575    }
576
577    /// Returns (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`)).
578    ///
579    /// Note: the fragment `source_backfill_fragment_id` may actually have multiple upstream fragments,
580    /// but only one of them is the upstream source fragment, which is what we return.
581    pub fn source_backfill_fragments(
582        &self,
583    ) -> MetadataModelResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
584        let mut source_backfill_fragments = HashMap::new();
585
586        for fragment in self.fragments() {
587            {
588                if let Some((source_id, upstream_source_fragment_id)) =
589                    fragment.nodes.find_source_backfill()
590                {
591                    source_backfill_fragments
592                        .entry(source_id as SourceId)
593                        .or_insert(BTreeSet::new())
594                        .insert((fragment.fragment_id, upstream_source_fragment_id));
595                }
596            }
597        }
598        Ok(source_backfill_fragments)
599    }
600
601    /// Find the table job's `Union` fragment.
602    /// Panics if not found.
603    pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
604        let mut union_fragment_id = None;
605        for (fragment_id, fragment) in &self.fragments {
606            {
607                {
608                    visit_stream_node(&fragment.nodes, |body| {
609                        if let NodeBody::Union(_) = body {
610                            if let Some(union_fragment_id) = union_fragment_id.as_mut() {
611                                // The union fragment should be unique.
612                                assert_eq!(*union_fragment_id, *fragment_id);
613                            } else {
614                                union_fragment_id = Some(*fragment_id);
615                            }
616                        }
617                    })
618                }
619            }
620        }
621
622        let union_fragment_id =
623            union_fragment_id.expect("fragment of placeholder merger not found");
624
625        (self
626            .fragments
627            .get_mut(&union_fragment_id)
628            .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
629    }
630
631    /// Resolve dependent table
632    fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
633        let table_id = match stream_node.node_body.as_ref() {
634            Some(NodeBody::StreamScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
635            Some(NodeBody::StreamCdcScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
636            _ => None,
637        };
638        if let Some(table_id) = table_id {
639            table_ids.entry(table_id).or_default().add_assign(1);
640        }
641
642        for child in &stream_node.input {
643            Self::resolve_dependent_table(child, table_ids);
644        }
645    }
646
647    /// Returns upstream table counts.
648    pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
649        let mut table_ids = HashMap::new();
650        self.fragments.values().for_each(|fragment| {
651            Self::resolve_dependent_table(&fragment.nodes, &mut table_ids);
652        });
653
654        table_ids
655    }
656
657    /// Returns states of actors group by worker id.
658    pub fn worker_actor_states(&self) -> BTreeMap<WorkerId, Vec<(ActorId, ActorState)>> {
659        let mut map = BTreeMap::default();
660        for (&actor_id, actor_status) in &self.actor_status {
661            let node_id = actor_status.worker_id() as WorkerId;
662            map.entry(node_id)
663                .or_insert_with(Vec::new)
664                .push((actor_id, actor_status.state()));
665        }
666        map
667    }
668
669    /// Returns actor locations group by worker id.
670    pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
671        let mut map = BTreeMap::default();
672        for (&actor_id, actor_status) in &self.actor_status {
673            let node_id = actor_status.worker_id() as WorkerId;
674            map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
675        }
676        map
677    }
678
679    /// Returns the status of actors group by worker id.
680    pub fn active_actors(&self) -> Vec<StreamActor> {
681        let mut actors = vec![];
682        for fragment in self.fragments.values() {
683            for actor in &fragment.actors {
684                if self.actor_status[&actor.actor_id].state == ActorState::Inactive as i32 {
685                    continue;
686                }
687                actors.push(actor.clone());
688            }
689        }
690        actors
691    }
692
693    pub fn actors_to_create(
694        &self,
695    ) -> impl Iterator<
696        Item = (
697            FragmentId,
698            &StreamNode,
699            impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
700        ),
701    > + '_ {
702        self.fragments.values().map(move |fragment| {
703            (
704                fragment.fragment_id,
705                &fragment.nodes,
706                fragment.actors.iter().map(move |actor| {
707                    let worker_id = self
708                        .actor_status
709                        .get(&actor.actor_id)
710                        .expect("should exist")
711                        .worker_id() as WorkerId;
712                    (actor, worker_id)
713                }),
714            )
715        })
716    }
717
718    pub fn mv_table_id(&self) -> Option<u32> {
719        if self
720            .fragments
721            .values()
722            .flat_map(|f| f.state_table_ids.iter())
723            .any(|table_id| *table_id == self.stream_job_id.table_id)
724        {
725            Some(self.stream_job_id.table_id)
726        } else {
727            None
728        }
729    }
730
731    /// Retrieve the **complete** internal tables map of the whole graph.
732    ///
733    /// Compared to [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`],
734    /// the table catalogs returned here are complete, with all fields filled.
735    pub fn internal_tables(&self) -> BTreeMap<u32, Table> {
736        self.collect_tables_inner(true)
737    }
738
739    /// `internal_tables()` with additional table in `Materialize` node.
740    pub fn all_tables(&self) -> BTreeMap<u32, Table> {
741        self.collect_tables_inner(false)
742    }
743
744    fn collect_tables_inner(&self, internal_tables_only: bool) -> BTreeMap<u32, Table> {
745        let mut tables = BTreeMap::new();
746        for fragment in self.fragments.values() {
747            stream_graph_visitor::visit_stream_node_tables_inner(
748                &mut fragment.nodes.clone(),
749                internal_tables_only,
750                true,
751                |table, _| {
752                    let table_id = table.id;
753                    tables
754                        .try_insert(table_id, table.clone())
755                        .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
756                },
757            );
758        }
759        tables
760    }
761
762    /// Returns the internal table ids without the mview table.
763    pub fn internal_table_ids(&self) -> Vec<u32> {
764        self.fragments
765            .values()
766            .flat_map(|f| f.state_table_ids.clone())
767            .filter(|&t| t != self.stream_job_id.table_id)
768            .collect_vec()
769    }
770
771    /// Returns all internal table ids including the mview table.
772    pub fn all_table_ids(&self) -> impl Iterator<Item = u32> + '_ {
773        self.fragments
774            .values()
775            .flat_map(|f| f.state_table_ids.clone())
776    }
777
778    /// Fill the `expr_context` in `StreamActor`. Used for compatibility.
779    pub fn fill_expr_context(mut self) -> Self {
780        self.fragments.values_mut().for_each(|fragment| {
781            fragment.actors.iter_mut().for_each(|actor| {
782                if actor.expr_context.is_none() {
783                    actor.expr_context = Some(self.ctx.to_expr_context());
784                }
785            });
786        });
787        self
788    }
789}
790
791#[derive(Debug, Clone, Copy, PartialEq, Eq)]
792pub enum BackfillUpstreamType {
793    MView,
794    Values,
795    Source,
796}
797
798impl BackfillUpstreamType {
799    pub fn from_fragment_type_mask(mask: u32) -> Self {
800        let is_mview = (mask & FragmentTypeFlag::StreamScan as u32) != 0;
801        let is_values = (mask & FragmentTypeFlag::Values as u32) != 0;
802        let is_source = (mask & FragmentTypeFlag::SourceScan as u32) != 0;
803
804        // Note: in theory we can have multiple backfill executors in one fragment, but currently it's not possible.
805        // See <https://github.com/risingwavelabs/risingwave/issues/6236>.
806        debug_assert!(
807            is_mview as u8 + is_values as u8 + is_source as u8 == 1,
808            "a backfill fragment should either be mview, value or source, found {:?}",
809            mask
810        );
811
812        if is_mview {
813            BackfillUpstreamType::MView
814        } else if is_values {
815            BackfillUpstreamType::Values
816        } else if is_source {
817            BackfillUpstreamType::Source
818        } else {
819            unreachable!("invalid fragment type mask: {}", mask);
820        }
821    }
822}