risingwave_common/session_config/
parallelism.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::{NonZeroU64, NonZeroUsize, ParseIntError};
16use std::str::FromStr;
17
18use risingwave_common::system_param::adaptive_parallelism_strategy::{
19    AdaptiveParallelismStrategy, ParallelismStrategyParseError, parse_strategy,
20};
21use thiserror::Error;
22
23const KEYWORD_DEFAULT: &str = "default";
24const KEYWORD_ADAPTIVE: &str = "adaptive";
25const KEYWORD_AUTO: &str = "auto";
26const KEYWORD_DEFAULT_STRATEGY: &str = "default";
27
28const fn non_zero_u64(parallelism: u64) -> NonZeroU64 {
29    match NonZeroU64::new(parallelism) {
30        Some(parallelism) => parallelism,
31        None => panic!("parallelism must be non-zero"),
32    }
33}
34
35const fn bounded_parallelism(parallelism: u64) -> ConfigParallelism {
36    ConfigParallelism::Bounded(non_zero_u64(parallelism))
37}
38
39pub const DEFAULT_GLOBAL_STREAMING_PARALLELISM: ConfigParallelism = bounded_parallelism(64);
40pub const DEFAULT_TABLE_SOURCE_STREAMING_PARALLELISM: ConfigParallelism = bounded_parallelism(4);
41pub const DEFAULT_SINK_STREAMING_PARALLELISM: ConfigParallelism = bounded_parallelism(8);
42
43#[derive(Copy, Debug, Clone, PartialEq, Default)]
44pub enum ConfigParallelism {
45    #[default]
46    Default,
47    Fixed(NonZeroU64),
48    Adaptive,
49    Bounded(NonZeroU64),
50    Ratio(f32),
51}
52
53#[derive(Error, Debug)]
54pub enum ConfigParallelismParseError {
55    #[error("Unsupported parallelism: {0}")]
56    UnsupportedParallelism(String),
57
58    #[error("Parse error: {0}")]
59    ParseIntError(#[from] ParseIntError),
60
61    #[error(transparent)]
62    StrategyParseError(#[from] ParallelismStrategyParseError),
63}
64
65impl FromStr for ConfigParallelism {
66    type Err = ConfigParallelismParseError;
67
68    fn from_str(s: &str) -> Result<Self, Self::Err> {
69        match s.to_ascii_lowercase().as_str() {
70            KEYWORD_DEFAULT => return Ok(ConfigParallelism::Default),
71            KEYWORD_ADAPTIVE | KEYWORD_AUTO => return Ok(ConfigParallelism::Adaptive),
72            _ => {}
73        }
74
75        match parse_strategy(s) {
76            Ok(AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full) => {
77                Ok(ConfigParallelism::Adaptive)
78            }
79            Ok(AdaptiveParallelismStrategy::Bounded(n)) => Ok(ConfigParallelism::Bounded(
80                NonZeroU64::new(n.get() as u64).unwrap(),
81            )),
82            Ok(AdaptiveParallelismStrategy::Ratio(r)) => Ok(ConfigParallelism::Ratio(r)),
83            Err(ParallelismStrategyParseError::UnsupportedStrategy(_)) => {
84                let parsed = s.parse::<u64>()?;
85                if parsed == 0 {
86                    Ok(ConfigParallelism::Adaptive)
87                } else {
88                    Ok(ConfigParallelism::Fixed(NonZeroU64::new(parsed).unwrap()))
89                }
90            }
91            Err(err) => Err(err.into()),
92        }
93    }
94}
95
96impl std::fmt::Display for ConfigParallelism {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        match *self {
99            ConfigParallelism::Adaptive => {
100                write!(f, "{}", KEYWORD_ADAPTIVE)
101            }
102            ConfigParallelism::Default => {
103                write!(f, "{}", KEYWORD_DEFAULT)
104            }
105            ConfigParallelism::Fixed(n) => {
106                write!(f, "{}", n)
107            }
108            ConfigParallelism::Bounded(n) => {
109                write!(f, "bounded({})", n)
110            }
111            ConfigParallelism::Ratio(r) => {
112                write!(f, "ratio({})", r)
113            }
114        }
115    }
116}
117
118impl ConfigParallelism {
119    pub fn adaptive_strategy(self) -> Option<AdaptiveParallelismStrategy> {
120        match self {
121            Self::Default | Self::Fixed(_) => None,
122            Self::Adaptive => Some(AdaptiveParallelismStrategy::Auto),
123            Self::Bounded(n) => Some(AdaptiveParallelismStrategy::Bounded(
124                NonZeroUsize::new(n.get() as usize).unwrap(),
125            )),
126            Self::Ratio(r) => Some(AdaptiveParallelismStrategy::Ratio(r)),
127        }
128    }
129}
130
131#[derive(Copy, Default, Debug, Clone, PartialEq)]
132pub enum ConfigBackfillParallelism {
133    #[default]
134    Default,
135    Fixed(NonZeroU64),
136    Adaptive,
137    Bounded(NonZeroU64),
138    Ratio(f32),
139}
140
141impl FromStr for ConfigBackfillParallelism {
142    type Err = ConfigParallelismParseError;
143
144    fn from_str(s: &str) -> Result<Self, Self::Err> {
145        match s.to_ascii_lowercase().as_str() {
146            KEYWORD_DEFAULT => return Ok(Self::Default),
147            KEYWORD_ADAPTIVE | KEYWORD_AUTO => return Ok(Self::Adaptive),
148            _ => {}
149        }
150
151        match parse_strategy(s) {
152            Ok(AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full) => {
153                Ok(Self::Adaptive)
154            }
155            Ok(AdaptiveParallelismStrategy::Bounded(n)) => {
156                Ok(Self::Bounded(NonZeroU64::new(n.get() as u64).unwrap()))
157            }
158            Ok(AdaptiveParallelismStrategy::Ratio(r)) => Ok(Self::Ratio(r)),
159            Err(ParallelismStrategyParseError::UnsupportedStrategy(_)) => {
160                let parsed = s.parse::<u64>()?;
161                if parsed == 0 {
162                    Ok(Self::Adaptive)
163                } else {
164                    Ok(Self::Fixed(NonZeroU64::new(parsed).unwrap()))
165                }
166            }
167            Err(err) => Err(err.into()),
168        }
169    }
170}
171
172impl std::fmt::Display for ConfigBackfillParallelism {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        match *self {
175            Self::Adaptive => write!(f, "{}", KEYWORD_ADAPTIVE),
176            Self::Default => write!(f, "{}", KEYWORD_DEFAULT),
177            Self::Fixed(n) => write!(f, "{}", n),
178            Self::Bounded(n) => write!(f, "bounded({})", n),
179            Self::Ratio(r) => write!(f, "ratio({})", r),
180        }
181    }
182}
183
184impl ConfigBackfillParallelism {
185    pub fn adaptive_strategy(self) -> Option<AdaptiveParallelismStrategy> {
186        match self {
187            Self::Default | Self::Fixed(_) => None,
188            Self::Adaptive => Some(AdaptiveParallelismStrategy::Auto),
189            Self::Bounded(n) => Some(AdaptiveParallelismStrategy::Bounded(
190                NonZeroUsize::new(n.get() as usize).unwrap(),
191            )),
192            Self::Ratio(r) => Some(AdaptiveParallelismStrategy::Ratio(r)),
193        }
194    }
195}
196
197#[derive(Copy, Default, Debug, Clone, PartialEq)]
198pub enum ConfigAdaptiveParallelismStrategy {
199    #[default]
200    Default,
201    Auto,
202    Full,
203    Bounded(NonZeroU64),
204    Ratio(f32),
205}
206
207impl FromStr for ConfigAdaptiveParallelismStrategy {
208    type Err = ParallelismStrategyParseError;
209
210    fn from_str(s: &str) -> Result<Self, Self::Err> {
211        if s.eq_ignore_ascii_case(KEYWORD_DEFAULT_STRATEGY) {
212            return Ok(Self::Default);
213        }
214
215        let strategy = parse_strategy(s)?;
216        Ok(strategy.into())
217    }
218}
219
220impl From<AdaptiveParallelismStrategy> for ConfigAdaptiveParallelismStrategy {
221    fn from(value: AdaptiveParallelismStrategy) -> Self {
222        match value {
223            AdaptiveParallelismStrategy::Auto => Self::Auto,
224            AdaptiveParallelismStrategy::Full => Self::Full,
225            AdaptiveParallelismStrategy::Bounded(n) => {
226                // Safe to unwrap since `n` is non-zero.
227                Self::Bounded(NonZeroU64::new(n.get() as u64).unwrap())
228            }
229            AdaptiveParallelismStrategy::Ratio(r) => Self::Ratio(r),
230        }
231    }
232}
233
234impl From<ConfigAdaptiveParallelismStrategy> for Option<AdaptiveParallelismStrategy> {
235    fn from(value: ConfigAdaptiveParallelismStrategy) -> Self {
236        match value {
237            ConfigAdaptiveParallelismStrategy::Default => None,
238            ConfigAdaptiveParallelismStrategy::Auto => Some(AdaptiveParallelismStrategy::Auto),
239            ConfigAdaptiveParallelismStrategy::Full => Some(AdaptiveParallelismStrategy::Full),
240            ConfigAdaptiveParallelismStrategy::Bounded(n) => {
241                Some(AdaptiveParallelismStrategy::Bounded(
242                    NonZeroUsize::new(n.get() as usize)
243                        // Bounded strategy requires non-zero; `NonZeroU64` guarantees this.
244                        .unwrap(),
245                ))
246            }
247            ConfigAdaptiveParallelismStrategy::Ratio(r) => {
248                Some(AdaptiveParallelismStrategy::Ratio(r))
249            }
250        }
251    }
252}
253
254impl std::fmt::Display for ConfigAdaptiveParallelismStrategy {
255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256        match self {
257            ConfigAdaptiveParallelismStrategy::Default => {
258                write!(f, "{}", KEYWORD_DEFAULT_STRATEGY)
259            }
260            ConfigAdaptiveParallelismStrategy::Auto => AdaptiveParallelismStrategy::Auto.fmt(f),
261            ConfigAdaptiveParallelismStrategy::Full => AdaptiveParallelismStrategy::Full.fmt(f),
262            ConfigAdaptiveParallelismStrategy::Bounded(n) => {
263                AdaptiveParallelismStrategy::Bounded(NonZeroUsize::new(n.get() as usize).unwrap())
264                    .fmt(f)
265            }
266            ConfigAdaptiveParallelismStrategy::Ratio(r) => {
267                AdaptiveParallelismStrategy::Ratio(*r).fmt(f)
268            }
269        }
270    }
271}
272
273pub fn migrate_legacy_global_parallelism(
274    parallelism: ConfigParallelism,
275    strategy: ConfigAdaptiveParallelismStrategy,
276    system_strategy: AdaptiveParallelismStrategy,
277) -> ConfigParallelism {
278    match parallelism {
279        ConfigParallelism::Fixed(_)
280        | ConfigParallelism::Bounded(_)
281        | ConfigParallelism::Ratio(_) => parallelism,
282        ConfigParallelism::Default | ConfigParallelism::Adaptive => {
283            legacy_strategy_to_parallelism(resolve_legacy_strategy(strategy, system_strategy))
284        }
285    }
286}
287
288pub fn migrate_legacy_type_parallelism(
289    specific_parallelism: ConfigParallelism,
290    specific_strategy: ConfigAdaptiveParallelismStrategy,
291    global_parallelism: ConfigParallelism,
292    global_strategy: ConfigAdaptiveParallelismStrategy,
293    system_strategy: AdaptiveParallelismStrategy,
294) -> ConfigParallelism {
295    match specific_parallelism {
296        ConfigParallelism::Fixed(_)
297        | ConfigParallelism::Bounded(_)
298        | ConfigParallelism::Ratio(_) => specific_parallelism,
299        ConfigParallelism::Adaptive => legacy_strategy_to_parallelism(resolve_legacy_strategy(
300            specific_strategy,
301            resolve_legacy_strategy(global_strategy, system_strategy),
302        )),
303        ConfigParallelism::Default => {
304            if matches!(
305                specific_strategy,
306                ConfigAdaptiveParallelismStrategy::Default
307            ) || matches!(global_parallelism, ConfigParallelism::Fixed(_))
308            {
309                ConfigParallelism::Default
310            } else {
311                legacy_strategy_to_parallelism(resolve_legacy_strategy(
312                    specific_strategy,
313                    resolve_legacy_strategy(global_strategy, system_strategy),
314                ))
315            }
316        }
317    }
318}
319
320fn resolve_legacy_strategy(
321    strategy: ConfigAdaptiveParallelismStrategy,
322    fallback: AdaptiveParallelismStrategy,
323) -> AdaptiveParallelismStrategy {
324    Option::<AdaptiveParallelismStrategy>::from(strategy).unwrap_or(fallback)
325}
326
327fn legacy_strategy_to_parallelism(strategy: AdaptiveParallelismStrategy) -> ConfigParallelism {
328    match strategy {
329        AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full => {
330            ConfigParallelism::Adaptive
331        }
332        AdaptiveParallelismStrategy::Bounded(n) => {
333            ConfigParallelism::Bounded(NonZeroU64::new(n.get() as u64).unwrap())
334        }
335        AdaptiveParallelismStrategy::Ratio(r) => ConfigParallelism::Ratio(r),
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    #[test]
344    fn test_parallelism_parse_bounded() {
345        let parallelism: ConfigParallelism = "Bounded(4)".parse().unwrap();
346        assert_eq!(
347            parallelism,
348            ConfigParallelism::Bounded(NonZeroU64::new(4).unwrap())
349        );
350    }
351
352    #[test]
353    fn test_parallelism_default_constants() {
354        assert_eq!(
355            DEFAULT_GLOBAL_STREAMING_PARALLELISM,
356            ConfigParallelism::Bounded(NonZeroU64::new(64).unwrap())
357        );
358        assert_eq!(
359            DEFAULT_TABLE_SOURCE_STREAMING_PARALLELISM,
360            ConfigParallelism::Bounded(NonZeroU64::new(4).unwrap())
361        );
362        assert_eq!(
363            DEFAULT_SINK_STREAMING_PARALLELISM,
364            ConfigParallelism::Bounded(NonZeroU64::new(8).unwrap())
365        );
366    }
367
368    #[test]
369    fn test_parallelism_parse_ratio() {
370        let parallelism: ConfigParallelism = "ratio(0.5)".parse().unwrap();
371        assert_eq!(parallelism, ConfigParallelism::Ratio(0.5));
372    }
373
374    #[test]
375    fn test_parallelism_default_variant() {
376        assert_eq!(ConfigParallelism::default(), ConfigParallelism::Default);
377    }
378
379    #[test]
380    fn test_backfill_parallelism_parse_bounded() {
381        let parallelism: ConfigBackfillParallelism = "Bounded(4)".parse().unwrap();
382        assert_eq!(
383            parallelism,
384            ConfigBackfillParallelism::Bounded(NonZeroU64::new(4).unwrap())
385        );
386        assert_eq!(parallelism.to_string(), "bounded(4)");
387        assert_eq!(
388            parallelism.adaptive_strategy(),
389            Some(AdaptiveParallelismStrategy::Bounded(
390                NonZeroUsize::new(4).unwrap()
391            ))
392        );
393    }
394
395    #[test]
396    fn test_backfill_parallelism_parse_ratio() {
397        let parallelism: ConfigBackfillParallelism = "ratio(0.5)".parse().unwrap();
398        assert_eq!(parallelism, ConfigBackfillParallelism::Ratio(0.5));
399        assert_eq!(parallelism.to_string(), "ratio(0.5)");
400        assert_eq!(
401            parallelism.adaptive_strategy(),
402            Some(AdaptiveParallelismStrategy::Ratio(0.5))
403        );
404    }
405
406    #[test]
407    fn test_strategy_parse_default() {
408        assert_eq!(
409            "default"
410                .parse::<ConfigAdaptiveParallelismStrategy>()
411                .unwrap(),
412            ConfigAdaptiveParallelismStrategy::Default
413        );
414    }
415
416    #[test]
417    fn test_strategy_parse_ratio() {
418        let strategy: ConfigAdaptiveParallelismStrategy = "Ratio(0.5)".parse().unwrap();
419        assert_eq!(strategy, ConfigAdaptiveParallelismStrategy::Ratio(0.5));
420    }
421
422    #[test]
423    fn test_strategy_into_option() {
424        let opt: Option<AdaptiveParallelismStrategy> =
425            ConfigAdaptiveParallelismStrategy::Full.into();
426        assert_eq!(opt, Some(AdaptiveParallelismStrategy::Full));
427    }
428
429    #[test]
430    fn test_migrate_legacy_global_parallelism() {
431        assert_eq!(
432            migrate_legacy_global_parallelism(
433                ConfigParallelism::Default,
434                ConfigAdaptiveParallelismStrategy::Ratio(0.5),
435                AdaptiveParallelismStrategy::Bounded(NonZeroUsize::new(64).unwrap()),
436            ),
437            ConfigParallelism::Ratio(0.5)
438        );
439    }
440
441    #[test]
442    fn test_migrate_legacy_type_parallelism() {
443        assert_eq!(
444            migrate_legacy_type_parallelism(
445                ConfigParallelism::Default,
446                ConfigAdaptiveParallelismStrategy::Bounded(NonZeroU64::new(4).unwrap()),
447                ConfigParallelism::Adaptive,
448                ConfigAdaptiveParallelismStrategy::Default,
449                AdaptiveParallelismStrategy::Auto,
450            ),
451            ConfigParallelism::Bounded(NonZeroU64::new(4).unwrap())
452        );
453        assert_eq!(
454            migrate_legacy_type_parallelism(
455                ConfigParallelism::Default,
456                ConfigAdaptiveParallelismStrategy::Bounded(NonZeroU64::new(4).unwrap()),
457                ConfigParallelism::Fixed(NonZeroU64::new(8).unwrap()),
458                ConfigAdaptiveParallelismStrategy::Default,
459                AdaptiveParallelismStrategy::Auto,
460            ),
461            ConfigParallelism::Default
462        );
463    }
464}