risingwave_meta/model/
stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
21use risingwave_common::hash::{
22    ActorAlignmentId, IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat,
23};
24use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node_body};
25use risingwave_meta_model::{DispatcherType, SourceId, StreamingParallelism, WorkerId};
26use risingwave_pb::catalog::Table;
27use risingwave_pb::common::{ActorInfo, PbActorLocation};
28use risingwave_pb::meta::table_fragments::actor_status::ActorState;
29use risingwave_pb::meta::table_fragments::fragment::{
30    FragmentDistributionType, PbFragmentDistributionType,
31};
32use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
33use risingwave_pb::meta::table_parallelism::{
34    FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
35    PbParallelism,
36};
37use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
38use risingwave_pb::plan_common::PbExprContext;
39use risingwave_pb::stream_plan::stream_node::NodeBody;
40use risingwave_pb::stream_plan::{
41    DispatchStrategy, Dispatcher, PbDispatchOutputMapping, PbDispatcher, PbStreamActor,
42    PbStreamContext, StreamNode,
43};
44
45use super::{ActorId, FragmentId};
46
47/// The parallelism for a `TableFragments`.
48#[derive(Debug, Copy, Clone, Eq, PartialEq)]
49pub enum TableParallelism {
50    /// This is when the system decides the parallelism, based on the available worker parallelisms.
51    Adaptive,
52    /// We set this when the `TableFragments` parallelism is changed.
53    /// All fragments which are part of the `TableFragment` will have the same parallelism as this.
54    Fixed(usize),
55    /// We set this when the individual parallelisms of the `Fragments`
56    /// can differ within a `TableFragments`.
57    /// This is set for `risectl`, since it has a low-level interface,
58    /// scale individual `Fragments` within `TableFragments`.
59    /// When that happens, the `TableFragments` no longer has a consistent
60    /// parallelism, so we set this to indicate that.
61    Custom,
62}
63
64impl From<PbTableParallelism> for TableParallelism {
65    fn from(value: PbTableParallelism) -> Self {
66        use Parallelism::*;
67        match &value.parallelism {
68            Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
69            Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
70            Some(Custom(_)) => Self::Custom,
71            _ => unreachable!(),
72        }
73    }
74}
75
76impl From<TableParallelism> for PbTableParallelism {
77    fn from(value: TableParallelism) -> Self {
78        use TableParallelism::*;
79
80        let parallelism = match value {
81            Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
82            Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
83                parallelism: n as u32,
84            }),
85            Custom => PbParallelism::Custom(PbCustomParallelism {}),
86        };
87
88        Self {
89            parallelism: Some(parallelism),
90        }
91    }
92}
93
94impl From<StreamingParallelism> for TableParallelism {
95    fn from(value: StreamingParallelism) -> Self {
96        match value {
97            StreamingParallelism::Adaptive => TableParallelism::Adaptive,
98            StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
99            StreamingParallelism::Custom => TableParallelism::Custom,
100        }
101    }
102}
103
104impl From<TableParallelism> for StreamingParallelism {
105    fn from(value: TableParallelism) -> Self {
106        match value {
107            TableParallelism::Adaptive => StreamingParallelism::Adaptive,
108            TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
109            TableParallelism::Custom => StreamingParallelism::Custom,
110        }
111    }
112}
113
114pub type ActorUpstreams = BTreeMap<FragmentId, HashMap<ActorId, ActorInfo>>;
115pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
116pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
117pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
118
119pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
120/// downstream `fragment_id` -> original upstream `fragment_id` -> new upstream `fragment_id`
121pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
122/// The newly added no-shuffle actor dispatcher from upstream fragment to downstream fragment
123/// upstream `fragment_id` -> downstream `fragment_id` -> upstream `actor_id` -> downstream `actor_id`
124pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
125
126#[derive(Debug, Clone)]
127pub struct DownstreamFragmentRelation {
128    pub downstream_fragment_id: FragmentId,
129    pub dispatcher_type: DispatcherType,
130    pub dist_key_indices: Vec<u32>,
131    pub output_mapping: PbDispatchOutputMapping,
132}
133
134impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
135    fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
136        Self {
137            downstream_fragment_id: fragment_id,
138            dispatcher_type: dispatch.get_type().unwrap().into(),
139            dist_key_indices: dispatch.dist_key_indices,
140            output_mapping: dispatch.output_mapping.unwrap(),
141        }
142    }
143}
144
145#[derive(Debug, Clone)]
146pub struct StreamJobFragmentsToCreate {
147    pub inner: StreamJobFragments,
148    pub downstreams: FragmentDownstreamRelation,
149}
150
151impl Deref for StreamJobFragmentsToCreate {
152    type Target = StreamJobFragments;
153
154    fn deref(&self) -> &Self::Target {
155        &self.inner
156    }
157}
158
159#[derive(Clone, Debug)]
160pub struct StreamActor {
161    pub actor_id: u32,
162    pub fragment_id: u32,
163    pub vnode_bitmap: Option<Bitmap>,
164    pub mview_definition: String,
165    pub expr_context: Option<PbExprContext>,
166}
167
168impl StreamActor {
169    fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
170        PbStreamActor {
171            actor_id: self.actor_id,
172            fragment_id: self.fragment_id,
173            dispatcher: dispatchers.collect(),
174            vnode_bitmap: self
175                .vnode_bitmap
176                .as_ref()
177                .map(|bitmap| bitmap.to_protobuf()),
178            mview_definition: self.mview_definition.clone(),
179            expr_context: self.expr_context.clone(),
180        }
181    }
182}
183
184#[derive(Clone, Debug, Default)]
185pub struct Fragment {
186    pub fragment_id: FragmentId,
187    pub fragment_type_mask: FragmentTypeMask,
188    pub distribution_type: PbFragmentDistributionType,
189    pub actors: Vec<StreamActor>,
190    pub state_table_ids: Vec<u32>,
191    pub maybe_vnode_count: Option<u32>,
192    pub nodes: StreamNode,
193}
194
195impl Fragment {
196    pub fn to_protobuf(
197        &self,
198        upstream_fragments: impl Iterator<Item = FragmentId>,
199        dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
200    ) -> PbFragment {
201        PbFragment {
202            fragment_id: self.fragment_id,
203            fragment_type_mask: self.fragment_type_mask.into(),
204            distribution_type: self.distribution_type as _,
205            actors: self
206                .actors
207                .iter()
208                .map(|actor| {
209                    actor.to_protobuf(
210                        dispatchers
211                            .and_then(|dispatchers| dispatchers.get(&(actor.actor_id as _)))
212                            .into_iter()
213                            .flatten()
214                            .cloned(),
215                    )
216                })
217                .collect(),
218            state_table_ids: self.state_table_ids.clone(),
219            upstream_fragment_ids: upstream_fragments.collect(),
220            maybe_vnode_count: self.maybe_vnode_count,
221            nodes: Some(self.nodes.clone()),
222        }
223    }
224}
225
226impl VnodeCountCompat for Fragment {
227    fn vnode_count_inner(&self) -> VnodeCount {
228        VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
229    }
230}
231
232impl IsSingleton for Fragment {
233    fn is_singleton(&self) -> bool {
234        matches!(self.distribution_type, FragmentDistributionType::Single)
235    }
236}
237
238/// Fragments of a streaming job. Corresponds to [`PbTableFragments`].
239/// (It was previously called `TableFragments` due to historical reasons.)
240///
241/// We store whole fragments in a single column family as follow:
242/// `stream_job_id` => `StreamJobFragments`.
243#[derive(Debug, Clone)]
244pub struct StreamJobFragments {
245    /// The table id.
246    pub stream_job_id: TableId,
247
248    /// The state of the table fragments.
249    pub state: State,
250
251    /// The table fragments.
252    pub fragments: BTreeMap<FragmentId, Fragment>,
253
254    /// The status of actors
255    pub actor_status: BTreeMap<ActorId, ActorStatus>,
256
257    /// The streaming context associated with this stream plan and its fragments
258    pub ctx: StreamContext,
259
260    /// The parallelism assigned to this table fragments
261    pub assigned_parallelism: TableParallelism,
262
263    /// The max parallelism specified when the streaming job was created, i.e., expected vnode count.
264    ///
265    /// The reason for persisting this value is mainly to check if a parallelism change (via `ALTER
266    /// .. SET PARALLELISM`) is valid, so that the behavior can be consistent with the creation of
267    /// the streaming job.
268    ///
269    /// Note that the actual vnode count, denoted by `vnode_count` in `fragments`, may be different
270    /// from this value (see `StreamFragmentGraph.max_parallelism` for more details.). As a result,
271    /// checking the parallelism change with this value can be inaccurate in some cases. However,
272    /// when generating resizing plans, we still take the `vnode_count` of each fragment into account.
273    pub max_parallelism: usize,
274}
275
276#[derive(Debug, Clone, Default)]
277pub struct StreamContext {
278    /// The timezone used to interpret timestamps and dates for conversion
279    pub timezone: Option<String>,
280}
281
282impl StreamContext {
283    pub fn to_protobuf(&self) -> PbStreamContext {
284        PbStreamContext {
285            timezone: self.timezone.clone().unwrap_or("".into()),
286        }
287    }
288
289    pub fn to_expr_context(&self) -> PbExprContext {
290        PbExprContext {
291            // `self.timezone` must always be set; an invalid value is used here for debugging if it's not.
292            time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
293            strict_mode: false,
294        }
295    }
296
297    pub fn from_protobuf(prost: &PbStreamContext) -> Self {
298        Self {
299            timezone: if prost.get_timezone().is_empty() {
300                None
301            } else {
302                Some(prost.get_timezone().clone())
303            },
304        }
305    }
306}
307
308impl StreamJobFragments {
309    pub fn to_protobuf(
310        &self,
311        fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
312        fragment_dispatchers: &FragmentActorDispatchers,
313    ) -> PbTableFragments {
314        PbTableFragments {
315            table_id: self.stream_job_id.table_id(),
316            state: self.state as _,
317            fragments: self
318                .fragments
319                .iter()
320                .map(|(id, fragment)| {
321                    (
322                        *id,
323                        fragment.to_protobuf(
324                            fragment_upstreams.get(id).into_iter().flatten().cloned(),
325                            fragment_dispatchers.get(&(*id as _)),
326                        ),
327                    )
328                })
329                .collect(),
330            actor_status: self.actor_status.clone().into_iter().collect(),
331            ctx: Some(self.ctx.to_protobuf()),
332            parallelism: Some(self.assigned_parallelism.into()),
333            node_label: "".to_owned(),
334            backfill_done: true,
335            max_parallelism: Some(self.max_parallelism as _),
336        }
337    }
338}
339
340pub type StreamJobActorsToCreate = HashMap<
341    WorkerId,
342    HashMap<FragmentId, (StreamNode, Vec<StreamActorWithUpDownstreams>, HashSet<u32>)>,
343>;
344
345impl StreamJobFragments {
346    /// Create a new `TableFragments` with state of `Initial`, with other fields empty.
347    pub fn for_test(table_id: TableId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
348        Self::new(
349            table_id,
350            fragments,
351            &BTreeMap::new(),
352            StreamContext::default(),
353            TableParallelism::Adaptive,
354            VirtualNode::COUNT_FOR_TEST,
355        )
356    }
357
358    /// Create a new `TableFragments` with state of `Initial`, with the status of actors set to
359    /// `Inactive` on the given workers.
360    pub fn new(
361        stream_job_id: TableId,
362        fragments: BTreeMap<FragmentId, Fragment>,
363        actor_locations: &BTreeMap<ActorId, ActorAlignmentId>,
364        ctx: StreamContext,
365        table_parallelism: TableParallelism,
366        max_parallelism: usize,
367    ) -> Self {
368        let actor_status = actor_locations
369            .iter()
370            .map(|(&actor_id, alignment_id)| {
371                (
372                    actor_id,
373                    ActorStatus {
374                        location: PbActorLocation::from_worker(alignment_id.worker_id()),
375                        state: ActorState::Inactive as i32,
376                    },
377                )
378            })
379            .collect();
380
381        Self {
382            stream_job_id,
383            state: State::Initial,
384            fragments,
385            actor_status,
386            ctx,
387            assigned_parallelism: table_parallelism,
388            max_parallelism,
389        }
390    }
391
392    pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
393        self.fragments.keys().cloned()
394    }
395
396    pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
397        self.fragments.values()
398    }
399
400    pub fn fragment_actors(&self, fragment_id: FragmentId) -> &[StreamActor] {
401        self.fragments
402            .get(&fragment_id)
403            .map(|f| f.actors.as_slice())
404            .unwrap_or_default()
405    }
406
407    /// Returns the table id.
408    pub fn stream_job_id(&self) -> TableId {
409        self.stream_job_id
410    }
411
412    /// Returns the timezone of the table
413    pub fn timezone(&self) -> Option<String> {
414        self.ctx.timezone.clone()
415    }
416
417    /// Returns whether the table fragments is in `Created` state.
418    pub fn is_created(&self) -> bool {
419        self.state == State::Created
420    }
421
422    /// Update state of all actors
423    pub fn update_actors_state(&mut self, state: ActorState) {
424        for actor_status in self.actor_status.values_mut() {
425            actor_status.set_state(state);
426        }
427    }
428
429    /// Returns actor ids associated with this table.
430    pub fn actor_ids(&self) -> Vec<ActorId> {
431        self.fragments
432            .values()
433            .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
434            .collect()
435    }
436
437    pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
438        self.fragments
439            .values()
440            .flat_map(|fragment| {
441                fragment
442                    .actors
443                    .iter()
444                    .map(|actor| (actor.actor_id, fragment.fragment_id))
445            })
446            .collect()
447    }
448
449    /// Returns actors associated with this table.
450    #[cfg(test)]
451    pub fn actors(&self) -> Vec<StreamActor> {
452        self.fragments
453            .values()
454            .flat_map(|fragment| fragment.actors.clone())
455            .collect()
456    }
457
458    /// Returns mview fragment ids.
459    #[cfg(test)]
460    pub fn mview_fragment_ids(&self) -> Vec<FragmentId> {
461        self.fragments
462            .values()
463            .filter(move |fragment| {
464                fragment
465                    .fragment_type_mask
466                    .contains(FragmentTypeFlag::Mview)
467            })
468            .map(|fragment| fragment.fragment_id)
469            .collect()
470    }
471
472    pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
473        Self::tracking_progress_actor_ids_impl(self.fragments.values().map(|fragment| {
474            (
475                fragment.fragment_type_mask,
476                fragment.actors.iter().map(|actor| actor.actor_id),
477            )
478        }))
479    }
480
481    /// Returns actor ids that need to be tracked when creating MV.
482    pub fn tracking_progress_actor_ids_impl(
483        fragments: impl IntoIterator<Item = (FragmentTypeMask, impl Iterator<Item = ActorId>)>,
484    ) -> Vec<(ActorId, BackfillUpstreamType)> {
485        let mut actor_ids = vec![];
486        for (fragment_type_mask, actors) in fragments {
487            if fragment_type_mask.contains(FragmentTypeFlag::CdcFilter) {
488                // Note: CDC table job contains a StreamScan fragment (StreamCdcScan node) and a CdcFilter fragment.
489                // We don't track any fragments' progress.
490                return vec![];
491            }
492            if fragment_type_mask.contains_any([
493                FragmentTypeFlag::Values,
494                FragmentTypeFlag::StreamScan,
495                FragmentTypeFlag::SourceScan,
496                FragmentTypeFlag::LocalityProvider,
497            ]) {
498                actor_ids.extend(actors.map(|actor_id| {
499                    (
500                        actor_id,
501                        BackfillUpstreamType::from_fragment_type_mask(fragment_type_mask),
502                    )
503                }));
504            }
505        }
506        actor_ids
507    }
508
509    pub fn root_fragment(&self) -> Option<Fragment> {
510        self.mview_fragment()
511            .or_else(|| self.sink_fragment())
512            .or_else(|| self.source_fragment())
513    }
514
515    /// Returns the fragment with the `Mview` type flag.
516    pub fn mview_fragment(&self) -> Option<Fragment> {
517        self.fragments
518            .values()
519            .find(|fragment| {
520                fragment
521                    .fragment_type_mask
522                    .contains(FragmentTypeFlag::Mview)
523            })
524            .cloned()
525    }
526
527    pub fn source_fragment(&self) -> Option<Fragment> {
528        self.fragments
529            .values()
530            .find(|fragment| {
531                fragment
532                    .fragment_type_mask
533                    .contains(FragmentTypeFlag::Source)
534            })
535            .cloned()
536    }
537
538    pub fn sink_fragment(&self) -> Option<Fragment> {
539        self.fragments
540            .values()
541            .find(|fragment| fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink))
542            .cloned()
543    }
544
545    /// Extract the fragments that include source executors that contains an external stream source,
546    /// grouping by source id.
547    pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
548        let mut source_fragments = HashMap::new();
549
550        for fragment in self.fragments() {
551            {
552                if let Some(source_id) = fragment.nodes.find_stream_source() {
553                    source_fragments
554                        .entry(source_id as SourceId)
555                        .or_insert(BTreeSet::new())
556                        .insert(fragment.fragment_id as FragmentId);
557                }
558            }
559        }
560        source_fragments
561    }
562
563    pub fn source_backfill_fragments(
564        &self,
565    ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
566        Self::source_backfill_fragments_impl(
567            self.fragments
568                .iter()
569                .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
570        )
571    }
572
573    /// Returns (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`)).
574    ///
575    /// Note: the fragment `source_backfill_fragment_id` may actually have multiple upstream fragments,
576    /// but only one of them is the upstream source fragment, which is what we return.
577    pub fn source_backfill_fragments_impl(
578        fragments: impl Iterator<Item = (FragmentId, &StreamNode)>,
579    ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
580        let mut source_backfill_fragments = HashMap::new();
581
582        for (fragment_id, fragment_node) in fragments {
583            {
584                if let Some((source_id, upstream_source_fragment_id)) =
585                    fragment_node.find_source_backfill()
586                {
587                    source_backfill_fragments
588                        .entry(source_id as SourceId)
589                        .or_insert(BTreeSet::new())
590                        .insert((fragment_id, upstream_source_fragment_id));
591                }
592            }
593        }
594        source_backfill_fragments
595    }
596
597    /// Find the table job's `Union` fragment.
598    /// Panics if not found.
599    pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
600        let mut union_fragment_id = None;
601        for (fragment_id, fragment) in &self.fragments {
602            {
603                {
604                    visit_stream_node_body(&fragment.nodes, |body| {
605                        if let NodeBody::Union(_) = body {
606                            if let Some(union_fragment_id) = union_fragment_id.as_mut() {
607                                // The union fragment should be unique.
608                                assert_eq!(*union_fragment_id, *fragment_id);
609                            } else {
610                                union_fragment_id = Some(*fragment_id);
611                            }
612                        }
613                    })
614                }
615            }
616        }
617
618        let union_fragment_id =
619            union_fragment_id.expect("fragment of placeholder merger not found");
620
621        (self
622            .fragments
623            .get_mut(&union_fragment_id)
624            .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
625    }
626
627    /// Resolve dependent table
628    fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
629        let table_id = match stream_node.node_body.as_ref() {
630            Some(NodeBody::StreamScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
631            Some(NodeBody::StreamCdcScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
632            _ => None,
633        };
634        if let Some(table_id) = table_id {
635            table_ids.entry(table_id).or_default().add_assign(1);
636        }
637
638        for child in &stream_node.input {
639            Self::resolve_dependent_table(child, table_ids);
640        }
641    }
642
643    pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
644        Self::upstream_table_counts_impl(self.fragments.values().map(|fragment| &fragment.nodes))
645    }
646
647    /// Returns upstream table counts.
648    pub fn upstream_table_counts_impl(
649        fragment_nodes: impl Iterator<Item = &StreamNode>,
650    ) -> HashMap<TableId, usize> {
651        let mut table_ids = HashMap::new();
652        fragment_nodes.for_each(|node| {
653            Self::resolve_dependent_table(node, &mut table_ids);
654        });
655
656        table_ids
657    }
658
659    /// Returns states of actors group by worker id.
660    pub fn worker_actor_states(&self) -> BTreeMap<WorkerId, Vec<(ActorId, ActorState)>> {
661        let mut map = BTreeMap::default();
662        for (&actor_id, actor_status) in &self.actor_status {
663            let node_id = actor_status.worker_id() as WorkerId;
664            map.entry(node_id)
665                .or_insert_with(Vec::new)
666                .push((actor_id, actor_status.state()));
667        }
668        map
669    }
670
671    /// Returns actor locations group by worker id.
672    pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
673        let mut map = BTreeMap::default();
674        for (&actor_id, actor_status) in &self.actor_status {
675            let node_id = actor_status.worker_id() as WorkerId;
676            map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
677        }
678        map
679    }
680
681    /// Returns the status of actors group by worker id.
682    pub fn active_actors(&self) -> Vec<StreamActor> {
683        let mut actors = vec![];
684        for fragment in self.fragments.values() {
685            for actor in &fragment.actors {
686                if self.actor_status[&actor.actor_id].state == ActorState::Inactive as i32 {
687                    continue;
688                }
689                actors.push(actor.clone());
690            }
691        }
692        actors
693    }
694
695    pub fn actors_to_create(
696        &self,
697    ) -> impl Iterator<
698        Item = (
699            FragmentId,
700            &StreamNode,
701            impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
702        ),
703    > + '_ {
704        self.fragments.values().map(move |fragment| {
705            (
706                fragment.fragment_id,
707                &fragment.nodes,
708                fragment.actors.iter().map(move |actor| {
709                    let worker_id = self
710                        .actor_status
711                        .get(&actor.actor_id)
712                        .expect("should exist")
713                        .worker_id() as WorkerId;
714                    (actor, worker_id)
715                }),
716            )
717        })
718    }
719
720    pub fn mv_table_id(&self) -> Option<u32> {
721        if self
722            .fragments
723            .values()
724            .flat_map(|f| f.state_table_ids.iter())
725            .any(|table_id| *table_id == self.stream_job_id.table_id)
726        {
727            Some(self.stream_job_id.table_id)
728        } else {
729            None
730        }
731    }
732
733    pub fn collect_tables(fragments: impl Iterator<Item = &Fragment>) -> BTreeMap<u32, Table> {
734        let mut tables = BTreeMap::new();
735        for fragment in fragments {
736            stream_graph_visitor::visit_stream_node_tables_inner(
737                &mut fragment.nodes.clone(),
738                false,
739                true,
740                |table, _| {
741                    let table_id = table.id;
742                    tables
743                        .try_insert(table_id, table.clone())
744                        .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
745                },
746            );
747        }
748        tables
749    }
750
751    /// Returns the internal table ids without the mview table.
752    pub fn internal_table_ids(&self) -> Vec<u32> {
753        self.fragments
754            .values()
755            .flat_map(|f| f.state_table_ids.clone())
756            .filter(|&t| t != self.stream_job_id.table_id)
757            .collect_vec()
758    }
759
760    /// Returns all internal table ids including the mview table.
761    pub fn all_table_ids(&self) -> impl Iterator<Item = u32> + '_ {
762        self.fragments
763            .values()
764            .flat_map(|f| f.state_table_ids.clone())
765    }
766
767    /// Fill the `expr_context` in `StreamActor`. Used for compatibility.
768    pub fn fill_expr_context(mut self) -> Self {
769        self.fragments.values_mut().for_each(|fragment| {
770            fragment.actors.iter_mut().for_each(|actor| {
771                if actor.expr_context.is_none() {
772                    actor.expr_context = Some(self.ctx.to_expr_context());
773                }
774            });
775        });
776        self
777    }
778}
779
780#[derive(Debug, Clone, Copy, PartialEq, Eq)]
781pub enum BackfillUpstreamType {
782    MView,
783    Values,
784    Source,
785    LocalityProvider,
786}
787
788impl BackfillUpstreamType {
789    pub fn from_fragment_type_mask(mask: FragmentTypeMask) -> Self {
790        let is_mview = mask.contains(FragmentTypeFlag::StreamScan);
791        let is_values = mask.contains(FragmentTypeFlag::Values);
792        let is_source = mask.contains(FragmentTypeFlag::SourceScan);
793        let is_locality_provider = mask.contains(FragmentTypeFlag::LocalityProvider);
794
795        // Note: in theory we can have multiple backfill executors in one fragment, but currently it's not possible.
796        // See <https://github.com/risingwavelabs/risingwave/issues/6236>.
797        debug_assert!(
798            is_mview as u8 + is_values as u8 + is_source as u8 + is_locality_provider as u8 == 1,
799            "a backfill fragment should either be mview, value, source, or locality provider, found {:?}",
800            mask
801        );
802
803        if is_mview {
804            BackfillUpstreamType::MView
805        } else if is_values {
806            BackfillUpstreamType::Values
807        } else if is_source {
808            BackfillUpstreamType::Source
809        } else if is_locality_provider {
810            BackfillUpstreamType::LocalityProvider
811        } else {
812            unreachable!("invalid fragment type mask: {:?}", mask);
813        }
814    }
815}