risingwave_frontend/optimizer/plan_node/
plan_base.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use educe::Educe;
16
17use super::generic::GenericPlanNode;
18use super::*;
19use crate::optimizer::property::{Distribution, StreamKind, WatermarkColumns};
20
21/// No extra fields for logical plan nodes.
22#[derive(Clone, Debug, PartialEq, Eq, Hash)]
23pub struct NoExtra;
24
25// Make them public types in a private module to allow using them as public trait bounds,
26// while still keeping them private to the super module.
27mod physical_common {
28    use super::*;
29
30    /// Common extra fields for physical plan nodes.
31    #[derive(Clone, Debug, PartialEq, Eq, Hash)]
32    pub struct PhysicalCommonExtra {
33        /// The distribution property of the `PlanNode`'s output, store an `Distribution::any()` here
34        /// will not affect correctness, but insert unnecessary exchange in plan
35        pub dist: Distribution,
36    }
37
38    /// A helper trait to reuse code for accessing the common physical fields of batch and stream
39    /// plan bases.
40    pub trait GetPhysicalCommon {
41        fn physical(&self) -> &PhysicalCommonExtra;
42        fn physical_mut(&mut self) -> &mut PhysicalCommonExtra;
43    }
44}
45
46use physical_common::*;
47
48/// Extra fields for stream plan nodes.
49#[derive(Clone, Debug, PartialEq, Eq, Hash)]
50pub struct StreamExtra {
51    /// Common fields for physical plan nodes.
52    physical: PhysicalCommonExtra,
53
54    /// Whether the `PlanNode`'s output is append-only, retract, or upsert.
55    stream_kind: StreamKind,
56
57    /// Whether the output is emitted on window close.
58    emit_on_window_close: bool,
59    /// The watermark column indices of the `PlanNode`'s output. There could be watermark output from
60    /// this stream operator.
61    watermark_columns: WatermarkColumns,
62    /// The monotonicity of columns in the output.
63    columns_monotonicity: MonotonicityMap,
64}
65
66impl GetPhysicalCommon for StreamExtra {
67    fn physical(&self) -> &PhysicalCommonExtra {
68        &self.physical
69    }
70
71    fn physical_mut(&mut self) -> &mut PhysicalCommonExtra {
72        &mut self.physical
73    }
74}
75
76/// Extra fields for batch plan nodes.
77#[derive(Clone, Debug, PartialEq, Eq, Hash)]
78pub struct BatchExtra {
79    /// Common fields for physical plan nodes.
80    physical: PhysicalCommonExtra,
81
82    /// Equivalent order properties of the `PlanNode`'s output.
83    ///
84    /// The first item is canonical and returned by `order()`. Additional items represent
85    /// equivalent orders that can also satisfy required order checks.
86    ///
87    /// Example (`BatchSeqScan`):
88    /// - Base table order: `(a, b, c, d)`.
89    /// - Scan range fixes `a = const` and `b = const` (`eq_prefix_len = 2`).
90    /// - The scan can provide these equivalent orders:
91    ///   `(a, b, c, d)`, `(b, c, d)`, `(c, d)`.
92    ///
93    /// This lets optimization rules avoid unnecessary `BatchSort` when required order is a suffix
94    /// after fixed equality prefixes.
95    orders: Vec<Order>,
96}
97
98impl GetPhysicalCommon for BatchExtra {
99    fn physical(&self) -> &PhysicalCommonExtra {
100        &self.physical
101    }
102
103    fn physical_mut(&mut self) -> &mut PhysicalCommonExtra {
104        &mut self.physical
105    }
106}
107
108/// The common fields of all plan nodes with different conventions.
109///
110/// Please make a field named `base` in every planNode and correctly value
111/// it when construct the planNode.
112///
113/// All fields are intentionally made private and immutable, as they should
114/// normally be the same as the given [`GenericPlanNode`] when constructing.
115///
116/// - To access them, use traits including [`GenericPlanRef`],
117///   [`PhysicalPlanRef`], [`StreamPlanNodeMetadata`] and [`BatchPlanNodeMetadata`] with
118///   compile-time checks.
119/// - To mutate them, use methods like `new_*` or `clone_with_*`.
120#[derive(Educe)]
121#[educe(PartialEq, Eq, Hash, Clone, Debug)]
122pub struct PlanBase<C: ConventionMarker> {
123    // -- common fields --
124    #[educe(PartialEq(ignore), Hash(ignore))]
125    id: PlanNodeId,
126    #[educe(PartialEq(ignore), Hash(ignore))]
127    ctx: OptimizerContextRef,
128
129    schema: Schema,
130    /// the pk indices of the `PlanNode`'s output, a empty stream key vec means there is no stream key
131    // TODO: this is actually a logical and stream only property.
132    // - For logical nodes, this is `None` in most time expect for the phase after `logical_rewrite_for_stream`.
133    // - For stream nodes, this is always `Some`.
134    stream_key: Option<Vec<usize>>,
135    functional_dependency: FunctionalDependencySet,
136
137    /// Extra fields for different conventions.
138    extra: C::Extra,
139}
140
141impl<C: ConventionMarker> generic::GenericPlanRef for PlanBase<C> {
142    fn id(&self) -> PlanNodeId {
143        self.id
144    }
145
146    fn schema(&self) -> &Schema {
147        &self.schema
148    }
149
150    fn stream_key(&self) -> Option<&[usize]> {
151        self.stream_key.as_deref()
152    }
153
154    fn ctx(&self) -> OptimizerContextRef {
155        self.ctx.clone()
156    }
157
158    fn functional_dependency(&self) -> &FunctionalDependencySet {
159        &self.functional_dependency
160    }
161}
162
163impl<C: ConventionMarker> generic::PhysicalPlanRef for PlanBase<C>
164where
165    C::Extra: GetPhysicalCommon,
166{
167    fn distribution(&self) -> &Distribution {
168        &self.extra.physical().dist
169    }
170}
171
172impl stream::StreamPlanNodeMetadata for PlanBase<Stream> {
173    fn stream_kind(&self) -> StreamKind {
174        self.extra.stream_kind
175    }
176
177    fn emit_on_window_close(&self) -> bool {
178        self.extra.emit_on_window_close
179    }
180
181    fn watermark_columns(&self) -> &WatermarkColumns {
182        &self.extra.watermark_columns
183    }
184
185    fn columns_monotonicity(&self) -> &MonotonicityMap {
186        &self.extra.columns_monotonicity
187    }
188}
189
190impl batch::BatchPlanNodeMetadata for PlanBase<Batch> {
191    fn order(&self) -> &Order {
192        self.extra
193            .orders
194            .first()
195            .expect("batch plan node should always have at least one order")
196    }
197
198    fn orders(&self) -> Vec<Order> {
199        self.extra.orders.clone()
200    }
201}
202
203impl<C: ConventionMarker> PlanBase<C> {
204    pub fn clone_with_new_plan_id(&self) -> Self {
205        let mut new = self.clone();
206        new.id = self.ctx().next_plan_node_id();
207        new
208    }
209}
210
211impl PlanBase<Logical> {
212    pub fn new_logical(
213        ctx: OptimizerContextRef,
214        schema: Schema,
215        stream_key: Option<Vec<usize>>,
216        functional_dependency: FunctionalDependencySet,
217    ) -> Self {
218        let id = ctx.next_plan_node_id();
219        Self {
220            id,
221            ctx,
222            schema,
223            stream_key,
224            functional_dependency,
225            extra: NoExtra,
226        }
227    }
228
229    pub fn new_logical_with_core(core: &impl GenericPlanNode) -> Self {
230        Self::new_logical(
231            core.ctx(),
232            core.schema(),
233            core.stream_key(),
234            core.functional_dependency(),
235        )
236    }
237}
238
239impl PlanBase<Stream> {
240    pub fn new_stream(
241        ctx: OptimizerContextRef,
242        schema: Schema,
243        stream_key: Option<Vec<usize>>,
244        functional_dependency: FunctionalDependencySet,
245        dist: Distribution,
246        stream_kind: StreamKind,
247        emit_on_window_close: bool,
248        watermark_columns: WatermarkColumns,
249        columns_monotonicity: MonotonicityMap,
250    ) -> Self {
251        let id = ctx.next_plan_node_id();
252        Self {
253            id,
254            ctx,
255            schema,
256            stream_key,
257            functional_dependency,
258            extra: StreamExtra {
259                physical: PhysicalCommonExtra { dist },
260                stream_kind,
261                emit_on_window_close,
262                watermark_columns,
263                columns_monotonicity,
264            },
265        }
266    }
267
268    pub fn new_stream_with_core(
269        core: &impl GenericPlanNode,
270        dist: Distribution,
271        stream_kind: StreamKind,
272        emit_on_window_close: bool,
273        watermark_columns: WatermarkColumns,
274        columns_monotonicity: MonotonicityMap,
275    ) -> Self {
276        Self::new_stream(
277            core.ctx(),
278            core.schema(),
279            core.stream_key(),
280            core.functional_dependency(),
281            dist,
282            stream_kind,
283            emit_on_window_close,
284            watermark_columns,
285            columns_monotonicity,
286        )
287    }
288}
289
290impl PlanBase<Batch> {
291    pub fn new_batch(
292        ctx: OptimizerContextRef,
293        schema: Schema,
294        dist: Distribution,
295        order: Order,
296    ) -> Self {
297        Self::new_batch_with_orders(ctx, schema, dist, vec![order])
298    }
299
300    pub fn new_batch_with_orders(
301        ctx: OptimizerContextRef,
302        schema: Schema,
303        dist: Distribution,
304        orders: Vec<Order>,
305    ) -> Self {
306        assert!(
307            !orders.is_empty(),
308            "batch plan node should always have at least one order"
309        );
310        let id = ctx.next_plan_node_id();
311        let functional_dependency = FunctionalDependencySet::new(schema.len());
312        Self {
313            id,
314            ctx,
315            schema,
316            stream_key: None,
317            functional_dependency,
318            extra: BatchExtra {
319                physical: PhysicalCommonExtra { dist },
320                orders,
321            },
322        }
323    }
324
325    pub fn new_batch_with_core(
326        core: &impl GenericPlanNode,
327        dist: Distribution,
328        order: Order,
329    ) -> Self {
330        Self::new_batch(core.ctx(), core.schema(), dist, order)
331    }
332
333    pub fn new_batch_with_core_and_orders(
334        core: &impl GenericPlanNode,
335        dist: Distribution,
336        orders: Vec<Order>,
337    ) -> Self {
338        Self::new_batch_with_orders(core.ctx(), core.schema(), dist, orders)
339    }
340}
341
342impl<C: ConventionMarker> PlanBase<C>
343where
344    C::Extra: GetPhysicalCommon,
345{
346    /// Clone the plan node with a new distribution.
347    ///
348    /// Panics if the plan node is not physical.
349    pub fn clone_with_new_distribution(&self, dist: Distribution) -> Self {
350        let mut new = self.clone();
351        new.extra.physical_mut().dist = dist;
352        new
353    }
354}
355
356// Mutators for testing only.
357#[cfg(test)]
358impl<C: ConventionMarker> PlanBase<C> {
359    pub fn functional_dependency_mut(&mut self) -> &mut FunctionalDependencySet {
360        &mut self.functional_dependency
361    }
362}