risingwave_meta/model/
stream.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
22use risingwave_common::hash::{
23    ActorAlignmentId, IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat,
24};
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
28use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node_body};
29use risingwave_meta_model::{DispatcherType, SourceId, StreamingParallelism, WorkerId};
30use risingwave_pb::catalog::Table;
31use risingwave_pb::common::{ActorInfo, PbActorLocation};
32use risingwave_pb::id::SubscriberId;
33use risingwave_pb::meta::table_fragments::fragment::{
34    FragmentDistributionType, PbFragmentDistributionType,
35};
36use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
37use risingwave_pb::meta::table_parallelism::{
38    FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
39    PbParallelism,
40};
41use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
42use risingwave_pb::plan_common::PbExprContext;
43use risingwave_pb::stream_plan::stream_node::NodeBody;
44use risingwave_pb::stream_plan::{
45    DispatchStrategy, Dispatcher, PbDispatchOutputMapping, PbDispatcher, PbStreamActor,
46    PbStreamContext, StreamNode,
47};
48use strum::Display;
49
50use super::{ActorId, FragmentId};
51
52/// The parallelism for a `TableFragments`.
53#[derive(Debug, Copy, Clone, Eq, PartialEq)]
54pub enum TableParallelism {
55    /// This is when the system decides the parallelism, based on the available worker parallelisms.
56    Adaptive,
57    /// We set this when the `TableFragments` parallelism is changed.
58    /// All fragments which are part of the `TableFragment` will have the same parallelism as this.
59    Fixed(usize),
60    /// We set this when the individual parallelisms of the `Fragments`
61    /// can differ within a `TableFragments`.
62    /// This is set for `risectl`, since it has a low-level interface,
63    /// scale individual `Fragments` within `TableFragments`.
64    /// When that happens, the `TableFragments` no longer has a consistent
65    /// parallelism, so we set this to indicate that.
66    Custom,
67}
68
69impl From<PbTableParallelism> for TableParallelism {
70    fn from(value: PbTableParallelism) -> Self {
71        use Parallelism::*;
72        match &value.parallelism {
73            Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
74            Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
75            Some(Custom(_)) => Self::Custom,
76            _ => unreachable!(),
77        }
78    }
79}
80
81impl From<TableParallelism> for PbTableParallelism {
82    fn from(value: TableParallelism) -> Self {
83        use TableParallelism::*;
84
85        let parallelism = match value {
86            Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
87            Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
88                parallelism: n as u32,
89            }),
90            Custom => PbParallelism::Custom(PbCustomParallelism {}),
91        };
92
93        Self {
94            parallelism: Some(parallelism),
95        }
96    }
97}
98
99impl From<StreamingParallelism> for TableParallelism {
100    fn from(value: StreamingParallelism) -> Self {
101        match value {
102            StreamingParallelism::Adaptive => TableParallelism::Adaptive,
103            StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
104            StreamingParallelism::Custom => TableParallelism::Custom,
105        }
106    }
107}
108
109impl From<TableParallelism> for StreamingParallelism {
110    fn from(value: TableParallelism) -> Self {
111        match value {
112            TableParallelism::Adaptive => StreamingParallelism::Adaptive,
113            TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
114            TableParallelism::Custom => StreamingParallelism::Custom,
115        }
116    }
117}
118
119pub type ActorUpstreams = BTreeMap<FragmentId, HashMap<ActorId, ActorInfo>>;
120pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
121pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
122pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
123
124pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
125/// downstream `fragment_id` -> original upstream `fragment_id` -> new upstream `fragment_id`
126pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
127/// The newly added no-shuffle actor dispatcher from upstream fragment to downstream fragment
128/// upstream `fragment_id` -> downstream `fragment_id` -> upstream `actor_id` -> downstream `actor_id`
129pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
130
131#[derive(Debug, Clone)]
132pub struct DownstreamFragmentRelation {
133    pub downstream_fragment_id: FragmentId,
134    pub dispatcher_type: DispatcherType,
135    pub dist_key_indices: Vec<u32>,
136    pub output_mapping: PbDispatchOutputMapping,
137}
138
139impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
140    fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
141        Self {
142            downstream_fragment_id: fragment_id,
143            dispatcher_type: dispatch.get_type().unwrap().into(),
144            dist_key_indices: dispatch.dist_key_indices,
145            output_mapping: dispatch.output_mapping.unwrap(),
146        }
147    }
148}
149
150#[derive(Debug, Clone)]
151pub struct StreamJobFragmentsToCreate {
152    pub inner: StreamJobFragments,
153    pub downstreams: FragmentDownstreamRelation,
154}
155
156impl Deref for StreamJobFragmentsToCreate {
157    type Target = StreamJobFragments;
158
159    fn deref(&self) -> &Self::Target {
160        &self.inner
161    }
162}
163
164#[derive(Clone, Debug)]
165pub struct StreamActor {
166    pub actor_id: ActorId,
167    pub fragment_id: FragmentId,
168    pub vnode_bitmap: Option<Bitmap>,
169    pub mview_definition: String,
170    pub expr_context: Option<PbExprContext>,
171    // TODO: shall we merge `config_override` with `expr_context` to be a `StreamContext`?
172    pub config_override: Arc<str>,
173}
174
175impl StreamActor {
176    fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
177        PbStreamActor {
178            actor_id: self.actor_id,
179            fragment_id: self.fragment_id,
180            dispatcher: dispatchers.collect(),
181            vnode_bitmap: self
182                .vnode_bitmap
183                .as_ref()
184                .map(|bitmap| bitmap.to_protobuf()),
185            mview_definition: self.mview_definition.clone(),
186            expr_context: self.expr_context.clone(),
187            config_override: self.config_override.to_string(),
188        }
189    }
190}
191
192#[derive(Clone, Debug, Default)]
193pub struct Fragment {
194    pub fragment_id: FragmentId,
195    pub fragment_type_mask: FragmentTypeMask,
196    pub distribution_type: PbFragmentDistributionType,
197    pub actors: Vec<StreamActor>,
198    pub state_table_ids: Vec<TableId>,
199    pub maybe_vnode_count: Option<u32>,
200    pub nodes: StreamNode,
201}
202
203impl Fragment {
204    pub fn to_protobuf(
205        &self,
206        upstream_fragments: impl Iterator<Item = FragmentId>,
207        dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
208    ) -> PbFragment {
209        PbFragment {
210            fragment_id: self.fragment_id,
211            fragment_type_mask: self.fragment_type_mask.into(),
212            distribution_type: self.distribution_type as _,
213            actors: self
214                .actors
215                .iter()
216                .map(|actor| {
217                    actor.to_protobuf(
218                        dispatchers
219                            .and_then(|dispatchers| dispatchers.get(&actor.actor_id))
220                            .into_iter()
221                            .flatten()
222                            .cloned(),
223                    )
224                })
225                .collect(),
226            state_table_ids: self.state_table_ids.clone(),
227            upstream_fragment_ids: upstream_fragments.collect(),
228            maybe_vnode_count: self.maybe_vnode_count,
229            nodes: Some(self.nodes.clone()),
230        }
231    }
232}
233
234impl VnodeCountCompat for Fragment {
235    fn vnode_count_inner(&self) -> VnodeCount {
236        VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
237    }
238}
239
240impl IsSingleton for Fragment {
241    fn is_singleton(&self) -> bool {
242        matches!(self.distribution_type, FragmentDistributionType::Single)
243    }
244}
245
246/// Fragments of a streaming job. Corresponds to [`PbTableFragments`].
247/// (It was previously called `TableFragments` due to historical reasons.)
248///
249/// We store whole fragments in a single column family as follow:
250/// `stream_job_id` => `StreamJobFragments`.
251#[derive(Debug, Clone)]
252pub struct StreamJobFragments {
253    /// The table id.
254    pub stream_job_id: JobId,
255
256    /// The state of the table fragments.
257    pub state: State,
258
259    /// The table fragments.
260    pub fragments: BTreeMap<FragmentId, Fragment>,
261
262    /// The status of actors
263    pub actor_status: BTreeMap<ActorId, ActorStatus>,
264
265    /// The streaming context associated with this stream plan and its fragments
266    pub ctx: StreamContext,
267
268    /// The parallelism assigned to this table fragments
269    pub assigned_parallelism: TableParallelism,
270
271    /// The max parallelism specified when the streaming job was created, i.e., expected vnode count.
272    ///
273    /// The reason for persisting this value is mainly to check if a parallelism change (via `ALTER
274    /// .. SET PARALLELISM`) is valid, so that the behavior can be consistent with the creation of
275    /// the streaming job.
276    ///
277    /// Note that the actual vnode count, denoted by `vnode_count` in `fragments`, may be different
278    /// from this value (see `StreamFragmentGraph.max_parallelism` for more details.). As a result,
279    /// checking the parallelism change with this value can be inaccurate in some cases. However,
280    /// when generating resizing plans, we still take the `vnode_count` of each fragment into account.
281    pub max_parallelism: usize,
282}
283
284#[derive(Debug, Clone, Default)]
285pub struct StreamContext {
286    /// The timezone used to interpret timestamps and dates for conversion
287    pub timezone: Option<String>,
288
289    /// The partial config of this job to override the global config.
290    pub config_override: Arc<str>,
291
292    /// The adaptive parallelism strategy for this job if it overrides the system default.
293    pub adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
294}
295
296impl StreamContext {
297    pub fn to_protobuf(&self) -> PbStreamContext {
298        PbStreamContext {
299            timezone: self.timezone.clone().unwrap_or("".into()),
300            config_override: self.config_override.to_string(),
301            adaptive_parallelism_strategy: self
302                .adaptive_parallelism_strategy
303                .as_ref()
304                .map(ToString::to_string)
305                .unwrap_or_default(),
306        }
307    }
308
309    pub fn to_expr_context(&self) -> PbExprContext {
310        PbExprContext {
311            // `self.timezone` must always be set; an invalid value is used here for debugging if it's not.
312            time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
313            strict_mode: false,
314        }
315    }
316
317    pub fn from_protobuf(prost: &PbStreamContext) -> Self {
318        Self {
319            timezone: if prost.get_timezone().is_empty() {
320                None
321            } else {
322                Some(prost.get_timezone().clone())
323            },
324            config_override: prost.get_config_override().as_str().into(),
325            adaptive_parallelism_strategy: if prost.get_adaptive_parallelism_strategy().is_empty() {
326                None
327            } else {
328                Some(
329                    parse_strategy(prost.get_adaptive_parallelism_strategy())
330                        .expect("adaptive parallelism strategy should be validated in frontend"),
331                )
332            },
333        }
334    }
335}
336
337#[easy_ext::ext(StreamingJobModelContextExt)]
338impl risingwave_meta_model::streaming_job::Model {
339    pub fn stream_context(&self) -> StreamContext {
340        StreamContext {
341            timezone: self.timezone.clone(),
342            config_override: self.config_override.clone().unwrap_or_default().into(),
343            adaptive_parallelism_strategy: self.adaptive_parallelism_strategy.as_deref().map(|s| {
344                parse_strategy(s).expect("strategy should be validated before persisting")
345            }),
346        }
347    }
348}
349
350impl StreamJobFragments {
351    pub fn to_protobuf(
352        &self,
353        fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
354        fragment_dispatchers: &FragmentActorDispatchers,
355    ) -> PbTableFragments {
356        PbTableFragments {
357            table_id: self.stream_job_id,
358            state: self.state as _,
359            fragments: self
360                .fragments
361                .iter()
362                .map(|(id, fragment)| {
363                    (
364                        *id,
365                        fragment.to_protobuf(
366                            fragment_upstreams.get(id).into_iter().flatten().cloned(),
367                            fragment_dispatchers.get(id),
368                        ),
369                    )
370                })
371                .collect(),
372            actor_status: self
373                .actor_status
374                .iter()
375                .map(|(actor_id, status)| (*actor_id, *status))
376                .collect(),
377            ctx: Some(self.ctx.to_protobuf()),
378            parallelism: Some(self.assigned_parallelism.into()),
379            node_label: "".to_owned(),
380            backfill_done: true,
381            max_parallelism: Some(self.max_parallelism as _),
382        }
383    }
384}
385
386pub type StreamJobActorsToCreate = HashMap<
387    WorkerId,
388    HashMap<
389        FragmentId,
390        (
391            StreamNode,
392            Vec<StreamActorWithUpDownstreams>,
393            HashSet<SubscriberId>,
394        ),
395    >,
396>;
397
398impl StreamJobFragments {
399    /// Create a new `TableFragments` with state of `Initial`, with other fields empty.
400    pub fn for_test(job_id: JobId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
401        Self::new(
402            job_id,
403            fragments,
404            &BTreeMap::new(),
405            StreamContext::default(),
406            TableParallelism::Adaptive,
407            VirtualNode::COUNT_FOR_TEST,
408        )
409    }
410
411    /// Create a new `TableFragments` with state of `Initial`, recording actor locations on the given
412    /// workers.
413    pub fn new(
414        stream_job_id: JobId,
415        fragments: BTreeMap<FragmentId, Fragment>,
416        actor_locations: &BTreeMap<ActorId, ActorAlignmentId>,
417        ctx: StreamContext,
418        table_parallelism: TableParallelism,
419        max_parallelism: usize,
420    ) -> Self {
421        let actor_status = actor_locations
422            .iter()
423            .map(|(&actor_id, alignment_id)| {
424                (
425                    actor_id,
426                    ActorStatus {
427                        location: PbActorLocation::from_worker(alignment_id.worker_id()),
428                    },
429                )
430            })
431            .collect();
432
433        Self {
434            stream_job_id,
435            state: State::Initial,
436            fragments,
437            actor_status,
438            ctx,
439            assigned_parallelism: table_parallelism,
440            max_parallelism,
441        }
442    }
443
444    pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
445        self.fragments.keys().cloned()
446    }
447
448    pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
449        self.fragments.values()
450    }
451
452    pub fn fragment_actors(&self, fragment_id: FragmentId) -> &[StreamActor] {
453        self.fragments
454            .get(&fragment_id)
455            .map(|f| f.actors.as_slice())
456            .unwrap_or_default()
457    }
458
459    /// Returns the table id.
460    pub fn stream_job_id(&self) -> JobId {
461        self.stream_job_id
462    }
463
464    /// Returns the timezone of the table
465    pub fn timezone(&self) -> Option<String> {
466        self.ctx.timezone.clone()
467    }
468
469    /// Returns whether the table fragments is in `Created` state.
470    pub fn is_created(&self) -> bool {
471        self.state == State::Created
472    }
473
474    /// Returns actor ids associated with this table.
475    pub fn actor_ids(&self) -> impl Iterator<Item = ActorId> + '_ {
476        self.fragments
477            .values()
478            .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
479    }
480
481    pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
482        self.fragments
483            .values()
484            .flat_map(|fragment| {
485                fragment
486                    .actors
487                    .iter()
488                    .map(|actor| (actor.actor_id, fragment.fragment_id))
489            })
490            .collect()
491    }
492
493    /// Returns actors associated with this table.
494    #[cfg(test)]
495    pub fn actors(&self) -> Vec<StreamActor> {
496        self.fragments
497            .values()
498            .flat_map(|fragment| fragment.actors.clone())
499            .collect()
500    }
501
502    /// Returns mview fragment ids.
503    #[cfg(test)]
504    pub fn mview_fragment_ids(&self) -> Vec<FragmentId> {
505        self.fragments
506            .values()
507            .filter(move |fragment| {
508                fragment
509                    .fragment_type_mask
510                    .contains(FragmentTypeFlag::Mview)
511            })
512            .map(|fragment| fragment.fragment_id)
513            .collect()
514    }
515
516    pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
517        Self::tracking_progress_actor_ids_impl(self.fragments.values().map(|fragment| {
518            (
519                fragment.fragment_type_mask,
520                fragment.actors.iter().map(|actor| actor.actor_id),
521            )
522        }))
523    }
524
525    /// Returns actor ids that need to be tracked when creating MV.
526    pub fn tracking_progress_actor_ids_impl(
527        fragments: impl IntoIterator<Item = (FragmentTypeMask, impl Iterator<Item = ActorId>)>,
528    ) -> Vec<(ActorId, BackfillUpstreamType)> {
529        let mut actor_ids = vec![];
530        for (fragment_type_mask, actors) in fragments {
531            if fragment_type_mask.contains(FragmentTypeFlag::CdcFilter) {
532                // Note: CDC table job contains a StreamScan fragment (StreamCdcScan node) and a CdcFilter fragment.
533                // We don't track any fragments' progress.
534                return vec![];
535            }
536            if fragment_type_mask.contains_any([
537                FragmentTypeFlag::Values,
538                FragmentTypeFlag::StreamScan,
539                FragmentTypeFlag::SourceScan,
540                FragmentTypeFlag::LocalityProvider,
541            ]) {
542                actor_ids.extend(actors.map(|actor_id| {
543                    (
544                        actor_id,
545                        BackfillUpstreamType::from_fragment_type_mask(fragment_type_mask),
546                    )
547                }));
548            }
549        }
550        actor_ids
551    }
552
553    pub fn root_fragment(&self) -> Option<Fragment> {
554        self.mview_fragment()
555            .or_else(|| self.sink_fragment())
556            .or_else(|| self.source_fragment())
557    }
558
559    /// Returns the fragment with the `Mview` type flag.
560    pub fn mview_fragment(&self) -> Option<Fragment> {
561        self.fragments
562            .values()
563            .find(|fragment| {
564                fragment
565                    .fragment_type_mask
566                    .contains(FragmentTypeFlag::Mview)
567            })
568            .cloned()
569    }
570
571    pub fn source_fragment(&self) -> Option<Fragment> {
572        self.fragments
573            .values()
574            .find(|fragment| {
575                fragment
576                    .fragment_type_mask
577                    .contains(FragmentTypeFlag::Source)
578            })
579            .cloned()
580    }
581
582    pub fn sink_fragment(&self) -> Option<Fragment> {
583        self.fragments
584            .values()
585            .find(|fragment| fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink))
586            .cloned()
587    }
588
589    /// Extract the fragments that include source executors that contains an external stream source,
590    /// grouping by source id.
591    pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
592        let mut source_fragments = HashMap::new();
593
594        for fragment in self.fragments() {
595            {
596                if let Some(source_id) = fragment.nodes.find_stream_source() {
597                    source_fragments
598                        .entry(source_id)
599                        .or_insert(BTreeSet::new())
600                        .insert(fragment.fragment_id as FragmentId);
601                }
602            }
603        }
604        source_fragments
605    }
606
607    pub fn source_backfill_fragments(
608        &self,
609    ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
610        Self::source_backfill_fragments_impl(
611            self.fragments
612                .iter()
613                .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
614        )
615    }
616
617    /// Returns (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`)).
618    ///
619    /// Note: the fragment `source_backfill_fragment_id` may actually have multiple upstream fragments,
620    /// but only one of them is the upstream source fragment, which is what we return.
621    pub fn source_backfill_fragments_impl(
622        fragments: impl Iterator<Item = (FragmentId, &StreamNode)>,
623    ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
624        let mut source_backfill_fragments = HashMap::new();
625
626        for (fragment_id, fragment_node) in fragments {
627            {
628                if let Some((source_id, upstream_source_fragment_id)) =
629                    fragment_node.find_source_backfill()
630                {
631                    source_backfill_fragments
632                        .entry(source_id)
633                        .or_insert(BTreeSet::new())
634                        .insert((fragment_id, upstream_source_fragment_id));
635                }
636            }
637        }
638        source_backfill_fragments
639    }
640
641    /// Find the table job's `Union` fragment.
642    /// Panics if not found.
643    pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
644        let mut union_fragment_id = None;
645        for (fragment_id, fragment) in &self.fragments {
646            {
647                {
648                    visit_stream_node_body(&fragment.nodes, |body| {
649                        if let NodeBody::Union(_) = body {
650                            if let Some(union_fragment_id) = union_fragment_id.as_mut() {
651                                // The union fragment should be unique.
652                                assert_eq!(*union_fragment_id, *fragment_id);
653                            } else {
654                                union_fragment_id = Some(*fragment_id);
655                            }
656                        }
657                    })
658                }
659            }
660        }
661
662        let union_fragment_id =
663            union_fragment_id.expect("fragment of placeholder merger not found");
664
665        (self
666            .fragments
667            .get_mut(&union_fragment_id)
668            .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
669    }
670
671    /// Resolve dependent table
672    fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
673        let table_id = match stream_node.node_body.as_ref() {
674            Some(NodeBody::StreamScan(stream_scan)) => Some(stream_scan.table_id),
675            Some(NodeBody::StreamCdcScan(stream_scan)) => Some(stream_scan.table_id),
676            Some(NodeBody::LocalityProvider(state)) => {
677                Some(state.state_table.as_ref().expect("must have state").id)
678            }
679            _ => None,
680        };
681        if let Some(table_id) = table_id {
682            table_ids.entry(table_id).or_default().add_assign(1);
683        }
684
685        for child in &stream_node.input {
686            Self::resolve_dependent_table(child, table_ids);
687        }
688    }
689
690    pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
691        Self::upstream_table_counts_impl(self.fragments.values().map(|fragment| &fragment.nodes))
692    }
693
694    /// Returns upstream table counts.
695    pub fn upstream_table_counts_impl(
696        fragment_nodes: impl Iterator<Item = &StreamNode>,
697    ) -> HashMap<TableId, usize> {
698        let mut table_ids = HashMap::new();
699        fragment_nodes.for_each(|node| {
700            Self::resolve_dependent_table(node, &mut table_ids);
701        });
702
703        table_ids
704    }
705
706    /// Returns actor locations group by worker id.
707    pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
708        let mut map = BTreeMap::default();
709        for (&actor_id, actor_status) in &self.actor_status {
710            let node_id = actor_status.worker_id();
711            map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
712        }
713        map
714    }
715
716    pub fn actors_to_create(
717        &self,
718    ) -> impl Iterator<
719        Item = (
720            FragmentId,
721            &StreamNode,
722            impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
723        ),
724    > + '_ {
725        self.fragments.values().map(move |fragment| {
726            (
727                fragment.fragment_id,
728                &fragment.nodes,
729                fragment.actors.iter().map(move |actor| {
730                    let worker_id: WorkerId = self
731                        .actor_status
732                        .get(&actor.actor_id)
733                        .expect("should exist")
734                        .worker_id();
735                    (actor, worker_id)
736                }),
737            )
738        })
739    }
740
741    pub fn mv_table_id(&self) -> Option<TableId> {
742        self.fragments
743            .values()
744            .flat_map(|f| f.state_table_ids.iter().copied())
745            .find(|table_id| self.stream_job_id.is_mv_table_id(*table_id))
746    }
747
748    pub fn collect_tables(fragments: impl Iterator<Item = &Fragment>) -> BTreeMap<TableId, Table> {
749        let mut tables = BTreeMap::new();
750        for fragment in fragments {
751            stream_graph_visitor::visit_stream_node_tables_inner(
752                &mut fragment.nodes.clone(),
753                false,
754                true,
755                |table, _| {
756                    let table_id = table.id;
757                    tables
758                        .try_insert(table_id, table.clone())
759                        .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
760                },
761            );
762        }
763        tables
764    }
765
766    /// Returns the internal table ids without the mview table.
767    pub fn internal_table_ids(&self) -> Vec<TableId> {
768        self.fragments
769            .values()
770            .flat_map(|f| f.state_table_ids.iter().copied())
771            .filter(|&t| !self.stream_job_id.is_mv_table_id(t))
772            .collect_vec()
773    }
774
775    /// Returns all internal table ids including the mview table.
776    pub fn all_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
777        self.fragments
778            .values()
779            .flat_map(|f| f.state_table_ids.clone())
780    }
781
782    /// Fill the `expr_context` in `StreamActor`. Used for compatibility.
783    pub fn fill_expr_context(mut self) -> Self {
784        self.fragments.values_mut().for_each(|fragment| {
785            fragment.actors.iter_mut().for_each(|actor| {
786                if actor.expr_context.is_none() {
787                    actor.expr_context = Some(self.ctx.to_expr_context());
788                }
789            });
790        });
791        self
792    }
793}
794
795#[derive(Debug, Display, Clone, Copy, PartialEq, Eq)]
796pub enum BackfillUpstreamType {
797    MView,
798    Values,
799    Source,
800    LocalityProvider,
801}
802
803impl BackfillUpstreamType {
804    pub fn from_fragment_type_mask(mask: FragmentTypeMask) -> Self {
805        let is_mview = mask.contains(FragmentTypeFlag::StreamScan);
806        let is_values = mask.contains(FragmentTypeFlag::Values);
807        let is_source = mask.contains(FragmentTypeFlag::SourceScan);
808        let is_locality_provider = mask.contains(FragmentTypeFlag::LocalityProvider);
809
810        // Note: in theory we can have multiple backfill executors in one fragment, but currently it's not possible.
811        // See <https://github.com/risingwavelabs/risingwave/issues/6236>.
812        debug_assert!(
813            is_mview as u8 + is_values as u8 + is_source as u8 + is_locality_provider as u8 == 1,
814            "a backfill fragment should either be mview, value, source, or locality provider, found {:?}",
815            mask
816        );
817
818        if is_mview {
819            BackfillUpstreamType::MView
820        } else if is_values {
821            BackfillUpstreamType::Values
822        } else if is_source {
823            BackfillUpstreamType::Source
824        } else if is_locality_provider {
825            BackfillUpstreamType::LocalityProvider
826        } else {
827            unreachable!("invalid fragment type mask: {:?}", mask);
828        }
829    }
830}