risingwave_common/session_config/
parallelism.rs1use std::num::{NonZeroU64, NonZeroUsize, ParseIntError};
16use std::str::FromStr;
17
18use risingwave_common::system_param::adaptive_parallelism_strategy::{
19 AdaptiveParallelismStrategy, ParallelismStrategyParseError, parse_strategy,
20};
21use thiserror::Error;
22
23const KEYWORD_DEFAULT: &str = "default";
24const KEYWORD_ADAPTIVE: &str = "adaptive";
25const KEYWORD_AUTO: &str = "auto";
26const KEYWORD_DEFAULT_STRATEGY: &str = "default";
27
28const fn non_zero_u64(parallelism: u64) -> NonZeroU64 {
29 match NonZeroU64::new(parallelism) {
30 Some(parallelism) => parallelism,
31 None => panic!("parallelism must be non-zero"),
32 }
33}
34
35const fn bounded_parallelism(parallelism: u64) -> ConfigParallelism {
36 ConfigParallelism::Bounded(non_zero_u64(parallelism))
37}
38
39pub const DEFAULT_GLOBAL_STREAMING_PARALLELISM: ConfigParallelism = bounded_parallelism(64);
40pub const DEFAULT_TABLE_SOURCE_STREAMING_PARALLELISM: ConfigParallelism = bounded_parallelism(4);
41pub const DEFAULT_SINK_STREAMING_PARALLELISM: ConfigParallelism = bounded_parallelism(8);
42
43#[derive(Copy, Debug, Clone, PartialEq, Default)]
44pub enum ConfigParallelism {
45 #[default]
46 Default,
47 Fixed(NonZeroU64),
48 Adaptive,
49 Bounded(NonZeroU64),
50 Ratio(f32),
51}
52
53#[derive(Error, Debug)]
54pub enum ConfigParallelismParseError {
55 #[error("Unsupported parallelism: {0}")]
56 UnsupportedParallelism(String),
57
58 #[error("Parse error: {0}")]
59 ParseIntError(#[from] ParseIntError),
60
61 #[error(transparent)]
62 StrategyParseError(#[from] ParallelismStrategyParseError),
63}
64
65impl FromStr for ConfigParallelism {
66 type Err = ConfigParallelismParseError;
67
68 fn from_str(s: &str) -> Result<Self, Self::Err> {
69 match s.to_ascii_lowercase().as_str() {
70 KEYWORD_DEFAULT => return Ok(ConfigParallelism::Default),
71 KEYWORD_ADAPTIVE | KEYWORD_AUTO => return Ok(ConfigParallelism::Adaptive),
72 _ => {}
73 }
74
75 match parse_strategy(s) {
76 Ok(AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full) => {
77 Ok(ConfigParallelism::Adaptive)
78 }
79 Ok(AdaptiveParallelismStrategy::Bounded(n)) => Ok(ConfigParallelism::Bounded(
80 NonZeroU64::new(n.get() as u64).unwrap(),
81 )),
82 Ok(AdaptiveParallelismStrategy::Ratio(r)) => Ok(ConfigParallelism::Ratio(r)),
83 Err(ParallelismStrategyParseError::UnsupportedStrategy(_)) => {
84 let parsed = s.parse::<u64>()?;
85 if parsed == 0 {
86 Ok(ConfigParallelism::Adaptive)
87 } else {
88 Ok(ConfigParallelism::Fixed(NonZeroU64::new(parsed).unwrap()))
89 }
90 }
91 Err(err) => Err(err.into()),
92 }
93 }
94}
95
96impl std::fmt::Display for ConfigParallelism {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 match *self {
99 ConfigParallelism::Adaptive => {
100 write!(f, "{}", KEYWORD_ADAPTIVE)
101 }
102 ConfigParallelism::Default => {
103 write!(f, "{}", KEYWORD_DEFAULT)
104 }
105 ConfigParallelism::Fixed(n) => {
106 write!(f, "{}", n)
107 }
108 ConfigParallelism::Bounded(n) => {
109 write!(f, "bounded({})", n)
110 }
111 ConfigParallelism::Ratio(r) => {
112 write!(f, "ratio({})", r)
113 }
114 }
115 }
116}
117
118impl ConfigParallelism {
119 pub fn adaptive_strategy(self) -> Option<AdaptiveParallelismStrategy> {
120 match self {
121 Self::Default | Self::Fixed(_) => None,
122 Self::Adaptive => Some(AdaptiveParallelismStrategy::Auto),
123 Self::Bounded(n) => Some(AdaptiveParallelismStrategy::Bounded(
124 NonZeroUsize::new(n.get() as usize).unwrap(),
125 )),
126 Self::Ratio(r) => Some(AdaptiveParallelismStrategy::Ratio(r)),
127 }
128 }
129}
130
131#[derive(Copy, Default, Debug, Clone, PartialEq)]
132pub enum ConfigBackfillParallelism {
133 #[default]
134 Default,
135 Fixed(NonZeroU64),
136 Adaptive,
137 Bounded(NonZeroU64),
138 Ratio(f32),
139}
140
141impl FromStr for ConfigBackfillParallelism {
142 type Err = ConfigParallelismParseError;
143
144 fn from_str(s: &str) -> Result<Self, Self::Err> {
145 match s.to_ascii_lowercase().as_str() {
146 KEYWORD_DEFAULT => return Ok(Self::Default),
147 KEYWORD_ADAPTIVE | KEYWORD_AUTO => return Ok(Self::Adaptive),
148 _ => {}
149 }
150
151 match parse_strategy(s) {
152 Ok(AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full) => {
153 Ok(Self::Adaptive)
154 }
155 Ok(AdaptiveParallelismStrategy::Bounded(n)) => {
156 Ok(Self::Bounded(NonZeroU64::new(n.get() as u64).unwrap()))
157 }
158 Ok(AdaptiveParallelismStrategy::Ratio(r)) => Ok(Self::Ratio(r)),
159 Err(ParallelismStrategyParseError::UnsupportedStrategy(_)) => {
160 let parsed = s.parse::<u64>()?;
161 if parsed == 0 {
162 Ok(Self::Adaptive)
163 } else {
164 Ok(Self::Fixed(NonZeroU64::new(parsed).unwrap()))
165 }
166 }
167 Err(err) => Err(err.into()),
168 }
169 }
170}
171
172impl std::fmt::Display for ConfigBackfillParallelism {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 match *self {
175 Self::Adaptive => write!(f, "{}", KEYWORD_ADAPTIVE),
176 Self::Default => write!(f, "{}", KEYWORD_DEFAULT),
177 Self::Fixed(n) => write!(f, "{}", n),
178 Self::Bounded(n) => write!(f, "bounded({})", n),
179 Self::Ratio(r) => write!(f, "ratio({})", r),
180 }
181 }
182}
183
184impl ConfigBackfillParallelism {
185 pub fn adaptive_strategy(self) -> Option<AdaptiveParallelismStrategy> {
186 match self {
187 Self::Default | Self::Fixed(_) => None,
188 Self::Adaptive => Some(AdaptiveParallelismStrategy::Auto),
189 Self::Bounded(n) => Some(AdaptiveParallelismStrategy::Bounded(
190 NonZeroUsize::new(n.get() as usize).unwrap(),
191 )),
192 Self::Ratio(r) => Some(AdaptiveParallelismStrategy::Ratio(r)),
193 }
194 }
195}
196
197#[derive(Copy, Default, Debug, Clone, PartialEq)]
198pub enum ConfigAdaptiveParallelismStrategy {
199 #[default]
200 Default,
201 Auto,
202 Full,
203 Bounded(NonZeroU64),
204 Ratio(f32),
205}
206
207impl FromStr for ConfigAdaptiveParallelismStrategy {
208 type Err = ParallelismStrategyParseError;
209
210 fn from_str(s: &str) -> Result<Self, Self::Err> {
211 if s.eq_ignore_ascii_case(KEYWORD_DEFAULT_STRATEGY) {
212 return Ok(Self::Default);
213 }
214
215 let strategy = parse_strategy(s)?;
216 Ok(strategy.into())
217 }
218}
219
220impl From<AdaptiveParallelismStrategy> for ConfigAdaptiveParallelismStrategy {
221 fn from(value: AdaptiveParallelismStrategy) -> Self {
222 match value {
223 AdaptiveParallelismStrategy::Auto => Self::Auto,
224 AdaptiveParallelismStrategy::Full => Self::Full,
225 AdaptiveParallelismStrategy::Bounded(n) => {
226 Self::Bounded(NonZeroU64::new(n.get() as u64).unwrap())
228 }
229 AdaptiveParallelismStrategy::Ratio(r) => Self::Ratio(r),
230 }
231 }
232}
233
234impl From<ConfigAdaptiveParallelismStrategy> for Option<AdaptiveParallelismStrategy> {
235 fn from(value: ConfigAdaptiveParallelismStrategy) -> Self {
236 match value {
237 ConfigAdaptiveParallelismStrategy::Default => None,
238 ConfigAdaptiveParallelismStrategy::Auto => Some(AdaptiveParallelismStrategy::Auto),
239 ConfigAdaptiveParallelismStrategy::Full => Some(AdaptiveParallelismStrategy::Full),
240 ConfigAdaptiveParallelismStrategy::Bounded(n) => {
241 Some(AdaptiveParallelismStrategy::Bounded(
242 NonZeroUsize::new(n.get() as usize)
243 .unwrap(),
245 ))
246 }
247 ConfigAdaptiveParallelismStrategy::Ratio(r) => {
248 Some(AdaptiveParallelismStrategy::Ratio(r))
249 }
250 }
251 }
252}
253
254impl std::fmt::Display for ConfigAdaptiveParallelismStrategy {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 match self {
257 ConfigAdaptiveParallelismStrategy::Default => {
258 write!(f, "{}", KEYWORD_DEFAULT_STRATEGY)
259 }
260 ConfigAdaptiveParallelismStrategy::Auto => AdaptiveParallelismStrategy::Auto.fmt(f),
261 ConfigAdaptiveParallelismStrategy::Full => AdaptiveParallelismStrategy::Full.fmt(f),
262 ConfigAdaptiveParallelismStrategy::Bounded(n) => {
263 AdaptiveParallelismStrategy::Bounded(NonZeroUsize::new(n.get() as usize).unwrap())
264 .fmt(f)
265 }
266 ConfigAdaptiveParallelismStrategy::Ratio(r) => {
267 AdaptiveParallelismStrategy::Ratio(*r).fmt(f)
268 }
269 }
270 }
271}
272
273pub fn migrate_legacy_global_parallelism(
274 parallelism: ConfigParallelism,
275 strategy: ConfigAdaptiveParallelismStrategy,
276 system_strategy: AdaptiveParallelismStrategy,
277) -> ConfigParallelism {
278 match parallelism {
279 ConfigParallelism::Fixed(_)
280 | ConfigParallelism::Bounded(_)
281 | ConfigParallelism::Ratio(_) => parallelism,
282 ConfigParallelism::Default | ConfigParallelism::Adaptive => {
283 legacy_strategy_to_parallelism(resolve_legacy_strategy(strategy, system_strategy))
284 }
285 }
286}
287
288pub fn migrate_legacy_type_parallelism(
289 specific_parallelism: ConfigParallelism,
290 specific_strategy: ConfigAdaptiveParallelismStrategy,
291 global_parallelism: ConfigParallelism,
292 global_strategy: ConfigAdaptiveParallelismStrategy,
293 system_strategy: AdaptiveParallelismStrategy,
294) -> ConfigParallelism {
295 match specific_parallelism {
296 ConfigParallelism::Fixed(_)
297 | ConfigParallelism::Bounded(_)
298 | ConfigParallelism::Ratio(_) => specific_parallelism,
299 ConfigParallelism::Adaptive => legacy_strategy_to_parallelism(resolve_legacy_strategy(
300 specific_strategy,
301 resolve_legacy_strategy(global_strategy, system_strategy),
302 )),
303 ConfigParallelism::Default => {
304 if matches!(
305 specific_strategy,
306 ConfigAdaptiveParallelismStrategy::Default
307 ) || matches!(global_parallelism, ConfigParallelism::Fixed(_))
308 {
309 ConfigParallelism::Default
310 } else {
311 legacy_strategy_to_parallelism(resolve_legacy_strategy(
312 specific_strategy,
313 resolve_legacy_strategy(global_strategy, system_strategy),
314 ))
315 }
316 }
317 }
318}
319
320fn resolve_legacy_strategy(
321 strategy: ConfigAdaptiveParallelismStrategy,
322 fallback: AdaptiveParallelismStrategy,
323) -> AdaptiveParallelismStrategy {
324 Option::<AdaptiveParallelismStrategy>::from(strategy).unwrap_or(fallback)
325}
326
327fn legacy_strategy_to_parallelism(strategy: AdaptiveParallelismStrategy) -> ConfigParallelism {
328 match strategy {
329 AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full => {
330 ConfigParallelism::Adaptive
331 }
332 AdaptiveParallelismStrategy::Bounded(n) => {
333 ConfigParallelism::Bounded(NonZeroU64::new(n.get() as u64).unwrap())
334 }
335 AdaptiveParallelismStrategy::Ratio(r) => ConfigParallelism::Ratio(r),
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342
343 #[test]
344 fn test_parallelism_parse_bounded() {
345 let parallelism: ConfigParallelism = "Bounded(4)".parse().unwrap();
346 assert_eq!(
347 parallelism,
348 ConfigParallelism::Bounded(NonZeroU64::new(4).unwrap())
349 );
350 }
351
352 #[test]
353 fn test_parallelism_default_constants() {
354 assert_eq!(
355 DEFAULT_GLOBAL_STREAMING_PARALLELISM,
356 ConfigParallelism::Bounded(NonZeroU64::new(64).unwrap())
357 );
358 assert_eq!(
359 DEFAULT_TABLE_SOURCE_STREAMING_PARALLELISM,
360 ConfigParallelism::Bounded(NonZeroU64::new(4).unwrap())
361 );
362 assert_eq!(
363 DEFAULT_SINK_STREAMING_PARALLELISM,
364 ConfigParallelism::Bounded(NonZeroU64::new(8).unwrap())
365 );
366 }
367
368 #[test]
369 fn test_parallelism_parse_ratio() {
370 let parallelism: ConfigParallelism = "ratio(0.5)".parse().unwrap();
371 assert_eq!(parallelism, ConfigParallelism::Ratio(0.5));
372 }
373
374 #[test]
375 fn test_parallelism_default_variant() {
376 assert_eq!(ConfigParallelism::default(), ConfigParallelism::Default);
377 }
378
379 #[test]
380 fn test_backfill_parallelism_parse_bounded() {
381 let parallelism: ConfigBackfillParallelism = "Bounded(4)".parse().unwrap();
382 assert_eq!(
383 parallelism,
384 ConfigBackfillParallelism::Bounded(NonZeroU64::new(4).unwrap())
385 );
386 assert_eq!(parallelism.to_string(), "bounded(4)");
387 assert_eq!(
388 parallelism.adaptive_strategy(),
389 Some(AdaptiveParallelismStrategy::Bounded(
390 NonZeroUsize::new(4).unwrap()
391 ))
392 );
393 }
394
395 #[test]
396 fn test_backfill_parallelism_parse_ratio() {
397 let parallelism: ConfigBackfillParallelism = "ratio(0.5)".parse().unwrap();
398 assert_eq!(parallelism, ConfigBackfillParallelism::Ratio(0.5));
399 assert_eq!(parallelism.to_string(), "ratio(0.5)");
400 assert_eq!(
401 parallelism.adaptive_strategy(),
402 Some(AdaptiveParallelismStrategy::Ratio(0.5))
403 );
404 }
405
406 #[test]
407 fn test_strategy_parse_default() {
408 assert_eq!(
409 "default"
410 .parse::<ConfigAdaptiveParallelismStrategy>()
411 .unwrap(),
412 ConfigAdaptiveParallelismStrategy::Default
413 );
414 }
415
416 #[test]
417 fn test_strategy_parse_ratio() {
418 let strategy: ConfigAdaptiveParallelismStrategy = "Ratio(0.5)".parse().unwrap();
419 assert_eq!(strategy, ConfigAdaptiveParallelismStrategy::Ratio(0.5));
420 }
421
422 #[test]
423 fn test_strategy_into_option() {
424 let opt: Option<AdaptiveParallelismStrategy> =
425 ConfigAdaptiveParallelismStrategy::Full.into();
426 assert_eq!(opt, Some(AdaptiveParallelismStrategy::Full));
427 }
428
429 #[test]
430 fn test_migrate_legacy_global_parallelism() {
431 assert_eq!(
432 migrate_legacy_global_parallelism(
433 ConfigParallelism::Default,
434 ConfigAdaptiveParallelismStrategy::Ratio(0.5),
435 AdaptiveParallelismStrategy::Bounded(NonZeroUsize::new(64).unwrap()),
436 ),
437 ConfigParallelism::Ratio(0.5)
438 );
439 }
440
441 #[test]
442 fn test_migrate_legacy_type_parallelism() {
443 assert_eq!(
444 migrate_legacy_type_parallelism(
445 ConfigParallelism::Default,
446 ConfigAdaptiveParallelismStrategy::Bounded(NonZeroU64::new(4).unwrap()),
447 ConfigParallelism::Adaptive,
448 ConfigAdaptiveParallelismStrategy::Default,
449 AdaptiveParallelismStrategy::Auto,
450 ),
451 ConfigParallelism::Bounded(NonZeroU64::new(4).unwrap())
452 );
453 assert_eq!(
454 migrate_legacy_type_parallelism(
455 ConfigParallelism::Default,
456 ConfigAdaptiveParallelismStrategy::Bounded(NonZeroU64::new(4).unwrap()),
457 ConfigParallelism::Fixed(NonZeroU64::new(8).unwrap()),
458 ConfigAdaptiveParallelismStrategy::Default,
459 AdaptiveParallelismStrategy::Auto,
460 ),
461 ConfigParallelism::Default
462 );
463 }
464}