risingwave_common/session_config/
parallelism.rs1use std::num::{NonZeroU64, NonZeroUsize, ParseIntError};
16use std::str::FromStr;
17
18use risingwave_common::system_param::adaptive_parallelism_strategy::{
19 AdaptiveParallelismStrategy, ParallelismStrategyParseError, parse_strategy,
20};
21use thiserror::Error;
22
23const KEYWORD_DEFAULT: &str = "default";
24const KEYWORD_ADAPTIVE: &str = "adaptive";
25const KEYWORD_AUTO: &str = "auto";
26const KEYWORD_DEFAULT_STRATEGY: &str = "default";
27
28#[derive(Copy, Debug, Clone, PartialEq, Default)]
29pub enum ConfigParallelism {
30 #[default]
31 Default,
32 Fixed(NonZeroU64),
33 Adaptive,
34 Bounded(NonZeroU64),
35 Ratio(f32),
36}
37
38#[derive(Error, Debug)]
39pub enum ConfigParallelismParseError {
40 #[error("Unsupported parallelism: {0}")]
41 UnsupportedParallelism(String),
42
43 #[error("Parse error: {0}")]
44 ParseIntError(#[from] ParseIntError),
45
46 #[error(transparent)]
47 StrategyParseError(#[from] ParallelismStrategyParseError),
48}
49
50impl FromStr for ConfigParallelism {
51 type Err = ConfigParallelismParseError;
52
53 fn from_str(s: &str) -> Result<Self, Self::Err> {
54 match s.to_ascii_lowercase().as_str() {
55 KEYWORD_DEFAULT => return Ok(ConfigParallelism::Default),
56 KEYWORD_ADAPTIVE | KEYWORD_AUTO => return Ok(ConfigParallelism::Adaptive),
57 _ => {}
58 }
59
60 match parse_strategy(s) {
61 Ok(AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full) => {
62 Ok(ConfigParallelism::Adaptive)
63 }
64 Ok(AdaptiveParallelismStrategy::Bounded(n)) => Ok(ConfigParallelism::Bounded(
65 NonZeroU64::new(n.get() as u64).unwrap(),
66 )),
67 Ok(AdaptiveParallelismStrategy::Ratio(r)) => Ok(ConfigParallelism::Ratio(r)),
68 Err(ParallelismStrategyParseError::UnsupportedStrategy(_)) => {
69 let parsed = s.parse::<u64>()?;
70 if parsed == 0 {
71 Ok(ConfigParallelism::Adaptive)
72 } else {
73 Ok(ConfigParallelism::Fixed(NonZeroU64::new(parsed).unwrap()))
74 }
75 }
76 Err(err) => Err(err.into()),
77 }
78 }
79}
80
81impl std::fmt::Display for ConfigParallelism {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 match *self {
84 ConfigParallelism::Adaptive => {
85 write!(f, "{}", KEYWORD_ADAPTIVE)
86 }
87 ConfigParallelism::Default => {
88 write!(f, "{}", KEYWORD_DEFAULT)
89 }
90 ConfigParallelism::Fixed(n) => {
91 write!(f, "{}", n)
92 }
93 ConfigParallelism::Bounded(n) => {
94 write!(f, "bounded({})", n)
95 }
96 ConfigParallelism::Ratio(r) => {
97 write!(f, "ratio({})", r)
98 }
99 }
100 }
101}
102
103impl ConfigParallelism {
104 pub fn adaptive_strategy(self) -> Option<AdaptiveParallelismStrategy> {
105 match self {
106 Self::Default | Self::Fixed(_) => None,
107 Self::Adaptive => Some(AdaptiveParallelismStrategy::Auto),
108 Self::Bounded(n) => Some(AdaptiveParallelismStrategy::Bounded(
109 NonZeroUsize::new(n.get() as usize).unwrap(),
110 )),
111 Self::Ratio(r) => Some(AdaptiveParallelismStrategy::Ratio(r)),
112 }
113 }
114}
115
116#[derive(Copy, Default, Debug, Clone, PartialEq)]
117pub enum ConfigBackfillParallelism {
118 #[default]
119 Default,
120 Fixed(NonZeroU64),
121 Adaptive,
122 Bounded(NonZeroU64),
123 Ratio(f32),
124}
125
126impl FromStr for ConfigBackfillParallelism {
127 type Err = ConfigParallelismParseError;
128
129 fn from_str(s: &str) -> Result<Self, Self::Err> {
130 match s.to_ascii_lowercase().as_str() {
131 KEYWORD_DEFAULT => return Ok(Self::Default),
132 KEYWORD_ADAPTIVE | KEYWORD_AUTO => return Ok(Self::Adaptive),
133 _ => {}
134 }
135
136 match parse_strategy(s) {
137 Ok(AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full) => {
138 Ok(Self::Adaptive)
139 }
140 Ok(AdaptiveParallelismStrategy::Bounded(n)) => {
141 Ok(Self::Bounded(NonZeroU64::new(n.get() as u64).unwrap()))
142 }
143 Ok(AdaptiveParallelismStrategy::Ratio(r)) => Ok(Self::Ratio(r)),
144 Err(ParallelismStrategyParseError::UnsupportedStrategy(_)) => {
145 let parsed = s.parse::<u64>()?;
146 if parsed == 0 {
147 Ok(Self::Adaptive)
148 } else {
149 Ok(Self::Fixed(NonZeroU64::new(parsed).unwrap()))
150 }
151 }
152 Err(err) => Err(err.into()),
153 }
154 }
155}
156
157impl std::fmt::Display for ConfigBackfillParallelism {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 match *self {
160 Self::Adaptive => write!(f, "{}", KEYWORD_ADAPTIVE),
161 Self::Default => write!(f, "{}", KEYWORD_DEFAULT),
162 Self::Fixed(n) => write!(f, "{}", n),
163 Self::Bounded(n) => write!(f, "bounded({})", n),
164 Self::Ratio(r) => write!(f, "ratio({})", r),
165 }
166 }
167}
168
169impl ConfigBackfillParallelism {
170 pub fn adaptive_strategy(self) -> Option<AdaptiveParallelismStrategy> {
171 match self {
172 Self::Default | Self::Fixed(_) => None,
173 Self::Adaptive => Some(AdaptiveParallelismStrategy::Auto),
174 Self::Bounded(n) => Some(AdaptiveParallelismStrategy::Bounded(
175 NonZeroUsize::new(n.get() as usize).unwrap(),
176 )),
177 Self::Ratio(r) => Some(AdaptiveParallelismStrategy::Ratio(r)),
178 }
179 }
180}
181
182#[derive(Copy, Default, Debug, Clone, PartialEq)]
183pub enum ConfigAdaptiveParallelismStrategy {
184 #[default]
185 Default,
186 Auto,
187 Full,
188 Bounded(NonZeroU64),
189 Ratio(f32),
190}
191
192impl FromStr for ConfigAdaptiveParallelismStrategy {
193 type Err = ParallelismStrategyParseError;
194
195 fn from_str(s: &str) -> Result<Self, Self::Err> {
196 if s.eq_ignore_ascii_case(KEYWORD_DEFAULT_STRATEGY) {
197 return Ok(Self::Default);
198 }
199
200 let strategy = parse_strategy(s)?;
201 Ok(strategy.into())
202 }
203}
204
205impl From<AdaptiveParallelismStrategy> for ConfigAdaptiveParallelismStrategy {
206 fn from(value: AdaptiveParallelismStrategy) -> Self {
207 match value {
208 AdaptiveParallelismStrategy::Auto => Self::Auto,
209 AdaptiveParallelismStrategy::Full => Self::Full,
210 AdaptiveParallelismStrategy::Bounded(n) => {
211 Self::Bounded(NonZeroU64::new(n.get() as u64).unwrap())
213 }
214 AdaptiveParallelismStrategy::Ratio(r) => Self::Ratio(r),
215 }
216 }
217}
218
219impl From<ConfigAdaptiveParallelismStrategy> for Option<AdaptiveParallelismStrategy> {
220 fn from(value: ConfigAdaptiveParallelismStrategy) -> Self {
221 match value {
222 ConfigAdaptiveParallelismStrategy::Default => None,
223 ConfigAdaptiveParallelismStrategy::Auto => Some(AdaptiveParallelismStrategy::Auto),
224 ConfigAdaptiveParallelismStrategy::Full => Some(AdaptiveParallelismStrategy::Full),
225 ConfigAdaptiveParallelismStrategy::Bounded(n) => {
226 Some(AdaptiveParallelismStrategy::Bounded(
227 NonZeroUsize::new(n.get() as usize)
228 .unwrap(),
230 ))
231 }
232 ConfigAdaptiveParallelismStrategy::Ratio(r) => {
233 Some(AdaptiveParallelismStrategy::Ratio(r))
234 }
235 }
236 }
237}
238
239impl std::fmt::Display for ConfigAdaptiveParallelismStrategy {
240 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241 match self {
242 ConfigAdaptiveParallelismStrategy::Default => {
243 write!(f, "{}", KEYWORD_DEFAULT_STRATEGY)
244 }
245 ConfigAdaptiveParallelismStrategy::Auto => AdaptiveParallelismStrategy::Auto.fmt(f),
246 ConfigAdaptiveParallelismStrategy::Full => AdaptiveParallelismStrategy::Full.fmt(f),
247 ConfigAdaptiveParallelismStrategy::Bounded(n) => {
248 AdaptiveParallelismStrategy::Bounded(NonZeroUsize::new(n.get() as usize).unwrap())
249 .fmt(f)
250 }
251 ConfigAdaptiveParallelismStrategy::Ratio(r) => {
252 AdaptiveParallelismStrategy::Ratio(*r).fmt(f)
253 }
254 }
255 }
256}
257
258pub fn migrate_legacy_global_parallelism(
259 parallelism: ConfigParallelism,
260 strategy: ConfigAdaptiveParallelismStrategy,
261 system_strategy: AdaptiveParallelismStrategy,
262) -> ConfigParallelism {
263 match parallelism {
264 ConfigParallelism::Fixed(_)
265 | ConfigParallelism::Bounded(_)
266 | ConfigParallelism::Ratio(_) => parallelism,
267 ConfigParallelism::Default | ConfigParallelism::Adaptive => {
268 legacy_strategy_to_parallelism(resolve_legacy_strategy(strategy, system_strategy))
269 }
270 }
271}
272
273pub fn migrate_legacy_type_parallelism(
274 specific_parallelism: ConfigParallelism,
275 specific_strategy: ConfigAdaptiveParallelismStrategy,
276 global_parallelism: ConfigParallelism,
277 global_strategy: ConfigAdaptiveParallelismStrategy,
278 system_strategy: AdaptiveParallelismStrategy,
279) -> ConfigParallelism {
280 match specific_parallelism {
281 ConfigParallelism::Fixed(_)
282 | ConfigParallelism::Bounded(_)
283 | ConfigParallelism::Ratio(_) => specific_parallelism,
284 ConfigParallelism::Adaptive => legacy_strategy_to_parallelism(resolve_legacy_strategy(
285 specific_strategy,
286 resolve_legacy_strategy(global_strategy, system_strategy),
287 )),
288 ConfigParallelism::Default => {
289 if matches!(
290 specific_strategy,
291 ConfigAdaptiveParallelismStrategy::Default
292 ) || matches!(global_parallelism, ConfigParallelism::Fixed(_))
293 {
294 ConfigParallelism::Default
295 } else {
296 legacy_strategy_to_parallelism(resolve_legacy_strategy(
297 specific_strategy,
298 resolve_legacy_strategy(global_strategy, system_strategy),
299 ))
300 }
301 }
302 }
303}
304
305fn resolve_legacy_strategy(
306 strategy: ConfigAdaptiveParallelismStrategy,
307 fallback: AdaptiveParallelismStrategy,
308) -> AdaptiveParallelismStrategy {
309 Option::<AdaptiveParallelismStrategy>::from(strategy).unwrap_or(fallback)
310}
311
312fn legacy_strategy_to_parallelism(strategy: AdaptiveParallelismStrategy) -> ConfigParallelism {
313 match strategy {
314 AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full => {
315 ConfigParallelism::Adaptive
316 }
317 AdaptiveParallelismStrategy::Bounded(n) => {
318 ConfigParallelism::Bounded(NonZeroU64::new(n.get() as u64).unwrap())
319 }
320 AdaptiveParallelismStrategy::Ratio(r) => ConfigParallelism::Ratio(r),
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn test_parallelism_parse_bounded() {
330 let parallelism: ConfigParallelism = "Bounded(4)".parse().unwrap();
331 assert_eq!(
332 parallelism,
333 ConfigParallelism::Bounded(NonZeroU64::new(4).unwrap())
334 );
335 }
336
337 #[test]
338 fn test_parallelism_parse_ratio() {
339 let parallelism: ConfigParallelism = "ratio(0.5)".parse().unwrap();
340 assert_eq!(parallelism, ConfigParallelism::Ratio(0.5));
341 }
342
343 #[test]
344 fn test_parallelism_default_variant() {
345 assert_eq!(ConfigParallelism::default(), ConfigParallelism::Default);
346 }
347
348 #[test]
349 fn test_backfill_parallelism_parse_bounded() {
350 let parallelism: ConfigBackfillParallelism = "Bounded(4)".parse().unwrap();
351 assert_eq!(
352 parallelism,
353 ConfigBackfillParallelism::Bounded(NonZeroU64::new(4).unwrap())
354 );
355 assert_eq!(parallelism.to_string(), "bounded(4)");
356 assert_eq!(
357 parallelism.adaptive_strategy(),
358 Some(AdaptiveParallelismStrategy::Bounded(
359 NonZeroUsize::new(4).unwrap()
360 ))
361 );
362 }
363
364 #[test]
365 fn test_backfill_parallelism_parse_ratio() {
366 let parallelism: ConfigBackfillParallelism = "ratio(0.5)".parse().unwrap();
367 assert_eq!(parallelism, ConfigBackfillParallelism::Ratio(0.5));
368 assert_eq!(parallelism.to_string(), "ratio(0.5)");
369 assert_eq!(
370 parallelism.adaptive_strategy(),
371 Some(AdaptiveParallelismStrategy::Ratio(0.5))
372 );
373 }
374
375 #[test]
376 fn test_strategy_parse_default() {
377 assert_eq!(
378 "default"
379 .parse::<ConfigAdaptiveParallelismStrategy>()
380 .unwrap(),
381 ConfigAdaptiveParallelismStrategy::Default
382 );
383 }
384
385 #[test]
386 fn test_strategy_parse_ratio() {
387 let strategy: ConfigAdaptiveParallelismStrategy = "Ratio(0.5)".parse().unwrap();
388 assert_eq!(strategy, ConfigAdaptiveParallelismStrategy::Ratio(0.5));
389 }
390
391 #[test]
392 fn test_strategy_into_option() {
393 let opt: Option<AdaptiveParallelismStrategy> =
394 ConfigAdaptiveParallelismStrategy::Full.into();
395 assert_eq!(opt, Some(AdaptiveParallelismStrategy::Full));
396 }
397
398 #[test]
399 fn test_migrate_legacy_global_parallelism() {
400 assert_eq!(
401 migrate_legacy_global_parallelism(
402 ConfigParallelism::Default,
403 ConfigAdaptiveParallelismStrategy::Ratio(0.5),
404 AdaptiveParallelismStrategy::Bounded(NonZeroUsize::new(64).unwrap()),
405 ),
406 ConfigParallelism::Ratio(0.5)
407 );
408 }
409
410 #[test]
411 fn test_migrate_legacy_type_parallelism() {
412 assert_eq!(
413 migrate_legacy_type_parallelism(
414 ConfigParallelism::Default,
415 ConfigAdaptiveParallelismStrategy::Bounded(NonZeroU64::new(4).unwrap()),
416 ConfigParallelism::Adaptive,
417 ConfigAdaptiveParallelismStrategy::Default,
418 AdaptiveParallelismStrategy::Auto,
419 ),
420 ConfigParallelism::Bounded(NonZeroU64::new(4).unwrap())
421 );
422 assert_eq!(
423 migrate_legacy_type_parallelism(
424 ConfigParallelism::Default,
425 ConfigAdaptiveParallelismStrategy::Bounded(NonZeroU64::new(4).unwrap()),
426 ConfigParallelism::Fixed(NonZeroU64::new(8).unwrap()),
427 ConfigAdaptiveParallelismStrategy::Default,
428 AdaptiveParallelismStrategy::Auto,
429 ),
430 ConfigParallelism::Default
431 );
432 }
433}