risingwave_frontend/planner/
select.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_common::util::sort_util::ColumnOrder;
23use risingwave_expr::ExprError;
24use risingwave_pb::plan_common::JoinType;
25
26use crate::OptimizerContextRef;
27use crate::binder::{BoundDistinct, BoundSelect};
28use crate::error::{ErrorCode, Result};
29use crate::expr::{
30    CorrelatedId, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef, Subquery,
31    SubqueryKind,
32};
33pub use crate::optimizer::plan_node::LogicalFilter;
34use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef, Project, ProjectBuilder};
35use crate::optimizer::plan_node::{
36    LogicalAgg, LogicalApply, LogicalDedup, LogicalJoin, LogicalOverWindow, LogicalProject,
37    LogicalProjectSet, LogicalTopN, LogicalValues, PlanAggCall, PlanRef,
38};
39use crate::optimizer::property::Order;
40use crate::planner::Planner;
41use crate::utils::{Condition, IndexSet};
42
43impl Planner {
44    pub(super) fn plan_select(
45        &mut self,
46        BoundSelect {
47            from,
48            where_clause,
49            mut select_items,
50            group_by,
51            mut having,
52            distinct,
53            ..
54        }: BoundSelect,
55        extra_order_exprs: Vec<ExprImpl>,
56        order: &[ColumnOrder],
57    ) -> Result<PlanRef> {
58        // Append expressions in ORDER BY.
59        if distinct.is_distinct() && !extra_order_exprs.is_empty() {
60            return Err(ErrorCode::InvalidInputSyntax(
61                "for SELECT DISTINCT, ORDER BY expressions must appear in select list".into(),
62            )
63            .into());
64        }
65        select_items.extend(extra_order_exprs);
66        // The DISTINCT ON expression(s) must match the leftmost ORDER BY expression(s).
67        if let BoundDistinct::DistinctOn(exprs) = &distinct {
68            let mut distinct_on_exprs: HashMap<ExprImpl, bool> =
69                exprs.iter().map(|expr| (expr.clone(), false)).collect();
70            let mut uncovered_distinct_on_exprs_cnt = distinct_on_exprs.len();
71            let mut order_iter = order.iter().map(|o| &select_items[o.column_index]);
72            while uncovered_distinct_on_exprs_cnt > 0
73                && let Some(order_expr) = order_iter.next()
74            {
75                match distinct_on_exprs.get_mut(order_expr) {
76                    Some(has_been_covered) => {
77                        if !*has_been_covered {
78                            *has_been_covered = true;
79                            uncovered_distinct_on_exprs_cnt -= 1;
80                        }
81                    }
82                    None => {
83                        return Err(ErrorCode::InvalidInputSyntax(
84                            "the SELECT DISTINCT ON expressions must match the leftmost ORDER BY expressions"
85                                .into(),
86                        )
87                        .into());
88                    }
89                }
90            }
91        }
92
93        // Plan the FROM clause.
94        let mut root = match from {
95            None => self.create_dummy_values(),
96            Some(t) => self.plan_relation(t)?,
97        };
98        // Plan the WHERE clause.
99        if let Some(where_clause) = where_clause {
100            root = self.plan_where(root, where_clause)?;
101        }
102        // Plan the SELECT clause.
103        // TODO: select-agg, group-by, having can also contain subquery exprs.
104        let has_agg_call = select_items.iter().any(|expr| expr.has_agg_call());
105        if !group_by.is_empty() || having.is_some() || has_agg_call {
106            (root, select_items, having) =
107                LogicalAgg::create(select_items, group_by, having, root)?;
108        }
109
110        if let Some(having) = having {
111            root = self.plan_where(root, having)?;
112        }
113
114        if select_items.iter().any(|e| e.has_subquery()) {
115            (root, select_items) =
116                self.substitute_subqueries_in_cross_join_way(root, select_items)?;
117        }
118        if select_items.iter().any(|e| e.has_window_function()) {
119            (root, select_items) = LogicalOverWindow::create(root, select_items)?;
120        }
121
122        let original_select_items_len = select_items.len();
123
124        // variable `distinct_list_index_to_select_items_index` is meaningful iff
125        // `matches!(&distinct, BoundDistinct::DistinctOn(_))`
126        let mut distinct_list_index_to_select_items_index = vec![];
127        if let BoundDistinct::DistinctOn(distinct_list) = &distinct {
128            distinct_list_index_to_select_items_index.reserve(distinct_list.len());
129            let mut builder_index_to_select_items_index =
130                Vec::with_capacity(original_select_items_len);
131            let mut input_proj_builder = ProjectBuilder::default();
132            for (select_item_index, select_item) in select_items.iter().enumerate() {
133                let builder_index = input_proj_builder
134                    .add_expr(select_item)
135                    .map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
136                if builder_index >= builder_index_to_select_items_index.len() {
137                    debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
138                    builder_index_to_select_items_index.push(select_item_index);
139                }
140            }
141            for distinct_expr in distinct_list {
142                let builder_index = input_proj_builder
143                    .add_expr(distinct_expr)
144                    .map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
145                if builder_index >= builder_index_to_select_items_index.len() {
146                    debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
147                    select_items.push(distinct_expr.clone());
148                    builder_index_to_select_items_index.push(select_items.len() - 1);
149                }
150                distinct_list_index_to_select_items_index
151                    .push(builder_index_to_select_items_index[builder_index]);
152            }
153        }
154
155        let need_restore_select_items = select_items.len() > original_select_items_len;
156
157        root = LogicalProjectSet::create(root, select_items);
158
159        if matches!(&distinct, BoundDistinct::DistinctOn(_)) {
160            root = if order.is_empty() {
161                // We only support deduplicating `DISTINCT ON` columns when there is no `ORDER BY`
162                // clause now.
163                LogicalDedup::new(root, distinct_list_index_to_select_items_index).into()
164            } else {
165                LogicalTopN::new(
166                    root,
167                    1,
168                    0,
169                    false,
170                    Order::new(order.to_vec()),
171                    distinct_list_index_to_select_items_index,
172                )
173                .into()
174            };
175        }
176
177        if need_restore_select_items {
178            root = LogicalProject::with_core(Project::with_out_col_idx(
179                root,
180                0..original_select_items_len,
181            ))
182            .into();
183        }
184
185        if let BoundDistinct::Distinct = distinct {
186            let fields = root.schema().fields();
187            let group_key = if let Some(field) = fields.first()
188                && field.name == "projected_row_id"
189            {
190                // Do not group by projected_row_id hidden column.
191                (1..fields.len()).collect()
192            } else {
193                (0..fields.len()).collect()
194            };
195            root = Agg::new(vec![], group_key, root).into();
196        }
197
198        Ok(root)
199    }
200
201    /// Helper to create a dummy node as child of [`LogicalProject`].
202    /// For example, `select 1+2, 3*4` will be `Project([1+2, 3+4]) - Values([[]])`.
203    fn create_dummy_values(&self) -> PlanRef {
204        LogicalValues::create(vec![vec![]], Schema::default(), self.ctx.clone())
205    }
206
207    /// Helper to create an `EXISTS` boolean operator with the given `input`.
208    /// It is represented by `Project([$0 >= 1]) -> Agg(count(*)) -> input`
209    fn create_exists(&self, input: PlanRef) -> Result<PlanRef> {
210        let count_star = Agg::new(vec![PlanAggCall::count_star()], IndexSet::empty(), input);
211        let ge = FunctionCall::new(
212            ExprType::GreaterThanOrEqual,
213            vec![
214                InputRef::new(0, DataType::Int64).into(),
215                ExprImpl::literal_int(1),
216            ],
217        )
218        .unwrap();
219        Ok(LogicalProject::create(count_star.into(), vec![ge.into()]))
220    }
221
222    /// For `(NOT) EXISTS subquery` or `(NOT) IN subquery`, we can plan it as
223    /// `LeftSemi/LeftAnti` [`LogicalApply`]
224    /// For other subqueries, we plan it as `LeftOuter` [`LogicalApply`] using
225    /// [`Self::substitute_subqueries_in_left_deep_tree_way`].
226    pub(super) fn plan_where(
227        &mut self,
228        mut input: PlanRef,
229        where_clause: ExprImpl,
230    ) -> Result<PlanRef> {
231        if !where_clause.has_subquery() {
232            return Ok(LogicalFilter::create_with_expr(input, where_clause));
233        }
234        let (subquery_conjunctions, not_subquery_conjunctions, others) =
235            Condition::with_expr(where_clause)
236                .group_by::<_, 3>(|expr| match expr {
237                    ExprImpl::Subquery(_) => 0,
238                    ExprImpl::FunctionCall(func_call)
239                        if func_call.func_type() == ExprType::Not
240                            && matches!(func_call.inputs()[0], ExprImpl::Subquery(_)) =>
241                    {
242                        1
243                    }
244                    _ => 2,
245                })
246                .into_iter()
247                .next_tuple()
248                .unwrap();
249
250        // EXISTS and IN in WHERE.
251        for expr in subquery_conjunctions {
252            self.handle_exists_and_in(expr, false, &mut input)?;
253        }
254
255        // NOT EXISTS and NOT IN in WHERE.
256        for expr in not_subquery_conjunctions {
257            let not = expr.into_function_call().unwrap();
258            let (_, expr) = not.decompose_as_unary();
259            self.handle_exists_and_in(expr, true, &mut input)?;
260        }
261
262        if others.always_true() {
263            Ok(input)
264        } else {
265            let (input, others) =
266                self.substitute_subqueries_in_left_deep_tree_way(input, others.conjunctions)?;
267            Ok(LogicalFilter::create(
268                input,
269                Condition {
270                    conjunctions: others,
271                },
272            ))
273        }
274    }
275
276    /// Handle (NOT) EXISTS and (NOT) IN in WHERE clause.
277    ///
278    /// We will use a = b to replace a in (select b from ....) for (NOT) IN thus avoiding adding a
279    /// `LogicalFilter` on `LogicalApply`.
280    fn handle_exists_and_in(
281        &mut self,
282        expr: ExprImpl,
283        negated: bool,
284        input: &mut PlanRef,
285    ) -> Result<()> {
286        let join_type = if negated {
287            JoinType::LeftAnti
288        } else {
289            JoinType::LeftSemi
290        };
291        let correlated_id = self.ctx.next_correlated_id();
292        let mut subquery = expr.into_subquery().unwrap();
293        // we should call `subquery.query.collect_correlated_indices_by_depth_and_assign_id`
294        // instead of `subquery.collect_correlated_indices_by_depth_and_assign_id`.
295        // because current subquery containing struct `kind` expr which should never be correlated with the current subquery.
296        let mut correlated_indices = subquery
297            .query
298            .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
299        correlated_indices.sort();
300        correlated_indices.dedup();
301        let output_column_type = subquery.query.data_types()[0].clone();
302        let right_plan = self.plan_query(subquery.query)?.into_unordered_subplan();
303        let on = match subquery.kind {
304            SubqueryKind::Existential => ExprImpl::literal_bool(true),
305            SubqueryKind::In(left_expr) => {
306                let right_expr = InputRef::new(input.schema().len(), output_column_type);
307                FunctionCall::new(ExprType::Equal, vec![left_expr, right_expr.into()])?.into()
308            }
309            kind => bail_not_implemented!(issue = 1343, "Not supported subquery kind: {:?}", kind),
310        };
311        *input = Self::create_apply(
312            correlated_id,
313            correlated_indices,
314            input.clone(),
315            right_plan,
316            on,
317            join_type,
318            false,
319        );
320        Ok(())
321    }
322
323    /// Substitutes all [`Subquery`] in `exprs` in a left deep tree way.
324    ///
325    /// Each time a [`Subquery`] is found, it is replaced by a new [`InputRef`]. And `root` is
326    /// replaced by a new `LeftOuter` [`LogicalApply`] whose left side is `root` and right side is
327    /// the planned subquery.
328    ///
329    /// The [`InputRef`]s' indexes start from `root.schema().len()`,
330    /// which means they are additional columns beyond the original `root`.
331    ///
332    /// The left-deep tree way only meaningful when there are multiple subqueries
333    ///
334    /// ```text
335    ///             Apply
336    ///            /    \
337    ///          Apply  Subquery3
338    ///         /    \
339    ///     Apply    Subquery2
340    ///     /   \
341    ///   left  Subquery1
342    /// ```
343    ///
344    /// Typically, this is used for subqueries in `WHERE` clause, because it is unlikely that users would write subqueries in the same where clause multiple times.
345    /// But our dynamic filter sometimes generates subqueries in the same where clause multiple times (which couldn't work in the cross join way), so we need to support this case.
346    pub(super) fn substitute_subqueries_in_left_deep_tree_way(
347        &mut self,
348        mut root: PlanRef,
349        mut exprs: Vec<ExprImpl>,
350    ) -> Result<(PlanRef, Vec<ExprImpl>)> {
351        struct SubstituteSubQueries {
352            input_col_num: usize,
353            subqueries: Vec<Subquery>,
354            correlated_indices_collection: Vec<Vec<usize>>,
355            correlated_ids: Vec<CorrelatedId>,
356            ctx: OptimizerContextRef,
357        }
358
359        // TODO: consider the multi-subquery case for normal predicate.
360        impl ExprRewriter for SubstituteSubQueries {
361            fn rewrite_subquery(&mut self, mut subquery: Subquery) -> ExprImpl {
362                let correlated_id = self.ctx.next_correlated_id();
363                self.correlated_ids.push(correlated_id);
364                let input_ref = InputRef::new(self.input_col_num, subquery.return_type()).into();
365                self.input_col_num += 1;
366                self.correlated_indices_collection.push(
367                    subquery.collect_correlated_indices_by_depth_and_assign_id(0, correlated_id),
368                );
369                self.subqueries.push(subquery);
370                input_ref
371            }
372        }
373
374        let mut rewriter = SubstituteSubQueries {
375            input_col_num: root.schema().len(),
376            subqueries: vec![],
377            correlated_indices_collection: vec![],
378            correlated_ids: vec![],
379            ctx: self.ctx.clone(),
380        };
381        exprs = exprs
382            .into_iter()
383            .map(|e| rewriter.rewrite_expr(e))
384            .collect();
385
386        for ((subquery, correlated_indices), correlated_id) in rewriter
387            .subqueries
388            .into_iter()
389            .zip_eq_fast(rewriter.correlated_indices_collection)
390            .zip_eq_fast(rewriter.correlated_ids)
391        {
392            let return_type = subquery.return_type();
393            let subroot = self.plan_query(subquery.query)?;
394
395            let right = match subquery.kind {
396                SubqueryKind::Scalar => subroot.into_unordered_subplan(),
397                SubqueryKind::UpdateSet => {
398                    let plan = subroot.into_unordered_subplan();
399
400                    // Compose all input columns into a struct with `ROW` function.
401                    let all_input_refs = plan
402                        .schema()
403                        .data_types()
404                        .into_iter()
405                        .enumerate()
406                        .map(|(i, data_type)| InputRef::new(i, data_type).into())
407                        .collect::<Vec<_>>();
408                    let call =
409                        FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type);
410
411                    LogicalProject::create(plan, vec![call.into()])
412                }
413                SubqueryKind::Existential => {
414                    self.create_exists(subroot.into_unordered_subplan())?
415                }
416                SubqueryKind::Array => subroot.into_array_agg()?,
417                _ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind),
418            };
419
420            root = Self::create_apply(
421                correlated_id,
422                correlated_indices,
423                root,
424                right,
425                ExprImpl::literal_bool(true),
426                JoinType::LeftOuter,
427                true,
428            );
429        }
430        Ok((root, exprs))
431    }
432
433    /// Substitutes all [`Subquery`] in `exprs` in a cross join way.
434    ///
435    /// Each time a [`Subquery`] is found, it is replaced by a new [`InputRef`]. And `root` is
436    /// replaced by a new `LeftOuter` [`LogicalApply`] whose left side is `root` and right side is
437    /// the planned subquery.
438    ///
439    /// The [`InputRef`]s' indexes start from `root.schema().len()`,
440    /// which means they are additional columns beyond the original `root`.
441    ///
442    ///
443    /// The cross join way only meaningful when there are multiple subqueries
444    ///
445    /// ```text
446    ///            Apply
447    ///           /    \
448    ///         left   CrossJoin
449    ///                   /   \
450    ///           Subquery1   CrossJoin
451    ///                         /   \
452    ///                 Subquery2   Subquery3
453    /// ```
454    /// Typically, this is used for scalar subqueries in select clauses, because users might write subqueries in exprs multiple times.
455    /// If we use the left-deep tree way, it will generate a lot of `Apply` nodes, which is not efficient.
456    pub(super) fn substitute_subqueries_in_cross_join_way(
457        &mut self,
458        mut root: PlanRef,
459        mut exprs: Vec<ExprImpl>,
460    ) -> Result<(PlanRef, Vec<ExprImpl>)> {
461        struct SubstituteSubQueries {
462            input_col_num: usize,
463            subqueries: Vec<Subquery>,
464            correlated_id: Option<CorrelatedId>,
465            correlated_indices_collection: Vec<Vec<usize>>,
466            ctx: OptimizerContextRef,
467        }
468
469        impl ExprRewriter for SubstituteSubQueries {
470            fn rewrite_subquery(&mut self, mut subquery: Subquery) -> ExprImpl {
471                if self.correlated_id.is_none() {
472                    self.correlated_id = Some(self.ctx.next_correlated_id());
473                }
474                let input_ref = InputRef::new(self.input_col_num, subquery.return_type()).into();
475                self.input_col_num += 1;
476                self.correlated_indices_collection.push(
477                    subquery.collect_correlated_indices_by_depth_and_assign_id(
478                        0,
479                        self.correlated_id.unwrap(),
480                    ),
481                );
482                self.subqueries.push(subquery);
483                input_ref
484            }
485        }
486
487        let mut rewriter = SubstituteSubQueries {
488            input_col_num: root.schema().len(),
489            subqueries: vec![],
490            correlated_id: None,
491            correlated_indices_collection: vec![],
492            ctx: self.ctx.clone(),
493        };
494        exprs = exprs
495            .into_iter()
496            .map(|e| rewriter.rewrite_expr(e))
497            .collect();
498
499        let mut right = None;
500
501        for subquery in rewriter.subqueries {
502            let return_type = subquery.return_type();
503            let subroot = self.plan_query(subquery.query)?;
504
505            let subplan = match subquery.kind {
506                SubqueryKind::Scalar => subroot.into_unordered_subplan(),
507                SubqueryKind::UpdateSet => {
508                    let plan = subroot.into_unordered_subplan();
509
510                    // Compose all input columns into a struct with `ROW` function.
511                    let all_input_refs = plan
512                        .schema()
513                        .data_types()
514                        .into_iter()
515                        .enumerate()
516                        .map(|(i, data_type)| InputRef::new(i, data_type).into())
517                        .collect::<Vec<_>>();
518                    let call =
519                        FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type);
520
521                    LogicalProject::create(plan, vec![call.into()])
522                }
523                SubqueryKind::Existential => {
524                    self.create_exists(subroot.into_unordered_subplan())?
525                }
526                SubqueryKind::Array => subroot.into_array_agg()?,
527                _ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind),
528            };
529            if right.is_none() {
530                right = Some(subplan);
531            } else {
532                right = Some(LogicalJoin::create(
533                    right.clone().unwrap(),
534                    subplan,
535                    JoinType::FullOuter,
536                    ExprImpl::literal_bool(true),
537                ));
538            }
539        }
540
541        root = if let Some(right) = right {
542            let mut correlated_indices = rewriter
543                .correlated_indices_collection
544                .iter()
545                .flatten()
546                .cloned()
547                .collect::<Vec<_>>();
548            correlated_indices.sort();
549            correlated_indices.dedup();
550
551            Self::create_apply(
552                rewriter.correlated_id.expect("must have a correlated id"),
553                correlated_indices,
554                root,
555                right,
556                ExprImpl::literal_bool(true),
557                JoinType::LeftOuter,
558                true,
559            )
560        } else {
561            root
562        };
563
564        Ok((root, exprs))
565    }
566
567    fn create_apply(
568        correlated_id: CorrelatedId,
569        correlated_indices: Vec<usize>,
570        left: PlanRef,
571        right: PlanRef,
572        on: ExprImpl,
573        join_type: JoinType,
574        max_one_row: bool,
575    ) -> PlanRef {
576        LogicalApply::create(
577            left,
578            right,
579            join_type,
580            Condition::with_expr(on),
581            correlated_id,
582            correlated_indices,
583            max_one_row,
584        )
585    }
586}