risingwave_frontend/optimizer/plan_node/
convert.rs1use std::collections::HashMap;
16
17use risingwave_common::catalog::FieldDisplay;
18use risingwave_pb::stream_plan::StreamScanType;
19
20use super::*;
21use crate::optimizer::property::RequiredDist;
22
23pub trait ToStream {
34 fn logical_rewrite_for_stream(
42 &self,
43 ctx: &mut RewriteStreamContext,
44 ) -> Result<(LogicalPlanRef, ColIndexMapping)>;
45
46 fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<StreamPlanRef>;
48
49 fn to_stream_with_dist_required(
51 &self,
52 required_dist: &RequiredDist,
53 ctx: &mut ToStreamContext,
54 ) -> Result<StreamPlanRef> {
55 let ret = self.to_stream(ctx)?;
56 required_dist.streaming_enforce_if_not_satisfies(ret)
57 }
58
59 fn try_better_locality(&self, _columns: &[usize]) -> Option<LogicalPlanRef> {
60 None
61 }
62}
63
64pub fn try_enforce_locality_requirement(plan: LogicalPlanRef, columns: &[usize]) -> LogicalPlanRef {
70 assert!(!columns.is_empty());
71 if let Some(better_plan) = plan.try_better_locality(columns) {
72 better_plan
73 } else if plan.ctx().session_ctx().config().enable_locality_backfill() {
74 LogicalLocalityProvider::new(plan, columns.to_owned()).into()
75 } else {
76 plan
77 }
78}
79
80pub fn stream_enforce_eowc_requirement(
81 ctx: OptimizerContextRef,
82 plan: StreamPlanRef,
83 emit_on_window_close: bool,
84) -> Result<StreamPlanRef> {
85 if emit_on_window_close && !plan.emit_on_window_close() {
86 let watermark_groups = plan.watermark_columns().grouped();
87 let n_watermark_groups = watermark_groups.len();
88 if n_watermark_groups == 0 {
89 Err(ErrorCode::NotSupported(
90 "The query cannot be executed in Emit-On-Window-Close mode.".to_owned(),
91 "Try define a watermark column in the source, or avoid aggregation without GROUP BY".to_owned(),
92 )
93 .into())
94 } else {
95 let first_watermark_group = watermark_groups.values().next().unwrap();
96 let watermark_col_idx = first_watermark_group.indices().next().unwrap();
97 if n_watermark_groups > 1 {
98 ctx.warn_to_user(format!(
99 "There are multiple unrelated watermark columns in the query, the first one `{}` is used.",
100 FieldDisplay(&plan.schema()[watermark_col_idx])
101 ));
102 }
103 Ok(StreamEowcSort::new(plan, watermark_col_idx).into())
104 }
105 } else {
106 Ok(plan)
107 }
108}
109
110#[derive(Debug, Clone, Default)]
111pub struct RewriteStreamContext {
112 share_rewrite_map: HashMap<PlanNodeId, (LogicalPlanRef, ColIndexMapping)>,
113}
114
115impl RewriteStreamContext {
116 pub fn add_rewrite_result(
117 &mut self,
118 plan_node_id: PlanNodeId,
119 plan_ref: LogicalPlanRef,
120 col_change: ColIndexMapping,
121 ) {
122 let prev = self
123 .share_rewrite_map
124 .insert(plan_node_id, (plan_ref, col_change));
125 assert!(prev.is_none());
126 }
127
128 pub fn get_rewrite_result(
129 &self,
130 plan_node_id: PlanNodeId,
131 ) -> Option<&(LogicalPlanRef, ColIndexMapping)> {
132 self.share_rewrite_map.get(&plan_node_id)
133 }
134}
135
136#[derive(Debug, Clone, Copy, Eq, PartialEq)]
137pub enum BackfillType {
138 UpstreamOnly,
139 Backfill,
140 ArrangementBackfill,
141 SnapshotBackfill,
142}
143
144impl BackfillType {
145 pub fn to_stream_scan_type(self) -> StreamScanType {
146 match self {
147 BackfillType::UpstreamOnly => StreamScanType::UpstreamOnly,
148 BackfillType::Backfill => StreamScanType::Backfill,
149 BackfillType::ArrangementBackfill => StreamScanType::ArrangementBackfill,
150 BackfillType::SnapshotBackfill => StreamScanType::SnapshotBackfill,
151 }
152 }
153}
154
155#[derive(Debug, Clone)]
156pub struct ToStreamContext {
157 share_to_stream_map: HashMap<PlanNodeId, StreamPlanRef>,
158 emit_on_window_close: bool,
159 backfill_type: BackfillType,
160}
161
162impl ToStreamContext {
163 pub fn new(emit_on_window_close: bool) -> Self {
164 Self::new_with_backfill_type(emit_on_window_close, BackfillType::Backfill)
165 }
166
167 pub fn new_with_backfill_type(emit_on_window_close: bool, backfill_type: BackfillType) -> Self {
168 Self {
169 share_to_stream_map: HashMap::new(),
170 emit_on_window_close,
171 backfill_type,
172 }
173 }
174
175 pub fn backfill_type(&self) -> BackfillType {
176 self.backfill_type
177 }
178
179 pub fn add_to_stream_result(&mut self, plan_node_id: PlanNodeId, plan_ref: StreamPlanRef) {
180 self.share_to_stream_map
181 .try_insert(plan_node_id, plan_ref)
182 .unwrap();
183 }
184
185 pub fn get_to_stream_result(&self, plan_node_id: PlanNodeId) -> Option<&StreamPlanRef> {
186 self.share_to_stream_map.get(&plan_node_id)
187 }
188
189 pub fn emit_on_window_close(&self) -> bool {
190 self.emit_on_window_close
191 }
192}
193
194pub trait ToBatch {
206 fn to_batch(&self) -> Result<BatchPlanRef>;
208 fn to_batch_with_order_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
210 let ret = self.to_batch()?;
211 required_order.enforce_if_not_satisfies(ret)
212 }
213}
214
215pub trait ToLocalBatch {
220 fn to_local(&self) -> Result<BatchPlanRef>;
221
222 fn to_local_with_order_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
224 let ret = self.to_local()?;
225 required_order.enforce_if_not_satisfies(ret)
226 }
227}
228
229pub trait ToDistributedBatch {
239 fn to_distributed(&self) -> Result<BatchPlanRef>;
242 fn to_distributed_with_required(
244 &self,
245 required_order: &Order,
246 required_dist: &RequiredDist,
247 ) -> Result<BatchPlanRef> {
248 let ret = self.to_distributed()?;
249 let ret = required_order.enforce_if_not_satisfies(ret)?;
250 required_dist.batch_enforce_if_not_satisfies(ret, required_order)
251 }
252}