risingwave_frontend/optimizer/plan_node/
convert.rs1use std::collections::HashMap;
16
17use risingwave_common::catalog::FieldDisplay;
18use risingwave_pb::stream_plan::StreamScanType;
19
20use super::*;
21use crate::optimizer::property::RequiredDist;
22
23pub trait ToStream {
34 fn logical_rewrite_for_stream(
42 &self,
43 ctx: &mut RewriteStreamContext,
44 ) -> Result<(LogicalPlanRef, ColIndexMapping)>;
45
46 fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<StreamPlanRef>;
48
49 fn to_stream_with_dist_required(
51 &self,
52 required_dist: &RequiredDist,
53 ctx: &mut ToStreamContext,
54 ) -> Result<StreamPlanRef> {
55 let ret = self.to_stream(ctx)?;
56 required_dist.streaming_enforce_if_not_satisfies(ret)
57 }
58
59 fn try_better_locality(&self, _columns: &[usize]) -> Option<LogicalPlanRef> {
60 None
61 }
62}
63
64pub fn try_enforce_locality_requirement(plan: LogicalPlanRef, columns: &[usize]) -> LogicalPlanRef {
70 assert!(!columns.is_empty());
71 if let Some(better_plan) = plan.try_better_locality(columns) {
72 better_plan
73 } else if plan.ctx().session_ctx().config().enable_locality_backfill() {
74 LogicalLocalityProvider::new(plan, columns.to_owned()).into()
75 } else {
76 plan
77 }
78}
79
80pub fn stream_enforce_eowc_requirement(
81 ctx: OptimizerContextRef,
82 plan: StreamPlanRef,
83 emit_on_window_close: bool,
84) -> Result<StreamPlanRef> {
85 if emit_on_window_close && !plan.emit_on_window_close() {
86 let watermark_groups = plan.watermark_columns().grouped();
87 let n_watermark_groups = watermark_groups.len();
88 if n_watermark_groups == 0 {
89 Err(ErrorCode::NotSupported(
90 "The query cannot be executed in Emit-On-Window-Close mode.".to_owned(),
91 "Try define a watermark column in the source, or avoid aggregation without GROUP BY".to_owned(),
92 )
93 .into())
94 } else {
95 let first_watermark_group = watermark_groups.values().next().unwrap();
96 let watermark_col_idx = first_watermark_group.indices().next().unwrap();
97 if n_watermark_groups > 1 {
98 ctx.warn_to_user(format!(
99 "There are multiple unrelated watermark columns in the query, the first one `{}` is used.",
100 FieldDisplay(&plan.schema()[watermark_col_idx])
101 ));
102 }
103 Ok(StreamEowcSort::new(plan, watermark_col_idx).into())
104 }
105 } else {
106 Ok(plan)
107 }
108}
109
110#[derive(Debug, Clone, Default)]
111pub struct RewriteStreamContext {
112 share_rewrite_map: HashMap<PlanNodeId, (LogicalPlanRef, ColIndexMapping)>,
113}
114
115impl RewriteStreamContext {
116 pub fn add_rewrite_result(
117 &mut self,
118 plan_node_id: PlanNodeId,
119 plan_ref: LogicalPlanRef,
120 col_change: ColIndexMapping,
121 ) {
122 let prev = self
123 .share_rewrite_map
124 .insert(plan_node_id, (plan_ref, col_change));
125 assert!(prev.is_none());
126 }
127
128 pub fn get_rewrite_result(
129 &self,
130 plan_node_id: PlanNodeId,
131 ) -> Option<&(LogicalPlanRef, ColIndexMapping)> {
132 self.share_rewrite_map.get(&plan_node_id)
133 }
134}
135
136#[derive(Debug, Clone)]
137pub struct ToStreamContext {
138 share_to_stream_map: HashMap<PlanNodeId, StreamPlanRef>,
139 emit_on_window_close: bool,
140 stream_scan_type: StreamScanType,
141}
142
143impl ToStreamContext {
144 pub fn new(emit_on_window_close: bool) -> Self {
145 Self::new_with_stream_scan_type(emit_on_window_close, StreamScanType::Backfill)
146 }
147
148 pub fn new_with_stream_scan_type(
149 emit_on_window_close: bool,
150 stream_scan_type: StreamScanType,
151 ) -> Self {
152 Self {
153 share_to_stream_map: HashMap::new(),
154 emit_on_window_close,
155 stream_scan_type,
156 }
157 }
158
159 pub fn stream_scan_type(&self) -> StreamScanType {
160 self.stream_scan_type
161 }
162
163 pub fn add_to_stream_result(&mut self, plan_node_id: PlanNodeId, plan_ref: StreamPlanRef) {
164 self.share_to_stream_map
165 .try_insert(plan_node_id, plan_ref)
166 .unwrap();
167 }
168
169 pub fn get_to_stream_result(&self, plan_node_id: PlanNodeId) -> Option<&StreamPlanRef> {
170 self.share_to_stream_map.get(&plan_node_id)
171 }
172
173 pub fn emit_on_window_close(&self) -> bool {
174 self.emit_on_window_close
175 }
176}
177
178pub trait ToBatch {
190 fn to_batch(&self) -> Result<BatchPlanRef>;
192 fn to_batch_with_order_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
194 let ret = self.to_batch()?;
195 required_order.enforce_if_not_satisfies(ret)
196 }
197}
198
199pub trait ToLocalBatch {
204 fn to_local(&self) -> Result<BatchPlanRef>;
205
206 fn to_local_with_order_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
208 let ret = self.to_local()?;
209 required_order.enforce_if_not_satisfies(ret)
210 }
211}
212
213pub trait ToDistributedBatch {
223 fn to_distributed(&self) -> Result<BatchPlanRef>;
226 fn to_distributed_with_required(
228 &self,
229 required_order: &Order,
230 required_dist: &RequiredDist,
231 ) -> Result<BatchPlanRef> {
232 let ret = self.to_distributed()?;
233 let ret = required_order.enforce_if_not_satisfies(ret)?;
234 required_dist.batch_enforce_if_not_satisfies(ret, required_order)
235 }
236}