risingwave_frontend/optimizer/plan_node/
plan_base.rs1use educe::Educe;
16
17use super::generic::GenericPlanNode;
18use super::*;
19use crate::optimizer::property::{Distribution, WatermarkColumns};
20
21#[derive(Clone, Debug, PartialEq, Eq, Hash)]
23pub struct NoExtra;
24
25mod physical_common {
28 use super::*;
29
30 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
32 pub struct PhysicalCommonExtra {
33 pub dist: Distribution,
36 }
37
38 pub trait GetPhysicalCommon {
41 fn physical(&self) -> &PhysicalCommonExtra;
42 fn physical_mut(&mut self) -> &mut PhysicalCommonExtra;
43 }
44}
45
46use physical_common::*;
47
48#[derive(Clone, Debug, PartialEq, Eq, Hash)]
50pub struct StreamExtra {
51 physical: PhysicalCommonExtra,
53
54 append_only: bool,
57 emit_on_window_close: bool,
59 watermark_columns: WatermarkColumns,
62 columns_monotonicity: MonotonicityMap,
64}
65
66impl GetPhysicalCommon for StreamExtra {
67 fn physical(&self) -> &PhysicalCommonExtra {
68 &self.physical
69 }
70
71 fn physical_mut(&mut self) -> &mut PhysicalCommonExtra {
72 &mut self.physical
73 }
74}
75
76#[derive(Clone, Debug, PartialEq, Eq, Hash)]
78pub struct BatchExtra {
79 physical: PhysicalCommonExtra,
81
82 order: Order,
85}
86
87impl GetPhysicalCommon for BatchExtra {
88 fn physical(&self) -> &PhysicalCommonExtra {
89 &self.physical
90 }
91
92 fn physical_mut(&mut self) -> &mut PhysicalCommonExtra {
93 &mut self.physical
94 }
95}
96
97#[derive(Educe)]
110#[educe(PartialEq, Eq, Hash, Clone, Debug)]
111pub struct PlanBase<C: ConventionMarker> {
112 #[educe(PartialEq(ignore), Hash(ignore))]
114 id: PlanNodeId,
115 #[educe(PartialEq(ignore), Hash(ignore))]
116 ctx: OptimizerContextRef,
117
118 schema: Schema,
119 stream_key: Option<Vec<usize>>,
124 functional_dependency: FunctionalDependencySet,
125
126 extra: C::Extra,
128}
129
130impl<C: ConventionMarker> generic::GenericPlanRef for PlanBase<C> {
131 fn id(&self) -> PlanNodeId {
132 self.id
133 }
134
135 fn schema(&self) -> &Schema {
136 &self.schema
137 }
138
139 fn stream_key(&self) -> Option<&[usize]> {
140 self.stream_key.as_deref()
141 }
142
143 fn ctx(&self) -> OptimizerContextRef {
144 self.ctx.clone()
145 }
146
147 fn functional_dependency(&self) -> &FunctionalDependencySet {
148 &self.functional_dependency
149 }
150}
151
152impl<C: ConventionMarker> generic::PhysicalPlanRef for PlanBase<C>
153where
154 C::Extra: GetPhysicalCommon,
155{
156 fn distribution(&self) -> &Distribution {
157 &self.extra.physical().dist
158 }
159}
160
161impl stream::StreamPlanRef for PlanBase<Stream> {
162 fn append_only(&self) -> bool {
163 self.extra.append_only
164 }
165
166 fn emit_on_window_close(&self) -> bool {
167 self.extra.emit_on_window_close
168 }
169
170 fn watermark_columns(&self) -> &WatermarkColumns {
171 &self.extra.watermark_columns
172 }
173
174 fn columns_monotonicity(&self) -> &MonotonicityMap {
175 &self.extra.columns_monotonicity
176 }
177}
178
179impl batch::BatchPlanRef for PlanBase<Batch> {
180 fn order(&self) -> &Order {
181 &self.extra.order
182 }
183}
184
185impl<C: ConventionMarker> PlanBase<C> {
186 pub fn clone_with_new_plan_id(&self) -> Self {
187 let mut new = self.clone();
188 new.id = self.ctx().next_plan_node_id();
189 new
190 }
191}
192
193impl PlanBase<Logical> {
194 pub fn new_logical(
195 ctx: OptimizerContextRef,
196 schema: Schema,
197 stream_key: Option<Vec<usize>>,
198 functional_dependency: FunctionalDependencySet,
199 ) -> Self {
200 let id = ctx.next_plan_node_id();
201 Self {
202 id,
203 ctx,
204 schema,
205 stream_key,
206 functional_dependency,
207 extra: NoExtra,
208 }
209 }
210
211 pub fn new_logical_with_core(core: &impl GenericPlanNode) -> Self {
212 Self::new_logical(
213 core.ctx(),
214 core.schema(),
215 core.stream_key(),
216 core.functional_dependency(),
217 )
218 }
219}
220
221impl PlanBase<Stream> {
222 pub fn new_stream(
223 ctx: OptimizerContextRef,
224 schema: Schema,
225 stream_key: Option<Vec<usize>>,
226 functional_dependency: FunctionalDependencySet,
227 dist: Distribution,
228 append_only: bool,
229 emit_on_window_close: bool,
230 watermark_columns: WatermarkColumns,
231 columns_monotonicity: MonotonicityMap,
232 ) -> Self {
233 let id = ctx.next_plan_node_id();
234 Self {
235 id,
236 ctx,
237 schema,
238 stream_key,
239 functional_dependency,
240 extra: StreamExtra {
241 physical: PhysicalCommonExtra { dist },
242 append_only,
243 emit_on_window_close,
244 watermark_columns,
245 columns_monotonicity,
246 },
247 }
248 }
249
250 pub fn new_stream_with_core(
251 core: &impl GenericPlanNode,
252 dist: Distribution,
253 append_only: bool,
254 emit_on_window_close: bool,
255 watermark_columns: WatermarkColumns,
256 columns_monotonicity: MonotonicityMap,
257 ) -> Self {
258 Self::new_stream(
259 core.ctx(),
260 core.schema(),
261 core.stream_key(),
262 core.functional_dependency(),
263 dist,
264 append_only,
265 emit_on_window_close,
266 watermark_columns,
267 columns_monotonicity,
268 )
269 }
270}
271
272impl PlanBase<Batch> {
273 pub fn new_batch(
274 ctx: OptimizerContextRef,
275 schema: Schema,
276 dist: Distribution,
277 order: Order,
278 ) -> Self {
279 let id = ctx.next_plan_node_id();
280 let functional_dependency = FunctionalDependencySet::new(schema.len());
281 Self {
282 id,
283 ctx,
284 schema,
285 stream_key: None,
286 functional_dependency,
287 extra: BatchExtra {
288 physical: PhysicalCommonExtra { dist },
289 order,
290 },
291 }
292 }
293
294 pub fn new_batch_with_core(
295 core: &impl GenericPlanNode,
296 dist: Distribution,
297 order: Order,
298 ) -> Self {
299 Self::new_batch(core.ctx(), core.schema(), dist, order)
300 }
301}
302
303impl<C: ConventionMarker> PlanBase<C>
304where
305 C::Extra: GetPhysicalCommon,
306{
307 pub fn clone_with_new_distribution(&self, dist: Distribution) -> Self {
311 let mut new = self.clone();
312 new.extra.physical_mut().dist = dist;
313 new
314 }
315}
316
317#[cfg(test)]
319impl<C: ConventionMarker> PlanBase<C> {
320 pub fn functional_dependency_mut(&mut self) -> &mut FunctionalDependencySet {
321 &mut self.functional_dependency
322 }
323}
324
325#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, enum_as_inner::EnumAsInner)]
331pub enum PlanBaseRef<'a> {
332 Logical(&'a PlanBase<Logical>),
333 Stream(&'a PlanBase<Stream>),
334 Batch(&'a PlanBase<Batch>),
335}
336
337impl PlanBaseRef<'_> {
338 pub fn convention(self) -> Convention {
340 match self {
341 PlanBaseRef::Logical(_) => Convention::Logical,
342 PlanBaseRef::Stream(_) => Convention::Stream,
343 PlanBaseRef::Batch(_) => Convention::Batch,
344 }
345 }
346}
347
348macro_rules! dispatch_plan_base {
350 ($self:ident, [$($convention:ident),+ $(,)?], $method:expr) => {
351 match $self {
352 $(
353 PlanBaseRef::$convention(plan) => $method(plan),
354 )+
355
356 #[allow(unreachable_patterns)]
357 _ => unreachable!("calling `{}` on a plan node of `{:?}`", stringify!($method), $self.convention()),
358 }
359 }
360}
361
362impl<'a> PlanBaseRef<'a> {
371 pub(super) fn schema(self) -> &'a Schema {
372 dispatch_plan_base!(self, [Logical, Stream, Batch], GenericPlanRef::schema)
373 }
374
375 pub(super) fn stream_key(self) -> Option<&'a [usize]> {
376 dispatch_plan_base!(self, [Logical, Stream, Batch], GenericPlanRef::stream_key)
377 }
378
379 pub(super) fn functional_dependency(self) -> &'a FunctionalDependencySet {
380 dispatch_plan_base!(
381 self,
382 [Logical, Stream, Batch],
383 GenericPlanRef::functional_dependency
384 )
385 }
386
387 pub(super) fn distribution(self) -> &'a Distribution {
388 dispatch_plan_base!(self, [Stream, Batch], PhysicalPlanRef::distribution)
389 }
390
391 pub(super) fn watermark_columns(self) -> &'a WatermarkColumns {
392 dispatch_plan_base!(self, [Stream], StreamPlanRef::watermark_columns)
393 }
394
395 pub(super) fn columns_monotonicity(self) -> &'a MonotonicityMap {
396 dispatch_plan_base!(self, [Stream], StreamPlanRef::columns_monotonicity)
397 }
398
399 pub(super) fn order(self) -> &'a Order {
400 dispatch_plan_base!(self, [Batch], BatchPlanRef::order)
401 }
402}
403
404impl GenericPlanRef for PlanBaseRef<'_> {
405 fn id(&self) -> PlanNodeId {
406 dispatch_plan_base!(self, [Logical, Stream, Batch], GenericPlanRef::id)
407 }
408
409 fn schema(&self) -> &Schema {
410 (*self).schema()
411 }
412
413 fn stream_key(&self) -> Option<&[usize]> {
414 (*self).stream_key()
415 }
416
417 fn functional_dependency(&self) -> &FunctionalDependencySet {
418 (*self).functional_dependency()
419 }
420
421 fn ctx(&self) -> OptimizerContextRef {
422 dispatch_plan_base!(self, [Logical, Stream, Batch], GenericPlanRef::ctx)
423 }
424}
425
426impl PhysicalPlanRef for PlanBaseRef<'_> {
427 fn distribution(&self) -> &Distribution {
428 (*self).distribution()
429 }
430}
431
432impl StreamPlanRef for PlanBaseRef<'_> {
433 fn append_only(&self) -> bool {
434 dispatch_plan_base!(self, [Stream], StreamPlanRef::append_only)
435 }
436
437 fn emit_on_window_close(&self) -> bool {
438 dispatch_plan_base!(self, [Stream], StreamPlanRef::emit_on_window_close)
439 }
440
441 fn watermark_columns(&self) -> &WatermarkColumns {
442 (*self).watermark_columns()
443 }
444
445 fn columns_monotonicity(&self) -> &MonotonicityMap {
446 (*self).columns_monotonicity()
447 }
448}
449
450impl BatchPlanRef for PlanBaseRef<'_> {
451 fn order(&self) -> &Order {
452 (*self).order()
453 }
454}