risingwave_frontend/optimizer/plan_node/
convert.rs1use std::collections::HashMap;
16
17use risingwave_common::catalog::FieldDisplay;
18use risingwave_pb::stream_plan::StreamScanType;
19
20use super::*;
21use crate::optimizer::property::RequiredDist;
22
23pub trait ToStream {
34 fn logical_rewrite_for_stream(
42 &self,
43 ctx: &mut RewriteStreamContext,
44 ) -> Result<(LogicalPlanRef, ColIndexMapping)>;
45
46 fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<StreamPlanRef>;
48
49 fn to_stream_with_dist_required(
51 &self,
52 required_dist: &RequiredDist,
53 ctx: &mut ToStreamContext,
54 ) -> Result<StreamPlanRef> {
55 let ret = self.to_stream(ctx)?;
56 required_dist.streaming_enforce_if_not_satisfies(ret)
57 }
58
59 fn try_better_locality(&self, _columns: &[usize]) -> Option<LogicalPlanRef> {
60 None
61 }
62}
63
64pub fn try_enforce_locality_requirement(plan: LogicalPlanRef, columns: &[usize]) -> LogicalPlanRef {
70 assert!(!columns.is_empty());
71 if let Some(better_plan) = plan.try_better_locality(columns) {
72 better_plan
73 } else if plan.ctx().session_ctx().config().enable_locality_backfill() {
74 LogicalLocalityProvider::new(plan, columns.to_owned()).into()
75 } else {
76 plan.ctx().inc_missed_locality_providers();
78 plan
79 }
80}
81
82pub fn stream_enforce_eowc_requirement(
83 ctx: OptimizerContextRef,
84 plan: StreamPlanRef,
85 emit_on_window_close: bool,
86) -> Result<StreamPlanRef> {
87 if emit_on_window_close && !plan.emit_on_window_close() {
88 let watermark_groups = plan.watermark_columns().grouped();
89 let n_watermark_groups = watermark_groups.len();
90 if n_watermark_groups == 0 {
91 Err(ErrorCode::NotSupported(
92 "The query cannot be executed in Emit-On-Window-Close mode.".to_owned(),
93 "Try define a watermark column in the source, or avoid aggregation without GROUP BY".to_owned(),
94 )
95 .into())
96 } else {
97 let first_watermark_group = watermark_groups.values().next().unwrap();
98 let watermark_col_idx = first_watermark_group.indices().next().unwrap();
99 if n_watermark_groups > 1 {
100 ctx.warn_to_user(format!(
101 "There are multiple unrelated watermark columns in the query, the first one `{}` is used.",
102 FieldDisplay(&plan.schema()[watermark_col_idx])
103 ));
104 }
105 Ok(StreamEowcSort::new(plan, watermark_col_idx).into())
106 }
107 } else {
108 Ok(plan)
109 }
110}
111
112#[derive(Debug, Clone, Default)]
113pub struct RewriteStreamContext {
114 share_rewrite_map: HashMap<PlanNodeId, (LogicalPlanRef, ColIndexMapping)>,
115}
116
117impl RewriteStreamContext {
118 pub fn add_rewrite_result(
119 &mut self,
120 plan_node_id: PlanNodeId,
121 plan_ref: LogicalPlanRef,
122 col_change: ColIndexMapping,
123 ) {
124 let prev = self
125 .share_rewrite_map
126 .insert(plan_node_id, (plan_ref, col_change));
127 assert!(prev.is_none());
128 }
129
130 pub fn get_rewrite_result(
131 &self,
132 plan_node_id: PlanNodeId,
133 ) -> Option<&(LogicalPlanRef, ColIndexMapping)> {
134 self.share_rewrite_map.get(&plan_node_id)
135 }
136}
137
138#[derive(Debug, Clone, Copy, Eq, PartialEq)]
139pub enum BackfillType {
140 UpstreamOnly,
141 Backfill,
142 ArrangementBackfill,
143 SnapshotBackfill,
144}
145
146impl BackfillType {
147 pub fn to_stream_scan_type(self) -> StreamScanType {
148 match self {
149 BackfillType::UpstreamOnly => StreamScanType::UpstreamOnly,
150 BackfillType::Backfill => StreamScanType::Backfill,
151 BackfillType::ArrangementBackfill => StreamScanType::ArrangementBackfill,
152 BackfillType::SnapshotBackfill => StreamScanType::SnapshotBackfill,
153 }
154 }
155}
156
157#[derive(Debug, Clone)]
158pub struct ToStreamContext {
159 share_to_stream_map: HashMap<PlanNodeId, StreamPlanRef>,
160 emit_on_window_close: bool,
161 backfill_type: BackfillType,
162}
163
164impl ToStreamContext {
165 pub fn new(emit_on_window_close: bool) -> Self {
166 Self::new_with_backfill_type(emit_on_window_close, BackfillType::Backfill)
167 }
168
169 pub fn new_with_backfill_type(emit_on_window_close: bool, backfill_type: BackfillType) -> Self {
170 Self {
171 share_to_stream_map: HashMap::new(),
172 emit_on_window_close,
173 backfill_type,
174 }
175 }
176
177 pub fn backfill_type(&self) -> BackfillType {
178 self.backfill_type
179 }
180
181 pub fn add_to_stream_result(&mut self, plan_node_id: PlanNodeId, plan_ref: StreamPlanRef) {
182 self.share_to_stream_map
183 .try_insert(plan_node_id, plan_ref)
184 .unwrap();
185 }
186
187 pub fn get_to_stream_result(&self, plan_node_id: PlanNodeId) -> Option<&StreamPlanRef> {
188 self.share_to_stream_map.get(&plan_node_id)
189 }
190
191 pub fn emit_on_window_close(&self) -> bool {
192 self.emit_on_window_close
193 }
194}
195
196pub trait ToBatch {
208 fn to_batch(&self) -> Result<BatchPlanRef>;
210 fn to_batch_with_order_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
212 let ret = self.to_batch()?;
213 required_order.enforce_if_not_satisfies(ret)
214 }
215}
216
217pub trait ToLocalBatch {
222 fn to_local(&self) -> Result<BatchPlanRef>;
223
224 fn to_local_with_order_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
226 let ret = self.to_local()?;
227 required_order.enforce_if_not_satisfies(ret)
228 }
229}
230
231pub trait ToDistributedBatch {
241 fn to_distributed(&self) -> Result<BatchPlanRef>;
244 fn to_distributed_with_required(
246 &self,
247 required_order: &Order,
248 required_dist: &RequiredDist,
249 ) -> Result<BatchPlanRef> {
250 let ret = self.to_distributed()?;
251 let ret = required_order.enforce_if_not_satisfies(ret)?;
252 required_dist.batch_enforce_if_not_satisfies(ret, required_order)
253 }
254}