risingwave_frontend/optimizer/plan_node/
plan_base.rs1use educe::Educe;
16
17use super::generic::GenericPlanNode;
18use super::*;
19use crate::optimizer::property::{Distribution, StreamKind, WatermarkColumns};
20
21#[derive(Clone, Debug, PartialEq, Eq, Hash)]
23pub struct NoExtra;
24
25mod physical_common {
28 use super::*;
29
30 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
32 pub struct PhysicalCommonExtra {
33 pub dist: Distribution,
36 }
37
38 pub trait GetPhysicalCommon {
41 fn physical(&self) -> &PhysicalCommonExtra;
42 fn physical_mut(&mut self) -> &mut PhysicalCommonExtra;
43 }
44}
45
46use physical_common::*;
47
48#[derive(Clone, Debug, PartialEq, Eq, Hash)]
50pub struct StreamExtra {
51 physical: PhysicalCommonExtra,
53
54 stream_kind: StreamKind,
56
57 emit_on_window_close: bool,
59 watermark_columns: WatermarkColumns,
62 columns_monotonicity: MonotonicityMap,
64}
65
66impl GetPhysicalCommon for StreamExtra {
67 fn physical(&self) -> &PhysicalCommonExtra {
68 &self.physical
69 }
70
71 fn physical_mut(&mut self) -> &mut PhysicalCommonExtra {
72 &mut self.physical
73 }
74}
75
76#[derive(Clone, Debug, PartialEq, Eq, Hash)]
78pub struct BatchExtra {
79 physical: PhysicalCommonExtra,
81
82 orders: Vec<Order>,
96}
97
98impl GetPhysicalCommon for BatchExtra {
99 fn physical(&self) -> &PhysicalCommonExtra {
100 &self.physical
101 }
102
103 fn physical_mut(&mut self) -> &mut PhysicalCommonExtra {
104 &mut self.physical
105 }
106}
107
108#[derive(Educe)]
121#[educe(PartialEq, Eq, Hash, Clone, Debug)]
122pub struct PlanBase<C: ConventionMarker> {
123 #[educe(PartialEq(ignore), Hash(ignore))]
125 id: PlanNodeId,
126 #[educe(PartialEq(ignore), Hash(ignore))]
127 ctx: OptimizerContextRef,
128
129 schema: Schema,
130 stream_key: Option<Vec<usize>>,
135 functional_dependency: FunctionalDependencySet,
136
137 extra: C::Extra,
139}
140
141impl<C: ConventionMarker> generic::GenericPlanRef for PlanBase<C> {
142 fn id(&self) -> PlanNodeId {
143 self.id
144 }
145
146 fn schema(&self) -> &Schema {
147 &self.schema
148 }
149
150 fn stream_key(&self) -> Option<&[usize]> {
151 self.stream_key.as_deref()
152 }
153
154 fn ctx(&self) -> OptimizerContextRef {
155 self.ctx.clone()
156 }
157
158 fn functional_dependency(&self) -> &FunctionalDependencySet {
159 &self.functional_dependency
160 }
161}
162
163impl<C: ConventionMarker> generic::PhysicalPlanRef for PlanBase<C>
164where
165 C::Extra: GetPhysicalCommon,
166{
167 fn distribution(&self) -> &Distribution {
168 &self.extra.physical().dist
169 }
170}
171
172impl stream::StreamPlanNodeMetadata for PlanBase<Stream> {
173 fn stream_kind(&self) -> StreamKind {
174 self.extra.stream_kind
175 }
176
177 fn emit_on_window_close(&self) -> bool {
178 self.extra.emit_on_window_close
179 }
180
181 fn watermark_columns(&self) -> &WatermarkColumns {
182 &self.extra.watermark_columns
183 }
184
185 fn columns_monotonicity(&self) -> &MonotonicityMap {
186 &self.extra.columns_monotonicity
187 }
188}
189
190impl batch::BatchPlanNodeMetadata for PlanBase<Batch> {
191 fn order(&self) -> &Order {
192 self.extra
193 .orders
194 .first()
195 .expect("batch plan node should always have at least one order")
196 }
197
198 fn orders(&self) -> Vec<Order> {
199 self.extra.orders.clone()
200 }
201}
202
203impl<C: ConventionMarker> PlanBase<C> {
204 pub fn clone_with_new_plan_id(&self) -> Self {
205 let mut new = self.clone();
206 new.id = self.ctx().next_plan_node_id();
207 new
208 }
209}
210
211impl PlanBase<Logical> {
212 pub fn new_logical(
213 ctx: OptimizerContextRef,
214 schema: Schema,
215 stream_key: Option<Vec<usize>>,
216 functional_dependency: FunctionalDependencySet,
217 ) -> Self {
218 let id = ctx.next_plan_node_id();
219 Self {
220 id,
221 ctx,
222 schema,
223 stream_key,
224 functional_dependency,
225 extra: NoExtra,
226 }
227 }
228
229 pub fn new_logical_with_core(core: &impl GenericPlanNode) -> Self {
230 Self::new_logical(
231 core.ctx(),
232 core.schema(),
233 core.stream_key(),
234 core.functional_dependency(),
235 )
236 }
237}
238
239impl PlanBase<Stream> {
240 pub fn new_stream(
241 ctx: OptimizerContextRef,
242 schema: Schema,
243 stream_key: Option<Vec<usize>>,
244 functional_dependency: FunctionalDependencySet,
245 dist: Distribution,
246 stream_kind: StreamKind,
247 emit_on_window_close: bool,
248 watermark_columns: WatermarkColumns,
249 columns_monotonicity: MonotonicityMap,
250 ) -> Self {
251 let id = ctx.next_plan_node_id();
252 Self {
253 id,
254 ctx,
255 schema,
256 stream_key,
257 functional_dependency,
258 extra: StreamExtra {
259 physical: PhysicalCommonExtra { dist },
260 stream_kind,
261 emit_on_window_close,
262 watermark_columns,
263 columns_monotonicity,
264 },
265 }
266 }
267
268 pub fn new_stream_with_core(
269 core: &impl GenericPlanNode,
270 dist: Distribution,
271 stream_kind: StreamKind,
272 emit_on_window_close: bool,
273 watermark_columns: WatermarkColumns,
274 columns_monotonicity: MonotonicityMap,
275 ) -> Self {
276 Self::new_stream(
277 core.ctx(),
278 core.schema(),
279 core.stream_key(),
280 core.functional_dependency(),
281 dist,
282 stream_kind,
283 emit_on_window_close,
284 watermark_columns,
285 columns_monotonicity,
286 )
287 }
288}
289
290impl PlanBase<Batch> {
291 pub fn new_batch(
292 ctx: OptimizerContextRef,
293 schema: Schema,
294 dist: Distribution,
295 order: Order,
296 ) -> Self {
297 Self::new_batch_with_orders(ctx, schema, dist, vec![order])
298 }
299
300 pub fn new_batch_with_orders(
301 ctx: OptimizerContextRef,
302 schema: Schema,
303 dist: Distribution,
304 orders: Vec<Order>,
305 ) -> Self {
306 assert!(
307 !orders.is_empty(),
308 "batch plan node should always have at least one order"
309 );
310 let id = ctx.next_plan_node_id();
311 let functional_dependency = FunctionalDependencySet::new(schema.len());
312 Self {
313 id,
314 ctx,
315 schema,
316 stream_key: None,
317 functional_dependency,
318 extra: BatchExtra {
319 physical: PhysicalCommonExtra { dist },
320 orders,
321 },
322 }
323 }
324
325 pub fn new_batch_with_core(
326 core: &impl GenericPlanNode,
327 dist: Distribution,
328 order: Order,
329 ) -> Self {
330 Self::new_batch(core.ctx(), core.schema(), dist, order)
331 }
332
333 pub fn new_batch_with_core_and_orders(
334 core: &impl GenericPlanNode,
335 dist: Distribution,
336 orders: Vec<Order>,
337 ) -> Self {
338 Self::new_batch_with_orders(core.ctx(), core.schema(), dist, orders)
339 }
340}
341
342impl<C: ConventionMarker> PlanBase<C>
343where
344 C::Extra: GetPhysicalCommon,
345{
346 pub fn clone_with_new_distribution(&self, dist: Distribution) -> Self {
350 let mut new = self.clone();
351 new.extra.physical_mut().dist = dist;
352 new
353 }
354}
355
356#[cfg(test)]
358impl<C: ConventionMarker> PlanBase<C> {
359 pub fn functional_dependency_mut(&mut self) -> &mut FunctionalDependencySet {
360 &mut self.functional_dependency
361 }
362}