risingwave_frontend/optimizer/plan_node/
plan_base.rs1use educe::Educe;
16
17use super::generic::GenericPlanNode;
18use super::*;
19use crate::optimizer::property::{Distribution, StreamKind, WatermarkColumns};
20
21#[derive(Clone, Debug, PartialEq, Eq, Hash)]
23pub struct NoExtra;
24
25mod physical_common {
28 use super::*;
29
30 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
32 pub struct PhysicalCommonExtra {
33 pub dist: Distribution,
36 }
37
38 pub trait GetPhysicalCommon {
41 fn physical(&self) -> &PhysicalCommonExtra;
42 fn physical_mut(&mut self) -> &mut PhysicalCommonExtra;
43 }
44}
45
46use physical_common::*;
47
48#[derive(Clone, Debug, PartialEq, Eq, Hash)]
50pub struct StreamExtra {
51 physical: PhysicalCommonExtra,
53
54 stream_kind: StreamKind,
56
57 emit_on_window_close: bool,
59 watermark_columns: WatermarkColumns,
62 columns_monotonicity: MonotonicityMap,
64}
65
66impl GetPhysicalCommon for StreamExtra {
67 fn physical(&self) -> &PhysicalCommonExtra {
68 &self.physical
69 }
70
71 fn physical_mut(&mut self) -> &mut PhysicalCommonExtra {
72 &mut self.physical
73 }
74}
75
76#[derive(Clone, Debug, PartialEq, Eq, Hash)]
78pub struct BatchExtra {
79 physical: PhysicalCommonExtra,
81
82 order: Order,
85}
86
87impl GetPhysicalCommon for BatchExtra {
88 fn physical(&self) -> &PhysicalCommonExtra {
89 &self.physical
90 }
91
92 fn physical_mut(&mut self) -> &mut PhysicalCommonExtra {
93 &mut self.physical
94 }
95}
96
97#[derive(Educe)]
110#[educe(PartialEq, Eq, Hash, Clone, Debug)]
111pub struct PlanBase<C: ConventionMarker> {
112 #[educe(PartialEq(ignore), Hash(ignore))]
114 id: PlanNodeId,
115 #[educe(PartialEq(ignore), Hash(ignore))]
116 ctx: OptimizerContextRef,
117
118 schema: Schema,
119 stream_key: Option<Vec<usize>>,
124 functional_dependency: FunctionalDependencySet,
125
126 extra: C::Extra,
128}
129
130impl<C: ConventionMarker> generic::GenericPlanRef for PlanBase<C> {
131 fn id(&self) -> PlanNodeId {
132 self.id
133 }
134
135 fn schema(&self) -> &Schema {
136 &self.schema
137 }
138
139 fn stream_key(&self) -> Option<&[usize]> {
140 self.stream_key.as_deref()
141 }
142
143 fn ctx(&self) -> OptimizerContextRef {
144 self.ctx.clone()
145 }
146
147 fn functional_dependency(&self) -> &FunctionalDependencySet {
148 &self.functional_dependency
149 }
150}
151
152impl<C: ConventionMarker> generic::PhysicalPlanRef for PlanBase<C>
153where
154 C::Extra: GetPhysicalCommon,
155{
156 fn distribution(&self) -> &Distribution {
157 &self.extra.physical().dist
158 }
159}
160
161impl stream::StreamPlanNodeMetadata for PlanBase<Stream> {
162 fn stream_kind(&self) -> StreamKind {
163 self.extra.stream_kind
164 }
165
166 fn emit_on_window_close(&self) -> bool {
167 self.extra.emit_on_window_close
168 }
169
170 fn watermark_columns(&self) -> &WatermarkColumns {
171 &self.extra.watermark_columns
172 }
173
174 fn columns_monotonicity(&self) -> &MonotonicityMap {
175 &self.extra.columns_monotonicity
176 }
177}
178
179impl batch::BatchPlanNodeMetadata for PlanBase<Batch> {
180 fn order(&self) -> &Order {
181 &self.extra.order
182 }
183}
184
185impl<C: ConventionMarker> PlanBase<C> {
186 pub fn clone_with_new_plan_id(&self) -> Self {
187 let mut new = self.clone();
188 new.id = self.ctx().next_plan_node_id();
189 new
190 }
191}
192
193impl PlanBase<Logical> {
194 pub fn new_logical(
195 ctx: OptimizerContextRef,
196 schema: Schema,
197 stream_key: Option<Vec<usize>>,
198 functional_dependency: FunctionalDependencySet,
199 ) -> Self {
200 let id = ctx.next_plan_node_id();
201 Self {
202 id,
203 ctx,
204 schema,
205 stream_key,
206 functional_dependency,
207 extra: NoExtra,
208 }
209 }
210
211 pub fn new_logical_with_core(core: &impl GenericPlanNode) -> Self {
212 Self::new_logical(
213 core.ctx(),
214 core.schema(),
215 core.stream_key(),
216 core.functional_dependency(),
217 )
218 }
219}
220
221impl PlanBase<Stream> {
222 pub fn new_stream(
223 ctx: OptimizerContextRef,
224 schema: Schema,
225 stream_key: Option<Vec<usize>>,
226 functional_dependency: FunctionalDependencySet,
227 dist: Distribution,
228 stream_kind: StreamKind,
229 emit_on_window_close: bool,
230 watermark_columns: WatermarkColumns,
231 columns_monotonicity: MonotonicityMap,
232 ) -> Self {
233 let id = ctx.next_plan_node_id();
234 Self {
235 id,
236 ctx,
237 schema,
238 stream_key,
239 functional_dependency,
240 extra: StreamExtra {
241 physical: PhysicalCommonExtra { dist },
242 stream_kind,
243 emit_on_window_close,
244 watermark_columns,
245 columns_monotonicity,
246 },
247 }
248 }
249
250 pub fn new_stream_with_core(
251 core: &impl GenericPlanNode,
252 dist: Distribution,
253 stream_kind: StreamKind,
254 emit_on_window_close: bool,
255 watermark_columns: WatermarkColumns,
256 columns_monotonicity: MonotonicityMap,
257 ) -> Self {
258 Self::new_stream(
259 core.ctx(),
260 core.schema(),
261 core.stream_key(),
262 core.functional_dependency(),
263 dist,
264 stream_kind,
265 emit_on_window_close,
266 watermark_columns,
267 columns_monotonicity,
268 )
269 }
270}
271
272impl PlanBase<Batch> {
273 pub fn new_batch(
274 ctx: OptimizerContextRef,
275 schema: Schema,
276 dist: Distribution,
277 order: Order,
278 ) -> Self {
279 let id = ctx.next_plan_node_id();
280 let functional_dependency = FunctionalDependencySet::new(schema.len());
281 Self {
282 id,
283 ctx,
284 schema,
285 stream_key: None,
286 functional_dependency,
287 extra: BatchExtra {
288 physical: PhysicalCommonExtra { dist },
289 order,
290 },
291 }
292 }
293
294 pub fn new_batch_with_core(
295 core: &impl GenericPlanNode,
296 dist: Distribution,
297 order: Order,
298 ) -> Self {
299 Self::new_batch(core.ctx(), core.schema(), dist, order)
300 }
301}
302
303impl<C: ConventionMarker> PlanBase<C>
304where
305 C::Extra: GetPhysicalCommon,
306{
307 pub fn clone_with_new_distribution(&self, dist: Distribution) -> Self {
311 let mut new = self.clone();
312 new.extra.physical_mut().dist = dist;
313 new
314 }
315}
316
317#[cfg(test)]
319impl<C: ConventionMarker> PlanBase<C> {
320 pub fn functional_dependency_mut(&mut self) -> &mut FunctionalDependencySet {
321 &mut self.functional_dependency
322 }
323}