risingwave_frontend/stream_fragmenter/
parallelism.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZeroU64;
16
17use risingwave_common::session_config::parallelism::{
18    ConfigBackfillParallelism, ConfigParallelism,
19};
20use risingwave_common::system_param::AdaptiveParallelismStrategy;
21use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
22
23use super::GraphJobType;
24
25#[derive(Debug, Clone, PartialEq)]
26pub(crate) struct ResolvedParallelism {
27    pub parallelism: Option<Parallelism>,
28    pub adaptive_strategy: Option<AdaptiveParallelismStrategy>,
29}
30
31fn resolve_global_parallelism(
32    global_streaming_parallelism: ConfigParallelism,
33) -> ConfigParallelism {
34    global_streaming_parallelism
35}
36
37fn resolve_default_parallelism(
38    job_type: Option<GraphJobType>,
39    global_streaming_parallelism: ConfigParallelism,
40) -> ConfigParallelism {
41    match job_type {
42        // Table/source only keep their legacy typed default on the untouched default path.
43        // Once the global parallelism is explicitly set, they inherit that global value.
44        Some(GraphJobType::Table | GraphJobType::Source) => match global_streaming_parallelism {
45            ConfigParallelism::Default => ConfigParallelism::Bounded(NonZeroU64::new(4).unwrap()),
46            other => resolve_global_parallelism(other),
47        },
48        Some(GraphJobType::MaterializedView | GraphJobType::Sink | GraphJobType::Index) | None => {
49            resolve_global_parallelism(global_streaming_parallelism)
50        }
51    }
52}
53
54pub(crate) fn derive_parallelism(
55    job_type: Option<GraphJobType>,
56    specific_type_parallelism: Option<ConfigParallelism>,
57    global_streaming_parallelism: ConfigParallelism,
58) -> ResolvedParallelism {
59    let effective_parallelism =
60        match specific_type_parallelism.unwrap_or(ConfigParallelism::Default) {
61            ConfigParallelism::Default => {
62                resolve_default_parallelism(job_type, global_streaming_parallelism)
63            }
64            other => other,
65        };
66
67    match effective_parallelism {
68        // No explicit session-level override: let meta use the system param
69        // `adaptive_parallelism_strategy` to decide.
70        ConfigParallelism::Default => ResolvedParallelism {
71            parallelism: None,
72            adaptive_strategy: None,
73        },
74        ConfigParallelism::Fixed(n) => ResolvedParallelism {
75            parallelism: Some(Parallelism {
76                parallelism: n.get(),
77            }),
78            adaptive_strategy: None,
79        },
80        ConfigParallelism::Adaptive
81        | ConfigParallelism::Bounded(_)
82        | ConfigParallelism::Ratio(_) => ResolvedParallelism {
83            parallelism: None,
84            adaptive_strategy: effective_parallelism.adaptive_strategy(),
85        },
86    }
87}
88
89pub(crate) fn derive_backfill_parallelism(
90    specific_backfill_parallelism: ConfigBackfillParallelism,
91) -> ResolvedParallelism {
92    match specific_backfill_parallelism {
93        ConfigBackfillParallelism::Default => ResolvedParallelism {
94            parallelism: None,
95            adaptive_strategy: None,
96        },
97        ConfigBackfillParallelism::Fixed(n) => ResolvedParallelism {
98            parallelism: Some(Parallelism {
99                parallelism: n.get(),
100            }),
101            adaptive_strategy: None,
102        },
103        ConfigBackfillParallelism::Adaptive
104        | ConfigBackfillParallelism::Bounded(_)
105        | ConfigBackfillParallelism::Ratio(_) => ResolvedParallelism {
106            parallelism: None,
107            adaptive_strategy: specific_backfill_parallelism.adaptive_strategy(),
108        },
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use std::num::{NonZeroU64, NonZeroUsize};
115
116    use super::*;
117
118    #[test]
119    fn test_none_global_fixed() {
120        let global = ConfigParallelism::Fixed(NonZeroU64::new(4).unwrap());
121        assert_eq!(
122            derive_parallelism(None, None, global)
123                .parallelism
124                .map(|p| p.parallelism),
125            Some(4)
126        );
127    }
128
129    #[test]
130    fn test_none_global_default() {
131        let global = ConfigParallelism::Default;
132        assert_eq!(derive_parallelism(None, None, global).parallelism, None);
133        assert_eq!(
134            derive_parallelism(None, None, global).adaptive_strategy,
135            None
136        );
137    }
138
139    #[test]
140    fn test_none_global_adaptive() {
141        let global = ConfigParallelism::Adaptive;
142        assert_eq!(derive_parallelism(None, None, global).parallelism, None);
143        assert_eq!(
144            derive_parallelism(None, None, global).adaptive_strategy,
145            Some(AdaptiveParallelismStrategy::Auto)
146        );
147    }
148
149    #[test]
150    fn test_default_global_fixed() {
151        let specific = Some(ConfigParallelism::Default);
152        let global = ConfigParallelism::Fixed(NonZeroU64::new(2).unwrap());
153        assert_eq!(
154            derive_parallelism(None, specific, global)
155                .parallelism
156                .map(|p| p.parallelism),
157            Some(2)
158        );
159    }
160
161    #[test]
162    fn test_default_global_default() {
163        let specific = Some(ConfigParallelism::Default);
164        let global = ConfigParallelism::Default;
165        assert_eq!(derive_parallelism(None, specific, global).parallelism, None);
166        assert_eq!(
167            derive_parallelism(None, specific, global).adaptive_strategy,
168            None
169        );
170    }
171
172    #[test]
173    fn test_default_global_adaptive() {
174        let specific = Some(ConfigParallelism::Default);
175        let global = ConfigParallelism::Adaptive;
176        assert_eq!(derive_parallelism(None, specific, global).parallelism, None);
177    }
178
179    #[test]
180    fn test_adaptive_any_global() {
181        let specific = Some(ConfigParallelism::Adaptive);
182        let globals = [
183            ConfigParallelism::Default,
184            ConfigParallelism::Adaptive,
185            ConfigParallelism::Fixed(NonZeroU64::new(8).unwrap()),
186        ];
187
188        for global in globals {
189            assert_eq!(derive_parallelism(None, specific, global).parallelism, None);
190        }
191    }
192
193    #[test]
194    fn test_fixed_override_global() {
195        let specific = Some(ConfigParallelism::Fixed(NonZeroU64::new(6).unwrap()));
196        let globals = [
197            ConfigParallelism::Default,
198            ConfigParallelism::Adaptive,
199            ConfigParallelism::Fixed(NonZeroU64::new(3).unwrap()),
200        ];
201
202        for global in globals {
203            assert_eq!(
204                derive_parallelism(None, specific, global)
205                    .parallelism
206                    .map(|p| p.parallelism),
207                Some(6)
208            );
209        }
210    }
211
212    #[test]
213    fn test_bounded_parallelism_resolves_strategy() {
214        assert_eq!(
215            derive_parallelism(
216                None,
217                Some(ConfigParallelism::Bounded(NonZeroU64::new(4).unwrap())),
218                ConfigParallelism::Adaptive
219            )
220            .adaptive_strategy,
221            Some(AdaptiveParallelismStrategy::Bounded(
222                NonZeroUsize::new(4).unwrap()
223            ))
224        );
225    }
226
227    #[test]
228    fn test_ratio_parallelism_resolves_strategy() {
229        assert_eq!(
230            derive_parallelism(
231                None,
232                Some(ConfigParallelism::Ratio(0.5)),
233                ConfigParallelism::Adaptive
234            )
235            .adaptive_strategy,
236            Some(AdaptiveParallelismStrategy::Ratio(0.5))
237        );
238    }
239
240    #[test]
241    fn test_table_default_uses_legacy_bound_only_with_default_global() {
242        assert_eq!(
243            derive_parallelism(
244                Some(GraphJobType::Table),
245                Some(ConfigParallelism::Default),
246                ConfigParallelism::Default
247            )
248            .adaptive_strategy,
249            Some(AdaptiveParallelismStrategy::Bounded(
250                NonZeroUsize::new(4).unwrap()
251            ))
252        );
253    }
254
255    #[test]
256    fn test_table_default_follows_explicit_global_strategy() {
257        assert_eq!(
258            derive_parallelism(
259                Some(GraphJobType::Table),
260                Some(ConfigParallelism::Default),
261                ConfigParallelism::Ratio(0.5)
262            )
263            .adaptive_strategy,
264            Some(AdaptiveParallelismStrategy::Ratio(0.5))
265        );
266        assert_eq!(
267            derive_parallelism(
268                Some(GraphJobType::Table),
269                Some(ConfigParallelism::Default),
270                ConfigParallelism::Adaptive
271            )
272            .adaptive_strategy,
273            Some(AdaptiveParallelismStrategy::Auto)
274        );
275    }
276
277    #[test]
278    fn test_source_default_uses_legacy_bound_only_with_default_global() {
279        assert_eq!(
280            derive_parallelism(
281                Some(GraphJobType::Source),
282                Some(ConfigParallelism::Default),
283                ConfigParallelism::Default
284            )
285            .adaptive_strategy,
286            Some(AdaptiveParallelismStrategy::Bounded(
287                NonZeroUsize::new(4).unwrap()
288            ))
289        );
290    }
291
292    #[test]
293    fn test_source_default_follows_explicit_global_parallelism() {
294        assert_eq!(
295            derive_parallelism(
296                Some(GraphJobType::Source),
297                Some(ConfigParallelism::Default),
298                ConfigParallelism::Fixed(NonZeroU64::new(7).unwrap())
299            )
300            .parallelism
301            .map(|p| p.parallelism),
302            Some(7)
303        );
304        assert_eq!(
305            derive_parallelism(
306                Some(GraphJobType::Source),
307                Some(ConfigParallelism::Default),
308                ConfigParallelism::Bounded(NonZeroU64::new(5).unwrap())
309            )
310            .adaptive_strategy,
311            Some(AdaptiveParallelismStrategy::Bounded(
312                NonZeroUsize::new(5).unwrap()
313            ))
314        );
315    }
316
317    #[test]
318    fn test_materialized_view_default_follows_global_parallelism() {
319        assert_eq!(
320            derive_parallelism(
321                Some(GraphJobType::MaterializedView),
322                Some(ConfigParallelism::Default),
323                ConfigParallelism::Default
324            )
325            .adaptive_strategy,
326            None
327        );
328        assert_eq!(
329            derive_parallelism(
330                Some(GraphJobType::MaterializedView),
331                Some(ConfigParallelism::Default),
332                ConfigParallelism::Ratio(0.5)
333            )
334            .adaptive_strategy,
335            Some(AdaptiveParallelismStrategy::Ratio(0.5))
336        );
337    }
338
339    #[test]
340    fn test_backfill_parallelism_default_does_not_resolve_to_fixed() {
341        assert_eq!(
342            derive_backfill_parallelism(ConfigBackfillParallelism::Default),
343            ResolvedParallelism {
344                parallelism: None,
345                adaptive_strategy: None,
346            }
347        );
348    }
349
350    #[test]
351    fn test_backfill_parallelism_fixed_preserves_explicit_override() {
352        assert_eq!(
353            derive_backfill_parallelism(ConfigBackfillParallelism::Fixed(
354                NonZeroU64::new(2).unwrap()
355            )),
356            ResolvedParallelism {
357                parallelism: Some(Parallelism { parallelism: 2 }),
358                adaptive_strategy: None,
359            }
360        );
361    }
362}