risingwave_meta/model/
stream.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
22use risingwave_common::hash::{IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat};
23use risingwave_common::id::JobId;
24use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node_body};
25use risingwave_meta_model::{DispatcherType, SourceId, StreamingParallelism, WorkerId, fragment};
26use risingwave_pb::catalog::Table;
27use risingwave_pb::common::ActorInfo;
28use risingwave_pb::id::SubscriberId;
29use risingwave_pb::meta::table_fragments::fragment::{
30    FragmentDistributionType, PbFragmentDistributionType,
31};
32use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment, State};
33use risingwave_pb::meta::table_parallelism::{
34    FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
35    PbParallelism,
36};
37use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
38use risingwave_pb::plan_common::PbExprContext;
39use risingwave_pb::stream_plan::stream_node::NodeBody;
40use risingwave_pb::stream_plan::{
41    DispatchStrategy, Dispatcher, PbDispatchOutputMapping, PbDispatcher, PbStreamActor,
42    PbStreamContext, StreamNode,
43};
44use strum::Display;
45
46use super::{ActorId, FragmentId};
47
48/// The parallelism for a `TableFragments`.
49#[derive(Debug, Copy, Clone, Eq, PartialEq)]
50pub enum TableParallelism {
51    /// This is when the system decides the parallelism, based on the available worker parallelisms.
52    Adaptive,
53    /// We set this when the `TableFragments` parallelism is changed.
54    /// All fragments which are part of the `TableFragment` will have the same parallelism as this.
55    Fixed(usize),
56    /// We set this when the individual parallelisms of the `Fragments`
57    /// can differ within a `TableFragments`.
58    /// This is set for `risectl`, since it has a low-level interface,
59    /// scale individual `Fragments` within `TableFragments`.
60    /// When that happens, the `TableFragments` no longer has a consistent
61    /// parallelism, so we set this to indicate that.
62    Custom,
63}
64
65impl From<PbTableParallelism> for TableParallelism {
66    fn from(value: PbTableParallelism) -> Self {
67        use Parallelism::*;
68        match &value.parallelism {
69            Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
70            Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
71            Some(Custom(_)) => Self::Custom,
72            _ => unreachable!(),
73        }
74    }
75}
76
77impl From<TableParallelism> for PbTableParallelism {
78    fn from(value: TableParallelism) -> Self {
79        use TableParallelism::*;
80
81        let parallelism = match value {
82            Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
83            Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
84                parallelism: n as u32,
85            }),
86            Custom => PbParallelism::Custom(PbCustomParallelism {}),
87        };
88
89        Self {
90            parallelism: Some(parallelism),
91        }
92    }
93}
94
95impl From<StreamingParallelism> for TableParallelism {
96    fn from(value: StreamingParallelism) -> Self {
97        match value {
98            StreamingParallelism::Adaptive => TableParallelism::Adaptive,
99            StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
100            StreamingParallelism::Custom => TableParallelism::Custom,
101        }
102    }
103}
104
105impl From<TableParallelism> for StreamingParallelism {
106    fn from(value: TableParallelism) -> Self {
107        match value {
108            TableParallelism::Adaptive => StreamingParallelism::Adaptive,
109            TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
110            TableParallelism::Custom => StreamingParallelism::Custom,
111        }
112    }
113}
114
115pub type ActorUpstreams = BTreeMap<FragmentId, HashMap<ActorId, ActorInfo>>;
116pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
117pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
118pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
119
120pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
121/// downstream `fragment_id` -> original upstream `fragment_id` -> new upstream `fragment_id`
122pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
123/// The newly added no-shuffle actor dispatcher from upstream fragment to downstream fragment
124/// upstream `fragment_id` -> downstream `fragment_id` -> upstream `actor_id` -> downstream `actor_id`
125pub type ActorNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
126
127#[derive(Debug, Clone)]
128pub struct DownstreamFragmentRelation {
129    pub downstream_fragment_id: FragmentId,
130    pub dispatcher_type: DispatcherType,
131    pub dist_key_indices: Vec<u32>,
132    pub output_mapping: PbDispatchOutputMapping,
133}
134
135impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
136    fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
137        Self {
138            downstream_fragment_id: fragment_id,
139            dispatcher_type: dispatch.get_type().unwrap().into(),
140            dist_key_indices: dispatch.dist_key_indices,
141            output_mapping: dispatch.output_mapping.unwrap(),
142        }
143    }
144}
145
146#[derive(Debug, Clone)]
147pub struct StreamJobFragmentsToCreate {
148    pub inner: StreamJobFragments,
149    pub downstreams: FragmentDownstreamRelation,
150}
151
152impl Deref for StreamJobFragmentsToCreate {
153    type Target = StreamJobFragments;
154
155    fn deref(&self) -> &Self::Target {
156        &self.inner
157    }
158}
159
160#[derive(Clone, Debug)]
161pub struct StreamActor {
162    pub actor_id: ActorId,
163    pub fragment_id: FragmentId,
164    pub vnode_bitmap: Option<Bitmap>,
165    pub mview_definition: String,
166    pub expr_context: Option<PbExprContext>,
167    // TODO: shall we merge `config_override` with `expr_context` to be a `StreamContext`?
168    pub config_override: Arc<str>,
169}
170
171impl StreamActor {
172    fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
173        PbStreamActor {
174            actor_id: self.actor_id,
175            fragment_id: self.fragment_id,
176            dispatcher: dispatchers.collect(),
177            vnode_bitmap: self
178                .vnode_bitmap
179                .as_ref()
180                .map(|bitmap| bitmap.to_protobuf()),
181            mview_definition: self.mview_definition.clone(),
182            expr_context: self.expr_context.clone(),
183            config_override: self.config_override.to_string(),
184        }
185    }
186}
187
188#[derive(Clone, Debug, Default)]
189pub struct Fragment {
190    pub fragment_id: FragmentId,
191    pub fragment_type_mask: FragmentTypeMask,
192    pub distribution_type: PbFragmentDistributionType,
193    pub state_table_ids: Vec<TableId>,
194    pub maybe_vnode_count: Option<u32>,
195    pub nodes: StreamNode,
196}
197
198impl Fragment {
199    pub fn to_protobuf(
200        &self,
201        actors: &[StreamActor],
202        upstream_fragments: impl Iterator<Item = FragmentId>,
203        dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
204    ) -> PbFragment {
205        PbFragment {
206            fragment_id: self.fragment_id,
207            fragment_type_mask: self.fragment_type_mask.into(),
208            distribution_type: self.distribution_type as _,
209            actors: actors
210                .iter()
211                .map(|actor| {
212                    actor.to_protobuf(
213                        dispatchers
214                            .and_then(|dispatchers| dispatchers.get(&actor.actor_id))
215                            .into_iter()
216                            .flatten()
217                            .cloned(),
218                    )
219                })
220                .collect(),
221            state_table_ids: self.state_table_ids.clone(),
222            upstream_fragment_ids: upstream_fragments.collect(),
223            maybe_vnode_count: self.maybe_vnode_count,
224            nodes: Some(self.nodes.clone()),
225        }
226    }
227}
228
229impl VnodeCountCompat for Fragment {
230    fn vnode_count_inner(&self) -> VnodeCount {
231        VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
232    }
233}
234
235impl IsSingleton for Fragment {
236    fn is_singleton(&self) -> bool {
237        matches!(self.distribution_type, FragmentDistributionType::Single)
238    }
239}
240
241impl From<fragment::Model> for Fragment {
242    fn from(model: fragment::Model) -> Self {
243        Self {
244            fragment_id: model.fragment_id,
245            fragment_type_mask: FragmentTypeMask::from(model.fragment_type_mask),
246            distribution_type: model.distribution_type.into(),
247            state_table_ids: model.state_table_ids.into_inner(),
248            maybe_vnode_count: VnodeCount::set(model.vnode_count).to_protobuf(),
249            nodes: model.stream_node.to_protobuf(),
250        }
251    }
252}
253
254/// Fragments of a streaming job. Corresponds to [`PbTableFragments`].
255/// (It was previously called `TableFragments` due to historical reasons.)
256///
257/// We store whole fragments in a single column family as follow:
258/// `stream_job_id` => `StreamJobFragments`.
259#[derive(Debug, Clone)]
260pub struct StreamJobFragments {
261    /// The table id.
262    pub stream_job_id: JobId,
263
264    /// The state of the table fragments.
265    pub state: State,
266
267    /// The table fragments.
268    pub fragments: BTreeMap<FragmentId, Fragment>,
269
270    /// The streaming context associated with this stream plan and its fragments
271    pub ctx: StreamContext,
272
273    /// The max parallelism specified when the streaming job was created, i.e., expected vnode count.
274    ///
275    /// The reason for persisting this value is mainly to check if a parallelism change (via `ALTER
276    /// .. SET PARALLELISM`) is valid, so that the behavior can be consistent with the creation of
277    /// the streaming job.
278    ///
279    /// Note that the actual vnode count, denoted by `vnode_count` in `fragments`, may be different
280    /// from this value (see `StreamFragmentGraph.max_parallelism` for more details.). As a result,
281    /// checking the parallelism change with this value can be inaccurate in some cases. However,
282    /// when generating resizing plans, we still take the `vnode_count` of each fragment into account.
283    pub max_parallelism: usize,
284}
285
286#[derive(Debug, Clone, Default)]
287pub struct StreamContext {
288    /// The timezone used to interpret timestamps and dates for conversion
289    pub timezone: Option<String>,
290
291    /// The partial config of this job to override the global config.
292    pub config_override: Arc<str>,
293}
294
295impl StreamContext {
296    pub fn to_protobuf(&self) -> PbStreamContext {
297        PbStreamContext {
298            timezone: self.timezone.clone().unwrap_or("".into()),
299            config_override: self.config_override.to_string(),
300        }
301    }
302
303    pub fn to_expr_context(&self) -> PbExprContext {
304        PbExprContext {
305            // `self.timezone` must always be set; an invalid value is used here for debugging if it's not.
306            time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
307            strict_mode: false,
308        }
309    }
310
311    pub fn from_protobuf(prost: &PbStreamContext) -> Self {
312        Self {
313            timezone: if prost.get_timezone().is_empty() {
314                None
315            } else {
316                Some(prost.get_timezone().clone())
317            },
318            config_override: prost.get_config_override().as_str().into(),
319        }
320    }
321}
322
323#[easy_ext::ext(StreamingJobModelContextExt)]
324impl risingwave_meta_model::streaming_job::Model {
325    pub fn stream_context(&self) -> StreamContext {
326        StreamContext {
327            timezone: self.timezone.clone(),
328            config_override: self.config_override.clone().unwrap_or_default().into(),
329        }
330    }
331}
332
333impl StreamJobFragments {
334    pub fn to_protobuf(
335        &self,
336        fragment_actors: &HashMap<FragmentId, Vec<StreamActor>>,
337        fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
338        fragment_dispatchers: &FragmentActorDispatchers,
339        actor_status: HashMap<ActorId, PbActorStatus>,
340    ) -> PbTableFragments {
341        PbTableFragments {
342            table_id: self.stream_job_id,
343            state: self.state as _,
344            fragments: self
345                .fragments
346                .iter()
347                .map(|(id, fragment)| {
348                    let actors = fragment_actors.get(id).map(|a| a.as_slice()).unwrap_or(&[]);
349                    (
350                        *id,
351                        fragment.to_protobuf(
352                            actors,
353                            fragment_upstreams.get(id).into_iter().flatten().cloned(),
354                            fragment_dispatchers.get(id),
355                        ),
356                    )
357                })
358                .collect(),
359            actor_status,
360            ctx: Some(self.ctx.to_protobuf()),
361            node_label: "".to_owned(),
362            backfill_done: true,
363            max_parallelism: Some(self.max_parallelism as _),
364        }
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371
372    #[test]
373    fn test_stream_context_protobuf_round_trip() {
374        let context = StreamContext {
375            timezone: Some("UTC".to_owned()),
376            config_override: "{\"a\":1}".into(),
377        };
378
379        let prost = context.to_protobuf();
380
381        let round_trip = StreamContext::from_protobuf(&prost);
382        assert_eq!(round_trip.timezone, Some("UTC".to_owned()));
383        assert_eq!(&*round_trip.config_override, "{\"a\":1}");
384    }
385
386    #[test]
387    fn test_stream_context_from_model() {
388        let model = risingwave_meta_model::streaming_job::Model {
389            job_id: 1.into(),
390            job_status: risingwave_meta_model::JobStatus::Created,
391            create_type: risingwave_meta_model::CreateType::Foreground,
392            timezone: Some("Asia/Shanghai".to_owned()),
393            config_override: Some("{\"parallelism\":2}".to_owned()),
394            adaptive_parallelism_strategy: Some("AUTO".to_owned()),
395            parallelism: StreamingParallelism::Adaptive,
396            backfill_parallelism: Some(StreamingParallelism::Adaptive),
397            backfill_adaptive_parallelism_strategy: Some("RATIO(0.25)".to_owned()),
398            backfill_orders: None,
399            max_parallelism: 32,
400            specific_resource_group: None,
401            is_serverless_backfill: false,
402            refresh_interval_sec: None,
403        };
404
405        let context = model.stream_context();
406        assert_eq!(context.timezone, Some("Asia/Shanghai".to_owned()));
407        assert_eq!(&*context.config_override, "{\"parallelism\":2}");
408    }
409}
410
411pub type StreamJobActorsToCreate = HashMap<
412    WorkerId,
413    HashMap<
414        FragmentId,
415        (
416            StreamNode,
417            Vec<StreamActorWithUpDownstreams>,
418            HashSet<SubscriberId>,
419        ),
420    >,
421>;
422
423impl StreamJobFragments {
424    /// Create a new `TableFragments` with state of `Initial`, with other fields empty.
425    pub fn for_test(job_id: JobId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
426        Self::new(
427            job_id,
428            fragments,
429            StreamContext::default(),
430            VirtualNode::COUNT_FOR_TEST,
431        )
432    }
433
434    /// Create a new `TableFragments` with state of `Initial`.
435    pub fn new(
436        stream_job_id: JobId,
437        fragments: BTreeMap<FragmentId, Fragment>,
438        ctx: StreamContext,
439        max_parallelism: usize,
440    ) -> Self {
441        Self {
442            stream_job_id,
443            state: State::Initial,
444            fragments,
445            ctx,
446            max_parallelism,
447        }
448    }
449
450    pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
451        self.fragments.keys().cloned()
452    }
453
454    pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
455        self.fragments.values()
456    }
457
458    /// Returns the table id.
459    pub fn stream_job_id(&self) -> JobId {
460        self.stream_job_id
461    }
462
463    /// Returns the timezone of the table
464    pub fn timezone(&self) -> Option<String> {
465        self.ctx.timezone.clone()
466    }
467
468    /// Returns whether the table fragments is in `Created` state.
469    pub fn is_created(&self) -> bool {
470        self.state == State::Created
471    }
472
473    /// Returns mview fragment ids.
474    #[cfg(test)]
475    pub fn mview_fragment_ids(&self) -> Vec<FragmentId> {
476        self.fragments
477            .values()
478            .filter(move |fragment| {
479                fragment
480                    .fragment_type_mask
481                    .contains(FragmentTypeFlag::Mview)
482            })
483            .map(|fragment| fragment.fragment_id)
484            .collect()
485    }
486
487    /// Returns actor ids that need to be tracked when creating MV.
488    pub fn tracking_progress_actor_ids_impl(
489        fragments: impl IntoIterator<Item = (FragmentTypeMask, impl Iterator<Item = ActorId>)>,
490    ) -> Vec<(ActorId, BackfillUpstreamType)> {
491        let mut actor_ids = vec![];
492        for (fragment_type_mask, actors) in fragments {
493            if fragment_type_mask.contains(FragmentTypeFlag::CdcFilter) {
494                // Note: CDC table job contains a StreamScan fragment (StreamCdcScan node) and a CdcFilter fragment.
495                // We don't track any fragments' progress.
496                return vec![];
497            }
498            if fragment_type_mask.contains_any([
499                FragmentTypeFlag::Values,
500                FragmentTypeFlag::StreamScan,
501                FragmentTypeFlag::SourceScan,
502                FragmentTypeFlag::LocalityProvider,
503            ]) {
504                actor_ids.extend(actors.map(|actor_id| {
505                    (
506                        actor_id,
507                        BackfillUpstreamType::from_fragment_type_mask(fragment_type_mask),
508                    )
509                }));
510            }
511        }
512        actor_ids
513    }
514
515    pub fn root_fragment(&self) -> Option<Fragment> {
516        self.mview_fragment()
517            .or_else(|| self.sink_fragment())
518            .or_else(|| self.source_fragment())
519    }
520
521    /// Returns the fragment with the `Mview` type flag.
522    pub fn mview_fragment(&self) -> Option<Fragment> {
523        self.fragments
524            .values()
525            .find(|fragment| {
526                fragment
527                    .fragment_type_mask
528                    .contains(FragmentTypeFlag::Mview)
529            })
530            .cloned()
531    }
532
533    pub fn source_fragment(&self) -> Option<Fragment> {
534        self.fragments
535            .values()
536            .find(|fragment| {
537                fragment
538                    .fragment_type_mask
539                    .contains(FragmentTypeFlag::Source)
540            })
541            .cloned()
542    }
543
544    pub fn sink_fragment(&self) -> Option<Fragment> {
545        self.fragments
546            .values()
547            .find(|fragment| fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink))
548            .cloned()
549    }
550
551    /// Extract the fragments that include source executors that contains an external stream source,
552    /// grouping by source id.
553    pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
554        let mut source_fragments = HashMap::new();
555
556        for fragment in self.fragments() {
557            {
558                if let Some(source_id) = fragment.nodes.find_stream_source() {
559                    source_fragments
560                        .entry(source_id)
561                        .or_insert(BTreeSet::new())
562                        .insert(fragment.fragment_id as FragmentId);
563                }
564            }
565        }
566        source_fragments
567    }
568
569    pub fn source_backfill_fragments(
570        &self,
571    ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
572        Self::source_backfill_fragments_impl(
573            self.fragments
574                .iter()
575                .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
576        )
577    }
578
579    /// Returns (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`)).
580    ///
581    /// Note: the fragment `source_backfill_fragment_id` may actually have multiple upstream fragments,
582    /// but only one of them is the upstream source fragment, which is what we return.
583    pub fn source_backfill_fragments_impl(
584        fragments: impl Iterator<Item = (FragmentId, &StreamNode)>,
585    ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
586        let mut source_backfill_fragments = HashMap::new();
587
588        for (fragment_id, fragment_node) in fragments {
589            {
590                if let Some((source_id, upstream_source_fragment_id)) =
591                    fragment_node.find_source_backfill()
592                {
593                    source_backfill_fragments
594                        .entry(source_id)
595                        .or_insert(BTreeSet::new())
596                        .insert((fragment_id, upstream_source_fragment_id));
597                }
598            }
599        }
600        source_backfill_fragments
601    }
602
603    /// Find the table job's `Union` fragment.
604    /// Panics if not found.
605    pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
606        let mut union_fragment_id = None;
607        for (fragment_id, fragment) in &self.fragments {
608            {
609                {
610                    visit_stream_node_body(&fragment.nodes, |body| {
611                        if let NodeBody::Union(_) = body {
612                            if let Some(union_fragment_id) = union_fragment_id.as_mut() {
613                                // The union fragment should be unique.
614                                assert_eq!(*union_fragment_id, *fragment_id);
615                            } else {
616                                union_fragment_id = Some(*fragment_id);
617                            }
618                        }
619                    })
620                }
621            }
622        }
623
624        let union_fragment_id =
625            union_fragment_id.expect("fragment of placeholder merger not found");
626
627        (self
628            .fragments
629            .get_mut(&union_fragment_id)
630            .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
631    }
632
633    /// Resolve dependent table
634    fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
635        let table_id = match stream_node.node_body.as_ref() {
636            Some(NodeBody::StreamScan(stream_scan)) => Some(stream_scan.table_id),
637            Some(NodeBody::StreamCdcScan(stream_scan)) => Some(stream_scan.table_id),
638            Some(NodeBody::LocalityProvider(state)) => {
639                Some(state.state_table.as_ref().expect("must have state").id)
640            }
641            _ => None,
642        };
643        if let Some(table_id) = table_id {
644            table_ids.entry(table_id).or_default().add_assign(1);
645        }
646
647        for child in &stream_node.input {
648            Self::resolve_dependent_table(child, table_ids);
649        }
650    }
651
652    pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
653        Self::upstream_table_counts_impl(self.fragments.values().map(|fragment| &fragment.nodes))
654    }
655
656    /// Returns upstream table counts.
657    pub fn upstream_table_counts_impl(
658        fragment_nodes: impl Iterator<Item = &StreamNode>,
659    ) -> HashMap<TableId, usize> {
660        let mut table_ids = HashMap::new();
661        fragment_nodes.for_each(|node| {
662            Self::resolve_dependent_table(node, &mut table_ids);
663        });
664
665        table_ids
666    }
667
668    pub fn mv_table_id(&self) -> Option<TableId> {
669        self.fragments
670            .values()
671            .flat_map(|f| f.state_table_ids.iter().copied())
672            .find(|table_id| self.stream_job_id.is_mv_table_id(*table_id))
673    }
674
675    pub fn collect_tables(fragments: impl Iterator<Item = &Fragment>) -> BTreeMap<TableId, Table> {
676        let mut tables = BTreeMap::new();
677        for fragment in fragments {
678            stream_graph_visitor::visit_stream_node_tables_inner(
679                &mut fragment.nodes.clone(),
680                false,
681                true,
682                |table, _| {
683                    let table_id = table.id;
684                    tables
685                        .try_insert(table_id, table.clone())
686                        .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
687                },
688            );
689        }
690        tables
691    }
692
693    /// Returns the internal table ids without the mview table.
694    pub fn internal_table_ids(&self) -> Vec<TableId> {
695        self.fragments
696            .values()
697            .flat_map(|f| f.state_table_ids.iter().copied())
698            .filter(|&t| !self.stream_job_id.is_mv_table_id(t))
699            .collect_vec()
700    }
701
702    /// Returns all internal table ids including the mview table.
703    pub fn all_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
704        self.fragments
705            .values()
706            .flat_map(|f| f.state_table_ids.clone())
707    }
708}
709
710#[derive(Debug, Display, Clone, Copy, PartialEq, Eq)]
711pub enum BackfillUpstreamType {
712    MView,
713    Values,
714    Source,
715    LocalityProvider,
716}
717
718impl BackfillUpstreamType {
719    pub fn from_fragment_type_mask(mask: FragmentTypeMask) -> Self {
720        let is_mview = mask.contains(FragmentTypeFlag::StreamScan);
721        let is_values = mask.contains(FragmentTypeFlag::Values);
722        let is_source = mask.contains(FragmentTypeFlag::SourceScan);
723        let is_locality_provider = mask.contains(FragmentTypeFlag::LocalityProvider);
724
725        // Note: in theory we can have multiple backfill executors in one fragment, but currently it's not possible.
726        // See <https://github.com/risingwavelabs/risingwave/issues/6236>.
727        debug_assert!(
728            is_mview as u8 + is_values as u8 + is_source as u8 + is_locality_provider as u8 == 1,
729            "a backfill fragment should either be mview, value, source, or locality provider, found {:?}",
730            mask
731        );
732
733        if is_mview {
734            BackfillUpstreamType::MView
735        } else if is_values {
736            BackfillUpstreamType::Values
737        } else if is_source {
738            BackfillUpstreamType::Source
739        } else if is_locality_provider {
740            BackfillUpstreamType::LocalityProvider
741        } else {
742            unreachable!("invalid fragment type mask: {:?}", mask);
743        }
744    }
745}