risingwave_frontend/optimizer/plan_node/
convert.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use paste::paste;
18use risingwave_common::catalog::FieldDisplay;
19use risingwave_pb::stream_plan::StreamScanType;
20
21use super::*;
22use crate::optimizer::property::RequiredDist;
23use crate::{for_batch_plan_nodes, for_logical_plan_nodes, for_stream_plan_nodes};
24
25/// `ToStream` converts a logical plan node to streaming physical node
26/// with an optional required distribution.
27///
28/// when implement this trait you can choose the two ways
29/// - Implement `to_stream` and use the default implementation of `to_stream_with_dist_required`
30/// - Or, if the required distribution is given, there will be a better plan. For example a hash
31///   join with hash-key(a,b) and the plan is required hash-distributed by (a,b,c). you can
32///   implement `to_stream_with_dist_required`, and implement `to_stream` with
33///   `to_stream_with_dist_required(RequiredDist::Any)`. you can see [`LogicalProject`] as an
34///   example.
35pub trait ToStream {
36    /// `logical_rewrite_for_stream` will rewrite the logical node, and return (`new_plan_node`,
37    /// `col_mapping`), the `col_mapping` is for original columns have been changed into some other
38    /// position.
39    ///
40    /// Now it is used to:
41    /// 1. ensure every plan node's output having pk column
42    /// 2. add `row_count`() in every Agg
43    fn logical_rewrite_for_stream(
44        &self,
45        ctx: &mut RewriteStreamContext,
46    ) -> Result<(PlanRef, ColIndexMapping)>;
47
48    /// `to_stream` is equivalent to `to_stream_with_dist_required(RequiredDist::Any)`
49    fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef>;
50
51    /// convert the plan to streaming physical plan and satisfy the required distribution
52    fn to_stream_with_dist_required(
53        &self,
54        required_dist: &RequiredDist,
55        ctx: &mut ToStreamContext,
56    ) -> Result<PlanRef> {
57        let ret = self.to_stream(ctx)?;
58        required_dist.enforce_if_not_satisfies(ret, &Order::any())
59    }
60}
61
62pub fn stream_enforce_eowc_requirement(
63    ctx: OptimizerContextRef,
64    plan: PlanRef,
65    emit_on_window_close: bool,
66) -> Result<PlanRef> {
67    if emit_on_window_close && !plan.emit_on_window_close() {
68        let watermark_groups = plan.watermark_columns().grouped();
69        let n_watermark_groups = watermark_groups.len();
70        if n_watermark_groups == 0 {
71            Err(ErrorCode::NotSupported(
72                "The query cannot be executed in Emit-On-Window-Close mode.".to_owned(),
73                "Try define a watermark column in the source, or avoid aggregation without GROUP BY".to_owned(),
74            )
75            .into())
76        } else {
77            let first_watermark_group = watermark_groups.values().next().unwrap();
78            let watermark_col_idx = first_watermark_group.indices().next().unwrap();
79            if n_watermark_groups > 1 {
80                ctx.warn_to_user(format!(
81                    "There are multiple unrelated watermark columns in the query, the first one `{}` is used.",
82                    FieldDisplay(&plan.schema()[watermark_col_idx])
83                ));
84            }
85            Ok(StreamEowcSort::new(plan, watermark_col_idx).into())
86        }
87    } else {
88        Ok(plan)
89    }
90}
91
92#[derive(Debug, Clone, Default)]
93pub struct RewriteStreamContext {
94    share_rewrite_map: HashMap<PlanNodeId, (PlanRef, ColIndexMapping)>,
95}
96
97impl RewriteStreamContext {
98    pub fn add_rewrite_result(
99        &mut self,
100        plan_node_id: PlanNodeId,
101        plan_ref: PlanRef,
102        col_change: ColIndexMapping,
103    ) {
104        let prev = self
105            .share_rewrite_map
106            .insert(plan_node_id, (plan_ref, col_change));
107        assert!(prev.is_none());
108    }
109
110    pub fn get_rewrite_result(
111        &self,
112        plan_node_id: PlanNodeId,
113    ) -> Option<&(PlanRef, ColIndexMapping)> {
114        self.share_rewrite_map.get(&plan_node_id)
115    }
116}
117
118#[derive(Debug, Clone)]
119pub struct ToStreamContext {
120    share_to_stream_map: HashMap<PlanNodeId, PlanRef>,
121    emit_on_window_close: bool,
122    stream_scan_type: StreamScanType,
123}
124
125impl ToStreamContext {
126    pub fn new(emit_on_window_close: bool) -> Self {
127        Self::new_with_stream_scan_type(emit_on_window_close, StreamScanType::Backfill)
128    }
129
130    pub fn new_with_stream_scan_type(
131        emit_on_window_close: bool,
132        stream_scan_type: StreamScanType,
133    ) -> Self {
134        Self {
135            share_to_stream_map: HashMap::new(),
136            emit_on_window_close,
137            stream_scan_type,
138        }
139    }
140
141    pub fn stream_scan_type(&self) -> StreamScanType {
142        self.stream_scan_type
143    }
144
145    pub fn add_to_stream_result(&mut self, plan_node_id: PlanNodeId, plan_ref: PlanRef) {
146        self.share_to_stream_map
147            .try_insert(plan_node_id, plan_ref)
148            .unwrap();
149    }
150
151    pub fn get_to_stream_result(&self, plan_node_id: PlanNodeId) -> Option<&PlanRef> {
152        self.share_to_stream_map.get(&plan_node_id)
153    }
154
155    pub fn emit_on_window_close(&self) -> bool {
156        self.emit_on_window_close
157    }
158}
159
160/// `ToBatch` allows to convert a logical plan node to batch physical node
161/// with an optional required order.
162///
163/// The generated plan has single distribution and doesn't have any exchange nodes inserted.
164/// Use either [`ToLocalBatch`] or [`ToDistributedBatch`] after `ToBatch` to get a distributed plan.
165///
166/// To implement this trait you can choose one of the two ways:
167/// - Implement `to_batch` and use the default implementation of `to_batch_with_order_required`
168/// - Or, if a better plan can be generated when a required order is given, you can implement
169///   `to_batch_with_order_required`, and implement `to_batch` with
170///   `to_batch_with_order_required(&Order::any())`.
171pub trait ToBatch {
172    /// `to_batch` is equivalent to `to_batch_with_order_required(&Order::any())`
173    fn to_batch(&self) -> Result<PlanRef>;
174    /// convert the plan to batch physical plan and satisfy the required Order
175    fn to_batch_with_order_required(&self, required_order: &Order) -> Result<PlanRef> {
176        let ret = self.to_batch()?;
177        required_order.enforce_if_not_satisfies(ret)
178    }
179}
180
181/// Converts a batch physical plan to local plan for local execution.
182///
183/// This is quite similar to `ToBatch`, but different in several ways. For example it converts
184/// scan to exchange + scan.
185pub trait ToLocalBatch {
186    fn to_local(&self) -> Result<PlanRef>;
187
188    /// Convert the plan to batch local physical plan and satisfy the required Order
189    fn to_local_with_order_required(&self, required_order: &Order) -> Result<PlanRef> {
190        let ret = self.to_local()?;
191        required_order.enforce_if_not_satisfies(ret)
192    }
193}
194
195/// `ToDistributedBatch` allows to convert a batch physical plan to distributed batch plan, by
196/// insert exchange node, with an optional required order and distributed.
197///
198/// To implement this trait you can choose one of the two ways:
199/// - Implement `to_distributed` and use the default implementation of
200///   `to_distributed_with_required`
201/// - Or, if a better plan can be generated when a required order is given, you can implement
202///   `to_distributed_with_required`, and implement `to_distributed` with
203///   `to_distributed_with_required(&Order::any(), &RequiredDist::Any)`
204pub trait ToDistributedBatch {
205    /// `to_distributed` is equivalent to `to_distributed_with_required(&Order::any(),
206    /// &RequiredDist::Any)`
207    fn to_distributed(&self) -> Result<PlanRef>;
208    /// insert the exchange in batch physical plan to satisfy the required Distribution and Order.
209    fn to_distributed_with_required(
210        &self,
211        required_order: &Order,
212        required_dist: &RequiredDist,
213    ) -> Result<PlanRef> {
214        let ret = self.to_distributed()?;
215        let ret = required_order.enforce_if_not_satisfies(ret)?;
216        required_dist.enforce_if_not_satisfies(ret, required_order)
217    }
218}
219
220/// Implement [`ToBatch`] for batch and streaming node.
221macro_rules! ban_to_batch {
222    ($( { $convention:ident, $name:ident }),*) => {
223        paste!{
224            $(impl ToBatch for [<$convention $name>] {
225                fn to_batch(&self) -> Result<PlanRef> {
226                    panic!("converting into batch is only allowed on logical plan")
227                }
228            })*
229        }
230    }
231}
232for_batch_plan_nodes! { ban_to_batch }
233for_stream_plan_nodes! { ban_to_batch }
234
235/// Implement [`ToStream`] for batch and streaming node.
236macro_rules! ban_to_stream {
237    ($( { $convention:ident, $name:ident }),*) => {
238        paste!{
239            $(impl ToStream for [<$convention $name>] {
240                fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef>{
241                    panic!("converting to stream is only allowed on logical plan")
242                }
243                fn logical_rewrite_for_stream(&self, _ctx: &mut RewriteStreamContext) -> Result<(PlanRef, ColIndexMapping)>{
244                    panic!("logical rewrite is only allowed on logical plan")
245                }
246            })*
247        }
248    }
249}
250for_batch_plan_nodes! { ban_to_stream }
251for_stream_plan_nodes! { ban_to_stream }
252
253/// impl `ToDistributedBatch`  for logical and streaming node.
254macro_rules! ban_to_distributed {
255    ($( { $convention:ident, $name:ident }),*) => {
256        paste!{
257            $(impl ToDistributedBatch for [<$convention $name>] {
258                fn to_distributed(&self) -> Result<PlanRef> {
259                    panic!("converting to distributed is only allowed on batch plan")
260                }
261            })*
262        }
263    }
264}
265for_logical_plan_nodes! { ban_to_distributed }
266for_stream_plan_nodes! { ban_to_distributed }
267
268/// impl `ToLocalBatch`  for logical and streaming node.
269macro_rules! ban_to_local {
270    ($( { $convention:ident, $name:ident }),*) => {
271        paste!{
272            $(impl ToLocalBatch for [<$convention $name>] {
273                fn to_local(&self) -> Result<PlanRef> {
274                    panic!("converting to distributed is only allowed on batch plan")
275                }
276            })*
277        }
278    }
279}
280for_logical_plan_nodes! { ban_to_local }
281for_stream_plan_nodes! { ban_to_local }