risingwave_frontend/optimizer/plan_node/
plan_base.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use educe::Educe;
16
17use super::generic::GenericPlanNode;
18use super::*;
19use crate::optimizer::property::{Distribution, WatermarkColumns};
20
21/// No extra fields for logical plan nodes.
22#[derive(Clone, Debug, PartialEq, Eq, Hash)]
23pub struct NoExtra;
24
25// Make them public types in a private module to allow using them as public trait bounds,
26// while still keeping them private to the super module.
27mod physical_common {
28    use super::*;
29
30    /// Common extra fields for physical plan nodes.
31    #[derive(Clone, Debug, PartialEq, Eq, Hash)]
32    pub struct PhysicalCommonExtra {
33        /// The distribution property of the `PlanNode`'s output, store an `Distribution::any()` here
34        /// will not affect correctness, but insert unnecessary exchange in plan
35        pub dist: Distribution,
36    }
37
38    /// A helper trait to reuse code for accessing the common physical fields of batch and stream
39    /// plan bases.
40    pub trait GetPhysicalCommon {
41        fn physical(&self) -> &PhysicalCommonExtra;
42        fn physical_mut(&mut self) -> &mut PhysicalCommonExtra;
43    }
44}
45
46use physical_common::*;
47
48/// Extra fields for stream plan nodes.
49#[derive(Clone, Debug, PartialEq, Eq, Hash)]
50pub struct StreamExtra {
51    /// Common fields for physical plan nodes.
52    physical: PhysicalCommonExtra,
53
54    /// The append-only property of the `PlanNode`'s output is a stream-only property. Append-only
55    /// means the stream contains only insert operation.
56    append_only: bool,
57    /// Whether the output is emitted on window close.
58    emit_on_window_close: bool,
59    /// The watermark column indices of the `PlanNode`'s output. There could be watermark output from
60    /// this stream operator.
61    watermark_columns: WatermarkColumns,
62    /// The monotonicity of columns in the output.
63    columns_monotonicity: MonotonicityMap,
64}
65
66impl GetPhysicalCommon for StreamExtra {
67    fn physical(&self) -> &PhysicalCommonExtra {
68        &self.physical
69    }
70
71    fn physical_mut(&mut self) -> &mut PhysicalCommonExtra {
72        &mut self.physical
73    }
74}
75
76/// Extra fields for batch plan nodes.
77#[derive(Clone, Debug, PartialEq, Eq, Hash)]
78pub struct BatchExtra {
79    /// Common fields for physical plan nodes.
80    physical: PhysicalCommonExtra,
81
82    /// The order property of the `PlanNode`'s output, store an `&Order::any()` here will not affect
83    /// correctness, but insert unnecessary sort in plan
84    order: Order,
85}
86
87impl GetPhysicalCommon for BatchExtra {
88    fn physical(&self) -> &PhysicalCommonExtra {
89        &self.physical
90    }
91
92    fn physical_mut(&mut self) -> &mut PhysicalCommonExtra {
93        &mut self.physical
94    }
95}
96
97/// The common fields of all plan nodes with different conventions.
98///
99/// Please make a field named `base` in every planNode and correctly value
100/// it when construct the planNode.
101///
102/// All fields are intentionally made private and immutable, as they should
103/// normally be the same as the given [`GenericPlanNode`] when constructing.
104///
105/// - To access them, use traits including [`GenericPlanRef`],
106///   [`PhysicalPlanRef`], [`StreamPlanRef`] and [`BatchPlanRef`] with
107///   compile-time checks.
108/// - To mutate them, use methods like `new_*` or `clone_with_*`.
109#[derive(Educe)]
110#[educe(PartialEq, Eq, Hash, Clone, Debug)]
111pub struct PlanBase<C: ConventionMarker> {
112    // -- common fields --
113    #[educe(PartialEq(ignore), Hash(ignore))]
114    id: PlanNodeId,
115    #[educe(PartialEq(ignore), Hash(ignore))]
116    ctx: OptimizerContextRef,
117
118    schema: Schema,
119    /// the pk indices of the `PlanNode`'s output, a empty stream key vec means there is no stream key
120    // TODO: this is actually a logical and stream only property.
121    // - For logical nodes, this is `None` in most time expect for the phase after `logical_rewrite_for_stream`.
122    // - For stream nodes, this is always `Some`.
123    stream_key: Option<Vec<usize>>,
124    functional_dependency: FunctionalDependencySet,
125
126    /// Extra fields for different conventions.
127    extra: C::Extra,
128}
129
130impl<C: ConventionMarker> generic::GenericPlanRef for PlanBase<C> {
131    fn id(&self) -> PlanNodeId {
132        self.id
133    }
134
135    fn schema(&self) -> &Schema {
136        &self.schema
137    }
138
139    fn stream_key(&self) -> Option<&[usize]> {
140        self.stream_key.as_deref()
141    }
142
143    fn ctx(&self) -> OptimizerContextRef {
144        self.ctx.clone()
145    }
146
147    fn functional_dependency(&self) -> &FunctionalDependencySet {
148        &self.functional_dependency
149    }
150}
151
152impl<C: ConventionMarker> generic::PhysicalPlanRef for PlanBase<C>
153where
154    C::Extra: GetPhysicalCommon,
155{
156    fn distribution(&self) -> &Distribution {
157        &self.extra.physical().dist
158    }
159}
160
161impl stream::StreamPlanRef for PlanBase<Stream> {
162    fn append_only(&self) -> bool {
163        self.extra.append_only
164    }
165
166    fn emit_on_window_close(&self) -> bool {
167        self.extra.emit_on_window_close
168    }
169
170    fn watermark_columns(&self) -> &WatermarkColumns {
171        &self.extra.watermark_columns
172    }
173
174    fn columns_monotonicity(&self) -> &MonotonicityMap {
175        &self.extra.columns_monotonicity
176    }
177}
178
179impl batch::BatchPlanRef for PlanBase<Batch> {
180    fn order(&self) -> &Order {
181        &self.extra.order
182    }
183}
184
185impl<C: ConventionMarker> PlanBase<C> {
186    pub fn clone_with_new_plan_id(&self) -> Self {
187        let mut new = self.clone();
188        new.id = self.ctx().next_plan_node_id();
189        new
190    }
191}
192
193impl PlanBase<Logical> {
194    pub fn new_logical(
195        ctx: OptimizerContextRef,
196        schema: Schema,
197        stream_key: Option<Vec<usize>>,
198        functional_dependency: FunctionalDependencySet,
199    ) -> Self {
200        let id = ctx.next_plan_node_id();
201        Self {
202            id,
203            ctx,
204            schema,
205            stream_key,
206            functional_dependency,
207            extra: NoExtra,
208        }
209    }
210
211    pub fn new_logical_with_core(core: &impl GenericPlanNode) -> Self {
212        Self::new_logical(
213            core.ctx(),
214            core.schema(),
215            core.stream_key(),
216            core.functional_dependency(),
217        )
218    }
219}
220
221impl PlanBase<Stream> {
222    pub fn new_stream(
223        ctx: OptimizerContextRef,
224        schema: Schema,
225        stream_key: Option<Vec<usize>>,
226        functional_dependency: FunctionalDependencySet,
227        dist: Distribution,
228        append_only: bool,
229        emit_on_window_close: bool,
230        watermark_columns: WatermarkColumns,
231        columns_monotonicity: MonotonicityMap,
232    ) -> Self {
233        let id = ctx.next_plan_node_id();
234        Self {
235            id,
236            ctx,
237            schema,
238            stream_key,
239            functional_dependency,
240            extra: StreamExtra {
241                physical: PhysicalCommonExtra { dist },
242                append_only,
243                emit_on_window_close,
244                watermark_columns,
245                columns_monotonicity,
246            },
247        }
248    }
249
250    pub fn new_stream_with_core(
251        core: &impl GenericPlanNode,
252        dist: Distribution,
253        append_only: bool,
254        emit_on_window_close: bool,
255        watermark_columns: WatermarkColumns,
256        columns_monotonicity: MonotonicityMap,
257    ) -> Self {
258        Self::new_stream(
259            core.ctx(),
260            core.schema(),
261            core.stream_key(),
262            core.functional_dependency(),
263            dist,
264            append_only,
265            emit_on_window_close,
266            watermark_columns,
267            columns_monotonicity,
268        )
269    }
270}
271
272impl PlanBase<Batch> {
273    pub fn new_batch(
274        ctx: OptimizerContextRef,
275        schema: Schema,
276        dist: Distribution,
277        order: Order,
278    ) -> Self {
279        let id = ctx.next_plan_node_id();
280        let functional_dependency = FunctionalDependencySet::new(schema.len());
281        Self {
282            id,
283            ctx,
284            schema,
285            stream_key: None,
286            functional_dependency,
287            extra: BatchExtra {
288                physical: PhysicalCommonExtra { dist },
289                order,
290            },
291        }
292    }
293
294    pub fn new_batch_with_core(
295        core: &impl GenericPlanNode,
296        dist: Distribution,
297        order: Order,
298    ) -> Self {
299        Self::new_batch(core.ctx(), core.schema(), dist, order)
300    }
301}
302
303impl<C: ConventionMarker> PlanBase<C>
304where
305    C::Extra: GetPhysicalCommon,
306{
307    /// Clone the plan node with a new distribution.
308    ///
309    /// Panics if the plan node is not physical.
310    pub fn clone_with_new_distribution(&self, dist: Distribution) -> Self {
311        let mut new = self.clone();
312        new.extra.physical_mut().dist = dist;
313        new
314    }
315}
316
317// Mutators for testing only.
318#[cfg(test)]
319impl<C: ConventionMarker> PlanBase<C> {
320    pub fn functional_dependency_mut(&mut self) -> &mut FunctionalDependencySet {
321        &mut self.functional_dependency
322    }
323}
324
325/// Reference to [`PlanBase`] with erased conventions.
326///
327/// Used for accessing fields on a type-erased plan node. All traits of [`GenericPlanRef`],
328/// [`PhysicalPlanRef`], [`StreamPlanRef`] and [`BatchPlanRef`] are implemented for this type,
329/// so runtime checks are required when calling methods on it.
330#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, enum_as_inner::EnumAsInner)]
331pub enum PlanBaseRef<'a> {
332    Logical(&'a PlanBase<Logical>),
333    Stream(&'a PlanBase<Stream>),
334    Batch(&'a PlanBase<Batch>),
335}
336
337impl PlanBaseRef<'_> {
338    /// Get the convention of this plan base.
339    pub fn convention(self) -> Convention {
340        match self {
341            PlanBaseRef::Logical(_) => Convention::Logical,
342            PlanBaseRef::Stream(_) => Convention::Stream,
343            PlanBaseRef::Batch(_) => Convention::Batch,
344        }
345    }
346}
347
348/// Dispatch a method call to the corresponding plan base type.
349macro_rules! dispatch_plan_base {
350    ($self:ident, [$($convention:ident),+ $(,)?], $method:expr) => {
351        match $self {
352            $(
353                PlanBaseRef::$convention(plan) => $method(plan),
354            )+
355
356            #[allow(unreachable_patterns)]
357            _ => unreachable!("calling `{}` on a plan node of `{:?}`", stringify!($method), $self.convention()),
358        }
359    }
360}
361
362/// Workaround for getters returning references.
363///
364/// For example, callers writing `GenericPlanRef::schema(&foo.plan_base())` will lead to a
365/// borrow checker error, as it borrows [`PlanBaseRef`] again, which is already a reference.
366///
367/// As a workaround, we directly let the getters below take the ownership of [`PlanBaseRef`],
368/// which is `Copy`. When callers write `foo.plan_base().schema()`, the compiler will prefer
369/// these ones over the ones defined in traits like [`GenericPlanRef`].
370impl<'a> PlanBaseRef<'a> {
371    pub(super) fn schema(self) -> &'a Schema {
372        dispatch_plan_base!(self, [Logical, Stream, Batch], GenericPlanRef::schema)
373    }
374
375    pub(super) fn stream_key(self) -> Option<&'a [usize]> {
376        dispatch_plan_base!(self, [Logical, Stream, Batch], GenericPlanRef::stream_key)
377    }
378
379    pub(super) fn functional_dependency(self) -> &'a FunctionalDependencySet {
380        dispatch_plan_base!(
381            self,
382            [Logical, Stream, Batch],
383            GenericPlanRef::functional_dependency
384        )
385    }
386
387    pub(super) fn distribution(self) -> &'a Distribution {
388        dispatch_plan_base!(self, [Stream, Batch], PhysicalPlanRef::distribution)
389    }
390
391    pub(super) fn watermark_columns(self) -> &'a WatermarkColumns {
392        dispatch_plan_base!(self, [Stream], StreamPlanRef::watermark_columns)
393    }
394
395    pub(super) fn columns_monotonicity(self) -> &'a MonotonicityMap {
396        dispatch_plan_base!(self, [Stream], StreamPlanRef::columns_monotonicity)
397    }
398
399    pub(super) fn order(self) -> &'a Order {
400        dispatch_plan_base!(self, [Batch], BatchPlanRef::order)
401    }
402}
403
404impl GenericPlanRef for PlanBaseRef<'_> {
405    fn id(&self) -> PlanNodeId {
406        dispatch_plan_base!(self, [Logical, Stream, Batch], GenericPlanRef::id)
407    }
408
409    fn schema(&self) -> &Schema {
410        (*self).schema()
411    }
412
413    fn stream_key(&self) -> Option<&[usize]> {
414        (*self).stream_key()
415    }
416
417    fn functional_dependency(&self) -> &FunctionalDependencySet {
418        (*self).functional_dependency()
419    }
420
421    fn ctx(&self) -> OptimizerContextRef {
422        dispatch_plan_base!(self, [Logical, Stream, Batch], GenericPlanRef::ctx)
423    }
424}
425
426impl PhysicalPlanRef for PlanBaseRef<'_> {
427    fn distribution(&self) -> &Distribution {
428        (*self).distribution()
429    }
430}
431
432impl StreamPlanRef for PlanBaseRef<'_> {
433    fn append_only(&self) -> bool {
434        dispatch_plan_base!(self, [Stream], StreamPlanRef::append_only)
435    }
436
437    fn emit_on_window_close(&self) -> bool {
438        dispatch_plan_base!(self, [Stream], StreamPlanRef::emit_on_window_close)
439    }
440
441    fn watermark_columns(&self) -> &WatermarkColumns {
442        (*self).watermark_columns()
443    }
444
445    fn columns_monotonicity(&self) -> &MonotonicityMap {
446        (*self).columns_monotonicity()
447    }
448}
449
450impl BatchPlanRef for PlanBaseRef<'_> {
451    fn order(&self) -> &Order {
452        (*self).order()
453    }
454}