risingwave_frontend/optimizer/plan_node/
convert.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::catalog::FieldDisplay;
18use risingwave_pb::stream_plan::StreamScanType;
19
20use super::*;
21use crate::optimizer::property::RequiredDist;
22
23/// `ToStream` converts a logical plan node to streaming physical node
24/// with an optional required distribution.
25///
26/// when implement this trait you can choose the two ways
27/// - Implement `to_stream` and use the default implementation of `to_stream_with_dist_required`
28/// - Or, if the required distribution is given, there will be a better plan. For example a hash
29///   join with hash-key(a,b) and the plan is required hash-distributed by (a,b,c). you can
30///   implement `to_stream_with_dist_required`, and implement `to_stream` with
31///   `to_stream_with_dist_required(RequiredDist::Any)`. you can see [`LogicalProject`] as an
32///   example.
33pub trait ToStream {
34    /// `logical_rewrite_for_stream` will rewrite the logical node, and return (`new_plan_node`,
35    /// `col_mapping`), the `col_mapping` is for original columns have been changed into some other
36    /// position.
37    ///
38    /// Now it is used to:
39    /// 1. ensure every plan node's output having pk column
40    /// 2. add `row_count`() in every Agg
41    fn logical_rewrite_for_stream(
42        &self,
43        ctx: &mut RewriteStreamContext,
44    ) -> Result<(LogicalPlanRef, ColIndexMapping)>;
45
46    /// `to_stream` is equivalent to `to_stream_with_dist_required(RequiredDist::Any)`
47    fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<StreamPlanRef>;
48
49    /// convert the plan to streaming physical plan and satisfy the required distribution
50    fn to_stream_with_dist_required(
51        &self,
52        required_dist: &RequiredDist,
53        ctx: &mut ToStreamContext,
54    ) -> Result<StreamPlanRef> {
55        let ret = self.to_stream(ctx)?;
56        required_dist.streaming_enforce_if_not_satisfies(ret)
57    }
58
59    fn try_better_locality(&self, _columns: &[usize]) -> Option<LogicalPlanRef> {
60        None
61    }
62}
63
64pub fn stream_enforce_eowc_requirement(
65    ctx: OptimizerContextRef,
66    plan: StreamPlanRef,
67    emit_on_window_close: bool,
68) -> Result<StreamPlanRef> {
69    if emit_on_window_close && !plan.emit_on_window_close() {
70        let watermark_groups = plan.watermark_columns().grouped();
71        let n_watermark_groups = watermark_groups.len();
72        if n_watermark_groups == 0 {
73            Err(ErrorCode::NotSupported(
74                "The query cannot be executed in Emit-On-Window-Close mode.".to_owned(),
75                "Try define a watermark column in the source, or avoid aggregation without GROUP BY".to_owned(),
76            )
77            .into())
78        } else {
79            let first_watermark_group = watermark_groups.values().next().unwrap();
80            let watermark_col_idx = first_watermark_group.indices().next().unwrap();
81            if n_watermark_groups > 1 {
82                ctx.warn_to_user(format!(
83                    "There are multiple unrelated watermark columns in the query, the first one `{}` is used.",
84                    FieldDisplay(&plan.schema()[watermark_col_idx])
85                ));
86            }
87            Ok(StreamEowcSort::new(plan, watermark_col_idx).into())
88        }
89    } else {
90        Ok(plan)
91    }
92}
93
94#[derive(Debug, Clone, Default)]
95pub struct RewriteStreamContext {
96    share_rewrite_map: HashMap<PlanNodeId, (LogicalPlanRef, ColIndexMapping)>,
97}
98
99impl RewriteStreamContext {
100    pub fn add_rewrite_result(
101        &mut self,
102        plan_node_id: PlanNodeId,
103        plan_ref: LogicalPlanRef,
104        col_change: ColIndexMapping,
105    ) {
106        let prev = self
107            .share_rewrite_map
108            .insert(plan_node_id, (plan_ref, col_change));
109        assert!(prev.is_none());
110    }
111
112    pub fn get_rewrite_result(
113        &self,
114        plan_node_id: PlanNodeId,
115    ) -> Option<&(LogicalPlanRef, ColIndexMapping)> {
116        self.share_rewrite_map.get(&plan_node_id)
117    }
118}
119
120#[derive(Debug, Clone)]
121pub struct ToStreamContext {
122    share_to_stream_map: HashMap<PlanNodeId, StreamPlanRef>,
123    emit_on_window_close: bool,
124    stream_scan_type: StreamScanType,
125}
126
127impl ToStreamContext {
128    pub fn new(emit_on_window_close: bool) -> Self {
129        Self::new_with_stream_scan_type(emit_on_window_close, StreamScanType::Backfill)
130    }
131
132    pub fn new_with_stream_scan_type(
133        emit_on_window_close: bool,
134        stream_scan_type: StreamScanType,
135    ) -> Self {
136        Self {
137            share_to_stream_map: HashMap::new(),
138            emit_on_window_close,
139            stream_scan_type,
140        }
141    }
142
143    pub fn stream_scan_type(&self) -> StreamScanType {
144        self.stream_scan_type
145    }
146
147    pub fn add_to_stream_result(&mut self, plan_node_id: PlanNodeId, plan_ref: StreamPlanRef) {
148        self.share_to_stream_map
149            .try_insert(plan_node_id, plan_ref)
150            .unwrap();
151    }
152
153    pub fn get_to_stream_result(&self, plan_node_id: PlanNodeId) -> Option<&StreamPlanRef> {
154        self.share_to_stream_map.get(&plan_node_id)
155    }
156
157    pub fn emit_on_window_close(&self) -> bool {
158        self.emit_on_window_close
159    }
160}
161
162/// `ToBatch` allows to convert a logical plan node to batch physical node
163/// with an optional required order.
164///
165/// The generated plan has single distribution and doesn't have any exchange nodes inserted.
166/// Use either [`ToLocalBatch`] or [`ToDistributedBatch`] after `ToBatch` to get a distributed plan.
167///
168/// To implement this trait you can choose one of the two ways:
169/// - Implement `to_batch` and use the default implementation of `to_batch_with_order_required`
170/// - Or, if a better plan can be generated when a required order is given, you can implement
171///   `to_batch_with_order_required`, and implement `to_batch` with
172///   `to_batch_with_order_required(&Order::any())`.
173pub trait ToBatch {
174    /// `to_batch` is equivalent to `to_batch_with_order_required(&Order::any())`
175    fn to_batch(&self) -> Result<BatchPlanRef>;
176    /// convert the plan to batch physical plan and satisfy the required Order
177    fn to_batch_with_order_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
178        let ret = self.to_batch()?;
179        required_order.enforce_if_not_satisfies(ret)
180    }
181}
182
183/// Converts a batch physical plan to local plan for local execution.
184///
185/// This is quite similar to `ToBatch`, but different in several ways. For example it converts
186/// scan to exchange + scan.
187pub trait ToLocalBatch {
188    fn to_local(&self) -> Result<BatchPlanRef>;
189
190    /// Convert the plan to batch local physical plan and satisfy the required Order
191    fn to_local_with_order_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
192        let ret = self.to_local()?;
193        required_order.enforce_if_not_satisfies(ret)
194    }
195}
196
197/// `ToDistributedBatch` allows to convert a batch physical plan to distributed batch plan, by
198/// insert exchange node, with an optional required order and distributed.
199///
200/// To implement this trait you can choose one of the two ways:
201/// - Implement `to_distributed` and use the default implementation of
202///   `to_distributed_with_required`
203/// - Or, if a better plan can be generated when a required order is given, you can implement
204///   `to_distributed_with_required`, and implement `to_distributed` with
205///   `to_distributed_with_required(&Order::any(), &RequiredDist::Any)`
206pub trait ToDistributedBatch {
207    /// `to_distributed` is equivalent to `to_distributed_with_required(&Order::any(),
208    /// &RequiredDist::Any)`
209    fn to_distributed(&self) -> Result<BatchPlanRef>;
210    /// insert the exchange in batch physical plan to satisfy the required Distribution and Order.
211    fn to_distributed_with_required(
212        &self,
213        required_order: &Order,
214        required_dist: &RequiredDist,
215    ) -> Result<BatchPlanRef> {
216        let ret = self.to_distributed()?;
217        let ret = required_order.enforce_if_not_satisfies(ret)?;
218        required_dist.batch_enforce_if_not_satisfies(ret, required_order)
219    }
220}