risingwave_frontend/optimizer/plan_node/convert.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::catalog::FieldDisplay;
18use risingwave_pb::stream_plan::StreamScanType;
19
20use super::*;
21use crate::optimizer::property::RequiredDist;
22
23/// `ToStream` converts a logical plan node to streaming physical node
24/// with an optional required distribution.
25///
26/// when implement this trait you can choose the two ways
27/// - Implement `to_stream` and use the default implementation of `to_stream_with_dist_required`
28/// - Or, if the required distribution is given, there will be a better plan. For example a hash
29/// join with hash-key(a,b) and the plan is required hash-distributed by (a,b,c). you can
30/// implement `to_stream_with_dist_required`, and implement `to_stream` with
31/// `to_stream_with_dist_required(RequiredDist::Any)`. you can see [`LogicalProject`] as an
32/// example.
33pub trait ToStream {
34 /// `logical_rewrite_for_stream` will rewrite the logical node, and return (`new_plan_node`,
35 /// `col_mapping`), the `col_mapping` is for original columns have been changed into some other
36 /// position.
37 ///
38 /// Now it is used to:
39 /// 1. ensure every plan node's output having pk column
40 /// 2. add `row_count`() in every Agg
41 fn logical_rewrite_for_stream(
42 &self,
43 ctx: &mut RewriteStreamContext,
44 ) -> Result<(LogicalPlanRef, ColIndexMapping)>;
45
46 /// `to_stream` is equivalent to `to_stream_with_dist_required(RequiredDist::Any)`
47 fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<StreamPlanRef>;
48
49 /// convert the plan to streaming physical plan and satisfy the required distribution
50 fn to_stream_with_dist_required(
51 &self,
52 required_dist: &RequiredDist,
53 ctx: &mut ToStreamContext,
54 ) -> Result<StreamPlanRef> {
55 let ret = self.to_stream(ctx)?;
56 required_dist.streaming_enforce_if_not_satisfies(ret)
57 }
58}
59
60pub fn stream_enforce_eowc_requirement(
61 ctx: OptimizerContextRef,
62 plan: StreamPlanRef,
63 emit_on_window_close: bool,
64) -> Result<StreamPlanRef> {
65 if emit_on_window_close && !plan.emit_on_window_close() {
66 let watermark_groups = plan.watermark_columns().grouped();
67 let n_watermark_groups = watermark_groups.len();
68 if n_watermark_groups == 0 {
69 Err(ErrorCode::NotSupported(
70 "The query cannot be executed in Emit-On-Window-Close mode.".to_owned(),
71 "Try define a watermark column in the source, or avoid aggregation without GROUP BY".to_owned(),
72 )
73 .into())
74 } else {
75 let first_watermark_group = watermark_groups.values().next().unwrap();
76 let watermark_col_idx = first_watermark_group.indices().next().unwrap();
77 if n_watermark_groups > 1 {
78 ctx.warn_to_user(format!(
79 "There are multiple unrelated watermark columns in the query, the first one `{}` is used.",
80 FieldDisplay(&plan.schema()[watermark_col_idx])
81 ));
82 }
83 Ok(StreamEowcSort::new(plan, watermark_col_idx).into())
84 }
85 } else {
86 Ok(plan)
87 }
88}
89
90#[derive(Debug, Clone, Default)]
91pub struct RewriteStreamContext {
92 share_rewrite_map: HashMap<PlanNodeId, (LogicalPlanRef, ColIndexMapping)>,
93}
94
95impl RewriteStreamContext {
96 pub fn add_rewrite_result(
97 &mut self,
98 plan_node_id: PlanNodeId,
99 plan_ref: LogicalPlanRef,
100 col_change: ColIndexMapping,
101 ) {
102 let prev = self
103 .share_rewrite_map
104 .insert(plan_node_id, (plan_ref, col_change));
105 assert!(prev.is_none());
106 }
107
108 pub fn get_rewrite_result(
109 &self,
110 plan_node_id: PlanNodeId,
111 ) -> Option<&(LogicalPlanRef, ColIndexMapping)> {
112 self.share_rewrite_map.get(&plan_node_id)
113 }
114}
115
116#[derive(Debug, Clone)]
117pub struct ToStreamContext {
118 share_to_stream_map: HashMap<PlanNodeId, StreamPlanRef>,
119 emit_on_window_close: bool,
120 stream_scan_type: StreamScanType,
121}
122
123impl ToStreamContext {
124 pub fn new(emit_on_window_close: bool) -> Self {
125 Self::new_with_stream_scan_type(emit_on_window_close, StreamScanType::Backfill)
126 }
127
128 pub fn new_with_stream_scan_type(
129 emit_on_window_close: bool,
130 stream_scan_type: StreamScanType,
131 ) -> Self {
132 Self {
133 share_to_stream_map: HashMap::new(),
134 emit_on_window_close,
135 stream_scan_type,
136 }
137 }
138
139 pub fn stream_scan_type(&self) -> StreamScanType {
140 self.stream_scan_type
141 }
142
143 pub fn add_to_stream_result(&mut self, plan_node_id: PlanNodeId, plan_ref: StreamPlanRef) {
144 self.share_to_stream_map
145 .try_insert(plan_node_id, plan_ref)
146 .unwrap();
147 }
148
149 pub fn get_to_stream_result(&self, plan_node_id: PlanNodeId) -> Option<&StreamPlanRef> {
150 self.share_to_stream_map.get(&plan_node_id)
151 }
152
153 pub fn emit_on_window_close(&self) -> bool {
154 self.emit_on_window_close
155 }
156}
157
158/// `ToBatch` allows to convert a logical plan node to batch physical node
159/// with an optional required order.
160///
161/// The generated plan has single distribution and doesn't have any exchange nodes inserted.
162/// Use either [`ToLocalBatch`] or [`ToDistributedBatch`] after `ToBatch` to get a distributed plan.
163///
164/// To implement this trait you can choose one of the two ways:
165/// - Implement `to_batch` and use the default implementation of `to_batch_with_order_required`
166/// - Or, if a better plan can be generated when a required order is given, you can implement
167/// `to_batch_with_order_required`, and implement `to_batch` with
168/// `to_batch_with_order_required(&Order::any())`.
169pub trait ToBatch {
170 /// `to_batch` is equivalent to `to_batch_with_order_required(&Order::any())`
171 fn to_batch(&self) -> Result<BatchPlanRef>;
172 /// convert the plan to batch physical plan and satisfy the required Order
173 fn to_batch_with_order_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
174 let ret = self.to_batch()?;
175 required_order.enforce_if_not_satisfies(ret)
176 }
177}
178
179/// Converts a batch physical plan to local plan for local execution.
180///
181/// This is quite similar to `ToBatch`, but different in several ways. For example it converts
182/// scan to exchange + scan.
183pub trait ToLocalBatch {
184 fn to_local(&self) -> Result<BatchPlanRef>;
185
186 /// Convert the plan to batch local physical plan and satisfy the required Order
187 fn to_local_with_order_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
188 let ret = self.to_local()?;
189 required_order.enforce_if_not_satisfies(ret)
190 }
191}
192
193/// `ToDistributedBatch` allows to convert a batch physical plan to distributed batch plan, by
194/// insert exchange node, with an optional required order and distributed.
195///
196/// To implement this trait you can choose one of the two ways:
197/// - Implement `to_distributed` and use the default implementation of
198/// `to_distributed_with_required`
199/// - Or, if a better plan can be generated when a required order is given, you can implement
200/// `to_distributed_with_required`, and implement `to_distributed` with
201/// `to_distributed_with_required(&Order::any(), &RequiredDist::Any)`
202pub trait ToDistributedBatch {
203 /// `to_distributed` is equivalent to `to_distributed_with_required(&Order::any(),
204 /// &RequiredDist::Any)`
205 fn to_distributed(&self) -> Result<BatchPlanRef>;
206 /// insert the exchange in batch physical plan to satisfy the required Distribution and Order.
207 fn to_distributed_with_required(
208 &self,
209 required_order: &Order,
210 required_dist: &RequiredDist,
211 ) -> Result<BatchPlanRef> {
212 let ret = self.to_distributed()?;
213 let ret = required_order.enforce_if_not_satisfies(ret)?;
214 required_dist.batch_enforce_if_not_satisfies(ret, required_order)
215 }
216}