risingwave_frontend/stream_fragmenter/
parallelism.rs1use risingwave_common::session_config::parallelism::{
16 ConfigBackfillParallelism, ConfigParallelism, DEFAULT_GLOBAL_STREAMING_PARALLELISM,
17 DEFAULT_SINK_STREAMING_PARALLELISM, DEFAULT_TABLE_SOURCE_STREAMING_PARALLELISM,
18};
19use risingwave_common::system_param::AdaptiveParallelismStrategy;
20use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
21
22use super::GraphJobType;
23
24#[derive(Debug, Clone, PartialEq)]
25pub(crate) struct ResolvedParallelism {
26 pub parallelism: Option<Parallelism>,
27 pub adaptive_strategy: Option<AdaptiveParallelismStrategy>,
28}
29
30fn resolve_global_parallelism(
31 global_streaming_parallelism: ConfigParallelism,
32) -> ConfigParallelism {
33 match global_streaming_parallelism {
34 ConfigParallelism::Default => DEFAULT_GLOBAL_STREAMING_PARALLELISM,
35 other => other,
36 }
37}
38
39fn resolve_default_parallelism(
40 job_type: Option<GraphJobType>,
41 global_streaming_parallelism: ConfigParallelism,
42) -> ConfigParallelism {
43 match job_type {
44 Some(GraphJobType::Table | GraphJobType::Source) => match global_streaming_parallelism {
47 ConfigParallelism::Default => DEFAULT_TABLE_SOURCE_STREAMING_PARALLELISM,
48 other => resolve_global_parallelism(other),
49 },
50 Some(GraphJobType::Sink) => match global_streaming_parallelism {
51 ConfigParallelism::Default => DEFAULT_SINK_STREAMING_PARALLELISM,
52 other => resolve_global_parallelism(other),
53 },
54 Some(GraphJobType::MaterializedView | GraphJobType::Index) | None => {
55 resolve_global_parallelism(global_streaming_parallelism)
56 }
57 }
58}
59
60pub(crate) fn derive_parallelism(
61 job_type: Option<GraphJobType>,
62 specific_type_parallelism: Option<ConfigParallelism>,
63 global_streaming_parallelism: ConfigParallelism,
64) -> ResolvedParallelism {
65 let effective_parallelism =
66 match specific_type_parallelism.unwrap_or(ConfigParallelism::Default) {
67 ConfigParallelism::Default => {
68 resolve_default_parallelism(job_type, global_streaming_parallelism)
69 }
70 other => other,
71 };
72
73 match effective_parallelism {
74 ConfigParallelism::Default => unreachable!("effective streaming parallelism must be set"),
75 ConfigParallelism::Fixed(n) => ResolvedParallelism {
76 parallelism: Some(Parallelism {
77 parallelism: n.get(),
78 }),
79 adaptive_strategy: None,
80 },
81 ConfigParallelism::Adaptive
82 | ConfigParallelism::Bounded(_)
83 | ConfigParallelism::Ratio(_) => ResolvedParallelism {
84 parallelism: None,
85 adaptive_strategy: effective_parallelism.adaptive_strategy(),
86 },
87 }
88}
89
90pub(crate) fn derive_backfill_parallelism(
91 specific_backfill_parallelism: ConfigBackfillParallelism,
92) -> ResolvedParallelism {
93 match specific_backfill_parallelism {
94 ConfigBackfillParallelism::Default => ResolvedParallelism {
95 parallelism: None,
96 adaptive_strategy: None,
97 },
98 ConfigBackfillParallelism::Fixed(n) => ResolvedParallelism {
99 parallelism: Some(Parallelism {
100 parallelism: n.get(),
101 }),
102 adaptive_strategy: None,
103 },
104 ConfigBackfillParallelism::Adaptive
105 | ConfigBackfillParallelism::Bounded(_)
106 | ConfigBackfillParallelism::Ratio(_) => ResolvedParallelism {
107 parallelism: None,
108 adaptive_strategy: specific_backfill_parallelism.adaptive_strategy(),
109 },
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use std::num::{NonZeroU64, NonZeroUsize};
116
117 use super::*;
118
119 #[test]
120 fn test_none_global_fixed() {
121 let global = ConfigParallelism::Fixed(NonZeroU64::new(4).unwrap());
122 assert_eq!(
123 derive_parallelism(None, None, global)
124 .parallelism
125 .map(|p| p.parallelism),
126 Some(4)
127 );
128 }
129
130 #[test]
131 fn test_none_global_default() {
132 let global = ConfigParallelism::Default;
133 assert_eq!(derive_parallelism(None, None, global).parallelism, None);
134 assert_eq!(
135 derive_parallelism(None, None, global).adaptive_strategy,
136 DEFAULT_GLOBAL_STREAMING_PARALLELISM.adaptive_strategy()
137 );
138 }
139
140 #[test]
141 fn test_none_global_adaptive() {
142 let global = ConfigParallelism::Adaptive;
143 assert_eq!(derive_parallelism(None, None, global).parallelism, None);
144 assert_eq!(
145 derive_parallelism(None, None, global).adaptive_strategy,
146 Some(AdaptiveParallelismStrategy::Auto)
147 );
148 }
149
150 #[test]
151 fn test_default_global_fixed() {
152 let specific = Some(ConfigParallelism::Default);
153 let global = ConfigParallelism::Fixed(NonZeroU64::new(2).unwrap());
154 assert_eq!(
155 derive_parallelism(None, specific, global)
156 .parallelism
157 .map(|p| p.parallelism),
158 Some(2)
159 );
160 }
161
162 #[test]
163 fn test_default_global_default() {
164 let specific = Some(ConfigParallelism::Default);
165 let global = ConfigParallelism::Default;
166 assert_eq!(derive_parallelism(None, specific, global).parallelism, None);
167 assert_eq!(
168 derive_parallelism(None, specific, global).adaptive_strategy,
169 DEFAULT_GLOBAL_STREAMING_PARALLELISM.adaptive_strategy()
170 );
171 }
172
173 #[test]
174 fn test_default_global_adaptive() {
175 let specific = Some(ConfigParallelism::Default);
176 let global = ConfigParallelism::Adaptive;
177 assert_eq!(derive_parallelism(None, specific, global).parallelism, None);
178 }
179
180 #[test]
181 fn test_adaptive_any_global() {
182 let specific = Some(ConfigParallelism::Adaptive);
183 let globals = [
184 ConfigParallelism::Default,
185 ConfigParallelism::Adaptive,
186 ConfigParallelism::Fixed(NonZeroU64::new(8).unwrap()),
187 ];
188
189 for global in globals {
190 assert_eq!(derive_parallelism(None, specific, global).parallelism, None);
191 }
192 }
193
194 #[test]
195 fn test_fixed_override_global() {
196 let specific = Some(ConfigParallelism::Fixed(NonZeroU64::new(6).unwrap()));
197 let globals = [
198 ConfigParallelism::Default,
199 ConfigParallelism::Adaptive,
200 ConfigParallelism::Fixed(NonZeroU64::new(3).unwrap()),
201 ];
202
203 for global in globals {
204 assert_eq!(
205 derive_parallelism(None, specific, global)
206 .parallelism
207 .map(|p| p.parallelism),
208 Some(6)
209 );
210 }
211 }
212
213 #[test]
214 fn test_bounded_parallelism_resolves_strategy() {
215 assert_eq!(
216 derive_parallelism(
217 None,
218 Some(ConfigParallelism::Bounded(NonZeroU64::new(4).unwrap())),
219 ConfigParallelism::Adaptive
220 )
221 .adaptive_strategy,
222 Some(AdaptiveParallelismStrategy::Bounded(
223 NonZeroUsize::new(4).unwrap()
224 ))
225 );
226 }
227
228 #[test]
229 fn test_ratio_parallelism_resolves_strategy() {
230 assert_eq!(
231 derive_parallelism(
232 None,
233 Some(ConfigParallelism::Ratio(0.5)),
234 ConfigParallelism::Adaptive
235 )
236 .adaptive_strategy,
237 Some(AdaptiveParallelismStrategy::Ratio(0.5))
238 );
239 }
240
241 #[test]
242 fn test_table_default_uses_legacy_bound_only_with_default_global() {
243 assert_eq!(
244 derive_parallelism(
245 Some(GraphJobType::Table),
246 Some(ConfigParallelism::Default),
247 ConfigParallelism::Default
248 )
249 .adaptive_strategy,
250 DEFAULT_TABLE_SOURCE_STREAMING_PARALLELISM.adaptive_strategy()
251 );
252 }
253
254 #[test]
255 fn test_table_default_follows_explicit_global_strategy() {
256 assert_eq!(
257 derive_parallelism(
258 Some(GraphJobType::Table),
259 Some(ConfigParallelism::Default),
260 ConfigParallelism::Ratio(0.5)
261 )
262 .adaptive_strategy,
263 Some(AdaptiveParallelismStrategy::Ratio(0.5))
264 );
265 assert_eq!(
266 derive_parallelism(
267 Some(GraphJobType::Table),
268 Some(ConfigParallelism::Default),
269 ConfigParallelism::Adaptive
270 )
271 .adaptive_strategy,
272 Some(AdaptiveParallelismStrategy::Auto)
273 );
274 }
275
276 #[test]
277 fn test_source_default_uses_legacy_bound_only_with_default_global() {
278 assert_eq!(
279 derive_parallelism(
280 Some(GraphJobType::Source),
281 Some(ConfigParallelism::Default),
282 ConfigParallelism::Default
283 )
284 .adaptive_strategy,
285 DEFAULT_TABLE_SOURCE_STREAMING_PARALLELISM.adaptive_strategy()
286 );
287 }
288
289 #[test]
290 fn test_source_default_follows_explicit_global_parallelism() {
291 assert_eq!(
292 derive_parallelism(
293 Some(GraphJobType::Source),
294 Some(ConfigParallelism::Default),
295 ConfigParallelism::Fixed(NonZeroU64::new(7).unwrap())
296 )
297 .parallelism
298 .map(|p| p.parallelism),
299 Some(7)
300 );
301 assert_eq!(
302 derive_parallelism(
303 Some(GraphJobType::Source),
304 Some(ConfigParallelism::Default),
305 ConfigParallelism::Bounded(NonZeroU64::new(5).unwrap())
306 )
307 .adaptive_strategy,
308 Some(AdaptiveParallelismStrategy::Bounded(
309 NonZeroUsize::new(5).unwrap()
310 ))
311 );
312 }
313
314 #[test]
315 fn test_sink_default_uses_legacy_bound_only_with_default_global() {
316 assert_eq!(
317 derive_parallelism(
318 Some(GraphJobType::Sink),
319 Some(ConfigParallelism::Default),
320 ConfigParallelism::Default
321 )
322 .adaptive_strategy,
323 DEFAULT_SINK_STREAMING_PARALLELISM.adaptive_strategy()
324 );
325 }
326
327 #[test]
328 fn test_sink_default_follows_explicit_global_strategy() {
329 assert_eq!(
330 derive_parallelism(
331 Some(GraphJobType::Sink),
332 Some(ConfigParallelism::Default),
333 ConfigParallelism::Ratio(0.5)
334 )
335 .adaptive_strategy,
336 Some(AdaptiveParallelismStrategy::Ratio(0.5))
337 );
338 }
339
340 #[test]
341 fn test_materialized_view_default_follows_global_parallelism() {
342 assert_eq!(
343 derive_parallelism(
344 Some(GraphJobType::MaterializedView),
345 Some(ConfigParallelism::Default),
346 ConfigParallelism::Default
347 )
348 .adaptive_strategy,
349 Some(AdaptiveParallelismStrategy::Bounded(
350 NonZeroUsize::new(64).unwrap()
351 ))
352 );
353 assert_eq!(
354 derive_parallelism(
355 Some(GraphJobType::MaterializedView),
356 Some(ConfigParallelism::Default),
357 ConfigParallelism::Ratio(0.5)
358 )
359 .adaptive_strategy,
360 Some(AdaptiveParallelismStrategy::Ratio(0.5))
361 );
362 }
363
364 #[test]
365 fn test_backfill_parallelism_adaptive_resolves_strategy() {
366 assert_eq!(
367 derive_backfill_parallelism(ConfigBackfillParallelism::Adaptive),
368 ResolvedParallelism {
369 parallelism: None,
370 adaptive_strategy: Some(AdaptiveParallelismStrategy::Auto),
371 }
372 );
373 }
374
375 #[test]
376 fn test_backfill_parallelism_default_does_not_resolve_to_fixed() {
377 assert_eq!(
378 derive_backfill_parallelism(ConfigBackfillParallelism::Default),
379 ResolvedParallelism {
380 parallelism: None,
381 adaptive_strategy: None,
382 }
383 );
384 }
385
386 #[test]
387 fn test_backfill_parallelism_bounded_resolves_strategy() {
388 assert_eq!(
389 derive_backfill_parallelism(ConfigBackfillParallelism::Bounded(
390 NonZeroU64::new(2).unwrap()
391 )),
392 ResolvedParallelism {
393 parallelism: None,
394 adaptive_strategy: Some(AdaptiveParallelismStrategy::Bounded(
395 NonZeroUsize::new(2).unwrap()
396 )),
397 }
398 );
399 }
400}