risingwave_meta/model/
stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
22use risingwave_common::hash::{
23    ActorAlignmentId, IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat,
24};
25use risingwave_common::id::JobId;
26use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node_body};
27use risingwave_meta_model::{DispatcherType, SourceId, StreamingParallelism, WorkerId};
28use risingwave_pb::catalog::Table;
29use risingwave_pb::common::{ActorInfo, PbActorLocation};
30use risingwave_pb::id::SubscriberId;
31use risingwave_pb::meta::table_fragments::fragment::{
32    FragmentDistributionType, PbFragmentDistributionType,
33};
34use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
35use risingwave_pb::meta::table_parallelism::{
36    FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
37    PbParallelism,
38};
39use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
40use risingwave_pb::plan_common::PbExprContext;
41use risingwave_pb::stream_plan::stream_node::NodeBody;
42use risingwave_pb::stream_plan::{
43    DispatchStrategy, Dispatcher, PbDispatchOutputMapping, PbDispatcher, PbStreamActor,
44    PbStreamContext, StreamNode,
45};
46
47use super::{ActorId, FragmentId};
48
49/// The parallelism for a `TableFragments`.
50#[derive(Debug, Copy, Clone, Eq, PartialEq)]
51pub enum TableParallelism {
52    /// This is when the system decides the parallelism, based on the available worker parallelisms.
53    Adaptive,
54    /// We set this when the `TableFragments` parallelism is changed.
55    /// All fragments which are part of the `TableFragment` will have the same parallelism as this.
56    Fixed(usize),
57    /// We set this when the individual parallelisms of the `Fragments`
58    /// can differ within a `TableFragments`.
59    /// This is set for `risectl`, since it has a low-level interface,
60    /// scale individual `Fragments` within `TableFragments`.
61    /// When that happens, the `TableFragments` no longer has a consistent
62    /// parallelism, so we set this to indicate that.
63    Custom,
64}
65
66impl From<PbTableParallelism> for TableParallelism {
67    fn from(value: PbTableParallelism) -> Self {
68        use Parallelism::*;
69        match &value.parallelism {
70            Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
71            Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
72            Some(Custom(_)) => Self::Custom,
73            _ => unreachable!(),
74        }
75    }
76}
77
78impl From<TableParallelism> for PbTableParallelism {
79    fn from(value: TableParallelism) -> Self {
80        use TableParallelism::*;
81
82        let parallelism = match value {
83            Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
84            Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
85                parallelism: n as u32,
86            }),
87            Custom => PbParallelism::Custom(PbCustomParallelism {}),
88        };
89
90        Self {
91            parallelism: Some(parallelism),
92        }
93    }
94}
95
96impl From<StreamingParallelism> for TableParallelism {
97    fn from(value: StreamingParallelism) -> Self {
98        match value {
99            StreamingParallelism::Adaptive => TableParallelism::Adaptive,
100            StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
101            StreamingParallelism::Custom => TableParallelism::Custom,
102        }
103    }
104}
105
106impl From<TableParallelism> for StreamingParallelism {
107    fn from(value: TableParallelism) -> Self {
108        match value {
109            TableParallelism::Adaptive => StreamingParallelism::Adaptive,
110            TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
111            TableParallelism::Custom => StreamingParallelism::Custom,
112        }
113    }
114}
115
116pub type ActorUpstreams = BTreeMap<FragmentId, HashMap<ActorId, ActorInfo>>;
117pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
118pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
119pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
120
121pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
122/// downstream `fragment_id` -> original upstream `fragment_id` -> new upstream `fragment_id`
123pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
124/// The newly added no-shuffle actor dispatcher from upstream fragment to downstream fragment
125/// upstream `fragment_id` -> downstream `fragment_id` -> upstream `actor_id` -> downstream `actor_id`
126pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
127
128#[derive(Debug, Clone)]
129pub struct DownstreamFragmentRelation {
130    pub downstream_fragment_id: FragmentId,
131    pub dispatcher_type: DispatcherType,
132    pub dist_key_indices: Vec<u32>,
133    pub output_mapping: PbDispatchOutputMapping,
134}
135
136impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
137    fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
138        Self {
139            downstream_fragment_id: fragment_id,
140            dispatcher_type: dispatch.get_type().unwrap().into(),
141            dist_key_indices: dispatch.dist_key_indices,
142            output_mapping: dispatch.output_mapping.unwrap(),
143        }
144    }
145}
146
147#[derive(Debug, Clone)]
148pub struct StreamJobFragmentsToCreate {
149    pub inner: StreamJobFragments,
150    pub downstreams: FragmentDownstreamRelation,
151}
152
153impl Deref for StreamJobFragmentsToCreate {
154    type Target = StreamJobFragments;
155
156    fn deref(&self) -> &Self::Target {
157        &self.inner
158    }
159}
160
161#[derive(Clone, Debug)]
162pub struct StreamActor {
163    pub actor_id: ActorId,
164    pub fragment_id: FragmentId,
165    pub vnode_bitmap: Option<Bitmap>,
166    pub mview_definition: String,
167    pub expr_context: Option<PbExprContext>,
168    // TODO: shall we merge `config_override` with `expr_context` to be a `StreamContext`?
169    pub config_override: Arc<str>,
170}
171
172impl StreamActor {
173    fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
174        PbStreamActor {
175            actor_id: self.actor_id,
176            fragment_id: self.fragment_id,
177            dispatcher: dispatchers.collect(),
178            vnode_bitmap: self
179                .vnode_bitmap
180                .as_ref()
181                .map(|bitmap| bitmap.to_protobuf()),
182            mview_definition: self.mview_definition.clone(),
183            expr_context: self.expr_context.clone(),
184            config_override: self.config_override.to_string(),
185        }
186    }
187}
188
189#[derive(Clone, Debug, Default)]
190pub struct Fragment {
191    pub fragment_id: FragmentId,
192    pub fragment_type_mask: FragmentTypeMask,
193    pub distribution_type: PbFragmentDistributionType,
194    pub actors: Vec<StreamActor>,
195    pub state_table_ids: Vec<TableId>,
196    pub maybe_vnode_count: Option<u32>,
197    pub nodes: StreamNode,
198}
199
200impl Fragment {
201    pub fn to_protobuf(
202        &self,
203        upstream_fragments: impl Iterator<Item = FragmentId>,
204        dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
205    ) -> PbFragment {
206        PbFragment {
207            fragment_id: self.fragment_id,
208            fragment_type_mask: self.fragment_type_mask.into(),
209            distribution_type: self.distribution_type as _,
210            actors: self
211                .actors
212                .iter()
213                .map(|actor| {
214                    actor.to_protobuf(
215                        dispatchers
216                            .and_then(|dispatchers| dispatchers.get(&(actor.actor_id as _)))
217                            .into_iter()
218                            .flatten()
219                            .cloned(),
220                    )
221                })
222                .collect(),
223            state_table_ids: self.state_table_ids.clone(),
224            upstream_fragment_ids: upstream_fragments.collect(),
225            maybe_vnode_count: self.maybe_vnode_count,
226            nodes: Some(self.nodes.clone()),
227        }
228    }
229}
230
231impl VnodeCountCompat for Fragment {
232    fn vnode_count_inner(&self) -> VnodeCount {
233        VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
234    }
235}
236
237impl IsSingleton for Fragment {
238    fn is_singleton(&self) -> bool {
239        matches!(self.distribution_type, FragmentDistributionType::Single)
240    }
241}
242
243/// Fragments of a streaming job. Corresponds to [`PbTableFragments`].
244/// (It was previously called `TableFragments` due to historical reasons.)
245///
246/// We store whole fragments in a single column family as follow:
247/// `stream_job_id` => `StreamJobFragments`.
248#[derive(Debug, Clone)]
249pub struct StreamJobFragments {
250    /// The table id.
251    pub stream_job_id: JobId,
252
253    /// The state of the table fragments.
254    pub state: State,
255
256    /// The table fragments.
257    pub fragments: BTreeMap<FragmentId, Fragment>,
258
259    /// The status of actors
260    pub actor_status: BTreeMap<ActorId, ActorStatus>,
261
262    /// The streaming context associated with this stream plan and its fragments
263    pub ctx: StreamContext,
264
265    /// The parallelism assigned to this table fragments
266    pub assigned_parallelism: TableParallelism,
267
268    /// The max parallelism specified when the streaming job was created, i.e., expected vnode count.
269    ///
270    /// The reason for persisting this value is mainly to check if a parallelism change (via `ALTER
271    /// .. SET PARALLELISM`) is valid, so that the behavior can be consistent with the creation of
272    /// the streaming job.
273    ///
274    /// Note that the actual vnode count, denoted by `vnode_count` in `fragments`, may be different
275    /// from this value (see `StreamFragmentGraph.max_parallelism` for more details.). As a result,
276    /// checking the parallelism change with this value can be inaccurate in some cases. However,
277    /// when generating resizing plans, we still take the `vnode_count` of each fragment into account.
278    pub max_parallelism: usize,
279}
280
281#[derive(Debug, Clone, Default)]
282pub struct StreamContext {
283    /// The timezone used to interpret timestamps and dates for conversion
284    pub timezone: Option<String>,
285
286    /// The partial config of this job to override the global config.
287    pub config_override: Arc<str>,
288}
289
290impl StreamContext {
291    pub fn to_protobuf(&self) -> PbStreamContext {
292        PbStreamContext {
293            timezone: self.timezone.clone().unwrap_or("".into()),
294            config_override: self.config_override.to_string(),
295        }
296    }
297
298    pub fn to_expr_context(&self) -> PbExprContext {
299        PbExprContext {
300            // `self.timezone` must always be set; an invalid value is used here for debugging if it's not.
301            time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
302            strict_mode: false,
303        }
304    }
305
306    pub fn from_protobuf(prost: &PbStreamContext) -> Self {
307        Self {
308            timezone: if prost.get_timezone().is_empty() {
309                None
310            } else {
311                Some(prost.get_timezone().clone())
312            },
313            config_override: prost.get_config_override().as_str().into(),
314        }
315    }
316}
317
318#[easy_ext::ext(StreamingJobModelContextExt)]
319impl risingwave_meta_model::streaming_job::Model {
320    pub fn stream_context(&self) -> StreamContext {
321        StreamContext {
322            timezone: self.timezone.clone(),
323            config_override: self.config_override.clone().unwrap_or_default().into(),
324        }
325    }
326}
327
328impl StreamJobFragments {
329    pub fn to_protobuf(
330        &self,
331        fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
332        fragment_dispatchers: &FragmentActorDispatchers,
333    ) -> PbTableFragments {
334        PbTableFragments {
335            table_id: self.stream_job_id,
336            state: self.state as _,
337            fragments: self
338                .fragments
339                .iter()
340                .map(|(id, fragment)| {
341                    (
342                        *id,
343                        fragment.to_protobuf(
344                            fragment_upstreams.get(id).into_iter().flatten().cloned(),
345                            fragment_dispatchers.get(&(*id as _)),
346                        ),
347                    )
348                })
349                .collect(),
350            actor_status: self
351                .actor_status
352                .iter()
353                .map(|(actor_id, status)| (*actor_id, *status))
354                .collect(),
355            ctx: Some(self.ctx.to_protobuf()),
356            parallelism: Some(self.assigned_parallelism.into()),
357            node_label: "".to_owned(),
358            backfill_done: true,
359            max_parallelism: Some(self.max_parallelism as _),
360        }
361    }
362}
363
364pub type StreamJobActorsToCreate = HashMap<
365    WorkerId,
366    HashMap<
367        FragmentId,
368        (
369            StreamNode,
370            Vec<StreamActorWithUpDownstreams>,
371            HashSet<SubscriberId>,
372        ),
373    >,
374>;
375
376impl StreamJobFragments {
377    /// Create a new `TableFragments` with state of `Initial`, with other fields empty.
378    pub fn for_test(job_id: JobId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
379        Self::new(
380            job_id,
381            fragments,
382            &BTreeMap::new(),
383            StreamContext::default(),
384            TableParallelism::Adaptive,
385            VirtualNode::COUNT_FOR_TEST,
386        )
387    }
388
389    /// Create a new `TableFragments` with state of `Initial`, recording actor locations on the given
390    /// workers.
391    pub fn new(
392        stream_job_id: JobId,
393        fragments: BTreeMap<FragmentId, Fragment>,
394        actor_locations: &BTreeMap<ActorId, ActorAlignmentId>,
395        ctx: StreamContext,
396        table_parallelism: TableParallelism,
397        max_parallelism: usize,
398    ) -> Self {
399        let actor_status = actor_locations
400            .iter()
401            .map(|(&actor_id, alignment_id)| {
402                (
403                    actor_id,
404                    ActorStatus {
405                        location: PbActorLocation::from_worker(alignment_id.worker_id()),
406                    },
407                )
408            })
409            .collect();
410
411        Self {
412            stream_job_id,
413            state: State::Initial,
414            fragments,
415            actor_status,
416            ctx,
417            assigned_parallelism: table_parallelism,
418            max_parallelism,
419        }
420    }
421
422    pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
423        self.fragments.keys().cloned()
424    }
425
426    pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
427        self.fragments.values()
428    }
429
430    pub fn fragment_actors(&self, fragment_id: FragmentId) -> &[StreamActor] {
431        self.fragments
432            .get(&fragment_id)
433            .map(|f| f.actors.as_slice())
434            .unwrap_or_default()
435    }
436
437    /// Returns the table id.
438    pub fn stream_job_id(&self) -> JobId {
439        self.stream_job_id
440    }
441
442    /// Returns the timezone of the table
443    pub fn timezone(&self) -> Option<String> {
444        self.ctx.timezone.clone()
445    }
446
447    /// Returns whether the table fragments is in `Created` state.
448    pub fn is_created(&self) -> bool {
449        self.state == State::Created
450    }
451
452    /// Returns actor ids associated with this table.
453    pub fn actor_ids(&self) -> impl Iterator<Item = ActorId> + '_ {
454        self.fragments
455            .values()
456            .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
457    }
458
459    pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
460        self.fragments
461            .values()
462            .flat_map(|fragment| {
463                fragment
464                    .actors
465                    .iter()
466                    .map(|actor| (actor.actor_id, fragment.fragment_id))
467            })
468            .collect()
469    }
470
471    /// Returns actors associated with this table.
472    #[cfg(test)]
473    pub fn actors(&self) -> Vec<StreamActor> {
474        self.fragments
475            .values()
476            .flat_map(|fragment| fragment.actors.clone())
477            .collect()
478    }
479
480    /// Returns mview fragment ids.
481    #[cfg(test)]
482    pub fn mview_fragment_ids(&self) -> Vec<FragmentId> {
483        self.fragments
484            .values()
485            .filter(move |fragment| {
486                fragment
487                    .fragment_type_mask
488                    .contains(FragmentTypeFlag::Mview)
489            })
490            .map(|fragment| fragment.fragment_id)
491            .collect()
492    }
493
494    pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
495        Self::tracking_progress_actor_ids_impl(self.fragments.values().map(|fragment| {
496            (
497                fragment.fragment_type_mask,
498                fragment.actors.iter().map(|actor| actor.actor_id),
499            )
500        }))
501    }
502
503    /// Returns actor ids that need to be tracked when creating MV.
504    pub fn tracking_progress_actor_ids_impl(
505        fragments: impl IntoIterator<Item = (FragmentTypeMask, impl Iterator<Item = ActorId>)>,
506    ) -> Vec<(ActorId, BackfillUpstreamType)> {
507        let mut actor_ids = vec![];
508        for (fragment_type_mask, actors) in fragments {
509            if fragment_type_mask.contains(FragmentTypeFlag::CdcFilter) {
510                // Note: CDC table job contains a StreamScan fragment (StreamCdcScan node) and a CdcFilter fragment.
511                // We don't track any fragments' progress.
512                return vec![];
513            }
514            if fragment_type_mask.contains_any([
515                FragmentTypeFlag::Values,
516                FragmentTypeFlag::StreamScan,
517                FragmentTypeFlag::SourceScan,
518                FragmentTypeFlag::LocalityProvider,
519            ]) {
520                actor_ids.extend(actors.map(|actor_id| {
521                    (
522                        actor_id,
523                        BackfillUpstreamType::from_fragment_type_mask(fragment_type_mask),
524                    )
525                }));
526            }
527        }
528        actor_ids
529    }
530
531    pub fn root_fragment(&self) -> Option<Fragment> {
532        self.mview_fragment()
533            .or_else(|| self.sink_fragment())
534            .or_else(|| self.source_fragment())
535    }
536
537    /// Returns the fragment with the `Mview` type flag.
538    pub fn mview_fragment(&self) -> Option<Fragment> {
539        self.fragments
540            .values()
541            .find(|fragment| {
542                fragment
543                    .fragment_type_mask
544                    .contains(FragmentTypeFlag::Mview)
545            })
546            .cloned()
547    }
548
549    pub fn source_fragment(&self) -> Option<Fragment> {
550        self.fragments
551            .values()
552            .find(|fragment| {
553                fragment
554                    .fragment_type_mask
555                    .contains(FragmentTypeFlag::Source)
556            })
557            .cloned()
558    }
559
560    pub fn sink_fragment(&self) -> Option<Fragment> {
561        self.fragments
562            .values()
563            .find(|fragment| fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink))
564            .cloned()
565    }
566
567    /// Extract the fragments that include source executors that contains an external stream source,
568    /// grouping by source id.
569    pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
570        let mut source_fragments = HashMap::new();
571
572        for fragment in self.fragments() {
573            {
574                if let Some(source_id) = fragment.nodes.find_stream_source() {
575                    source_fragments
576                        .entry(source_id)
577                        .or_insert(BTreeSet::new())
578                        .insert(fragment.fragment_id as FragmentId);
579                }
580            }
581        }
582        source_fragments
583    }
584
585    pub fn source_backfill_fragments(
586        &self,
587    ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
588        Self::source_backfill_fragments_impl(
589            self.fragments
590                .iter()
591                .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
592        )
593    }
594
595    /// Returns (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`)).
596    ///
597    /// Note: the fragment `source_backfill_fragment_id` may actually have multiple upstream fragments,
598    /// but only one of them is the upstream source fragment, which is what we return.
599    pub fn source_backfill_fragments_impl(
600        fragments: impl Iterator<Item = (FragmentId, &StreamNode)>,
601    ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
602        let mut source_backfill_fragments = HashMap::new();
603
604        for (fragment_id, fragment_node) in fragments {
605            {
606                if let Some((source_id, upstream_source_fragment_id)) =
607                    fragment_node.find_source_backfill()
608                {
609                    source_backfill_fragments
610                        .entry(source_id)
611                        .or_insert(BTreeSet::new())
612                        .insert((fragment_id, upstream_source_fragment_id));
613                }
614            }
615        }
616        source_backfill_fragments
617    }
618
619    /// Find the table job's `Union` fragment.
620    /// Panics if not found.
621    pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
622        let mut union_fragment_id = None;
623        for (fragment_id, fragment) in &self.fragments {
624            {
625                {
626                    visit_stream_node_body(&fragment.nodes, |body| {
627                        if let NodeBody::Union(_) = body {
628                            if let Some(union_fragment_id) = union_fragment_id.as_mut() {
629                                // The union fragment should be unique.
630                                assert_eq!(*union_fragment_id, *fragment_id);
631                            } else {
632                                union_fragment_id = Some(*fragment_id);
633                            }
634                        }
635                    })
636                }
637            }
638        }
639
640        let union_fragment_id =
641            union_fragment_id.expect("fragment of placeholder merger not found");
642
643        (self
644            .fragments
645            .get_mut(&union_fragment_id)
646            .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
647    }
648
649    /// Resolve dependent table
650    fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
651        let table_id = match stream_node.node_body.as_ref() {
652            Some(NodeBody::StreamScan(stream_scan)) => Some(stream_scan.table_id),
653            Some(NodeBody::StreamCdcScan(stream_scan)) => Some(stream_scan.table_id),
654            _ => None,
655        };
656        if let Some(table_id) = table_id {
657            table_ids.entry(table_id).or_default().add_assign(1);
658        }
659
660        for child in &stream_node.input {
661            Self::resolve_dependent_table(child, table_ids);
662        }
663    }
664
665    pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
666        Self::upstream_table_counts_impl(self.fragments.values().map(|fragment| &fragment.nodes))
667    }
668
669    /// Returns upstream table counts.
670    pub fn upstream_table_counts_impl(
671        fragment_nodes: impl Iterator<Item = &StreamNode>,
672    ) -> HashMap<TableId, usize> {
673        let mut table_ids = HashMap::new();
674        fragment_nodes.for_each(|node| {
675            Self::resolve_dependent_table(node, &mut table_ids);
676        });
677
678        table_ids
679    }
680
681    /// Returns actor locations group by worker id.
682    pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
683        let mut map = BTreeMap::default();
684        for (&actor_id, actor_status) in &self.actor_status {
685            let node_id = actor_status.worker_id();
686            map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
687        }
688        map
689    }
690
691    pub fn actors_to_create(
692        &self,
693    ) -> impl Iterator<
694        Item = (
695            FragmentId,
696            &StreamNode,
697            impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
698        ),
699    > + '_ {
700        self.fragments.values().map(move |fragment| {
701            (
702                fragment.fragment_id,
703                &fragment.nodes,
704                fragment.actors.iter().map(move |actor| {
705                    let worker_id: WorkerId = self
706                        .actor_status
707                        .get(&actor.actor_id)
708                        .expect("should exist")
709                        .worker_id();
710                    (actor, worker_id)
711                }),
712            )
713        })
714    }
715
716    pub fn mv_table_id(&self) -> Option<TableId> {
717        self.fragments
718            .values()
719            .flat_map(|f| f.state_table_ids.iter().copied())
720            .find(|table_id| self.stream_job_id.is_mv_table_id(*table_id))
721    }
722
723    pub fn collect_tables(fragments: impl Iterator<Item = &Fragment>) -> BTreeMap<TableId, Table> {
724        let mut tables = BTreeMap::new();
725        for fragment in fragments {
726            stream_graph_visitor::visit_stream_node_tables_inner(
727                &mut fragment.nodes.clone(),
728                false,
729                true,
730                |table, _| {
731                    let table_id = table.id;
732                    tables
733                        .try_insert(table_id, table.clone())
734                        .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
735                },
736            );
737        }
738        tables
739    }
740
741    /// Returns the internal table ids without the mview table.
742    pub fn internal_table_ids(&self) -> Vec<TableId> {
743        self.fragments
744            .values()
745            .flat_map(|f| f.state_table_ids.iter().copied())
746            .filter(|&t| !self.stream_job_id.is_mv_table_id(t))
747            .collect_vec()
748    }
749
750    /// Returns all internal table ids including the mview table.
751    pub fn all_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
752        self.fragments
753            .values()
754            .flat_map(|f| f.state_table_ids.clone())
755    }
756
757    /// Fill the `expr_context` in `StreamActor`. Used for compatibility.
758    pub fn fill_expr_context(mut self) -> Self {
759        self.fragments.values_mut().for_each(|fragment| {
760            fragment.actors.iter_mut().for_each(|actor| {
761                if actor.expr_context.is_none() {
762                    actor.expr_context = Some(self.ctx.to_expr_context());
763                }
764            });
765        });
766        self
767    }
768}
769
770#[derive(Debug, Clone, Copy, PartialEq, Eq)]
771pub enum BackfillUpstreamType {
772    MView,
773    Values,
774    Source,
775    LocalityProvider,
776}
777
778impl BackfillUpstreamType {
779    pub fn from_fragment_type_mask(mask: FragmentTypeMask) -> Self {
780        let is_mview = mask.contains(FragmentTypeFlag::StreamScan);
781        let is_values = mask.contains(FragmentTypeFlag::Values);
782        let is_source = mask.contains(FragmentTypeFlag::SourceScan);
783        let is_locality_provider = mask.contains(FragmentTypeFlag::LocalityProvider);
784
785        // Note: in theory we can have multiple backfill executors in one fragment, but currently it's not possible.
786        // See <https://github.com/risingwavelabs/risingwave/issues/6236>.
787        debug_assert!(
788            is_mview as u8 + is_values as u8 + is_source as u8 + is_locality_provider as u8 == 1,
789            "a backfill fragment should either be mview, value, source, or locality provider, found {:?}",
790            mask
791        );
792
793        if is_mview {
794            BackfillUpstreamType::MView
795        } else if is_values {
796            BackfillUpstreamType::Values
797        } else if is_source {
798            BackfillUpstreamType::Source
799        } else if is_locality_provider {
800            BackfillUpstreamType::LocalityProvider
801        } else {
802            unreachable!("invalid fragment type mask: {:?}", mask);
803        }
804    }
805}