risingwave_frontend/optimizer/plan_node/
convert.rs1use std::collections::HashMap;
16
17use risingwave_common::catalog::FieldDisplay;
18use risingwave_pb::stream_plan::StreamScanType;
19
20use super::*;
21use crate::optimizer::property::RequiredDist;
22
23pub trait ToStream {
34 fn logical_rewrite_for_stream(
42 &self,
43 ctx: &mut RewriteStreamContext,
44 ) -> Result<(LogicalPlanRef, ColIndexMapping)>;
45
46 fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<StreamPlanRef>;
48
49 fn to_stream_with_dist_required(
51 &self,
52 required_dist: &RequiredDist,
53 ctx: &mut ToStreamContext,
54 ) -> Result<StreamPlanRef> {
55 let ret = self.to_stream(ctx)?;
56 required_dist.streaming_enforce_if_not_satisfies(ret)
57 }
58
59 fn try_better_locality(&self, _columns: &[usize]) -> Option<LogicalPlanRef> {
60 None
61 }
62}
63
64pub fn stream_enforce_eowc_requirement(
65 ctx: OptimizerContextRef,
66 plan: StreamPlanRef,
67 emit_on_window_close: bool,
68) -> Result<StreamPlanRef> {
69 if emit_on_window_close && !plan.emit_on_window_close() {
70 let watermark_groups = plan.watermark_columns().grouped();
71 let n_watermark_groups = watermark_groups.len();
72 if n_watermark_groups == 0 {
73 Err(ErrorCode::NotSupported(
74 "The query cannot be executed in Emit-On-Window-Close mode.".to_owned(),
75 "Try define a watermark column in the source, or avoid aggregation without GROUP BY".to_owned(),
76 )
77 .into())
78 } else {
79 let first_watermark_group = watermark_groups.values().next().unwrap();
80 let watermark_col_idx = first_watermark_group.indices().next().unwrap();
81 if n_watermark_groups > 1 {
82 ctx.warn_to_user(format!(
83 "There are multiple unrelated watermark columns in the query, the first one `{}` is used.",
84 FieldDisplay(&plan.schema()[watermark_col_idx])
85 ));
86 }
87 Ok(StreamEowcSort::new(plan, watermark_col_idx).into())
88 }
89 } else {
90 Ok(plan)
91 }
92}
93
94#[derive(Debug, Clone, Default)]
95pub struct RewriteStreamContext {
96 share_rewrite_map: HashMap<PlanNodeId, (LogicalPlanRef, ColIndexMapping)>,
97}
98
99impl RewriteStreamContext {
100 pub fn add_rewrite_result(
101 &mut self,
102 plan_node_id: PlanNodeId,
103 plan_ref: LogicalPlanRef,
104 col_change: ColIndexMapping,
105 ) {
106 let prev = self
107 .share_rewrite_map
108 .insert(plan_node_id, (plan_ref, col_change));
109 assert!(prev.is_none());
110 }
111
112 pub fn get_rewrite_result(
113 &self,
114 plan_node_id: PlanNodeId,
115 ) -> Option<&(LogicalPlanRef, ColIndexMapping)> {
116 self.share_rewrite_map.get(&plan_node_id)
117 }
118}
119
120#[derive(Debug, Clone)]
121pub struct ToStreamContext {
122 share_to_stream_map: HashMap<PlanNodeId, StreamPlanRef>,
123 emit_on_window_close: bool,
124 stream_scan_type: StreamScanType,
125}
126
127impl ToStreamContext {
128 pub fn new(emit_on_window_close: bool) -> Self {
129 Self::new_with_stream_scan_type(emit_on_window_close, StreamScanType::Backfill)
130 }
131
132 pub fn new_with_stream_scan_type(
133 emit_on_window_close: bool,
134 stream_scan_type: StreamScanType,
135 ) -> Self {
136 Self {
137 share_to_stream_map: HashMap::new(),
138 emit_on_window_close,
139 stream_scan_type,
140 }
141 }
142
143 pub fn stream_scan_type(&self) -> StreamScanType {
144 self.stream_scan_type
145 }
146
147 pub fn add_to_stream_result(&mut self, plan_node_id: PlanNodeId, plan_ref: StreamPlanRef) {
148 self.share_to_stream_map
149 .try_insert(plan_node_id, plan_ref)
150 .unwrap();
151 }
152
153 pub fn get_to_stream_result(&self, plan_node_id: PlanNodeId) -> Option<&StreamPlanRef> {
154 self.share_to_stream_map.get(&plan_node_id)
155 }
156
157 pub fn emit_on_window_close(&self) -> bool {
158 self.emit_on_window_close
159 }
160}
161
162pub trait ToBatch {
174 fn to_batch(&self) -> Result<BatchPlanRef>;
176 fn to_batch_with_order_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
178 let ret = self.to_batch()?;
179 required_order.enforce_if_not_satisfies(ret)
180 }
181}
182
183pub trait ToLocalBatch {
188 fn to_local(&self) -> Result<BatchPlanRef>;
189
190 fn to_local_with_order_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
192 let ret = self.to_local()?;
193 required_order.enforce_if_not_satisfies(ret)
194 }
195}
196
197pub trait ToDistributedBatch {
207 fn to_distributed(&self) -> Result<BatchPlanRef>;
210 fn to_distributed_with_required(
212 &self,
213 required_order: &Order,
214 required_dist: &RequiredDist,
215 ) -> Result<BatchPlanRef> {
216 let ret = self.to_distributed()?;
217 let ret = required_order.enforce_if_not_satisfies(ret)?;
218 required_dist.batch_enforce_if_not_satisfies(ret, required_order)
219 }
220}