risingwave_frontend/optimizer/plan_node/
convert.rs1use std::collections::HashMap;
16
17use risingwave_common::catalog::FieldDisplay;
18use risingwave_pb::stream_plan::StreamScanType;
19
20use super::*;
21use crate::optimizer::property::RequiredDist;
22
23pub trait ToStream {
34 fn logical_rewrite_for_stream(
42 &self,
43 ctx: &mut RewriteStreamContext,
44 ) -> Result<(LogicalPlanRef, ColIndexMapping)>;
45
46 fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<StreamPlanRef>;
48
49 fn to_stream_with_dist_required(
51 &self,
52 required_dist: &RequiredDist,
53 ctx: &mut ToStreamContext,
54 ) -> Result<StreamPlanRef> {
55 let ret = self.to_stream(ctx)?;
56 required_dist.streaming_enforce_if_not_satisfies(ret)
57 }
58
59 fn try_better_locality(&self, _columns: &[usize]) -> Option<LogicalPlanRef> {
60 None
61 }
62}
63
64pub fn try_enforce_locality_requirement(plan: LogicalPlanRef, columns: &[usize]) -> LogicalPlanRef {
70 assert!(!columns.is_empty());
71 if let Some(better_plan) = plan.try_better_locality(columns) {
72 better_plan
73 } else if plan.ctx().session_ctx().config().enable_locality_backfill() {
74 LogicalLocalityProvider::new(plan, columns.to_owned()).into()
75 } else {
76 plan.ctx().inc_missed_locality_providers();
78 plan
79 }
80}
81
82pub fn stream_enforce_eowc_requirement(
83 ctx: OptimizerContextRef,
84 plan: StreamPlanRef,
85 emit_on_window_close: bool,
86) -> Result<StreamPlanRef> {
87 if emit_on_window_close && !plan.emit_on_window_close() {
88 let watermark_groups = plan.watermark_columns().grouped();
89 let n_watermark_groups = watermark_groups.len();
90 if n_watermark_groups == 0 {
91 Err(ErrorCode::NotSupported(
92 "The query cannot be executed in Emit-On-Window-Close mode.".to_owned(),
93 "Try define a watermark column in the source, or avoid aggregation without GROUP BY".to_owned(),
94 )
95 .into())
96 } else {
97 let first_watermark_group = watermark_groups.values().next().unwrap();
98 let watermark_col_idx = first_watermark_group.indices().next().unwrap();
99 if n_watermark_groups > 1 {
100 ctx.warn_to_user(format!(
101 "There are multiple unrelated watermark columns in the query, the first one `{}` is used.",
102 FieldDisplay(&plan.schema()[watermark_col_idx])
103 ));
104 }
105 Ok(StreamEowcSort::new(plan, watermark_col_idx).into())
106 }
107 } else {
108 Ok(plan)
109 }
110}
111
112#[derive(Debug, Clone)]
113pub struct RewriteStreamContext {
114 share_rewrite_map: HashMap<PlanNodeId, (LogicalPlanRef, ColIndexMapping)>,
115 backfill_type: BackfillType,
119}
120
121impl RewriteStreamContext {
122 pub fn new_with_backfill_type(backfill_type: BackfillType) -> Self {
123 Self {
124 share_rewrite_map: HashMap::new(),
125 backfill_type,
126 }
127 }
128
129 pub fn backfill_type(&self) -> BackfillType {
130 self.backfill_type
131 }
132
133 pub fn add_rewrite_result(
134 &mut self,
135 plan_node_id: PlanNodeId,
136 plan_ref: LogicalPlanRef,
137 col_change: ColIndexMapping,
138 ) {
139 let prev = self
140 .share_rewrite_map
141 .insert(plan_node_id, (plan_ref, col_change));
142 assert!(prev.is_none());
143 }
144
145 pub fn get_rewrite_result(
146 &self,
147 plan_node_id: PlanNodeId,
148 ) -> Option<&(LogicalPlanRef, ColIndexMapping)> {
149 self.share_rewrite_map.get(&plan_node_id)
150 }
151}
152
153#[derive(Debug, Clone, Copy, Eq, PartialEq)]
154pub enum BackfillType {
155 UpstreamOnly,
156 Backfill,
157 ArrangementBackfill,
158 SnapshotBackfill,
159}
160
161impl BackfillType {
162 pub fn to_stream_scan_type(self) -> StreamScanType {
163 match self {
164 BackfillType::UpstreamOnly => StreamScanType::UpstreamOnly,
165 BackfillType::Backfill => StreamScanType::Backfill,
166 BackfillType::ArrangementBackfill => StreamScanType::ArrangementBackfill,
167 BackfillType::SnapshotBackfill => StreamScanType::SnapshotBackfill,
168 }
169 }
170}
171
172#[derive(Debug, Clone)]
173pub struct ToStreamContext {
174 share_to_stream_map: HashMap<PlanNodeId, StreamPlanRef>,
175 emit_on_window_close: bool,
176 backfill_type: BackfillType,
177}
178
179impl ToStreamContext {
180 pub fn new(emit_on_window_close: bool) -> Self {
181 Self::new_with_backfill_type(emit_on_window_close, BackfillType::Backfill)
182 }
183
184 pub fn new_with_backfill_type(emit_on_window_close: bool, backfill_type: BackfillType) -> Self {
185 Self {
186 share_to_stream_map: HashMap::new(),
187 emit_on_window_close,
188 backfill_type,
189 }
190 }
191
192 pub fn backfill_type(&self) -> BackfillType {
193 self.backfill_type
194 }
195
196 pub fn add_to_stream_result(&mut self, plan_node_id: PlanNodeId, plan_ref: StreamPlanRef) {
197 self.share_to_stream_map
198 .try_insert(plan_node_id, plan_ref)
199 .unwrap();
200 }
201
202 pub fn get_to_stream_result(&self, plan_node_id: PlanNodeId) -> Option<&StreamPlanRef> {
203 self.share_to_stream_map.get(&plan_node_id)
204 }
205
206 pub fn emit_on_window_close(&self) -> bool {
207 self.emit_on_window_close
208 }
209}
210
211pub trait ToBatch {
223 fn to_batch(&self) -> Result<BatchPlanRef>;
225 fn to_batch_with_order_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
227 let ret = self.to_batch()?;
228 required_order.enforce_if_not_satisfies(ret)
229 }
230}
231
232pub trait ToLocalBatch {
237 fn to_local(&self) -> Result<BatchPlanRef>;
238
239 fn to_local_with_order_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
241 let ret = self.to_local()?;
242 required_order.enforce_if_not_satisfies(ret)
243 }
244}
245
246pub trait ToDistributedBatch {
256 fn to_distributed(&self) -> Result<BatchPlanRef>;
259 fn to_distributed_with_required(
261 &self,
262 required_order: &Order,
263 required_dist: &RequiredDist,
264 ) -> Result<BatchPlanRef> {
265 let ret = self.to_distributed()?;
266 let ret = required_order.enforce_if_not_satisfies(ret)?;
267 required_dist.batch_enforce_if_not_satisfies(ret, required_order)
268 }
269}