risingwave_frontend/planner/
select.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_common::util::sort_util::ColumnOrder;
23use risingwave_expr::ExprError;
24use risingwave_pb::plan_common::JoinType;
25
26use crate::OptimizerContextRef;
27use crate::binder::{BoundDistinct, BoundSelect};
28use crate::error::{ErrorCode, Result};
29use crate::expr::{
30    CorrelatedId, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef, Subquery,
31    SubqueryKind,
32};
33pub use crate::optimizer::plan_node::LogicalFilter;
34use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef, Project, ProjectBuilder};
35use crate::optimizer::plan_node::{
36    LogicalAgg, LogicalApply, LogicalDedup, LogicalJoin, LogicalOverWindow,
37    LogicalPlanRef as PlanRef, LogicalProject, LogicalProjectSet, LogicalTopN, LogicalValues,
38    PlanAggCall,
39};
40use crate::optimizer::property::Order;
41use crate::planner::Planner;
42use crate::utils::{Condition, IndexSet};
43
44impl Planner {
45    pub(super) fn plan_select(
46        &mut self,
47        BoundSelect {
48            from,
49            where_clause,
50            mut select_items,
51            group_by,
52            mut having,
53            distinct,
54            ..
55        }: BoundSelect,
56        extra_order_exprs: Vec<ExprImpl>,
57        order: &[ColumnOrder],
58    ) -> Result<PlanRef> {
59        // Append expressions in ORDER BY.
60        if distinct.is_distinct() && !extra_order_exprs.is_empty() {
61            return Err(ErrorCode::InvalidInputSyntax(
62                "for SELECT DISTINCT, ORDER BY expressions must appear in select list".into(),
63            )
64            .into());
65        }
66        select_items.extend(extra_order_exprs);
67        // The DISTINCT ON expression(s) must match the leftmost ORDER BY expression(s).
68        if let BoundDistinct::DistinctOn(exprs) = &distinct {
69            let mut distinct_on_exprs: HashMap<ExprImpl, bool> =
70                exprs.iter().map(|expr| (expr.clone(), false)).collect();
71            let mut uncovered_distinct_on_exprs_cnt = distinct_on_exprs.len();
72            let mut order_iter = order.iter().map(|o| &select_items[o.column_index]);
73            while uncovered_distinct_on_exprs_cnt > 0
74                && let Some(order_expr) = order_iter.next()
75            {
76                match distinct_on_exprs.get_mut(order_expr) {
77                    Some(has_been_covered) => {
78                        if !*has_been_covered {
79                            *has_been_covered = true;
80                            uncovered_distinct_on_exprs_cnt -= 1;
81                        }
82                    }
83                    None => {
84                        return Err(ErrorCode::InvalidInputSyntax(
85                            "the SELECT DISTINCT ON expressions must match the leftmost ORDER BY expressions"
86                                .into(),
87                        )
88                        .into());
89                    }
90                }
91            }
92        }
93
94        // Plan the FROM clause.
95        let mut root = match from {
96            None => self.create_dummy_values(),
97            Some(t) => self.plan_relation(t)?,
98        };
99        // Plan the WHERE clause.
100        if let Some(where_clause) = where_clause {
101            root = self.plan_where(root, where_clause)?;
102        }
103        // Plan the SELECT clause.
104        // TODO: select-agg, group-by, having can also contain subquery exprs.
105        let has_agg_call = select_items.iter().any(|expr| expr.has_agg_call());
106        if !group_by.is_empty() || having.is_some() || has_agg_call {
107            (root, select_items, having) =
108                LogicalAgg::create(select_items, group_by, having, root)?;
109        }
110
111        if let Some(having) = having {
112            root = self.plan_where(root, having)?;
113        }
114
115        if select_items.iter().any(|e| e.has_subquery()) {
116            (root, select_items) =
117                self.substitute_subqueries_in_cross_join_way(root, select_items)?;
118        }
119        if select_items.iter().any(|e| e.has_window_function()) {
120            (root, select_items) = LogicalOverWindow::create(root, select_items)?;
121        }
122
123        let original_select_items_len = select_items.len();
124
125        // variable `distinct_list_index_to_select_items_index` is meaningful iff
126        // `matches!(&distinct, BoundDistinct::DistinctOn(_))`
127        let mut distinct_list_index_to_select_items_index = vec![];
128        if let BoundDistinct::DistinctOn(distinct_list) = &distinct {
129            distinct_list_index_to_select_items_index.reserve(distinct_list.len());
130            let mut builder_index_to_select_items_index =
131                Vec::with_capacity(original_select_items_len);
132            let mut input_proj_builder = ProjectBuilder::default();
133            for (select_item_index, select_item) in select_items.iter().enumerate() {
134                let builder_index = input_proj_builder
135                    .add_expr(select_item)
136                    .map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
137                if builder_index >= builder_index_to_select_items_index.len() {
138                    debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
139                    builder_index_to_select_items_index.push(select_item_index);
140                }
141            }
142            for distinct_expr in distinct_list {
143                let builder_index = input_proj_builder
144                    .add_expr(distinct_expr)
145                    .map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
146                if builder_index >= builder_index_to_select_items_index.len() {
147                    debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
148                    select_items.push(distinct_expr.clone());
149                    builder_index_to_select_items_index.push(select_items.len() - 1);
150                }
151                distinct_list_index_to_select_items_index
152                    .push(builder_index_to_select_items_index[builder_index]);
153            }
154        }
155
156        let need_restore_select_items = select_items.len() > original_select_items_len;
157
158        root = LogicalProjectSet::create(root, select_items);
159
160        if matches!(&distinct, BoundDistinct::DistinctOn(_)) {
161            root = if order.is_empty() {
162                // We only support deduplicating `DISTINCT ON` columns when there is no `ORDER BY`
163                // clause now.
164                LogicalDedup::new(root, distinct_list_index_to_select_items_index).into()
165            } else {
166                LogicalTopN::new(
167                    root,
168                    1,
169                    0,
170                    false,
171                    Order::new(order.to_vec()),
172                    distinct_list_index_to_select_items_index,
173                )
174                .into()
175            };
176        }
177
178        if need_restore_select_items {
179            root = LogicalProject::with_core(Project::with_out_col_idx(
180                root,
181                0..original_select_items_len,
182            ))
183            .into();
184        }
185
186        if let BoundDistinct::Distinct = distinct {
187            let fields = root.schema().fields();
188            let group_key = if let Some(field) = fields.first()
189                && field.name == "projected_row_id"
190            {
191                // Do not group by projected_row_id hidden column.
192                (1..fields.len()).collect()
193            } else {
194                (0..fields.len()).collect()
195            };
196            root = Agg::new(vec![], group_key, root).into();
197        }
198
199        Ok(root)
200    }
201
202    /// Helper to create a dummy node as child of [`LogicalProject`].
203    /// For example, `select 1+2, 3*4` will be `Project([1+2, 3+4]) - Values([[]])`.
204    fn create_dummy_values(&self) -> PlanRef {
205        LogicalValues::create(vec![vec![]], Schema::default(), self.ctx.clone())
206    }
207
208    /// Helper to create an `EXISTS` boolean operator with the given `input`.
209    /// It is represented by `Project([$0 >= 1]) -> Agg(count(*)) -> input`
210    fn create_exists(&self, input: PlanRef) -> Result<PlanRef> {
211        let count_star = Agg::new(vec![PlanAggCall::count_star()], IndexSet::empty(), input);
212        let ge = FunctionCall::new(
213            ExprType::GreaterThanOrEqual,
214            vec![
215                InputRef::new(0, DataType::Int64).into(),
216                ExprImpl::literal_int(1),
217            ],
218        )
219        .unwrap();
220        Ok(LogicalProject::create(count_star.into(), vec![ge.into()]))
221    }
222
223    /// For `(NOT) EXISTS subquery` or `(NOT) IN subquery`, we can plan it as
224    /// `LeftSemi/LeftAnti` [`LogicalApply`]
225    /// For other subqueries, we plan it as `LeftOuter` [`LogicalApply`] using
226    /// [`Self::substitute_subqueries_in_left_deep_tree_way`].
227    pub(super) fn plan_where(
228        &mut self,
229        mut input: PlanRef,
230        where_clause: ExprImpl,
231    ) -> Result<PlanRef> {
232        if !where_clause.has_subquery() {
233            return Ok(LogicalFilter::create_with_expr(input, where_clause));
234        }
235        let (subquery_conjunctions, not_subquery_conjunctions, others) =
236            Condition::with_expr(where_clause)
237                .group_by::<_, 3>(|expr| match expr {
238                    ExprImpl::Subquery(_) => 0,
239                    ExprImpl::FunctionCall(func_call)
240                        if func_call.func_type() == ExprType::Not
241                            && matches!(func_call.inputs()[0], ExprImpl::Subquery(_)) =>
242                    {
243                        1
244                    }
245                    _ => 2,
246                })
247                .into_iter()
248                .next_tuple()
249                .unwrap();
250
251        // EXISTS and IN in WHERE.
252        for expr in subquery_conjunctions {
253            self.handle_exists_and_in(expr, false, &mut input)?;
254        }
255
256        // NOT EXISTS and NOT IN in WHERE.
257        for expr in not_subquery_conjunctions {
258            let not = expr.into_function_call().unwrap();
259            let (_, expr) = not.decompose_as_unary();
260            self.handle_exists_and_in(expr, true, &mut input)?;
261        }
262
263        if others.always_true() {
264            Ok(input)
265        } else {
266            let (input, others) =
267                self.substitute_subqueries_in_left_deep_tree_way(input, others.conjunctions)?;
268            Ok(LogicalFilter::create(
269                input,
270                Condition {
271                    conjunctions: others,
272                },
273            ))
274        }
275    }
276
277    /// Handle (NOT) EXISTS and (NOT) IN in WHERE clause.
278    ///
279    /// We will use a = b to replace a in (select b from ....) for (NOT) IN thus avoiding adding a
280    /// `LogicalFilter` on `LogicalApply`.
281    fn handle_exists_and_in(
282        &mut self,
283        expr: ExprImpl,
284        negated: bool,
285        input: &mut PlanRef,
286    ) -> Result<()> {
287        let join_type = if negated {
288            JoinType::LeftAnti
289        } else {
290            JoinType::LeftSemi
291        };
292        let correlated_id = self.ctx.next_correlated_id();
293        let mut subquery = expr.into_subquery().unwrap();
294        // we should call `subquery.query.collect_correlated_indices_by_depth_and_assign_id`
295        // instead of `subquery.collect_correlated_indices_by_depth_and_assign_id`.
296        // because current subquery containing struct `kind` expr which should never be correlated with the current subquery.
297        let mut correlated_indices = subquery
298            .query
299            .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
300        correlated_indices.sort();
301        correlated_indices.dedup();
302        let output_column_type = subquery.query.data_types()[0].clone();
303        let right_plan = self.plan_query(subquery.query)?.into_unordered_subplan();
304        let on = match subquery.kind {
305            SubqueryKind::Existential => ExprImpl::literal_bool(true),
306            SubqueryKind::In(left_expr) => {
307                let right_expr = InputRef::new(input.schema().len(), output_column_type);
308                FunctionCall::new(ExprType::Equal, vec![left_expr, right_expr.into()])?.into()
309            }
310            kind => bail_not_implemented!(issue = 1343, "Not supported subquery kind: {:?}", kind),
311        };
312        *input = Self::create_apply(
313            correlated_id,
314            correlated_indices,
315            input.clone(),
316            right_plan,
317            on,
318            join_type,
319            false,
320        );
321        Ok(())
322    }
323
324    /// Substitutes all [`Subquery`] in `exprs` in a left deep tree way.
325    ///
326    /// Each time a [`Subquery`] is found, it is replaced by a new [`InputRef`]. And `root` is
327    /// replaced by a new `LeftOuter` [`LogicalApply`] whose left side is `root` and right side is
328    /// the planned subquery.
329    ///
330    /// The [`InputRef`]s' indexes start from `root.schema().len()`,
331    /// which means they are additional columns beyond the original `root`.
332    ///
333    /// The left-deep tree way only meaningful when there are multiple subqueries
334    ///
335    /// ```text
336    ///             Apply
337    ///            /    \
338    ///          Apply  Subquery3
339    ///         /    \
340    ///     Apply    Subquery2
341    ///     /   \
342    ///   left  Subquery1
343    /// ```
344    ///
345    /// Typically, this is used for subqueries in `WHERE` clause, because it is unlikely that users would write subqueries in the same where clause multiple times.
346    /// But our dynamic filter sometimes generates subqueries in the same where clause multiple times (which couldn't work in the cross join way), so we need to support this case.
347    pub(super) fn substitute_subqueries_in_left_deep_tree_way(
348        &mut self,
349        mut root: PlanRef,
350        mut exprs: Vec<ExprImpl>,
351    ) -> Result<(PlanRef, Vec<ExprImpl>)> {
352        struct SubstituteSubQueries {
353            input_col_num: usize,
354            subqueries: Vec<Subquery>,
355            correlated_indices_collection: Vec<Vec<usize>>,
356            correlated_ids: Vec<CorrelatedId>,
357            ctx: OptimizerContextRef,
358        }
359
360        // TODO: consider the multi-subquery case for normal predicate.
361        impl ExprRewriter for SubstituteSubQueries {
362            fn rewrite_subquery(&mut self, mut subquery: Subquery) -> ExprImpl {
363                let correlated_id = self.ctx.next_correlated_id();
364                self.correlated_ids.push(correlated_id);
365                let input_ref = InputRef::new(self.input_col_num, subquery.return_type()).into();
366                self.input_col_num += 1;
367                self.correlated_indices_collection.push(
368                    subquery.collect_correlated_indices_by_depth_and_assign_id(0, correlated_id),
369                );
370                self.subqueries.push(subquery);
371                input_ref
372            }
373        }
374
375        let mut rewriter = SubstituteSubQueries {
376            input_col_num: root.schema().len(),
377            subqueries: vec![],
378            correlated_indices_collection: vec![],
379            correlated_ids: vec![],
380            ctx: self.ctx.clone(),
381        };
382        exprs = exprs
383            .into_iter()
384            .map(|e| rewriter.rewrite_expr(e))
385            .collect();
386
387        for ((subquery, correlated_indices), correlated_id) in rewriter
388            .subqueries
389            .into_iter()
390            .zip_eq_fast(rewriter.correlated_indices_collection)
391            .zip_eq_fast(rewriter.correlated_ids)
392        {
393            let return_type = subquery.return_type();
394            let subroot = self.plan_query(subquery.query)?;
395
396            let right = match subquery.kind {
397                SubqueryKind::Scalar => subroot.into_unordered_subplan(),
398                SubqueryKind::UpdateSet => {
399                    let plan = subroot.into_unordered_subplan();
400
401                    // Compose all input columns into a struct with `ROW` function.
402                    let all_input_refs = plan
403                        .schema()
404                        .data_types()
405                        .into_iter()
406                        .enumerate()
407                        .map(|(i, data_type)| InputRef::new(i, data_type).into())
408                        .collect::<Vec<_>>();
409                    let call =
410                        FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type);
411
412                    LogicalProject::create(plan, vec![call.into()])
413                }
414                SubqueryKind::Existential => {
415                    self.create_exists(subroot.into_unordered_subplan())?
416                }
417                SubqueryKind::Array => subroot.into_array_agg()?,
418                _ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind),
419            };
420
421            root = Self::create_apply(
422                correlated_id,
423                correlated_indices,
424                root,
425                right,
426                ExprImpl::literal_bool(true),
427                JoinType::LeftOuter,
428                true,
429            );
430        }
431        Ok((root, exprs))
432    }
433
434    /// Substitutes all [`Subquery`] in `exprs` in a cross join way.
435    ///
436    /// Each time a [`Subquery`] is found, it is replaced by a new [`InputRef`]. And `root` is
437    /// replaced by a new `LeftOuter` [`LogicalApply`] whose left side is `root` and right side is
438    /// the planned subquery.
439    ///
440    /// The [`InputRef`]s' indexes start from `root.schema().len()`,
441    /// which means they are additional columns beyond the original `root`.
442    ///
443    ///
444    /// The cross join way only meaningful when there are multiple subqueries
445    ///
446    /// ```text
447    ///            Apply
448    ///           /    \
449    ///         left   CrossJoin
450    ///                   /   \
451    ///           Subquery1   CrossJoin
452    ///                         /   \
453    ///                 Subquery2   Subquery3
454    /// ```
455    /// Typically, this is used for scalar subqueries in select clauses, because users might write subqueries in exprs multiple times.
456    /// If we use the left-deep tree way, it will generate a lot of `Apply` nodes, which is not efficient.
457    pub(super) fn substitute_subqueries_in_cross_join_way(
458        &mut self,
459        mut root: PlanRef,
460        mut exprs: Vec<ExprImpl>,
461    ) -> Result<(PlanRef, Vec<ExprImpl>)> {
462        struct SubstituteSubQueries {
463            input_col_num: usize,
464            subqueries: Vec<Subquery>,
465            correlated_id: Option<CorrelatedId>,
466            correlated_indices_collection: Vec<Vec<usize>>,
467            ctx: OptimizerContextRef,
468        }
469
470        impl ExprRewriter for SubstituteSubQueries {
471            fn rewrite_subquery(&mut self, mut subquery: Subquery) -> ExprImpl {
472                if self.correlated_id.is_none() {
473                    self.correlated_id = Some(self.ctx.next_correlated_id());
474                }
475                let input_ref = InputRef::new(self.input_col_num, subquery.return_type()).into();
476                self.input_col_num += 1;
477                self.correlated_indices_collection.push(
478                    subquery.collect_correlated_indices_by_depth_and_assign_id(
479                        0,
480                        self.correlated_id.unwrap(),
481                    ),
482                );
483                self.subqueries.push(subquery);
484                input_ref
485            }
486        }
487
488        let mut rewriter = SubstituteSubQueries {
489            input_col_num: root.schema().len(),
490            subqueries: vec![],
491            correlated_id: None,
492            correlated_indices_collection: vec![],
493            ctx: self.ctx.clone(),
494        };
495        exprs = exprs
496            .into_iter()
497            .map(|e| rewriter.rewrite_expr(e))
498            .collect();
499
500        let mut right = None;
501
502        for subquery in rewriter.subqueries {
503            let return_type = subquery.return_type();
504            let subroot = self.plan_query(subquery.query)?;
505
506            let subplan = match subquery.kind {
507                SubqueryKind::Scalar => subroot.into_unordered_subplan(),
508                SubqueryKind::UpdateSet => {
509                    let plan = subroot.into_unordered_subplan();
510
511                    // Compose all input columns into a struct with `ROW` function.
512                    let all_input_refs = plan
513                        .schema()
514                        .data_types()
515                        .into_iter()
516                        .enumerate()
517                        .map(|(i, data_type)| InputRef::new(i, data_type).into())
518                        .collect::<Vec<_>>();
519                    let call =
520                        FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type);
521
522                    LogicalProject::create(plan, vec![call.into()])
523                }
524                SubqueryKind::Existential => {
525                    self.create_exists(subroot.into_unordered_subplan())?
526                }
527                SubqueryKind::Array => subroot.into_array_agg()?,
528                _ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind),
529            };
530            if right.is_none() {
531                right = Some(subplan);
532            } else {
533                right = Some(LogicalJoin::create(
534                    right.clone().unwrap(),
535                    subplan,
536                    JoinType::FullOuter,
537                    ExprImpl::literal_bool(true),
538                ));
539            }
540        }
541
542        root = if let Some(right) = right {
543            let mut correlated_indices = rewriter
544                .correlated_indices_collection
545                .iter()
546                .flatten()
547                .cloned()
548                .collect::<Vec<_>>();
549            correlated_indices.sort();
550            correlated_indices.dedup();
551
552            Self::create_apply(
553                rewriter.correlated_id.expect("must have a correlated id"),
554                correlated_indices,
555                root,
556                right,
557                ExprImpl::literal_bool(true),
558                JoinType::LeftOuter,
559                true,
560            )
561        } else {
562            root
563        };
564
565        Ok((root, exprs))
566    }
567
568    fn create_apply(
569        correlated_id: CorrelatedId,
570        correlated_indices: Vec<usize>,
571        left: PlanRef,
572        right: PlanRef,
573        on: ExprImpl,
574        join_type: JoinType,
575        max_one_row: bool,
576    ) -> PlanRef {
577        LogicalApply::create(
578            left,
579            right,
580            join_type,
581            Condition::with_expr(on),
582            correlated_id,
583            correlated_indices,
584            max_one_row,
585        )
586    }
587}