risingwave_common/session_config/
parallelism.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::{NonZeroU64, NonZeroUsize, ParseIntError};
16use std::str::FromStr;
17
18use risingwave_common::system_param::adaptive_parallelism_strategy::{
19    AdaptiveParallelismStrategy, ParallelismStrategyParseError, parse_strategy,
20};
21use thiserror::Error;
22
23const KEYWORD_DEFAULT: &str = "default";
24const KEYWORD_ADAPTIVE: &str = "adaptive";
25const KEYWORD_AUTO: &str = "auto";
26const KEYWORD_DEFAULT_STRATEGY: &str = "default";
27
28#[derive(Copy, Debug, Clone, PartialEq, Default)]
29pub enum ConfigParallelism {
30    #[default]
31    Default,
32    Fixed(NonZeroU64),
33    Adaptive,
34    Bounded(NonZeroU64),
35    Ratio(f32),
36}
37
38#[derive(Error, Debug)]
39pub enum ConfigParallelismParseError {
40    #[error("Unsupported parallelism: {0}")]
41    UnsupportedParallelism(String),
42
43    #[error("Parse error: {0}")]
44    ParseIntError(#[from] ParseIntError),
45
46    #[error(transparent)]
47    StrategyParseError(#[from] ParallelismStrategyParseError),
48}
49
50impl FromStr for ConfigParallelism {
51    type Err = ConfigParallelismParseError;
52
53    fn from_str(s: &str) -> Result<Self, Self::Err> {
54        match s.to_ascii_lowercase().as_str() {
55            KEYWORD_DEFAULT => return Ok(ConfigParallelism::Default),
56            KEYWORD_ADAPTIVE | KEYWORD_AUTO => return Ok(ConfigParallelism::Adaptive),
57            _ => {}
58        }
59
60        match parse_strategy(s) {
61            Ok(AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full) => {
62                Ok(ConfigParallelism::Adaptive)
63            }
64            Ok(AdaptiveParallelismStrategy::Bounded(n)) => Ok(ConfigParallelism::Bounded(
65                NonZeroU64::new(n.get() as u64).unwrap(),
66            )),
67            Ok(AdaptiveParallelismStrategy::Ratio(r)) => Ok(ConfigParallelism::Ratio(r)),
68            Err(ParallelismStrategyParseError::UnsupportedStrategy(_)) => {
69                let parsed = s.parse::<u64>()?;
70                if parsed == 0 {
71                    Ok(ConfigParallelism::Adaptive)
72                } else {
73                    Ok(ConfigParallelism::Fixed(NonZeroU64::new(parsed).unwrap()))
74                }
75            }
76            Err(err) => Err(err.into()),
77        }
78    }
79}
80
81impl std::fmt::Display for ConfigParallelism {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        match *self {
84            ConfigParallelism::Adaptive => {
85                write!(f, "{}", KEYWORD_ADAPTIVE)
86            }
87            ConfigParallelism::Default => {
88                write!(f, "{}", KEYWORD_DEFAULT)
89            }
90            ConfigParallelism::Fixed(n) => {
91                write!(f, "{}", n)
92            }
93            ConfigParallelism::Bounded(n) => {
94                write!(f, "bounded({})", n)
95            }
96            ConfigParallelism::Ratio(r) => {
97                write!(f, "ratio({})", r)
98            }
99        }
100    }
101}
102
103impl ConfigParallelism {
104    pub fn adaptive_strategy(self) -> Option<AdaptiveParallelismStrategy> {
105        match self {
106            Self::Default | Self::Fixed(_) => None,
107            Self::Adaptive => Some(AdaptiveParallelismStrategy::Auto),
108            Self::Bounded(n) => Some(AdaptiveParallelismStrategy::Bounded(
109                NonZeroUsize::new(n.get() as usize).unwrap(),
110            )),
111            Self::Ratio(r) => Some(AdaptiveParallelismStrategy::Ratio(r)),
112        }
113    }
114}
115
116#[derive(Copy, Default, Debug, Clone, PartialEq)]
117pub enum ConfigBackfillParallelism {
118    #[default]
119    Default,
120    Fixed(NonZeroU64),
121    Adaptive,
122    Bounded(NonZeroU64),
123    Ratio(f32),
124}
125
126impl FromStr for ConfigBackfillParallelism {
127    type Err = ConfigParallelismParseError;
128
129    fn from_str(s: &str) -> Result<Self, Self::Err> {
130        match s.to_ascii_lowercase().as_str() {
131            KEYWORD_DEFAULT => return Ok(Self::Default),
132            KEYWORD_ADAPTIVE | KEYWORD_AUTO => return Ok(Self::Adaptive),
133            _ => {}
134        }
135
136        match parse_strategy(s) {
137            Ok(AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full) => {
138                Ok(Self::Adaptive)
139            }
140            Ok(AdaptiveParallelismStrategy::Bounded(n)) => {
141                Ok(Self::Bounded(NonZeroU64::new(n.get() as u64).unwrap()))
142            }
143            Ok(AdaptiveParallelismStrategy::Ratio(r)) => Ok(Self::Ratio(r)),
144            Err(ParallelismStrategyParseError::UnsupportedStrategy(_)) => {
145                let parsed = s.parse::<u64>()?;
146                if parsed == 0 {
147                    Ok(Self::Adaptive)
148                } else {
149                    Ok(Self::Fixed(NonZeroU64::new(parsed).unwrap()))
150                }
151            }
152            Err(err) => Err(err.into()),
153        }
154    }
155}
156
157impl std::fmt::Display for ConfigBackfillParallelism {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        match *self {
160            Self::Adaptive => write!(f, "{}", KEYWORD_ADAPTIVE),
161            Self::Default => write!(f, "{}", KEYWORD_DEFAULT),
162            Self::Fixed(n) => write!(f, "{}", n),
163            Self::Bounded(n) => write!(f, "bounded({})", n),
164            Self::Ratio(r) => write!(f, "ratio({})", r),
165        }
166    }
167}
168
169impl ConfigBackfillParallelism {
170    pub fn adaptive_strategy(self) -> Option<AdaptiveParallelismStrategy> {
171        match self {
172            Self::Default | Self::Fixed(_) => None,
173            Self::Adaptive => Some(AdaptiveParallelismStrategy::Auto),
174            Self::Bounded(n) => Some(AdaptiveParallelismStrategy::Bounded(
175                NonZeroUsize::new(n.get() as usize).unwrap(),
176            )),
177            Self::Ratio(r) => Some(AdaptiveParallelismStrategy::Ratio(r)),
178        }
179    }
180}
181
182#[derive(Copy, Default, Debug, Clone, PartialEq)]
183pub enum ConfigAdaptiveParallelismStrategy {
184    #[default]
185    Default,
186    Auto,
187    Full,
188    Bounded(NonZeroU64),
189    Ratio(f32),
190}
191
192impl FromStr for ConfigAdaptiveParallelismStrategy {
193    type Err = ParallelismStrategyParseError;
194
195    fn from_str(s: &str) -> Result<Self, Self::Err> {
196        if s.eq_ignore_ascii_case(KEYWORD_DEFAULT_STRATEGY) {
197            return Ok(Self::Default);
198        }
199
200        let strategy = parse_strategy(s)?;
201        Ok(strategy.into())
202    }
203}
204
205impl From<AdaptiveParallelismStrategy> for ConfigAdaptiveParallelismStrategy {
206    fn from(value: AdaptiveParallelismStrategy) -> Self {
207        match value {
208            AdaptiveParallelismStrategy::Auto => Self::Auto,
209            AdaptiveParallelismStrategy::Full => Self::Full,
210            AdaptiveParallelismStrategy::Bounded(n) => {
211                // Safe to unwrap since `n` is non-zero.
212                Self::Bounded(NonZeroU64::new(n.get() as u64).unwrap())
213            }
214            AdaptiveParallelismStrategy::Ratio(r) => Self::Ratio(r),
215        }
216    }
217}
218
219impl From<ConfigAdaptiveParallelismStrategy> for Option<AdaptiveParallelismStrategy> {
220    fn from(value: ConfigAdaptiveParallelismStrategy) -> Self {
221        match value {
222            ConfigAdaptiveParallelismStrategy::Default => None,
223            ConfigAdaptiveParallelismStrategy::Auto => Some(AdaptiveParallelismStrategy::Auto),
224            ConfigAdaptiveParallelismStrategy::Full => Some(AdaptiveParallelismStrategy::Full),
225            ConfigAdaptiveParallelismStrategy::Bounded(n) => {
226                Some(AdaptiveParallelismStrategy::Bounded(
227                    NonZeroUsize::new(n.get() as usize)
228                        // Bounded strategy requires non-zero; `NonZeroU64` guarantees this.
229                        .unwrap(),
230                ))
231            }
232            ConfigAdaptiveParallelismStrategy::Ratio(r) => {
233                Some(AdaptiveParallelismStrategy::Ratio(r))
234            }
235        }
236    }
237}
238
239impl std::fmt::Display for ConfigAdaptiveParallelismStrategy {
240    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241        match self {
242            ConfigAdaptiveParallelismStrategy::Default => {
243                write!(f, "{}", KEYWORD_DEFAULT_STRATEGY)
244            }
245            ConfigAdaptiveParallelismStrategy::Auto => AdaptiveParallelismStrategy::Auto.fmt(f),
246            ConfigAdaptiveParallelismStrategy::Full => AdaptiveParallelismStrategy::Full.fmt(f),
247            ConfigAdaptiveParallelismStrategy::Bounded(n) => {
248                AdaptiveParallelismStrategy::Bounded(NonZeroUsize::new(n.get() as usize).unwrap())
249                    .fmt(f)
250            }
251            ConfigAdaptiveParallelismStrategy::Ratio(r) => {
252                AdaptiveParallelismStrategy::Ratio(*r).fmt(f)
253            }
254        }
255    }
256}
257
258pub fn migrate_legacy_global_parallelism(
259    parallelism: ConfigParallelism,
260    strategy: ConfigAdaptiveParallelismStrategy,
261    system_strategy: AdaptiveParallelismStrategy,
262) -> ConfigParallelism {
263    match parallelism {
264        ConfigParallelism::Fixed(_)
265        | ConfigParallelism::Bounded(_)
266        | ConfigParallelism::Ratio(_) => parallelism,
267        ConfigParallelism::Default | ConfigParallelism::Adaptive => {
268            legacy_strategy_to_parallelism(resolve_legacy_strategy(strategy, system_strategy))
269        }
270    }
271}
272
273pub fn migrate_legacy_type_parallelism(
274    specific_parallelism: ConfigParallelism,
275    specific_strategy: ConfigAdaptiveParallelismStrategy,
276    global_parallelism: ConfigParallelism,
277    global_strategy: ConfigAdaptiveParallelismStrategy,
278    system_strategy: AdaptiveParallelismStrategy,
279) -> ConfigParallelism {
280    match specific_parallelism {
281        ConfigParallelism::Fixed(_)
282        | ConfigParallelism::Bounded(_)
283        | ConfigParallelism::Ratio(_) => specific_parallelism,
284        ConfigParallelism::Adaptive => legacy_strategy_to_parallelism(resolve_legacy_strategy(
285            specific_strategy,
286            resolve_legacy_strategy(global_strategy, system_strategy),
287        )),
288        ConfigParallelism::Default => {
289            if matches!(
290                specific_strategy,
291                ConfigAdaptiveParallelismStrategy::Default
292            ) || matches!(global_parallelism, ConfigParallelism::Fixed(_))
293            {
294                ConfigParallelism::Default
295            } else {
296                legacy_strategy_to_parallelism(resolve_legacy_strategy(
297                    specific_strategy,
298                    resolve_legacy_strategy(global_strategy, system_strategy),
299                ))
300            }
301        }
302    }
303}
304
305fn resolve_legacy_strategy(
306    strategy: ConfigAdaptiveParallelismStrategy,
307    fallback: AdaptiveParallelismStrategy,
308) -> AdaptiveParallelismStrategy {
309    Option::<AdaptiveParallelismStrategy>::from(strategy).unwrap_or(fallback)
310}
311
312fn legacy_strategy_to_parallelism(strategy: AdaptiveParallelismStrategy) -> ConfigParallelism {
313    match strategy {
314        AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full => {
315            ConfigParallelism::Adaptive
316        }
317        AdaptiveParallelismStrategy::Bounded(n) => {
318            ConfigParallelism::Bounded(NonZeroU64::new(n.get() as u64).unwrap())
319        }
320        AdaptiveParallelismStrategy::Ratio(r) => ConfigParallelism::Ratio(r),
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    #[test]
329    fn test_parallelism_parse_bounded() {
330        let parallelism: ConfigParallelism = "Bounded(4)".parse().unwrap();
331        assert_eq!(
332            parallelism,
333            ConfigParallelism::Bounded(NonZeroU64::new(4).unwrap())
334        );
335    }
336
337    #[test]
338    fn test_parallelism_parse_ratio() {
339        let parallelism: ConfigParallelism = "ratio(0.5)".parse().unwrap();
340        assert_eq!(parallelism, ConfigParallelism::Ratio(0.5));
341    }
342
343    #[test]
344    fn test_parallelism_default_variant() {
345        assert_eq!(ConfigParallelism::default(), ConfigParallelism::Default);
346    }
347
348    #[test]
349    fn test_backfill_parallelism_parse_bounded() {
350        let parallelism: ConfigBackfillParallelism = "Bounded(4)".parse().unwrap();
351        assert_eq!(
352            parallelism,
353            ConfigBackfillParallelism::Bounded(NonZeroU64::new(4).unwrap())
354        );
355        assert_eq!(parallelism.to_string(), "bounded(4)");
356        assert_eq!(
357            parallelism.adaptive_strategy(),
358            Some(AdaptiveParallelismStrategy::Bounded(
359                NonZeroUsize::new(4).unwrap()
360            ))
361        );
362    }
363
364    #[test]
365    fn test_backfill_parallelism_parse_ratio() {
366        let parallelism: ConfigBackfillParallelism = "ratio(0.5)".parse().unwrap();
367        assert_eq!(parallelism, ConfigBackfillParallelism::Ratio(0.5));
368        assert_eq!(parallelism.to_string(), "ratio(0.5)");
369        assert_eq!(
370            parallelism.adaptive_strategy(),
371            Some(AdaptiveParallelismStrategy::Ratio(0.5))
372        );
373    }
374
375    #[test]
376    fn test_strategy_parse_default() {
377        assert_eq!(
378            "default"
379                .parse::<ConfigAdaptiveParallelismStrategy>()
380                .unwrap(),
381            ConfigAdaptiveParallelismStrategy::Default
382        );
383    }
384
385    #[test]
386    fn test_strategy_parse_ratio() {
387        let strategy: ConfigAdaptiveParallelismStrategy = "Ratio(0.5)".parse().unwrap();
388        assert_eq!(strategy, ConfigAdaptiveParallelismStrategy::Ratio(0.5));
389    }
390
391    #[test]
392    fn test_strategy_into_option() {
393        let opt: Option<AdaptiveParallelismStrategy> =
394            ConfigAdaptiveParallelismStrategy::Full.into();
395        assert_eq!(opt, Some(AdaptiveParallelismStrategy::Full));
396    }
397
398    #[test]
399    fn test_migrate_legacy_global_parallelism() {
400        assert_eq!(
401            migrate_legacy_global_parallelism(
402                ConfigParallelism::Default,
403                ConfigAdaptiveParallelismStrategy::Ratio(0.5),
404                AdaptiveParallelismStrategy::Bounded(NonZeroUsize::new(64).unwrap()),
405            ),
406            ConfigParallelism::Ratio(0.5)
407        );
408    }
409
410    #[test]
411    fn test_migrate_legacy_type_parallelism() {
412        assert_eq!(
413            migrate_legacy_type_parallelism(
414                ConfigParallelism::Default,
415                ConfigAdaptiveParallelismStrategy::Bounded(NonZeroU64::new(4).unwrap()),
416                ConfigParallelism::Adaptive,
417                ConfigAdaptiveParallelismStrategy::Default,
418                AdaptiveParallelismStrategy::Auto,
419            ),
420            ConfigParallelism::Bounded(NonZeroU64::new(4).unwrap())
421        );
422        assert_eq!(
423            migrate_legacy_type_parallelism(
424                ConfigParallelism::Default,
425                ConfigAdaptiveParallelismStrategy::Bounded(NonZeroU64::new(4).unwrap()),
426                ConfigParallelism::Fixed(NonZeroU64::new(8).unwrap()),
427                ConfigAdaptiveParallelismStrategy::Default,
428                AdaptiveParallelismStrategy::Auto,
429            ),
430            ConfigParallelism::Default
431        );
432    }
433}