Skip to main content

risingwave_frontend/planner/
select.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::catalog::PROJECTED_ROW_ID_COLUMN_NAME;
20use risingwave_common::types::DataType;
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_common::util::sort_util::ColumnOrder;
23use risingwave_expr::ExprError;
24use risingwave_pb::plan_common::JoinType;
25
26use crate::OptimizerContextRef;
27use crate::binder::{BoundDistinct, BoundSelect};
28use crate::error::{ErrorCode, Result};
29use crate::expr::{
30    CorrelatedId, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef, Subquery,
31    SubqueryKind,
32};
33pub use crate::optimizer::plan_node::LogicalFilter;
34use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef, Project, ProjectBuilder};
35use crate::optimizer::plan_node::{
36    LogicalAgg, LogicalApply, LogicalDedup, LogicalJoin, LogicalOverWindow,
37    LogicalPlanRef as PlanRef, LogicalProject, LogicalProjectSet, LogicalTopN, LogicalValues,
38    PlanAggCall,
39};
40use crate::optimizer::property::Order;
41use crate::planner::Planner;
42use crate::utils::{Condition, IndexSet};
43
44impl Planner {
45    pub(super) fn plan_select(
46        &mut self,
47        BoundSelect {
48            from,
49            where_clause,
50            mut select_items,
51            group_by,
52            mut having,
53            mut distinct,
54            ..
55        }: BoundSelect,
56        extra_order_exprs: Vec<ExprImpl>,
57        order: &[ColumnOrder],
58    ) -> Result<PlanRef> {
59        // Append expressions in ORDER BY.
60        if distinct.is_distinct() && !extra_order_exprs.is_empty() {
61            return Err(ErrorCode::InvalidInputSyntax(
62                "for SELECT DISTINCT, ORDER BY expressions must appear in select list".into(),
63            )
64            .into());
65        }
66        select_items.extend(extra_order_exprs);
67        // The DISTINCT ON expression(s) must match the leftmost ORDER BY expression(s).
68        if let BoundDistinct::DistinctOn(exprs) = &distinct {
69            let mut distinct_on_exprs: HashMap<ExprImpl, bool> =
70                exprs.iter().map(|expr| (expr.clone(), false)).collect();
71            let mut uncovered_distinct_on_exprs_cnt = distinct_on_exprs.len();
72            let mut order_iter = order.iter().map(|o| &select_items[o.column_index]);
73            while uncovered_distinct_on_exprs_cnt > 0
74                && let Some(order_expr) = order_iter.next()
75            {
76                match distinct_on_exprs.get_mut(order_expr) {
77                    Some(has_been_covered) => {
78                        if !*has_been_covered {
79                            *has_been_covered = true;
80                            uncovered_distinct_on_exprs_cnt -= 1;
81                        }
82                    }
83                    None => {
84                        return Err(ErrorCode::InvalidInputSyntax(
85                            "the SELECT DISTINCT ON expressions must match the leftmost ORDER BY expressions"
86                                .into(),
87                        )
88                        .into());
89                    }
90                }
91            }
92        }
93
94        // Plan the FROM clause.
95        let mut root = match from {
96            None => self.create_dummy_values(),
97            Some(t) => self.plan_relation(t)?,
98        };
99        // Plan the WHERE clause.
100        if let Some(where_clause) = where_clause {
101            root = self.plan_where(root, where_clause)?;
102        }
103        // Plan the SELECT clause.
104        // TODO: select-agg, group-by, having can also contain subquery exprs.
105        let has_agg_call = select_items.iter().any(|expr| expr.has_agg_call());
106        if !group_by.is_empty() || having.is_some() || has_agg_call {
107            // The `DISTINCT ON` expressions must also be rewritten to reference the
108            // aggregated results, just like the SELECT and ORDER BY expressions. By
109            // appending them to `select_items` they go through the same agg rewriting,
110            // which maps group-key references and rejects any column that neither
111            // appears in GROUP BY nor inside an aggregate (matching PostgreSQL).
112            let n_distinct_on = if let BoundDistinct::DistinctOn(exprs) = &distinct {
113                select_items.extend(exprs.iter().cloned());
114                exprs.len()
115            } else {
116                0
117            };
118
119            (root, select_items, having) =
120                LogicalAgg::create(select_items, group_by, having, root)?;
121
122            if n_distinct_on > 0 {
123                let rewritten = select_items.split_off(select_items.len() - n_distinct_on);
124                distinct = BoundDistinct::DistinctOn(rewritten);
125            }
126        }
127
128        if let Some(having) = having {
129            root = self.plan_where(root, having)?;
130        }
131
132        if select_items.iter().any(|e| e.has_subquery()) {
133            (root, select_items) =
134                self.substitute_subqueries_in_cross_join_way(root, select_items)?;
135        }
136        if select_items.iter().any(|e| e.has_window_function()) {
137            (root, select_items) = LogicalOverWindow::create(root, select_items)?;
138        }
139
140        let original_select_items_len = select_items.len();
141
142        // variable `distinct_list_index_to_select_items_index` is meaningful iff
143        // `matches!(&distinct, BoundDistinct::DistinctOn(_))`
144        let mut distinct_list_index_to_select_items_index = vec![];
145        if let BoundDistinct::DistinctOn(distinct_list) = &distinct {
146            distinct_list_index_to_select_items_index.reserve(distinct_list.len());
147            let mut builder_index_to_select_items_index =
148                Vec::with_capacity(original_select_items_len);
149            let mut input_proj_builder = ProjectBuilder::default();
150            for (select_item_index, select_item) in select_items.iter().enumerate() {
151                let builder_index = input_proj_builder
152                    .add_expr(select_item)
153                    .map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
154                if builder_index >= builder_index_to_select_items_index.len() {
155                    debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
156                    builder_index_to_select_items_index.push(select_item_index);
157                }
158            }
159            for distinct_expr in distinct_list {
160                let builder_index = input_proj_builder
161                    .add_expr(distinct_expr)
162                    .map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
163                if builder_index >= builder_index_to_select_items_index.len() {
164                    debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
165                    select_items.push(distinct_expr.clone());
166                    builder_index_to_select_items_index.push(select_items.len() - 1);
167                }
168                distinct_list_index_to_select_items_index
169                    .push(builder_index_to_select_items_index[builder_index]);
170            }
171        }
172
173        let need_restore_select_items = select_items.len() > original_select_items_len;
174
175        root = LogicalProjectSet::create(root, select_items);
176
177        if matches!(&distinct, BoundDistinct::DistinctOn(_)) {
178            root = if order.is_empty() {
179                // We only support deduplicating `DISTINCT ON` columns when there is no `ORDER BY`
180                // clause now.
181                LogicalDedup::new(root, distinct_list_index_to_select_items_index).into()
182            } else {
183                LogicalTopN::new(
184                    root,
185                    1,
186                    0,
187                    false,
188                    Order::new(order.to_vec()),
189                    distinct_list_index_to_select_items_index,
190                )
191                .into()
192            };
193        }
194
195        if need_restore_select_items {
196            root = LogicalProject::with_core(Project::with_out_col_idx(
197                root,
198                0..original_select_items_len,
199            ))
200            .into();
201        }
202
203        if let BoundDistinct::Distinct = distinct {
204            let fields = root.schema().fields();
205            let group_key = if let Some(field) = fields.first()
206                && field.name == PROJECTED_ROW_ID_COLUMN_NAME
207            {
208                // Do not group by projected_row_id hidden column.
209                (1..fields.len()).collect()
210            } else {
211                (0..fields.len()).collect()
212            };
213            root = Agg::new(vec![], group_key, root).into();
214        }
215
216        Ok(root)
217    }
218
219    /// Helper to create a dummy node as child of [`LogicalProject`].
220    /// For example, `select 1+2, 3*4` will be `Project([1+2, 3+4]) - Values([[]])`.
221    fn create_dummy_values(&self) -> PlanRef {
222        LogicalValues::create_empty_scalar(self.ctx.clone())
223    }
224
225    /// Helper to create an `EXISTS` boolean operator with the given `input`.
226    /// It is represented by `Project([$0 >= 1]) -> Agg(count(*)) -> input`
227    fn create_exists(&self, input: PlanRef) -> Result<PlanRef> {
228        let count_star = Agg::new(vec![PlanAggCall::count_star()], IndexSet::empty(), input);
229        let ge = FunctionCall::new(
230            ExprType::GreaterThanOrEqual,
231            vec![
232                InputRef::new(0, DataType::Int64).into(),
233                ExprImpl::literal_int(1),
234            ],
235        )
236        .unwrap();
237        Ok(LogicalProject::create(count_star.into(), vec![ge.into()]))
238    }
239
240    /// For `(NOT) EXISTS subquery` or `(NOT) IN subquery`, we can plan it as
241    /// `LeftSemi/LeftAnti` [`LogicalApply`]
242    /// For other subqueries, we plan it as `LeftOuter` [`LogicalApply`] using
243    /// [`Self::substitute_subqueries_in_left_deep_tree_way`].
244    pub(super) fn plan_where(
245        &mut self,
246        mut input: PlanRef,
247        where_clause: ExprImpl,
248    ) -> Result<PlanRef> {
249        if !where_clause.has_subquery() {
250            return Ok(LogicalFilter::create_with_expr(input, where_clause));
251        }
252        let (subquery_conjunctions, not_subquery_conjunctions, others) =
253            Condition::with_expr(where_clause)
254                .group_by::<_, 3>(|expr| match expr {
255                    ExprImpl::Subquery(_) => 0,
256                    ExprImpl::FunctionCall(func_call)
257                        if func_call.func_type() == ExprType::Not
258                            && matches!(func_call.inputs()[0], ExprImpl::Subquery(_)) =>
259                    {
260                        1
261                    }
262                    _ => 2,
263                })
264                .into_iter()
265                .next_tuple()
266                .unwrap();
267
268        // EXISTS and IN in WHERE.
269        for expr in subquery_conjunctions {
270            self.handle_exists_and_in(expr, false, &mut input)?;
271        }
272
273        // NOT EXISTS and NOT IN in WHERE.
274        for expr in not_subquery_conjunctions {
275            let not = expr.into_function_call().unwrap();
276            let (_, expr) = not.decompose_as_unary();
277            self.handle_exists_and_in(expr, true, &mut input)?;
278        }
279
280        if others.always_true() {
281            Ok(input)
282        } else {
283            let (input, others) =
284                self.substitute_subqueries_in_left_deep_tree_way(input, others.conjunctions)?;
285            Ok(LogicalFilter::create(
286                input,
287                Condition {
288                    conjunctions: others,
289                },
290            ))
291        }
292    }
293
294    /// Handle (NOT) EXISTS and (NOT) IN in WHERE clause.
295    ///
296    /// We will use a = b to replace a in (select b from ....) for (NOT) IN thus avoiding adding a
297    /// `LogicalFilter` on `LogicalApply`.
298    fn handle_exists_and_in(
299        &mut self,
300        expr: ExprImpl,
301        negated: bool,
302        input: &mut PlanRef,
303    ) -> Result<()> {
304        let join_type = if negated {
305            JoinType::LeftAnti
306        } else {
307            JoinType::LeftSemi
308        };
309        let correlated_id = self.ctx.next_correlated_id();
310        let mut subquery = expr.into_subquery().unwrap();
311        // we should call `subquery.query.collect_correlated_indices_by_depth_and_assign_id`
312        // instead of `subquery.collect_correlated_indices_by_depth_and_assign_id`.
313        // because current subquery containing struct `kind` expr which should never be correlated with the current subquery.
314        let mut correlated_indices = subquery
315            .query
316            .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
317        correlated_indices.sort();
318        correlated_indices.dedup();
319        let output_column_type = subquery.query.data_types()[0].clone();
320        let right_plan = self.plan_query(subquery.query)?.into_unordered_subplan();
321        let on = match subquery.kind {
322            SubqueryKind::Existential => ExprImpl::literal_bool(true),
323            SubqueryKind::In(left_expr) => {
324                let right_expr = InputRef::new(input.schema().len(), output_column_type);
325                FunctionCall::new(ExprType::Equal, vec![left_expr, right_expr.into()])?.into()
326            }
327            kind => bail_not_implemented!(issue = 1343, "Not supported subquery kind: {:?}", kind),
328        };
329        *input = Self::create_apply(
330            correlated_id,
331            correlated_indices,
332            input.clone(),
333            right_plan,
334            on,
335            join_type,
336            false,
337        );
338        Ok(())
339    }
340
341    /// Substitutes all [`Subquery`] in `exprs` in a left deep tree way.
342    ///
343    /// Each time a [`Subquery`] is found, it is replaced by a new [`InputRef`]. And `root` is
344    /// replaced by a new `LeftOuter` [`LogicalApply`] whose left side is `root` and right side is
345    /// the planned subquery.
346    ///
347    /// The [`InputRef`]s' indexes start from `root.schema().len()`,
348    /// which means they are additional columns beyond the original `root`.
349    ///
350    /// The left-deep tree way only meaningful when there are multiple subqueries
351    ///
352    /// ```text
353    ///             Apply
354    ///            /    \
355    ///          Apply  Subquery3
356    ///         /    \
357    ///     Apply    Subquery2
358    ///     /   \
359    ///   left  Subquery1
360    /// ```
361    ///
362    /// Typically, this is used for subqueries in `WHERE` clause, because it is unlikely that users would write subqueries in the same where clause multiple times.
363    /// But our dynamic filter sometimes generates subqueries in the same where clause multiple times (which couldn't work in the cross join way), so we need to support this case.
364    pub(super) fn substitute_subqueries_in_left_deep_tree_way(
365        &mut self,
366        mut root: PlanRef,
367        mut exprs: Vec<ExprImpl>,
368    ) -> Result<(PlanRef, Vec<ExprImpl>)> {
369        struct SubstituteSubQueries {
370            input_col_num: usize,
371            subqueries: Vec<Subquery>,
372            correlated_indices_collection: Vec<Vec<usize>>,
373            correlated_ids: Vec<CorrelatedId>,
374            ctx: OptimizerContextRef,
375        }
376
377        // TODO: consider the multi-subquery case for normal predicate.
378        impl ExprRewriter for SubstituteSubQueries {
379            fn rewrite_subquery(&mut self, mut subquery: Subquery) -> ExprImpl {
380                let correlated_id = self.ctx.next_correlated_id();
381                self.correlated_ids.push(correlated_id);
382                let input_ref = InputRef::new(self.input_col_num, subquery.return_type()).into();
383                self.input_col_num += 1;
384                self.correlated_indices_collection.push(
385                    subquery.collect_correlated_indices_by_depth_and_assign_id(0, correlated_id),
386                );
387                self.subqueries.push(subquery);
388                input_ref
389            }
390        }
391
392        let mut rewriter = SubstituteSubQueries {
393            input_col_num: root.schema().len(),
394            subqueries: vec![],
395            correlated_indices_collection: vec![],
396            correlated_ids: vec![],
397            ctx: self.ctx.clone(),
398        };
399        exprs = exprs
400            .into_iter()
401            .map(|e| rewriter.rewrite_expr(e))
402            .collect();
403
404        for ((subquery, correlated_indices), correlated_id) in rewriter
405            .subqueries
406            .into_iter()
407            .zip_eq_fast(rewriter.correlated_indices_collection)
408            .zip_eq_fast(rewriter.correlated_ids)
409        {
410            let return_type = subquery.return_type();
411            let subroot = self.plan_query(subquery.query)?;
412
413            let right = match subquery.kind {
414                SubqueryKind::Scalar => subroot.into_unordered_subplan(),
415                SubqueryKind::UpdateSet => {
416                    let plan = subroot.into_unordered_subplan();
417
418                    // Compose all input columns into a struct with `ROW` function.
419                    let all_input_refs = plan
420                        .schema()
421                        .data_types()
422                        .into_iter()
423                        .enumerate()
424                        .map(|(i, data_type)| InputRef::new(i, data_type).into())
425                        .collect::<Vec<_>>();
426                    let call =
427                        FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type);
428
429                    LogicalProject::create(plan, vec![call.into()])
430                }
431                SubqueryKind::Existential => {
432                    self.create_exists(subroot.into_unordered_subplan())?
433                }
434                SubqueryKind::Array => subroot.into_array_agg()?,
435                _ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind),
436            };
437
438            root = Self::create_apply(
439                correlated_id,
440                correlated_indices,
441                root,
442                right,
443                ExprImpl::literal_bool(true),
444                JoinType::LeftOuter,
445                true,
446            );
447        }
448        Ok((root, exprs))
449    }
450
451    /// Substitutes all [`Subquery`] in `exprs` in a cross join way.
452    ///
453    /// Each time a [`Subquery`] is found, it is replaced by a new [`InputRef`]. And `root` is
454    /// replaced by a new `LeftOuter` [`LogicalApply`] whose left side is `root` and right side is
455    /// the planned subquery.
456    ///
457    /// The [`InputRef`]s' indexes start from `root.schema().len()`,
458    /// which means they are additional columns beyond the original `root`.
459    ///
460    ///
461    /// The cross join way only meaningful when there are multiple subqueries
462    ///
463    /// ```text
464    ///            Apply
465    ///           /    \
466    ///         left   CrossJoin
467    ///                   /   \
468    ///           Subquery1   CrossJoin
469    ///                         /   \
470    ///                 Subquery2   Subquery3
471    /// ```
472    /// Typically, this is used for scalar subqueries in select clauses, because users might write subqueries in exprs multiple times.
473    /// If we use the left-deep tree way, it will generate a lot of `Apply` nodes, which is not efficient.
474    pub(super) fn substitute_subqueries_in_cross_join_way(
475        &mut self,
476        mut root: PlanRef,
477        mut exprs: Vec<ExprImpl>,
478    ) -> Result<(PlanRef, Vec<ExprImpl>)> {
479        struct SubstituteSubQueries {
480            input_col_num: usize,
481            subqueries: Vec<Subquery>,
482            correlated_id: Option<CorrelatedId>,
483            correlated_indices_collection: Vec<Vec<usize>>,
484            ctx: OptimizerContextRef,
485        }
486
487        impl ExprRewriter for SubstituteSubQueries {
488            fn rewrite_subquery(&mut self, mut subquery: Subquery) -> ExprImpl {
489                if self.correlated_id.is_none() {
490                    self.correlated_id = Some(self.ctx.next_correlated_id());
491                }
492                let input_ref = InputRef::new(self.input_col_num, subquery.return_type()).into();
493                self.input_col_num += 1;
494                self.correlated_indices_collection.push(
495                    subquery.collect_correlated_indices_by_depth_and_assign_id(
496                        0,
497                        self.correlated_id.unwrap(),
498                    ),
499                );
500                self.subqueries.push(subquery);
501                input_ref
502            }
503        }
504
505        let mut rewriter = SubstituteSubQueries {
506            input_col_num: root.schema().len(),
507            subqueries: vec![],
508            correlated_id: None,
509            correlated_indices_collection: vec![],
510            ctx: self.ctx.clone(),
511        };
512        exprs = exprs
513            .into_iter()
514            .map(|e| rewriter.rewrite_expr(e))
515            .collect();
516
517        let mut right = None;
518
519        for subquery in rewriter.subqueries {
520            let return_type = subquery.return_type();
521            let subroot = self.plan_query(subquery.query)?;
522
523            let subplan = match subquery.kind {
524                SubqueryKind::Scalar => subroot.into_unordered_subplan(),
525                SubqueryKind::UpdateSet => {
526                    let plan = subroot.into_unordered_subplan();
527
528                    // Compose all input columns into a struct with `ROW` function.
529                    let all_input_refs = plan
530                        .schema()
531                        .data_types()
532                        .into_iter()
533                        .enumerate()
534                        .map(|(i, data_type)| InputRef::new(i, data_type).into())
535                        .collect::<Vec<_>>();
536                    let call =
537                        FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type);
538
539                    LogicalProject::create(plan, vec![call.into()])
540                }
541                SubqueryKind::Existential => {
542                    self.create_exists(subroot.into_unordered_subplan())?
543                }
544                SubqueryKind::Array => subroot.into_array_agg()?,
545                _ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind),
546            };
547            if right.is_none() {
548                right = Some(subplan);
549            } else {
550                right = Some(LogicalJoin::create(
551                    right.clone().unwrap(),
552                    subplan,
553                    JoinType::FullOuter,
554                    ExprImpl::literal_bool(true),
555                ));
556            }
557        }
558
559        root = if let Some(right) = right {
560            let mut correlated_indices = rewriter
561                .correlated_indices_collection
562                .iter()
563                .flatten()
564                .cloned()
565                .collect::<Vec<_>>();
566            correlated_indices.sort();
567            correlated_indices.dedup();
568
569            Self::create_apply(
570                rewriter.correlated_id.expect("must have a correlated id"),
571                correlated_indices,
572                root,
573                right,
574                ExprImpl::literal_bool(true),
575                JoinType::LeftOuter,
576                true,
577            )
578        } else {
579            root
580        };
581
582        Ok((root, exprs))
583    }
584
585    fn create_apply(
586        correlated_id: CorrelatedId,
587        correlated_indices: Vec<usize>,
588        left: PlanRef,
589        right: PlanRef,
590        on: ExprImpl,
591        join_type: JoinType,
592        max_one_row: bool,
593    ) -> PlanRef {
594        LogicalApply::create(
595            left,
596            right,
597            join_type,
598            Condition::with_expr(on),
599            correlated_id,
600            correlated_indices,
601            max_one_row,
602        )
603    }
604}