risingwave_frontend/planner/
select.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_common::util::sort_util::ColumnOrder;
23use risingwave_expr::ExprError;
24use risingwave_pb::plan_common::JoinType;
25
26use crate::OptimizerContextRef;
27use crate::binder::{BoundDistinct, BoundSelect};
28use crate::error::{ErrorCode, Result};
29use crate::expr::{
30    CorrelatedId, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef, Subquery,
31    SubqueryKind,
32};
33pub use crate::optimizer::plan_node::LogicalFilter;
34use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef, Project, ProjectBuilder};
35use crate::optimizer::plan_node::{
36    LogicalAgg, LogicalApply, LogicalDedup, LogicalOverWindow, LogicalProject, LogicalProjectSet,
37    LogicalTopN, LogicalValues, PlanAggCall, PlanRef,
38};
39use crate::optimizer::property::Order;
40use crate::planner::Planner;
41use crate::utils::{Condition, IndexSet};
42
43impl Planner {
44    pub(super) fn plan_select(
45        &mut self,
46        BoundSelect {
47            from,
48            where_clause,
49            mut select_items,
50            group_by,
51            mut having,
52            distinct,
53            ..
54        }: BoundSelect,
55        extra_order_exprs: Vec<ExprImpl>,
56        order: &[ColumnOrder],
57    ) -> Result<PlanRef> {
58        // Append expressions in ORDER BY.
59        if distinct.is_distinct() && !extra_order_exprs.is_empty() {
60            return Err(ErrorCode::InvalidInputSyntax(
61                "for SELECT DISTINCT, ORDER BY expressions must appear in select list".into(),
62            )
63            .into());
64        }
65        select_items.extend(extra_order_exprs);
66        // The DISTINCT ON expression(s) must match the leftmost ORDER BY expression(s).
67        if let BoundDistinct::DistinctOn(exprs) = &distinct {
68            let mut distinct_on_exprs: HashMap<ExprImpl, bool> =
69                exprs.iter().map(|expr| (expr.clone(), false)).collect();
70            let mut uncovered_distinct_on_exprs_cnt = distinct_on_exprs.len();
71            let mut order_iter = order.iter().map(|o| &select_items[o.column_index]);
72            while uncovered_distinct_on_exprs_cnt > 0
73                && let Some(order_expr) = order_iter.next()
74            {
75                match distinct_on_exprs.get_mut(order_expr) {
76                    Some(has_been_covered) => {
77                        if !*has_been_covered {
78                            *has_been_covered = true;
79                            uncovered_distinct_on_exprs_cnt -= 1;
80                        }
81                    }
82                    None => {
83                        return Err(ErrorCode::InvalidInputSyntax(
84                            "the SELECT DISTINCT ON expressions must match the leftmost ORDER BY expressions"
85                                .into(),
86                        )
87                        .into());
88                    }
89                }
90            }
91        }
92
93        // Plan the FROM clause.
94        let mut root = match from {
95            None => self.create_dummy_values(),
96            Some(t) => self.plan_relation(t)?,
97        };
98        // Plan the WHERE clause.
99        if let Some(where_clause) = where_clause {
100            root = self.plan_where(root, where_clause)?;
101        }
102        // Plan the SELECT clause.
103        // TODO: select-agg, group-by, having can also contain subquery exprs.
104        let has_agg_call = select_items.iter().any(|expr| expr.has_agg_call());
105        if !group_by.is_empty() || having.is_some() || has_agg_call {
106            (root, select_items, having) =
107                LogicalAgg::create(select_items, group_by, having, root)?;
108        }
109
110        if let Some(having) = having {
111            root = self.plan_where(root, having)?;
112        }
113
114        if select_items.iter().any(|e| e.has_subquery()) {
115            (root, select_items) = self.substitute_subqueries(root, select_items)?;
116        }
117        if select_items.iter().any(|e| e.has_window_function()) {
118            (root, select_items) = LogicalOverWindow::create(root, select_items)?;
119        }
120
121        let original_select_items_len = select_items.len();
122
123        // variable `distinct_list_index_to_select_items_index` is meaningful iff
124        // `matches!(&distinct, BoundDistinct::DistinctOn(_))`
125        let mut distinct_list_index_to_select_items_index = vec![];
126        if let BoundDistinct::DistinctOn(distinct_list) = &distinct {
127            distinct_list_index_to_select_items_index.reserve(distinct_list.len());
128            let mut builder_index_to_select_items_index =
129                Vec::with_capacity(original_select_items_len);
130            let mut input_proj_builder = ProjectBuilder::default();
131            for (select_item_index, select_item) in select_items.iter().enumerate() {
132                let builder_index = input_proj_builder
133                    .add_expr(select_item)
134                    .map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
135                if builder_index >= builder_index_to_select_items_index.len() {
136                    debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
137                    builder_index_to_select_items_index.push(select_item_index);
138                }
139            }
140            for distinct_expr in distinct_list {
141                let builder_index = input_proj_builder
142                    .add_expr(distinct_expr)
143                    .map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
144                if builder_index >= builder_index_to_select_items_index.len() {
145                    debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
146                    select_items.push(distinct_expr.clone());
147                    builder_index_to_select_items_index.push(select_items.len() - 1);
148                }
149                distinct_list_index_to_select_items_index
150                    .push(builder_index_to_select_items_index[builder_index]);
151            }
152        }
153
154        let need_restore_select_items = select_items.len() > original_select_items_len;
155
156        root = LogicalProjectSet::create(root, select_items);
157
158        if matches!(&distinct, BoundDistinct::DistinctOn(_)) {
159            root = if order.is_empty() {
160                // We only support deduplicating `DISTINCT ON` columns when there is no `ORDER BY`
161                // clause now.
162                LogicalDedup::new(root, distinct_list_index_to_select_items_index).into()
163            } else {
164                LogicalTopN::new(
165                    root,
166                    1,
167                    0,
168                    false,
169                    Order::new(order.to_vec()),
170                    distinct_list_index_to_select_items_index,
171                )
172                .into()
173            };
174        }
175
176        if need_restore_select_items {
177            root = LogicalProject::with_core(Project::with_out_col_idx(
178                root,
179                0..original_select_items_len,
180            ))
181            .into();
182        }
183
184        if let BoundDistinct::Distinct = distinct {
185            let fields = root.schema().fields();
186            let group_key = if let Some(field) = fields.first()
187                && field.name == "projected_row_id"
188            {
189                // Do not group by projected_row_id hidden column.
190                (1..fields.len()).collect()
191            } else {
192                (0..fields.len()).collect()
193            };
194            root = Agg::new(vec![], group_key, root).into();
195        }
196
197        Ok(root)
198    }
199
200    /// Helper to create a dummy node as child of [`LogicalProject`].
201    /// For example, `select 1+2, 3*4` will be `Project([1+2, 3+4]) - Values([[]])`.
202    fn create_dummy_values(&self) -> PlanRef {
203        LogicalValues::create(vec![vec![]], Schema::default(), self.ctx.clone())
204    }
205
206    /// Helper to create an `EXISTS` boolean operator with the given `input`.
207    /// It is represented by `Project([$0 >= 1]) -> Agg(count(*)) -> input`
208    fn create_exists(&self, input: PlanRef) -> Result<PlanRef> {
209        let count_star = Agg::new(vec![PlanAggCall::count_star()], IndexSet::empty(), input);
210        let ge = FunctionCall::new(
211            ExprType::GreaterThanOrEqual,
212            vec![
213                InputRef::new(0, DataType::Int64).into(),
214                ExprImpl::literal_int(1),
215            ],
216        )
217        .unwrap();
218        Ok(LogicalProject::create(count_star.into(), vec![ge.into()]))
219    }
220
221    /// For `(NOT) EXISTS subquery` or `(NOT) IN subquery`, we can plan it as
222    /// `LeftSemi/LeftAnti` [`LogicalApply`]
223    /// For other subqueries, we plan it as `LeftOuter` [`LogicalApply`] using
224    /// [`Self::substitute_subqueries`].
225    pub(super) fn plan_where(
226        &mut self,
227        mut input: PlanRef,
228        where_clause: ExprImpl,
229    ) -> Result<PlanRef> {
230        if !where_clause.has_subquery() {
231            return Ok(LogicalFilter::create_with_expr(input, where_clause));
232        }
233        let (subquery_conjunctions, not_subquery_conjunctions, others) =
234            Condition::with_expr(where_clause)
235                .group_by::<_, 3>(|expr| match expr {
236                    ExprImpl::Subquery(_) => 0,
237                    ExprImpl::FunctionCall(func_call)
238                        if func_call.func_type() == ExprType::Not
239                            && matches!(func_call.inputs()[0], ExprImpl::Subquery(_)) =>
240                    {
241                        1
242                    }
243                    _ => 2,
244                })
245                .into_iter()
246                .next_tuple()
247                .unwrap();
248
249        // EXISTS and IN in WHERE.
250        for expr in subquery_conjunctions {
251            self.handle_exists_and_in(expr, false, &mut input)?;
252        }
253
254        // NOT EXISTS and NOT IN in WHERE.
255        for expr in not_subquery_conjunctions {
256            let not = expr.into_function_call().unwrap();
257            let (_, expr) = not.decompose_as_unary();
258            self.handle_exists_and_in(expr, true, &mut input)?;
259        }
260
261        if others.always_true() {
262            Ok(input)
263        } else {
264            let (input, others) = self.substitute_subqueries(input, others.conjunctions)?;
265            Ok(LogicalFilter::create(
266                input,
267                Condition {
268                    conjunctions: others,
269                },
270            ))
271        }
272    }
273
274    /// Handle (NOT) EXISTS and (NOT) IN in WHERE clause.
275    ///
276    /// We will use a = b to replace a in (select b from ....) for (NOT) IN thus avoiding adding a
277    /// `LogicalFilter` on `LogicalApply`.
278    fn handle_exists_and_in(
279        &mut self,
280        expr: ExprImpl,
281        negated: bool,
282        input: &mut PlanRef,
283    ) -> Result<()> {
284        let join_type = if negated {
285            JoinType::LeftAnti
286        } else {
287            JoinType::LeftSemi
288        };
289        let correlated_id = self.ctx.next_correlated_id();
290        let mut subquery = expr.into_subquery().unwrap();
291        let correlated_indices =
292            subquery.collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
293        let output_column_type = subquery.query.data_types()[0].clone();
294        let right_plan = self.plan_query(subquery.query)?.into_unordered_subplan();
295        let on = match subquery.kind {
296            SubqueryKind::Existential => ExprImpl::literal_bool(true),
297            SubqueryKind::In(left_expr) => {
298                let right_expr = InputRef::new(input.schema().len(), output_column_type);
299                FunctionCall::new(ExprType::Equal, vec![left_expr, right_expr.into()])?.into()
300            }
301            kind => bail_not_implemented!(issue = 1343, "Not supported subquery kind: {:?}", kind),
302        };
303        *input = Self::create_apply(
304            correlated_id,
305            correlated_indices,
306            input.clone(),
307            right_plan,
308            on,
309            join_type,
310            false,
311        );
312        Ok(())
313    }
314
315    /// Substitutes all [`Subquery`] in `exprs`.
316    ///
317    /// Each time a [`Subquery`] is found, it is replaced by a new [`InputRef`]. And `root` is
318    /// replaced by a new `LeftOuter` [`LogicalApply`] whose left side is `root` and right side is
319    /// the planned subquery.
320    ///
321    /// The [`InputRef`]s' indexes start from `root.schema().len()`,
322    /// which means they are additional columns beyond the original `root`.
323    pub(super) fn substitute_subqueries(
324        &mut self,
325        mut root: PlanRef,
326        mut exprs: Vec<ExprImpl>,
327    ) -> Result<(PlanRef, Vec<ExprImpl>)> {
328        struct SubstituteSubQueries {
329            input_col_num: usize,
330            subqueries: Vec<Subquery>,
331            correlated_indices_collection: Vec<Vec<usize>>,
332            correlated_ids: Vec<CorrelatedId>,
333            ctx: OptimizerContextRef,
334        }
335
336        // TODO: consider the multi-subquery case for normal predicate.
337        impl ExprRewriter for SubstituteSubQueries {
338            fn rewrite_subquery(&mut self, mut subquery: Subquery) -> ExprImpl {
339                let correlated_id = self.ctx.next_correlated_id();
340                self.correlated_ids.push(correlated_id);
341                let input_ref = InputRef::new(self.input_col_num, subquery.return_type()).into();
342                self.input_col_num += 1;
343                self.correlated_indices_collection.push(
344                    subquery.collect_correlated_indices_by_depth_and_assign_id(0, correlated_id),
345                );
346                self.subqueries.push(subquery);
347                input_ref
348            }
349        }
350
351        let mut rewriter = SubstituteSubQueries {
352            input_col_num: root.schema().len(),
353            subqueries: vec![],
354            correlated_indices_collection: vec![],
355            correlated_ids: vec![],
356            ctx: self.ctx.clone(),
357        };
358        exprs = exprs
359            .into_iter()
360            .map(|e| rewriter.rewrite_expr(e))
361            .collect();
362
363        for ((subquery, correlated_indices), correlated_id) in rewriter
364            .subqueries
365            .into_iter()
366            .zip_eq_fast(rewriter.correlated_indices_collection)
367            .zip_eq_fast(rewriter.correlated_ids)
368        {
369            let return_type = subquery.return_type();
370            let subroot = self.plan_query(subquery.query)?;
371
372            let right = match subquery.kind {
373                SubqueryKind::Scalar => subroot.into_unordered_subplan(),
374                SubqueryKind::UpdateSet => {
375                    let plan = subroot.into_unordered_subplan();
376
377                    // Compose all input columns into a struct with `ROW` function.
378                    let all_input_refs = plan
379                        .schema()
380                        .data_types()
381                        .into_iter()
382                        .enumerate()
383                        .map(|(i, data_type)| InputRef::new(i, data_type).into())
384                        .collect::<Vec<_>>();
385                    let call =
386                        FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type);
387
388                    LogicalProject::create(plan, vec![call.into()])
389                }
390                SubqueryKind::Existential => {
391                    self.create_exists(subroot.into_unordered_subplan())?
392                }
393                SubqueryKind::Array => subroot.into_array_agg()?,
394                _ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind),
395            };
396
397            root = Self::create_apply(
398                correlated_id,
399                correlated_indices,
400                root,
401                right,
402                ExprImpl::literal_bool(true),
403                JoinType::LeftOuter,
404                true,
405            );
406        }
407        Ok((root, exprs))
408    }
409
410    fn create_apply(
411        correlated_id: CorrelatedId,
412        correlated_indices: Vec<usize>,
413        left: PlanRef,
414        right: PlanRef,
415        on: ExprImpl,
416        join_type: JoinType,
417        max_one_row: bool,
418    ) -> PlanRef {
419        LogicalApply::create(
420            left,
421            right,
422            join_type,
423            Condition::with_expr(on),
424            correlated_id,
425            correlated_indices,
426            max_one_row,
427        )
428    }
429}