risingwave_meta/model/
stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
22use risingwave_common::hash::{
23    ActorAlignmentId, IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat,
24};
25use risingwave_common::id::JobId;
26use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node_body};
27use risingwave_meta_model::{DispatcherType, SourceId, StreamingParallelism, WorkerId};
28use risingwave_pb::catalog::Table;
29use risingwave_pb::common::{ActorInfo, PbActorLocation};
30use risingwave_pb::meta::table_fragments::fragment::{
31    FragmentDistributionType, PbFragmentDistributionType,
32};
33use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
34use risingwave_pb::meta::table_parallelism::{
35    FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
36    PbParallelism,
37};
38use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
39use risingwave_pb::plan_common::PbExprContext;
40use risingwave_pb::stream_plan::stream_node::NodeBody;
41use risingwave_pb::stream_plan::{
42    DispatchStrategy, Dispatcher, PbDispatchOutputMapping, PbDispatcher, PbStreamActor,
43    PbStreamContext, StreamNode,
44};
45
46use super::{ActorId, FragmentId};
47
48/// The parallelism for a `TableFragments`.
49#[derive(Debug, Copy, Clone, Eq, PartialEq)]
50pub enum TableParallelism {
51    /// This is when the system decides the parallelism, based on the available worker parallelisms.
52    Adaptive,
53    /// We set this when the `TableFragments` parallelism is changed.
54    /// All fragments which are part of the `TableFragment` will have the same parallelism as this.
55    Fixed(usize),
56    /// We set this when the individual parallelisms of the `Fragments`
57    /// can differ within a `TableFragments`.
58    /// This is set for `risectl`, since it has a low-level interface,
59    /// scale individual `Fragments` within `TableFragments`.
60    /// When that happens, the `TableFragments` no longer has a consistent
61    /// parallelism, so we set this to indicate that.
62    Custom,
63}
64
65impl From<PbTableParallelism> for TableParallelism {
66    fn from(value: PbTableParallelism) -> Self {
67        use Parallelism::*;
68        match &value.parallelism {
69            Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
70            Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
71            Some(Custom(_)) => Self::Custom,
72            _ => unreachable!(),
73        }
74    }
75}
76
77impl From<TableParallelism> for PbTableParallelism {
78    fn from(value: TableParallelism) -> Self {
79        use TableParallelism::*;
80
81        let parallelism = match value {
82            Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
83            Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
84                parallelism: n as u32,
85            }),
86            Custom => PbParallelism::Custom(PbCustomParallelism {}),
87        };
88
89        Self {
90            parallelism: Some(parallelism),
91        }
92    }
93}
94
95impl From<StreamingParallelism> for TableParallelism {
96    fn from(value: StreamingParallelism) -> Self {
97        match value {
98            StreamingParallelism::Adaptive => TableParallelism::Adaptive,
99            StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
100            StreamingParallelism::Custom => TableParallelism::Custom,
101        }
102    }
103}
104
105impl From<TableParallelism> for StreamingParallelism {
106    fn from(value: TableParallelism) -> Self {
107        match value {
108            TableParallelism::Adaptive => StreamingParallelism::Adaptive,
109            TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
110            TableParallelism::Custom => StreamingParallelism::Custom,
111        }
112    }
113}
114
115pub type ActorUpstreams = BTreeMap<FragmentId, HashMap<ActorId, ActorInfo>>;
116pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
117pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
118pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
119
120pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
121/// downstream `fragment_id` -> original upstream `fragment_id` -> new upstream `fragment_id`
122pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
123/// The newly added no-shuffle actor dispatcher from upstream fragment to downstream fragment
124/// upstream `fragment_id` -> downstream `fragment_id` -> upstream `actor_id` -> downstream `actor_id`
125pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
126
127#[derive(Debug, Clone)]
128pub struct DownstreamFragmentRelation {
129    pub downstream_fragment_id: FragmentId,
130    pub dispatcher_type: DispatcherType,
131    pub dist_key_indices: Vec<u32>,
132    pub output_mapping: PbDispatchOutputMapping,
133}
134
135impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
136    fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
137        Self {
138            downstream_fragment_id: fragment_id,
139            dispatcher_type: dispatch.get_type().unwrap().into(),
140            dist_key_indices: dispatch.dist_key_indices,
141            output_mapping: dispatch.output_mapping.unwrap(),
142        }
143    }
144}
145
146#[derive(Debug, Clone)]
147pub struct StreamJobFragmentsToCreate {
148    pub inner: StreamJobFragments,
149    pub downstreams: FragmentDownstreamRelation,
150}
151
152impl Deref for StreamJobFragmentsToCreate {
153    type Target = StreamJobFragments;
154
155    fn deref(&self) -> &Self::Target {
156        &self.inner
157    }
158}
159
160#[derive(Clone, Debug)]
161pub struct StreamActor {
162    pub actor_id: ActorId,
163    pub fragment_id: FragmentId,
164    pub vnode_bitmap: Option<Bitmap>,
165    pub mview_definition: String,
166    pub expr_context: Option<PbExprContext>,
167    // TODO: shall we merge `config_override` with `expr_context` to be a `StreamContext`?
168    pub config_override: Arc<str>,
169}
170
171impl StreamActor {
172    fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
173        PbStreamActor {
174            actor_id: self.actor_id,
175            fragment_id: self.fragment_id,
176            dispatcher: dispatchers.collect(),
177            vnode_bitmap: self
178                .vnode_bitmap
179                .as_ref()
180                .map(|bitmap| bitmap.to_protobuf()),
181            mview_definition: self.mview_definition.clone(),
182            expr_context: self.expr_context.clone(),
183            config_override: self.config_override.to_string(),
184        }
185    }
186}
187
188#[derive(Clone, Debug, Default)]
189pub struct Fragment {
190    pub fragment_id: FragmentId,
191    pub fragment_type_mask: FragmentTypeMask,
192    pub distribution_type: PbFragmentDistributionType,
193    pub actors: Vec<StreamActor>,
194    pub state_table_ids: Vec<TableId>,
195    pub maybe_vnode_count: Option<u32>,
196    pub nodes: StreamNode,
197}
198
199impl Fragment {
200    pub fn to_protobuf(
201        &self,
202        upstream_fragments: impl Iterator<Item = FragmentId>,
203        dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
204    ) -> PbFragment {
205        PbFragment {
206            fragment_id: self.fragment_id,
207            fragment_type_mask: self.fragment_type_mask.into(),
208            distribution_type: self.distribution_type as _,
209            actors: self
210                .actors
211                .iter()
212                .map(|actor| {
213                    actor.to_protobuf(
214                        dispatchers
215                            .and_then(|dispatchers| dispatchers.get(&(actor.actor_id as _)))
216                            .into_iter()
217                            .flatten()
218                            .cloned(),
219                    )
220                })
221                .collect(),
222            state_table_ids: self.state_table_ids.clone(),
223            upstream_fragment_ids: upstream_fragments.collect(),
224            maybe_vnode_count: self.maybe_vnode_count,
225            nodes: Some(self.nodes.clone()),
226        }
227    }
228}
229
230impl VnodeCountCompat for Fragment {
231    fn vnode_count_inner(&self) -> VnodeCount {
232        VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
233    }
234}
235
236impl IsSingleton for Fragment {
237    fn is_singleton(&self) -> bool {
238        matches!(self.distribution_type, FragmentDistributionType::Single)
239    }
240}
241
242/// Fragments of a streaming job. Corresponds to [`PbTableFragments`].
243/// (It was previously called `TableFragments` due to historical reasons.)
244///
245/// We store whole fragments in a single column family as follow:
246/// `stream_job_id` => `StreamJobFragments`.
247#[derive(Debug, Clone)]
248pub struct StreamJobFragments {
249    /// The table id.
250    pub stream_job_id: JobId,
251
252    /// The state of the table fragments.
253    pub state: State,
254
255    /// The table fragments.
256    pub fragments: BTreeMap<FragmentId, Fragment>,
257
258    /// The status of actors
259    pub actor_status: BTreeMap<ActorId, ActorStatus>,
260
261    /// The streaming context associated with this stream plan and its fragments
262    pub ctx: StreamContext,
263
264    /// The parallelism assigned to this table fragments
265    pub assigned_parallelism: TableParallelism,
266
267    /// The max parallelism specified when the streaming job was created, i.e., expected vnode count.
268    ///
269    /// The reason for persisting this value is mainly to check if a parallelism change (via `ALTER
270    /// .. SET PARALLELISM`) is valid, so that the behavior can be consistent with the creation of
271    /// the streaming job.
272    ///
273    /// Note that the actual vnode count, denoted by `vnode_count` in `fragments`, may be different
274    /// from this value (see `StreamFragmentGraph.max_parallelism` for more details.). As a result,
275    /// checking the parallelism change with this value can be inaccurate in some cases. However,
276    /// when generating resizing plans, we still take the `vnode_count` of each fragment into account.
277    pub max_parallelism: usize,
278}
279
280#[derive(Debug, Clone, Default)]
281pub struct StreamContext {
282    /// The timezone used to interpret timestamps and dates for conversion
283    pub timezone: Option<String>,
284
285    /// The partial config of this job to override the global config.
286    pub config_override: Arc<str>,
287}
288
289impl StreamContext {
290    pub fn to_protobuf(&self) -> PbStreamContext {
291        PbStreamContext {
292            timezone: self.timezone.clone().unwrap_or("".into()),
293            config_override: self.config_override.to_string(),
294        }
295    }
296
297    pub fn to_expr_context(&self) -> PbExprContext {
298        PbExprContext {
299            // `self.timezone` must always be set; an invalid value is used here for debugging if it's not.
300            time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
301            strict_mode: false,
302        }
303    }
304
305    pub fn from_protobuf(prost: &PbStreamContext) -> Self {
306        Self {
307            timezone: if prost.get_timezone().is_empty() {
308                None
309            } else {
310                Some(prost.get_timezone().clone())
311            },
312            config_override: prost.get_config_override().as_str().into(),
313        }
314    }
315}
316
317#[easy_ext::ext(StreamingJobModelContextExt)]
318impl risingwave_meta_model::streaming_job::Model {
319    pub fn stream_context(&self) -> StreamContext {
320        StreamContext {
321            timezone: self.timezone.clone(),
322            config_override: self.config_override.clone().unwrap_or_default().into(),
323        }
324    }
325}
326
327impl StreamJobFragments {
328    pub fn to_protobuf(
329        &self,
330        fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
331        fragment_dispatchers: &FragmentActorDispatchers,
332    ) -> PbTableFragments {
333        PbTableFragments {
334            table_id: self.stream_job_id,
335            state: self.state as _,
336            fragments: self
337                .fragments
338                .iter()
339                .map(|(id, fragment)| {
340                    (
341                        *id,
342                        fragment.to_protobuf(
343                            fragment_upstreams.get(id).into_iter().flatten().cloned(),
344                            fragment_dispatchers.get(&(*id as _)),
345                        ),
346                    )
347                })
348                .collect(),
349            actor_status: self
350                .actor_status
351                .iter()
352                .map(|(actor_id, status)| (*actor_id, *status))
353                .collect(),
354            ctx: Some(self.ctx.to_protobuf()),
355            parallelism: Some(self.assigned_parallelism.into()),
356            node_label: "".to_owned(),
357            backfill_done: true,
358            max_parallelism: Some(self.max_parallelism as _),
359        }
360    }
361}
362
363pub type StreamJobActorsToCreate = HashMap<
364    WorkerId,
365    HashMap<FragmentId, (StreamNode, Vec<StreamActorWithUpDownstreams>, HashSet<u32>)>,
366>;
367
368impl StreamJobFragments {
369    /// Create a new `TableFragments` with state of `Initial`, with other fields empty.
370    pub fn for_test(job_id: JobId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
371        Self::new(
372            job_id,
373            fragments,
374            &BTreeMap::new(),
375            StreamContext::default(),
376            TableParallelism::Adaptive,
377            VirtualNode::COUNT_FOR_TEST,
378        )
379    }
380
381    /// Create a new `TableFragments` with state of `Initial`, recording actor locations on the given
382    /// workers.
383    pub fn new(
384        stream_job_id: JobId,
385        fragments: BTreeMap<FragmentId, Fragment>,
386        actor_locations: &BTreeMap<ActorId, ActorAlignmentId>,
387        ctx: StreamContext,
388        table_parallelism: TableParallelism,
389        max_parallelism: usize,
390    ) -> Self {
391        let actor_status = actor_locations
392            .iter()
393            .map(|(&actor_id, alignment_id)| {
394                (
395                    actor_id,
396                    ActorStatus {
397                        location: PbActorLocation::from_worker(alignment_id.worker_id()),
398                    },
399                )
400            })
401            .collect();
402
403        Self {
404            stream_job_id,
405            state: State::Initial,
406            fragments,
407            actor_status,
408            ctx,
409            assigned_parallelism: table_parallelism,
410            max_parallelism,
411        }
412    }
413
414    pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
415        self.fragments.keys().cloned()
416    }
417
418    pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
419        self.fragments.values()
420    }
421
422    pub fn fragment_actors(&self, fragment_id: FragmentId) -> &[StreamActor] {
423        self.fragments
424            .get(&fragment_id)
425            .map(|f| f.actors.as_slice())
426            .unwrap_or_default()
427    }
428
429    /// Returns the table id.
430    pub fn stream_job_id(&self) -> JobId {
431        self.stream_job_id
432    }
433
434    /// Returns the timezone of the table
435    pub fn timezone(&self) -> Option<String> {
436        self.ctx.timezone.clone()
437    }
438
439    /// Returns whether the table fragments is in `Created` state.
440    pub fn is_created(&self) -> bool {
441        self.state == State::Created
442    }
443
444    /// Returns actor ids associated with this table.
445    pub fn actor_ids(&self) -> impl Iterator<Item = ActorId> + '_ {
446        self.fragments
447            .values()
448            .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
449    }
450
451    pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
452        self.fragments
453            .values()
454            .flat_map(|fragment| {
455                fragment
456                    .actors
457                    .iter()
458                    .map(|actor| (actor.actor_id, fragment.fragment_id))
459            })
460            .collect()
461    }
462
463    /// Returns actors associated with this table.
464    #[cfg(test)]
465    pub fn actors(&self) -> Vec<StreamActor> {
466        self.fragments
467            .values()
468            .flat_map(|fragment| fragment.actors.clone())
469            .collect()
470    }
471
472    /// Returns mview fragment ids.
473    #[cfg(test)]
474    pub fn mview_fragment_ids(&self) -> Vec<FragmentId> {
475        self.fragments
476            .values()
477            .filter(move |fragment| {
478                fragment
479                    .fragment_type_mask
480                    .contains(FragmentTypeFlag::Mview)
481            })
482            .map(|fragment| fragment.fragment_id)
483            .collect()
484    }
485
486    pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
487        Self::tracking_progress_actor_ids_impl(self.fragments.values().map(|fragment| {
488            (
489                fragment.fragment_type_mask,
490                fragment.actors.iter().map(|actor| actor.actor_id),
491            )
492        }))
493    }
494
495    /// Returns actor ids that need to be tracked when creating MV.
496    pub fn tracking_progress_actor_ids_impl(
497        fragments: impl IntoIterator<Item = (FragmentTypeMask, impl Iterator<Item = ActorId>)>,
498    ) -> Vec<(ActorId, BackfillUpstreamType)> {
499        let mut actor_ids = vec![];
500        for (fragment_type_mask, actors) in fragments {
501            if fragment_type_mask.contains(FragmentTypeFlag::CdcFilter) {
502                // Note: CDC table job contains a StreamScan fragment (StreamCdcScan node) and a CdcFilter fragment.
503                // We don't track any fragments' progress.
504                return vec![];
505            }
506            if fragment_type_mask.contains_any([
507                FragmentTypeFlag::Values,
508                FragmentTypeFlag::StreamScan,
509                FragmentTypeFlag::SourceScan,
510                FragmentTypeFlag::LocalityProvider,
511            ]) {
512                actor_ids.extend(actors.map(|actor_id| {
513                    (
514                        actor_id,
515                        BackfillUpstreamType::from_fragment_type_mask(fragment_type_mask),
516                    )
517                }));
518            }
519        }
520        actor_ids
521    }
522
523    pub fn root_fragment(&self) -> Option<Fragment> {
524        self.mview_fragment()
525            .or_else(|| self.sink_fragment())
526            .or_else(|| self.source_fragment())
527    }
528
529    /// Returns the fragment with the `Mview` type flag.
530    pub fn mview_fragment(&self) -> Option<Fragment> {
531        self.fragments
532            .values()
533            .find(|fragment| {
534                fragment
535                    .fragment_type_mask
536                    .contains(FragmentTypeFlag::Mview)
537            })
538            .cloned()
539    }
540
541    pub fn source_fragment(&self) -> Option<Fragment> {
542        self.fragments
543            .values()
544            .find(|fragment| {
545                fragment
546                    .fragment_type_mask
547                    .contains(FragmentTypeFlag::Source)
548            })
549            .cloned()
550    }
551
552    pub fn sink_fragment(&self) -> Option<Fragment> {
553        self.fragments
554            .values()
555            .find(|fragment| fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink))
556            .cloned()
557    }
558
559    /// Extract the fragments that include source executors that contains an external stream source,
560    /// grouping by source id.
561    pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
562        let mut source_fragments = HashMap::new();
563
564        for fragment in self.fragments() {
565            {
566                if let Some(source_id) = fragment.nodes.find_stream_source() {
567                    source_fragments
568                        .entry(source_id)
569                        .or_insert(BTreeSet::new())
570                        .insert(fragment.fragment_id as FragmentId);
571                }
572            }
573        }
574        source_fragments
575    }
576
577    pub fn source_backfill_fragments(
578        &self,
579    ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
580        Self::source_backfill_fragments_impl(
581            self.fragments
582                .iter()
583                .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
584        )
585    }
586
587    /// Returns (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`)).
588    ///
589    /// Note: the fragment `source_backfill_fragment_id` may actually have multiple upstream fragments,
590    /// but only one of them is the upstream source fragment, which is what we return.
591    pub fn source_backfill_fragments_impl(
592        fragments: impl Iterator<Item = (FragmentId, &StreamNode)>,
593    ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
594        let mut source_backfill_fragments = HashMap::new();
595
596        for (fragment_id, fragment_node) in fragments {
597            {
598                if let Some((source_id, upstream_source_fragment_id)) =
599                    fragment_node.find_source_backfill()
600                {
601                    source_backfill_fragments
602                        .entry(source_id)
603                        .or_insert(BTreeSet::new())
604                        .insert((fragment_id, upstream_source_fragment_id));
605                }
606            }
607        }
608        source_backfill_fragments
609    }
610
611    /// Find the table job's `Union` fragment.
612    /// Panics if not found.
613    pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
614        let mut union_fragment_id = None;
615        for (fragment_id, fragment) in &self.fragments {
616            {
617                {
618                    visit_stream_node_body(&fragment.nodes, |body| {
619                        if let NodeBody::Union(_) = body {
620                            if let Some(union_fragment_id) = union_fragment_id.as_mut() {
621                                // The union fragment should be unique.
622                                assert_eq!(*union_fragment_id, *fragment_id);
623                            } else {
624                                union_fragment_id = Some(*fragment_id);
625                            }
626                        }
627                    })
628                }
629            }
630        }
631
632        let union_fragment_id =
633            union_fragment_id.expect("fragment of placeholder merger not found");
634
635        (self
636            .fragments
637            .get_mut(&union_fragment_id)
638            .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
639    }
640
641    /// Resolve dependent table
642    fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
643        let table_id = match stream_node.node_body.as_ref() {
644            Some(NodeBody::StreamScan(stream_scan)) => Some(stream_scan.table_id),
645            Some(NodeBody::StreamCdcScan(stream_scan)) => Some(stream_scan.table_id),
646            _ => None,
647        };
648        if let Some(table_id) = table_id {
649            table_ids.entry(table_id).or_default().add_assign(1);
650        }
651
652        for child in &stream_node.input {
653            Self::resolve_dependent_table(child, table_ids);
654        }
655    }
656
657    pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
658        Self::upstream_table_counts_impl(self.fragments.values().map(|fragment| &fragment.nodes))
659    }
660
661    /// Returns upstream table counts.
662    pub fn upstream_table_counts_impl(
663        fragment_nodes: impl Iterator<Item = &StreamNode>,
664    ) -> HashMap<TableId, usize> {
665        let mut table_ids = HashMap::new();
666        fragment_nodes.for_each(|node| {
667            Self::resolve_dependent_table(node, &mut table_ids);
668        });
669
670        table_ids
671    }
672
673    /// Returns actor locations group by worker id.
674    pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
675        let mut map = BTreeMap::default();
676        for (&actor_id, actor_status) in &self.actor_status {
677            let node_id = actor_status.worker_id();
678            map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
679        }
680        map
681    }
682
683    pub fn actors_to_create(
684        &self,
685    ) -> impl Iterator<
686        Item = (
687            FragmentId,
688            &StreamNode,
689            impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
690        ),
691    > + '_ {
692        self.fragments.values().map(move |fragment| {
693            (
694                fragment.fragment_id,
695                &fragment.nodes,
696                fragment.actors.iter().map(move |actor| {
697                    let worker_id: WorkerId = self
698                        .actor_status
699                        .get(&actor.actor_id)
700                        .expect("should exist")
701                        .worker_id();
702                    (actor, worker_id)
703                }),
704            )
705        })
706    }
707
708    pub fn mv_table_id(&self) -> Option<TableId> {
709        self.fragments
710            .values()
711            .flat_map(|f| f.state_table_ids.iter().copied())
712            .find(|table_id| self.stream_job_id.is_mv_table_id(*table_id))
713    }
714
715    pub fn collect_tables(fragments: impl Iterator<Item = &Fragment>) -> BTreeMap<TableId, Table> {
716        let mut tables = BTreeMap::new();
717        for fragment in fragments {
718            stream_graph_visitor::visit_stream_node_tables_inner(
719                &mut fragment.nodes.clone(),
720                false,
721                true,
722                |table, _| {
723                    let table_id = table.id;
724                    tables
725                        .try_insert(table_id, table.clone())
726                        .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
727                },
728            );
729        }
730        tables
731    }
732
733    /// Returns the internal table ids without the mview table.
734    pub fn internal_table_ids(&self) -> Vec<TableId> {
735        self.fragments
736            .values()
737            .flat_map(|f| f.state_table_ids.iter().copied())
738            .filter(|&t| !self.stream_job_id.is_mv_table_id(t))
739            .collect_vec()
740    }
741
742    /// Returns all internal table ids including the mview table.
743    pub fn all_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
744        self.fragments
745            .values()
746            .flat_map(|f| f.state_table_ids.clone())
747    }
748
749    /// Fill the `expr_context` in `StreamActor`. Used for compatibility.
750    pub fn fill_expr_context(mut self) -> Self {
751        self.fragments.values_mut().for_each(|fragment| {
752            fragment.actors.iter_mut().for_each(|actor| {
753                if actor.expr_context.is_none() {
754                    actor.expr_context = Some(self.ctx.to_expr_context());
755                }
756            });
757        });
758        self
759    }
760}
761
762#[derive(Debug, Clone, Copy, PartialEq, Eq)]
763pub enum BackfillUpstreamType {
764    MView,
765    Values,
766    Source,
767    LocalityProvider,
768}
769
770impl BackfillUpstreamType {
771    pub fn from_fragment_type_mask(mask: FragmentTypeMask) -> Self {
772        let is_mview = mask.contains(FragmentTypeFlag::StreamScan);
773        let is_values = mask.contains(FragmentTypeFlag::Values);
774        let is_source = mask.contains(FragmentTypeFlag::SourceScan);
775        let is_locality_provider = mask.contains(FragmentTypeFlag::LocalityProvider);
776
777        // Note: in theory we can have multiple backfill executors in one fragment, but currently it's not possible.
778        // See <https://github.com/risingwavelabs/risingwave/issues/6236>.
779        debug_assert!(
780            is_mview as u8 + is_values as u8 + is_source as u8 + is_locality_provider as u8 == 1,
781            "a backfill fragment should either be mview, value, source, or locality provider, found {:?}",
782            mask
783        );
784
785        if is_mview {
786            BackfillUpstreamType::MView
787        } else if is_values {
788            BackfillUpstreamType::Values
789        } else if is_source {
790            BackfillUpstreamType::Source
791        } else if is_locality_provider {
792            BackfillUpstreamType::LocalityProvider
793        } else {
794            unreachable!("invalid fragment type mask: {:?}", mask);
795        }
796    }
797}