risingwave_meta/barrier/
edge_builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::stream_plan::StreamNode;
21use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
22use tracing::warn;
23
24use crate::controller::fragment::InflightFragmentInfo;
25use crate::controller::utils::compose_dispatchers;
26use crate::model::{
27    ActorId, DownstreamFragmentRelation, FragmentActorDispatchers, FragmentActorUpstreams,
28    FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
29};
30
31#[derive(Debug)]
32struct FragmentInfo {
33    distribution_type: DistributionType,
34    actors: HashMap<ActorId, Option<Bitmap>>,
35}
36
37pub(super) struct FragmentEdgeBuildResult {
38    pub(super) upstreams: HashMap<FragmentId, FragmentActorUpstreams>,
39    pub(super) dispatchers: FragmentActorDispatchers,
40    pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
41}
42
43impl FragmentEdgeBuildResult {
44    pub(super) fn collect_actors_to_create(
45        &mut self,
46        actors: impl Iterator<
47            Item = (
48                FragmentId,
49                &StreamNode,
50                impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
51            ),
52        >,
53    ) -> StreamJobActorsToCreate {
54        let mut actors_to_create = StreamJobActorsToCreate::default();
55        for (fragment_id, node, actors) in actors {
56            for (actor, worker_id) in actors {
57                let upstreams = self
58                    .upstreams
59                    .get_mut(&fragment_id)
60                    .and_then(|upstreams| upstreams.remove(&actor.actor_id))
61                    .unwrap_or_default();
62                let dispatchers = self
63                    .dispatchers
64                    .get_mut(&fragment_id)
65                    .and_then(|upstreams| upstreams.remove(&actor.actor_id))
66                    .unwrap_or_default();
67                actors_to_create
68                    .entry(worker_id)
69                    .or_default()
70                    .entry(fragment_id)
71                    .or_insert_with(|| (node.clone(), vec![]))
72                    .1
73                    .push((actor.clone(), upstreams, dispatchers))
74            }
75        }
76        actors_to_create
77    }
78}
79
80pub(super) struct FragmentEdgeBuilder {
81    fragments: HashMap<FragmentId, FragmentInfo>,
82    result: FragmentEdgeBuildResult,
83}
84
85impl FragmentEdgeBuilder {
86    pub(super) fn new(fragment_infos: impl Iterator<Item = &InflightFragmentInfo>) -> Self {
87        let mut fragments = HashMap::new();
88        for info in fragment_infos {
89            fragments
90                .try_insert(
91                    info.fragment_id,
92                    FragmentInfo {
93                        distribution_type: info.distribution_type,
94                        actors: info
95                            .actors
96                            .iter()
97                            .map(|(actor_id, actor)| (*actor_id, actor.vnode_bitmap.clone()))
98                            .collect(),
99                    },
100                )
101                .expect("non-duplicate");
102        }
103        Self {
104            fragments,
105            result: FragmentEdgeBuildResult {
106                upstreams: Default::default(),
107                dispatchers: Default::default(),
108                merge_updates: Default::default(),
109            },
110        }
111    }
112
113    pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
114        for (fragment_id, relations) in relations {
115            for relation in relations {
116                self.add_edge(*fragment_id, relation);
117            }
118        }
119    }
120
121    fn add_edge(&mut self, fragment_id: FragmentId, downstream: &DownstreamFragmentRelation) {
122        let fragment = &self
123            .fragments
124            .get(&fragment_id)
125            .unwrap_or_else(|| panic!("cannot find {}", fragment_id));
126        let downstream_fragment = &self.fragments[&downstream.downstream_fragment_id];
127        let dispatchers = compose_dispatchers(
128            fragment.distribution_type,
129            &fragment.actors,
130            downstream.downstream_fragment_id,
131            downstream_fragment.distribution_type,
132            &downstream_fragment.actors,
133            downstream.dispatcher_type,
134            downstream.dist_key_indices.clone(),
135            downstream.output_indices.clone(),
136        );
137        let downstream_fragment_upstreams = self
138            .result
139            .upstreams
140            .entry(downstream.downstream_fragment_id)
141            .or_default();
142        for (actor_id, dispatcher) in dispatchers {
143            for downstream_actor in &dispatcher.downstream_actor_id {
144                downstream_fragment_upstreams
145                    .entry(*downstream_actor)
146                    .or_default()
147                    .entry(fragment_id)
148                    .or_default()
149                    .insert(actor_id);
150            }
151            self.result
152                .dispatchers
153                .entry(fragment_id)
154                .or_default()
155                .entry(actor_id)
156                .or_default()
157                .push(dispatcher);
158        }
159    }
160
161    pub(super) fn replace_upstream(
162        &mut self,
163        fragment_id: FragmentId,
164        original_upstream_fragment_id: FragmentId,
165        new_upstream_fragment_id: FragmentId,
166    ) {
167        let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
168        if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
169            fragment_upstreams.retain(|&actor_id, actor_upstreams| {
170                if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
171                    fragment_merge_updates.push(MergeUpdate {
172                        actor_id,
173                        upstream_fragment_id: original_upstream_fragment_id,
174                        new_upstream_fragment_id: Some(new_upstream_fragment_id),
175                        added_upstream_actor_id: new_upstreams.into_iter().collect(),
176                        removed_upstream_actor_id: vec![],
177                    })
178                } else if cfg!(debug_assertions) {
179                    panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
180                } else {
181                    warn!(actor_id, fragment_id, new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
182                }
183                !actor_upstreams.is_empty()
184            })
185        } else if cfg!(debug_assertions) {
186            panic!(
187                "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
188                fragment_id,
189                new_upstream_fragment_id,
190                original_upstream_fragment_id,
191                self.result.upstreams
192            );
193        } else {
194            warn!(fragment_id, new_upstream_fragment_id, original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
195        }
196    }
197
198    pub(super) fn build(self) -> FragmentEdgeBuildResult {
199        self.result
200    }
201}