risingwave_frontend/stream_fragmenter/
parallelism.rs1use std::num::NonZeroU64;
16
17use risingwave_common::session_config::parallelism::{
18 ConfigBackfillParallelism, ConfigParallelism,
19};
20use risingwave_common::system_param::AdaptiveParallelismStrategy;
21use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
22
23use super::GraphJobType;
24
25#[derive(Debug, Clone, PartialEq)]
26pub(crate) struct ResolvedParallelism {
27 pub parallelism: Option<Parallelism>,
28 pub adaptive_strategy: Option<AdaptiveParallelismStrategy>,
29}
30
31fn resolve_global_parallelism(
32 global_streaming_parallelism: ConfigParallelism,
33) -> ConfigParallelism {
34 global_streaming_parallelism
35}
36
37fn resolve_default_parallelism(
38 job_type: Option<GraphJobType>,
39 global_streaming_parallelism: ConfigParallelism,
40) -> ConfigParallelism {
41 match job_type {
42 Some(GraphJobType::Table | GraphJobType::Source) => match global_streaming_parallelism {
45 ConfigParallelism::Default => ConfigParallelism::Bounded(NonZeroU64::new(4).unwrap()),
46 other => resolve_global_parallelism(other),
47 },
48 Some(GraphJobType::MaterializedView | GraphJobType::Sink | GraphJobType::Index) | None => {
49 resolve_global_parallelism(global_streaming_parallelism)
50 }
51 }
52}
53
54pub(crate) fn derive_parallelism(
55 job_type: Option<GraphJobType>,
56 specific_type_parallelism: Option<ConfigParallelism>,
57 global_streaming_parallelism: ConfigParallelism,
58) -> ResolvedParallelism {
59 let effective_parallelism =
60 match specific_type_parallelism.unwrap_or(ConfigParallelism::Default) {
61 ConfigParallelism::Default => {
62 resolve_default_parallelism(job_type, global_streaming_parallelism)
63 }
64 other => other,
65 };
66
67 match effective_parallelism {
68 ConfigParallelism::Default => ResolvedParallelism {
71 parallelism: None,
72 adaptive_strategy: None,
73 },
74 ConfigParallelism::Fixed(n) => ResolvedParallelism {
75 parallelism: Some(Parallelism {
76 parallelism: n.get(),
77 }),
78 adaptive_strategy: None,
79 },
80 ConfigParallelism::Adaptive
81 | ConfigParallelism::Bounded(_)
82 | ConfigParallelism::Ratio(_) => ResolvedParallelism {
83 parallelism: None,
84 adaptive_strategy: effective_parallelism.adaptive_strategy(),
85 },
86 }
87}
88
89pub(crate) fn derive_backfill_parallelism(
90 specific_backfill_parallelism: ConfigBackfillParallelism,
91) -> ResolvedParallelism {
92 match specific_backfill_parallelism {
93 ConfigBackfillParallelism::Default => ResolvedParallelism {
94 parallelism: None,
95 adaptive_strategy: None,
96 },
97 ConfigBackfillParallelism::Fixed(n) => ResolvedParallelism {
98 parallelism: Some(Parallelism {
99 parallelism: n.get(),
100 }),
101 adaptive_strategy: None,
102 },
103 ConfigBackfillParallelism::Adaptive
104 | ConfigBackfillParallelism::Bounded(_)
105 | ConfigBackfillParallelism::Ratio(_) => ResolvedParallelism {
106 parallelism: None,
107 adaptive_strategy: specific_backfill_parallelism.adaptive_strategy(),
108 },
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use std::num::{NonZeroU64, NonZeroUsize};
115
116 use super::*;
117
118 #[test]
119 fn test_none_global_fixed() {
120 let global = ConfigParallelism::Fixed(NonZeroU64::new(4).unwrap());
121 assert_eq!(
122 derive_parallelism(None, None, global)
123 .parallelism
124 .map(|p| p.parallelism),
125 Some(4)
126 );
127 }
128
129 #[test]
130 fn test_none_global_default() {
131 let global = ConfigParallelism::Default;
132 assert_eq!(derive_parallelism(None, None, global).parallelism, None);
133 assert_eq!(
134 derive_parallelism(None, None, global).adaptive_strategy,
135 None
136 );
137 }
138
139 #[test]
140 fn test_none_global_adaptive() {
141 let global = ConfigParallelism::Adaptive;
142 assert_eq!(derive_parallelism(None, None, global).parallelism, None);
143 assert_eq!(
144 derive_parallelism(None, None, global).adaptive_strategy,
145 Some(AdaptiveParallelismStrategy::Auto)
146 );
147 }
148
149 #[test]
150 fn test_default_global_fixed() {
151 let specific = Some(ConfigParallelism::Default);
152 let global = ConfigParallelism::Fixed(NonZeroU64::new(2).unwrap());
153 assert_eq!(
154 derive_parallelism(None, specific, global)
155 .parallelism
156 .map(|p| p.parallelism),
157 Some(2)
158 );
159 }
160
161 #[test]
162 fn test_default_global_default() {
163 let specific = Some(ConfigParallelism::Default);
164 let global = ConfigParallelism::Default;
165 assert_eq!(derive_parallelism(None, specific, global).parallelism, None);
166 assert_eq!(
167 derive_parallelism(None, specific, global).adaptive_strategy,
168 None
169 );
170 }
171
172 #[test]
173 fn test_default_global_adaptive() {
174 let specific = Some(ConfigParallelism::Default);
175 let global = ConfigParallelism::Adaptive;
176 assert_eq!(derive_parallelism(None, specific, global).parallelism, None);
177 }
178
179 #[test]
180 fn test_adaptive_any_global() {
181 let specific = Some(ConfigParallelism::Adaptive);
182 let globals = [
183 ConfigParallelism::Default,
184 ConfigParallelism::Adaptive,
185 ConfigParallelism::Fixed(NonZeroU64::new(8).unwrap()),
186 ];
187
188 for global in globals {
189 assert_eq!(derive_parallelism(None, specific, global).parallelism, None);
190 }
191 }
192
193 #[test]
194 fn test_fixed_override_global() {
195 let specific = Some(ConfigParallelism::Fixed(NonZeroU64::new(6).unwrap()));
196 let globals = [
197 ConfigParallelism::Default,
198 ConfigParallelism::Adaptive,
199 ConfigParallelism::Fixed(NonZeroU64::new(3).unwrap()),
200 ];
201
202 for global in globals {
203 assert_eq!(
204 derive_parallelism(None, specific, global)
205 .parallelism
206 .map(|p| p.parallelism),
207 Some(6)
208 );
209 }
210 }
211
212 #[test]
213 fn test_bounded_parallelism_resolves_strategy() {
214 assert_eq!(
215 derive_parallelism(
216 None,
217 Some(ConfigParallelism::Bounded(NonZeroU64::new(4).unwrap())),
218 ConfigParallelism::Adaptive
219 )
220 .adaptive_strategy,
221 Some(AdaptiveParallelismStrategy::Bounded(
222 NonZeroUsize::new(4).unwrap()
223 ))
224 );
225 }
226
227 #[test]
228 fn test_ratio_parallelism_resolves_strategy() {
229 assert_eq!(
230 derive_parallelism(
231 None,
232 Some(ConfigParallelism::Ratio(0.5)),
233 ConfigParallelism::Adaptive
234 )
235 .adaptive_strategy,
236 Some(AdaptiveParallelismStrategy::Ratio(0.5))
237 );
238 }
239
240 #[test]
241 fn test_table_default_uses_legacy_bound_only_with_default_global() {
242 assert_eq!(
243 derive_parallelism(
244 Some(GraphJobType::Table),
245 Some(ConfigParallelism::Default),
246 ConfigParallelism::Default
247 )
248 .adaptive_strategy,
249 Some(AdaptiveParallelismStrategy::Bounded(
250 NonZeroUsize::new(4).unwrap()
251 ))
252 );
253 }
254
255 #[test]
256 fn test_table_default_follows_explicit_global_strategy() {
257 assert_eq!(
258 derive_parallelism(
259 Some(GraphJobType::Table),
260 Some(ConfigParallelism::Default),
261 ConfigParallelism::Ratio(0.5)
262 )
263 .adaptive_strategy,
264 Some(AdaptiveParallelismStrategy::Ratio(0.5))
265 );
266 assert_eq!(
267 derive_parallelism(
268 Some(GraphJobType::Table),
269 Some(ConfigParallelism::Default),
270 ConfigParallelism::Adaptive
271 )
272 .adaptive_strategy,
273 Some(AdaptiveParallelismStrategy::Auto)
274 );
275 }
276
277 #[test]
278 fn test_source_default_uses_legacy_bound_only_with_default_global() {
279 assert_eq!(
280 derive_parallelism(
281 Some(GraphJobType::Source),
282 Some(ConfigParallelism::Default),
283 ConfigParallelism::Default
284 )
285 .adaptive_strategy,
286 Some(AdaptiveParallelismStrategy::Bounded(
287 NonZeroUsize::new(4).unwrap()
288 ))
289 );
290 }
291
292 #[test]
293 fn test_source_default_follows_explicit_global_parallelism() {
294 assert_eq!(
295 derive_parallelism(
296 Some(GraphJobType::Source),
297 Some(ConfigParallelism::Default),
298 ConfigParallelism::Fixed(NonZeroU64::new(7).unwrap())
299 )
300 .parallelism
301 .map(|p| p.parallelism),
302 Some(7)
303 );
304 assert_eq!(
305 derive_parallelism(
306 Some(GraphJobType::Source),
307 Some(ConfigParallelism::Default),
308 ConfigParallelism::Bounded(NonZeroU64::new(5).unwrap())
309 )
310 .adaptive_strategy,
311 Some(AdaptiveParallelismStrategy::Bounded(
312 NonZeroUsize::new(5).unwrap()
313 ))
314 );
315 }
316
317 #[test]
318 fn test_materialized_view_default_follows_global_parallelism() {
319 assert_eq!(
320 derive_parallelism(
321 Some(GraphJobType::MaterializedView),
322 Some(ConfigParallelism::Default),
323 ConfigParallelism::Default
324 )
325 .adaptive_strategy,
326 None
327 );
328 assert_eq!(
329 derive_parallelism(
330 Some(GraphJobType::MaterializedView),
331 Some(ConfigParallelism::Default),
332 ConfigParallelism::Ratio(0.5)
333 )
334 .adaptive_strategy,
335 Some(AdaptiveParallelismStrategy::Ratio(0.5))
336 );
337 }
338
339 #[test]
340 fn test_backfill_parallelism_default_does_not_resolve_to_fixed() {
341 assert_eq!(
342 derive_backfill_parallelism(ConfigBackfillParallelism::Default),
343 ResolvedParallelism {
344 parallelism: None,
345 adaptive_strategy: None,
346 }
347 );
348 }
349
350 #[test]
351 fn test_backfill_parallelism_fixed_preserves_explicit_override() {
352 assert_eq!(
353 derive_backfill_parallelism(ConfigBackfillParallelism::Fixed(
354 NonZeroU64::new(2).unwrap()
355 )),
356 ResolvedParallelism {
357 parallelism: Some(Parallelism { parallelism: 2 }),
358 adaptive_strategy: None,
359 }
360 );
361 }
362}