risingwave_frontend/optimizer/plan_node/
plan_base.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use educe::Educe;
16
17use super::generic::GenericPlanNode;
18use super::*;
19use crate::optimizer::property::{Distribution, StreamKind, WatermarkColumns};
20
21/// No extra fields for logical plan nodes.
22#[derive(Clone, Debug, PartialEq, Eq, Hash)]
23pub struct NoExtra;
24
25// Make them public types in a private module to allow using them as public trait bounds,
26// while still keeping them private to the super module.
27mod physical_common {
28    use super::*;
29
30    /// Common extra fields for physical plan nodes.
31    #[derive(Clone, Debug, PartialEq, Eq, Hash)]
32    pub struct PhysicalCommonExtra {
33        /// The distribution property of the `PlanNode`'s output, store an `Distribution::any()` here
34        /// will not affect correctness, but insert unnecessary exchange in plan
35        pub dist: Distribution,
36    }
37
38    /// A helper trait to reuse code for accessing the common physical fields of batch and stream
39    /// plan bases.
40    pub trait GetPhysicalCommon {
41        fn physical(&self) -> &PhysicalCommonExtra;
42        fn physical_mut(&mut self) -> &mut PhysicalCommonExtra;
43    }
44}
45
46use physical_common::*;
47
48/// Extra fields for stream plan nodes.
49#[derive(Clone, Debug, PartialEq, Eq, Hash)]
50pub struct StreamExtra {
51    /// Common fields for physical plan nodes.
52    physical: PhysicalCommonExtra,
53
54    /// Whether the `PlanNode`'s output is append-only, retract, or upsert.
55    stream_kind: StreamKind,
56
57    /// Whether the output is emitted on window close.
58    emit_on_window_close: bool,
59    /// The watermark column indices of the `PlanNode`'s output. There could be watermark output from
60    /// this stream operator.
61    watermark_columns: WatermarkColumns,
62    /// The monotonicity of columns in the output.
63    columns_monotonicity: MonotonicityMap,
64}
65
66impl GetPhysicalCommon for StreamExtra {
67    fn physical(&self) -> &PhysicalCommonExtra {
68        &self.physical
69    }
70
71    fn physical_mut(&mut self) -> &mut PhysicalCommonExtra {
72        &mut self.physical
73    }
74}
75
76/// Extra fields for batch plan nodes.
77#[derive(Clone, Debug, PartialEq, Eq, Hash)]
78pub struct BatchExtra {
79    /// Common fields for physical plan nodes.
80    physical: PhysicalCommonExtra,
81
82    /// The order property of the `PlanNode`'s output, store an `&Order::any()` here will not affect
83    /// correctness, but insert unnecessary sort in plan
84    order: Order,
85}
86
87impl GetPhysicalCommon for BatchExtra {
88    fn physical(&self) -> &PhysicalCommonExtra {
89        &self.physical
90    }
91
92    fn physical_mut(&mut self) -> &mut PhysicalCommonExtra {
93        &mut self.physical
94    }
95}
96
97/// The common fields of all plan nodes with different conventions.
98///
99/// Please make a field named `base` in every planNode and correctly value
100/// it when construct the planNode.
101///
102/// All fields are intentionally made private and immutable, as they should
103/// normally be the same as the given [`GenericPlanNode`] when constructing.
104///
105/// - To access them, use traits including [`GenericPlanRef`],
106///   [`PhysicalPlanRef`], [`StreamPlanNodeMetadata`] and [`BatchPlanNodeMetadata`] with
107///   compile-time checks.
108/// - To mutate them, use methods like `new_*` or `clone_with_*`.
109#[derive(Educe)]
110#[educe(PartialEq, Eq, Hash, Clone, Debug)]
111pub struct PlanBase<C: ConventionMarker> {
112    // -- common fields --
113    #[educe(PartialEq(ignore), Hash(ignore))]
114    id: PlanNodeId,
115    #[educe(PartialEq(ignore), Hash(ignore))]
116    ctx: OptimizerContextRef,
117
118    schema: Schema,
119    /// the pk indices of the `PlanNode`'s output, a empty stream key vec means there is no stream key
120    // TODO: this is actually a logical and stream only property.
121    // - For logical nodes, this is `None` in most time expect for the phase after `logical_rewrite_for_stream`.
122    // - For stream nodes, this is always `Some`.
123    stream_key: Option<Vec<usize>>,
124    functional_dependency: FunctionalDependencySet,
125
126    /// Extra fields for different conventions.
127    extra: C::Extra,
128}
129
130impl<C: ConventionMarker> generic::GenericPlanRef for PlanBase<C> {
131    fn id(&self) -> PlanNodeId {
132        self.id
133    }
134
135    fn schema(&self) -> &Schema {
136        &self.schema
137    }
138
139    fn stream_key(&self) -> Option<&[usize]> {
140        self.stream_key.as_deref()
141    }
142
143    fn ctx(&self) -> OptimizerContextRef {
144        self.ctx.clone()
145    }
146
147    fn functional_dependency(&self) -> &FunctionalDependencySet {
148        &self.functional_dependency
149    }
150}
151
152impl<C: ConventionMarker> generic::PhysicalPlanRef for PlanBase<C>
153where
154    C::Extra: GetPhysicalCommon,
155{
156    fn distribution(&self) -> &Distribution {
157        &self.extra.physical().dist
158    }
159}
160
161impl stream::StreamPlanNodeMetadata for PlanBase<Stream> {
162    fn stream_kind(&self) -> StreamKind {
163        self.extra.stream_kind
164    }
165
166    fn emit_on_window_close(&self) -> bool {
167        self.extra.emit_on_window_close
168    }
169
170    fn watermark_columns(&self) -> &WatermarkColumns {
171        &self.extra.watermark_columns
172    }
173
174    fn columns_monotonicity(&self) -> &MonotonicityMap {
175        &self.extra.columns_monotonicity
176    }
177}
178
179impl batch::BatchPlanNodeMetadata for PlanBase<Batch> {
180    fn order(&self) -> &Order {
181        &self.extra.order
182    }
183}
184
185impl<C: ConventionMarker> PlanBase<C> {
186    pub fn clone_with_new_plan_id(&self) -> Self {
187        let mut new = self.clone();
188        new.id = self.ctx().next_plan_node_id();
189        new
190    }
191}
192
193impl PlanBase<Logical> {
194    pub fn new_logical(
195        ctx: OptimizerContextRef,
196        schema: Schema,
197        stream_key: Option<Vec<usize>>,
198        functional_dependency: FunctionalDependencySet,
199    ) -> Self {
200        let id = ctx.next_plan_node_id();
201        Self {
202            id,
203            ctx,
204            schema,
205            stream_key,
206            functional_dependency,
207            extra: NoExtra,
208        }
209    }
210
211    pub fn new_logical_with_core(core: &impl GenericPlanNode) -> Self {
212        Self::new_logical(
213            core.ctx(),
214            core.schema(),
215            core.stream_key(),
216            core.functional_dependency(),
217        )
218    }
219}
220
221impl PlanBase<Stream> {
222    pub fn new_stream(
223        ctx: OptimizerContextRef,
224        schema: Schema,
225        stream_key: Option<Vec<usize>>,
226        functional_dependency: FunctionalDependencySet,
227        dist: Distribution,
228        stream_kind: StreamKind,
229        emit_on_window_close: bool,
230        watermark_columns: WatermarkColumns,
231        columns_monotonicity: MonotonicityMap,
232    ) -> Self {
233        let id = ctx.next_plan_node_id();
234        Self {
235            id,
236            ctx,
237            schema,
238            stream_key,
239            functional_dependency,
240            extra: StreamExtra {
241                physical: PhysicalCommonExtra { dist },
242                stream_kind,
243                emit_on_window_close,
244                watermark_columns,
245                columns_monotonicity,
246            },
247        }
248    }
249
250    pub fn new_stream_with_core(
251        core: &impl GenericPlanNode,
252        dist: Distribution,
253        stream_kind: StreamKind,
254        emit_on_window_close: bool,
255        watermark_columns: WatermarkColumns,
256        columns_monotonicity: MonotonicityMap,
257    ) -> Self {
258        Self::new_stream(
259            core.ctx(),
260            core.schema(),
261            core.stream_key(),
262            core.functional_dependency(),
263            dist,
264            stream_kind,
265            emit_on_window_close,
266            watermark_columns,
267            columns_monotonicity,
268        )
269    }
270}
271
272impl PlanBase<Batch> {
273    pub fn new_batch(
274        ctx: OptimizerContextRef,
275        schema: Schema,
276        dist: Distribution,
277        order: Order,
278    ) -> Self {
279        let id = ctx.next_plan_node_id();
280        let functional_dependency = FunctionalDependencySet::new(schema.len());
281        Self {
282            id,
283            ctx,
284            schema,
285            stream_key: None,
286            functional_dependency,
287            extra: BatchExtra {
288                physical: PhysicalCommonExtra { dist },
289                order,
290            },
291        }
292    }
293
294    pub fn new_batch_with_core(
295        core: &impl GenericPlanNode,
296        dist: Distribution,
297        order: Order,
298    ) -> Self {
299        Self::new_batch(core.ctx(), core.schema(), dist, order)
300    }
301}
302
303impl<C: ConventionMarker> PlanBase<C>
304where
305    C::Extra: GetPhysicalCommon,
306{
307    /// Clone the plan node with a new distribution.
308    ///
309    /// Panics if the plan node is not physical.
310    pub fn clone_with_new_distribution(&self, dist: Distribution) -> Self {
311        let mut new = self.clone();
312        new.extra.physical_mut().dist = dist;
313        new
314    }
315}
316
317// Mutators for testing only.
318#[cfg(test)]
319impl<C: ConventionMarker> PlanBase<C> {
320    pub fn functional_dependency_mut(&mut self) -> &mut FunctionalDependencySet {
321        &mut self.functional_dependency
322    }
323}