risingwave_frontend/stream_fragmenter/
parallelism.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::session_config::parallelism::{
16    ConfigBackfillParallelism, ConfigParallelism, DEFAULT_GLOBAL_STREAMING_PARALLELISM,
17    DEFAULT_SINK_STREAMING_PARALLELISM, DEFAULT_TABLE_SOURCE_STREAMING_PARALLELISM,
18};
19use risingwave_common::system_param::AdaptiveParallelismStrategy;
20use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
21
22use super::GraphJobType;
23
24#[derive(Debug, Clone, PartialEq)]
25pub(crate) struct ResolvedParallelism {
26    pub parallelism: Option<Parallelism>,
27    pub adaptive_strategy: Option<AdaptiveParallelismStrategy>,
28}
29
30fn resolve_global_parallelism(
31    global_streaming_parallelism: ConfigParallelism,
32) -> ConfigParallelism {
33    match global_streaming_parallelism {
34        ConfigParallelism::Default => DEFAULT_GLOBAL_STREAMING_PARALLELISM,
35        other => other,
36    }
37}
38
39fn resolve_default_parallelism(
40    job_type: Option<GraphJobType>,
41    global_streaming_parallelism: ConfigParallelism,
42) -> ConfigParallelism {
43    match job_type {
44        // Table/source only keep their legacy typed default on the untouched default path.
45        // Once the global parallelism is explicitly set, they inherit that global value.
46        Some(GraphJobType::Table | GraphJobType::Source) => match global_streaming_parallelism {
47            ConfigParallelism::Default => DEFAULT_TABLE_SOURCE_STREAMING_PARALLELISM,
48            other => resolve_global_parallelism(other),
49        },
50        Some(GraphJobType::Sink) => match global_streaming_parallelism {
51            ConfigParallelism::Default => DEFAULT_SINK_STREAMING_PARALLELISM,
52            other => resolve_global_parallelism(other),
53        },
54        Some(GraphJobType::MaterializedView | GraphJobType::Index) | None => {
55            resolve_global_parallelism(global_streaming_parallelism)
56        }
57    }
58}
59
60pub(crate) fn derive_parallelism(
61    job_type: Option<GraphJobType>,
62    specific_type_parallelism: Option<ConfigParallelism>,
63    global_streaming_parallelism: ConfigParallelism,
64) -> ResolvedParallelism {
65    let effective_parallelism =
66        match specific_type_parallelism.unwrap_or(ConfigParallelism::Default) {
67            ConfigParallelism::Default => {
68                resolve_default_parallelism(job_type, global_streaming_parallelism)
69            }
70            other => other,
71        };
72
73    match effective_parallelism {
74        ConfigParallelism::Default => unreachable!("effective streaming parallelism must be set"),
75        ConfigParallelism::Fixed(n) => ResolvedParallelism {
76            parallelism: Some(Parallelism {
77                parallelism: n.get(),
78            }),
79            adaptive_strategy: None,
80        },
81        ConfigParallelism::Adaptive
82        | ConfigParallelism::Bounded(_)
83        | ConfigParallelism::Ratio(_) => ResolvedParallelism {
84            parallelism: None,
85            adaptive_strategy: effective_parallelism.adaptive_strategy(),
86        },
87    }
88}
89
90pub(crate) fn derive_backfill_parallelism(
91    specific_backfill_parallelism: ConfigBackfillParallelism,
92) -> ResolvedParallelism {
93    match specific_backfill_parallelism {
94        ConfigBackfillParallelism::Default => ResolvedParallelism {
95            parallelism: None,
96            adaptive_strategy: None,
97        },
98        ConfigBackfillParallelism::Fixed(n) => ResolvedParallelism {
99            parallelism: Some(Parallelism {
100                parallelism: n.get(),
101            }),
102            adaptive_strategy: None,
103        },
104        ConfigBackfillParallelism::Adaptive
105        | ConfigBackfillParallelism::Bounded(_)
106        | ConfigBackfillParallelism::Ratio(_) => ResolvedParallelism {
107            parallelism: None,
108            adaptive_strategy: specific_backfill_parallelism.adaptive_strategy(),
109        },
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use std::num::{NonZeroU64, NonZeroUsize};
116
117    use super::*;
118
119    #[test]
120    fn test_none_global_fixed() {
121        let global = ConfigParallelism::Fixed(NonZeroU64::new(4).unwrap());
122        assert_eq!(
123            derive_parallelism(None, None, global)
124                .parallelism
125                .map(|p| p.parallelism),
126            Some(4)
127        );
128    }
129
130    #[test]
131    fn test_none_global_default() {
132        let global = ConfigParallelism::Default;
133        assert_eq!(derive_parallelism(None, None, global).parallelism, None);
134        assert_eq!(
135            derive_parallelism(None, None, global).adaptive_strategy,
136            DEFAULT_GLOBAL_STREAMING_PARALLELISM.adaptive_strategy()
137        );
138    }
139
140    #[test]
141    fn test_none_global_adaptive() {
142        let global = ConfigParallelism::Adaptive;
143        assert_eq!(derive_parallelism(None, None, global).parallelism, None);
144        assert_eq!(
145            derive_parallelism(None, None, global).adaptive_strategy,
146            Some(AdaptiveParallelismStrategy::Auto)
147        );
148    }
149
150    #[test]
151    fn test_default_global_fixed() {
152        let specific = Some(ConfigParallelism::Default);
153        let global = ConfigParallelism::Fixed(NonZeroU64::new(2).unwrap());
154        assert_eq!(
155            derive_parallelism(None, specific, global)
156                .parallelism
157                .map(|p| p.parallelism),
158            Some(2)
159        );
160    }
161
162    #[test]
163    fn test_default_global_default() {
164        let specific = Some(ConfigParallelism::Default);
165        let global = ConfigParallelism::Default;
166        assert_eq!(derive_parallelism(None, specific, global).parallelism, None);
167        assert_eq!(
168            derive_parallelism(None, specific, global).adaptive_strategy,
169            DEFAULT_GLOBAL_STREAMING_PARALLELISM.adaptive_strategy()
170        );
171    }
172
173    #[test]
174    fn test_default_global_adaptive() {
175        let specific = Some(ConfigParallelism::Default);
176        let global = ConfigParallelism::Adaptive;
177        assert_eq!(derive_parallelism(None, specific, global).parallelism, None);
178    }
179
180    #[test]
181    fn test_adaptive_any_global() {
182        let specific = Some(ConfigParallelism::Adaptive);
183        let globals = [
184            ConfigParallelism::Default,
185            ConfigParallelism::Adaptive,
186            ConfigParallelism::Fixed(NonZeroU64::new(8).unwrap()),
187        ];
188
189        for global in globals {
190            assert_eq!(derive_parallelism(None, specific, global).parallelism, None);
191        }
192    }
193
194    #[test]
195    fn test_fixed_override_global() {
196        let specific = Some(ConfigParallelism::Fixed(NonZeroU64::new(6).unwrap()));
197        let globals = [
198            ConfigParallelism::Default,
199            ConfigParallelism::Adaptive,
200            ConfigParallelism::Fixed(NonZeroU64::new(3).unwrap()),
201        ];
202
203        for global in globals {
204            assert_eq!(
205                derive_parallelism(None, specific, global)
206                    .parallelism
207                    .map(|p| p.parallelism),
208                Some(6)
209            );
210        }
211    }
212
213    #[test]
214    fn test_bounded_parallelism_resolves_strategy() {
215        assert_eq!(
216            derive_parallelism(
217                None,
218                Some(ConfigParallelism::Bounded(NonZeroU64::new(4).unwrap())),
219                ConfigParallelism::Adaptive
220            )
221            .adaptive_strategy,
222            Some(AdaptiveParallelismStrategy::Bounded(
223                NonZeroUsize::new(4).unwrap()
224            ))
225        );
226    }
227
228    #[test]
229    fn test_ratio_parallelism_resolves_strategy() {
230        assert_eq!(
231            derive_parallelism(
232                None,
233                Some(ConfigParallelism::Ratio(0.5)),
234                ConfigParallelism::Adaptive
235            )
236            .adaptive_strategy,
237            Some(AdaptiveParallelismStrategy::Ratio(0.5))
238        );
239    }
240
241    #[test]
242    fn test_table_default_uses_legacy_bound_only_with_default_global() {
243        assert_eq!(
244            derive_parallelism(
245                Some(GraphJobType::Table),
246                Some(ConfigParallelism::Default),
247                ConfigParallelism::Default
248            )
249            .adaptive_strategy,
250            DEFAULT_TABLE_SOURCE_STREAMING_PARALLELISM.adaptive_strategy()
251        );
252    }
253
254    #[test]
255    fn test_table_default_follows_explicit_global_strategy() {
256        assert_eq!(
257            derive_parallelism(
258                Some(GraphJobType::Table),
259                Some(ConfigParallelism::Default),
260                ConfigParallelism::Ratio(0.5)
261            )
262            .adaptive_strategy,
263            Some(AdaptiveParallelismStrategy::Ratio(0.5))
264        );
265        assert_eq!(
266            derive_parallelism(
267                Some(GraphJobType::Table),
268                Some(ConfigParallelism::Default),
269                ConfigParallelism::Adaptive
270            )
271            .adaptive_strategy,
272            Some(AdaptiveParallelismStrategy::Auto)
273        );
274    }
275
276    #[test]
277    fn test_source_default_uses_legacy_bound_only_with_default_global() {
278        assert_eq!(
279            derive_parallelism(
280                Some(GraphJobType::Source),
281                Some(ConfigParallelism::Default),
282                ConfigParallelism::Default
283            )
284            .adaptive_strategy,
285            DEFAULT_TABLE_SOURCE_STREAMING_PARALLELISM.adaptive_strategy()
286        );
287    }
288
289    #[test]
290    fn test_source_default_follows_explicit_global_parallelism() {
291        assert_eq!(
292            derive_parallelism(
293                Some(GraphJobType::Source),
294                Some(ConfigParallelism::Default),
295                ConfigParallelism::Fixed(NonZeroU64::new(7).unwrap())
296            )
297            .parallelism
298            .map(|p| p.parallelism),
299            Some(7)
300        );
301        assert_eq!(
302            derive_parallelism(
303                Some(GraphJobType::Source),
304                Some(ConfigParallelism::Default),
305                ConfigParallelism::Bounded(NonZeroU64::new(5).unwrap())
306            )
307            .adaptive_strategy,
308            Some(AdaptiveParallelismStrategy::Bounded(
309                NonZeroUsize::new(5).unwrap()
310            ))
311        );
312    }
313
314    #[test]
315    fn test_sink_default_uses_legacy_bound_only_with_default_global() {
316        assert_eq!(
317            derive_parallelism(
318                Some(GraphJobType::Sink),
319                Some(ConfigParallelism::Default),
320                ConfigParallelism::Default
321            )
322            .adaptive_strategy,
323            DEFAULT_SINK_STREAMING_PARALLELISM.adaptive_strategy()
324        );
325    }
326
327    #[test]
328    fn test_sink_default_follows_explicit_global_strategy() {
329        assert_eq!(
330            derive_parallelism(
331                Some(GraphJobType::Sink),
332                Some(ConfigParallelism::Default),
333                ConfigParallelism::Ratio(0.5)
334            )
335            .adaptive_strategy,
336            Some(AdaptiveParallelismStrategy::Ratio(0.5))
337        );
338    }
339
340    #[test]
341    fn test_materialized_view_default_follows_global_parallelism() {
342        assert_eq!(
343            derive_parallelism(
344                Some(GraphJobType::MaterializedView),
345                Some(ConfigParallelism::Default),
346                ConfigParallelism::Default
347            )
348            .adaptive_strategy,
349            Some(AdaptiveParallelismStrategy::Bounded(
350                NonZeroUsize::new(64).unwrap()
351            ))
352        );
353        assert_eq!(
354            derive_parallelism(
355                Some(GraphJobType::MaterializedView),
356                Some(ConfigParallelism::Default),
357                ConfigParallelism::Ratio(0.5)
358            )
359            .adaptive_strategy,
360            Some(AdaptiveParallelismStrategy::Ratio(0.5))
361        );
362    }
363
364    #[test]
365    fn test_backfill_parallelism_adaptive_resolves_strategy() {
366        assert_eq!(
367            derive_backfill_parallelism(ConfigBackfillParallelism::Adaptive),
368            ResolvedParallelism {
369                parallelism: None,
370                adaptive_strategy: Some(AdaptiveParallelismStrategy::Auto),
371            }
372        );
373    }
374
375    #[test]
376    fn test_backfill_parallelism_default_does_not_resolve_to_fixed() {
377        assert_eq!(
378            derive_backfill_parallelism(ConfigBackfillParallelism::Default),
379            ResolvedParallelism {
380                parallelism: None,
381                adaptive_strategy: None,
382            }
383        );
384    }
385
386    #[test]
387    fn test_backfill_parallelism_bounded_resolves_strategy() {
388        assert_eq!(
389            derive_backfill_parallelism(ConfigBackfillParallelism::Bounded(
390                NonZeroU64::new(2).unwrap()
391            )),
392            ResolvedParallelism {
393                parallelism: None,
394                adaptive_strategy: Some(AdaptiveParallelismStrategy::Bounded(
395                    NonZeroUsize::new(2).unwrap()
396                )),
397            }
398        );
399    }
400}