risingwave_frontend/optimizer/plan_node/
convert.rs1use std::collections::HashMap;
16
17use paste::paste;
18use risingwave_common::catalog::FieldDisplay;
19use risingwave_pb::stream_plan::StreamScanType;
20
21use super::*;
22use crate::optimizer::property::RequiredDist;
23use crate::{for_batch_plan_nodes, for_logical_plan_nodes, for_stream_plan_nodes};
24
25pub trait ToStream {
36 fn logical_rewrite_for_stream(
44 &self,
45 ctx: &mut RewriteStreamContext,
46 ) -> Result<(PlanRef, ColIndexMapping)>;
47
48 fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef>;
50
51 fn to_stream_with_dist_required(
53 &self,
54 required_dist: &RequiredDist,
55 ctx: &mut ToStreamContext,
56 ) -> Result<PlanRef> {
57 let ret = self.to_stream(ctx)?;
58 required_dist.enforce_if_not_satisfies(ret, &Order::any())
59 }
60}
61
62pub fn stream_enforce_eowc_requirement(
63 ctx: OptimizerContextRef,
64 plan: PlanRef,
65 emit_on_window_close: bool,
66) -> Result<PlanRef> {
67 if emit_on_window_close && !plan.emit_on_window_close() {
68 let watermark_groups = plan.watermark_columns().grouped();
69 let n_watermark_groups = watermark_groups.len();
70 if n_watermark_groups == 0 {
71 Err(ErrorCode::NotSupported(
72 "The query cannot be executed in Emit-On-Window-Close mode.".to_owned(),
73 "Try define a watermark column in the source, or avoid aggregation without GROUP BY".to_owned(),
74 )
75 .into())
76 } else {
77 let first_watermark_group = watermark_groups.values().next().unwrap();
78 let watermark_col_idx = first_watermark_group.indices().next().unwrap();
79 if n_watermark_groups > 1 {
80 ctx.warn_to_user(format!(
81 "There are multiple unrelated watermark columns in the query, the first one `{}` is used.",
82 FieldDisplay(&plan.schema()[watermark_col_idx])
83 ));
84 }
85 Ok(StreamEowcSort::new(plan, watermark_col_idx).into())
86 }
87 } else {
88 Ok(plan)
89 }
90}
91
92#[derive(Debug, Clone, Default)]
93pub struct RewriteStreamContext {
94 share_rewrite_map: HashMap<PlanNodeId, (PlanRef, ColIndexMapping)>,
95}
96
97impl RewriteStreamContext {
98 pub fn add_rewrite_result(
99 &mut self,
100 plan_node_id: PlanNodeId,
101 plan_ref: PlanRef,
102 col_change: ColIndexMapping,
103 ) {
104 let prev = self
105 .share_rewrite_map
106 .insert(plan_node_id, (plan_ref, col_change));
107 assert!(prev.is_none());
108 }
109
110 pub fn get_rewrite_result(
111 &self,
112 plan_node_id: PlanNodeId,
113 ) -> Option<&(PlanRef, ColIndexMapping)> {
114 self.share_rewrite_map.get(&plan_node_id)
115 }
116}
117
118#[derive(Debug, Clone)]
119pub struct ToStreamContext {
120 share_to_stream_map: HashMap<PlanNodeId, PlanRef>,
121 emit_on_window_close: bool,
122 stream_scan_type: StreamScanType,
123}
124
125impl ToStreamContext {
126 pub fn new(emit_on_window_close: bool) -> Self {
127 Self::new_with_stream_scan_type(emit_on_window_close, StreamScanType::Backfill)
128 }
129
130 pub fn new_with_stream_scan_type(
131 emit_on_window_close: bool,
132 stream_scan_type: StreamScanType,
133 ) -> Self {
134 Self {
135 share_to_stream_map: HashMap::new(),
136 emit_on_window_close,
137 stream_scan_type,
138 }
139 }
140
141 pub fn stream_scan_type(&self) -> StreamScanType {
142 self.stream_scan_type
143 }
144
145 pub fn add_to_stream_result(&mut self, plan_node_id: PlanNodeId, plan_ref: PlanRef) {
146 self.share_to_stream_map
147 .try_insert(plan_node_id, plan_ref)
148 .unwrap();
149 }
150
151 pub fn get_to_stream_result(&self, plan_node_id: PlanNodeId) -> Option<&PlanRef> {
152 self.share_to_stream_map.get(&plan_node_id)
153 }
154
155 pub fn emit_on_window_close(&self) -> bool {
156 self.emit_on_window_close
157 }
158}
159
160pub trait ToBatch {
172 fn to_batch(&self) -> Result<PlanRef>;
174 fn to_batch_with_order_required(&self, required_order: &Order) -> Result<PlanRef> {
176 let ret = self.to_batch()?;
177 required_order.enforce_if_not_satisfies(ret)
178 }
179}
180
181pub trait ToLocalBatch {
186 fn to_local(&self) -> Result<PlanRef>;
187
188 fn to_local_with_order_required(&self, required_order: &Order) -> Result<PlanRef> {
190 let ret = self.to_local()?;
191 required_order.enforce_if_not_satisfies(ret)
192 }
193}
194
195pub trait ToDistributedBatch {
205 fn to_distributed(&self) -> Result<PlanRef>;
208 fn to_distributed_with_required(
210 &self,
211 required_order: &Order,
212 required_dist: &RequiredDist,
213 ) -> Result<PlanRef> {
214 let ret = self.to_distributed()?;
215 let ret = required_order.enforce_if_not_satisfies(ret)?;
216 required_dist.enforce_if_not_satisfies(ret, required_order)
217 }
218}
219
220macro_rules! ban_to_batch {
222 ($( { $convention:ident, $name:ident }),*) => {
223 paste!{
224 $(impl ToBatch for [<$convention $name>] {
225 fn to_batch(&self) -> Result<PlanRef> {
226 panic!("converting into batch is only allowed on logical plan")
227 }
228 })*
229 }
230 }
231}
232for_batch_plan_nodes! { ban_to_batch }
233for_stream_plan_nodes! { ban_to_batch }
234
235macro_rules! ban_to_stream {
237 ($( { $convention:ident, $name:ident }),*) => {
238 paste!{
239 $(impl ToStream for [<$convention $name>] {
240 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef>{
241 panic!("converting to stream is only allowed on logical plan")
242 }
243 fn logical_rewrite_for_stream(&self, _ctx: &mut RewriteStreamContext) -> Result<(PlanRef, ColIndexMapping)>{
244 panic!("logical rewrite is only allowed on logical plan")
245 }
246 })*
247 }
248 }
249}
250for_batch_plan_nodes! { ban_to_stream }
251for_stream_plan_nodes! { ban_to_stream }
252
253macro_rules! ban_to_distributed {
255 ($( { $convention:ident, $name:ident }),*) => {
256 paste!{
257 $(impl ToDistributedBatch for [<$convention $name>] {
258 fn to_distributed(&self) -> Result<PlanRef> {
259 panic!("converting to distributed is only allowed on batch plan")
260 }
261 })*
262 }
263 }
264}
265for_logical_plan_nodes! { ban_to_distributed }
266for_stream_plan_nodes! { ban_to_distributed }
267
268macro_rules! ban_to_local {
270 ($( { $convention:ident, $name:ident }),*) => {
271 paste!{
272 $(impl ToLocalBatch for [<$convention $name>] {
273 fn to_local(&self) -> Result<PlanRef> {
274 panic!("converting to distributed is only allowed on batch plan")
275 }
276 })*
277 }
278 }
279}
280for_logical_plan_nodes! { ban_to_local }
281for_stream_plan_nodes! { ban_to_local }