risingwave_frontend/binder/
query.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::collections::HashMap;
17use std::rc::Rc;
18
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
22use risingwave_sqlparser::ast::{Cte, CteInner, Expr, Fetch, OrderByExpr, Query, Value, With};
23use thiserror_ext::AsReport;
24
25use super::BoundValues;
26use super::bind_context::BindingCteState;
27use super::statement::RewriteExprsRecursive;
28use crate::binder::bind_context::BindingCte;
29use crate::binder::{Binder, BoundSetExpr};
30use crate::error::{ErrorCode, Result, RwError};
31use crate::expr::{CorrelatedId, Depth, ExprImpl, ExprRewriter};
32
33/// A validated sql query, including order and union.
34/// An example of its relationship with `BoundSetExpr` and `BoundSelect` can be found here: <https://bit.ly/3GQwgPz>
35#[derive(Debug, Clone)]
36pub struct BoundQuery {
37    pub body: BoundSetExpr,
38    pub order: Vec<ColumnOrder>,
39    pub limit: Option<u64>,
40    pub offset: Option<u64>,
41    pub with_ties: bool,
42    pub extra_order_exprs: Vec<ExprImpl>,
43}
44
45impl BoundQuery {
46    /// The schema returned by this [`BoundQuery`].
47    pub fn schema(&self) -> std::borrow::Cow<'_, Schema> {
48        self.body.schema()
49    }
50
51    /// The types returned by this [`BoundQuery`].
52    pub fn data_types(&self) -> Vec<DataType> {
53        self.schema().data_types()
54    }
55
56    /// Checks whether this query contains references to outer queries.
57    ///
58    /// Note there are 3 cases:
59    /// ```sql
60    /// select 1 from a having exists ( -- this is self
61    ///   select 1 from b where exists (
62    ///     select b1 from c
63    ///   )
64    /// );
65    ///
66    /// select 1 from a having exists ( -- this is self
67    ///   select 1 from b where exists (
68    ///     select a1 from c
69    ///   )
70    /// );
71    ///
72    /// select 1 from a where exists (
73    ///   select 1 from b having exists ( -- this is self, not the one above
74    ///     select a1 from c
75    ///   )
76    /// );
77    /// ```
78    /// We assume `self` is the subquery after `having`. In other words, the query with `from b` in
79    /// first 2 examples and the query with `from c` in the last example.
80    ///
81    /// * The first example is uncorrelated, because it is self-contained and does not depend on
82    ///   table `a`, although there is correlated input ref (`b1`) in it.
83    /// * The second example is correlated, because it depend on a correlated input ref (`a1`) that
84    ///   goes out.
85    /// * The last example is also correlated. because it cannot be evaluated independently either.
86    pub fn is_correlated_by_depth(&self, depth: Depth) -> bool {
87        self.body.is_correlated_by_depth(depth + 1)
88            || self
89                .extra_order_exprs
90                .iter()
91                .any(|e| e.has_correlated_input_ref_by_depth(depth + 1))
92    }
93
94    pub fn is_correlated_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
95        self.body.is_correlated_by_correlated_id(correlated_id)
96            || self
97                .extra_order_exprs
98                .iter()
99                .any(|e| e.has_correlated_input_ref_by_correlated_id(correlated_id))
100    }
101
102    pub fn collect_correlated_indices_by_depth_and_assign_id(
103        &mut self,
104        depth: Depth,
105        correlated_id: CorrelatedId,
106    ) -> Vec<usize> {
107        let mut correlated_indices = vec![];
108
109        correlated_indices.extend(
110            self.body
111                .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
112        );
113
114        correlated_indices.extend(self.extra_order_exprs.iter_mut().flat_map(|expr| {
115            expr.collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id)
116        }));
117        correlated_indices
118    }
119
120    /// Simple `VALUES` without other clauses.
121    pub fn with_values(values: BoundValues) -> Self {
122        BoundQuery {
123            body: BoundSetExpr::Values(values.into()),
124            order: vec![],
125            limit: None,
126            offset: None,
127            with_ties: false,
128            extra_order_exprs: vec![],
129        }
130    }
131}
132
133impl RewriteExprsRecursive for BoundQuery {
134    fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl ExprRewriter) {
135        let new_extra_order_exprs = std::mem::take(&mut self.extra_order_exprs)
136            .into_iter()
137            .map(|expr| rewriter.rewrite_expr(expr))
138            .collect::<Vec<_>>();
139        self.extra_order_exprs = new_extra_order_exprs;
140
141        self.body.rewrite_exprs_recursive(rewriter);
142    }
143}
144
145impl Binder {
146    /// Bind a [`Query`].
147    ///
148    /// Before binding the [`Query`], we push the current [`BindContext`](super::BindContext) to the
149    /// stack and create a new context, because it may be a subquery.
150    ///
151    /// After finishing binding, we pop the previous context from the stack.
152    pub fn bind_query(&mut self, query: &Query) -> Result<BoundQuery> {
153        self.push_context();
154        let result = self.bind_query_inner(query);
155        self.pop_context()?;
156        result
157    }
158
159    /// Bind a [`Query`] for view.
160    /// TODO: support `SECURITY INVOKER` for view.
161    pub fn bind_query_for_view(&mut self, query: &Query) -> Result<BoundQuery> {
162        self.push_context();
163        self.context.disable_security_invoker = true;
164        let result = self.bind_query_inner(query);
165        self.pop_context()?;
166        result
167    }
168
169    /// Bind a [`Query`] using the current [`BindContext`](super::BindContext).
170    pub(super) fn bind_query_inner(
171        &mut self,
172        Query {
173            with,
174            body,
175            order_by,
176            limit,
177            offset,
178            fetch,
179        }: &Query,
180    ) -> Result<BoundQuery> {
181        let mut with_ties = false;
182        let limit = match (limit, fetch) {
183            (None, None) => None,
184            (
185                None,
186                Some(Fetch {
187                    with_ties: fetch_with_ties,
188                    quantity,
189                }),
190            ) => {
191                with_ties = *fetch_with_ties;
192                match quantity {
193                    Some(v) => Some(Expr::Value(Value::Number(v.clone()))),
194                    None => Some(Expr::Value(Value::Number("1".to_owned()))),
195                }
196            }
197            (Some(limit), None) => Some(limit.clone()),
198            (Some(_), Some(_)) => unreachable!(), // parse error
199        };
200        let limit_expr = limit.map(|expr| self.bind_expr(&expr)).transpose()?;
201        let limit = if let Some(limit_expr) = limit_expr {
202            // wrong type error is handled here
203            let limit_cast_to_bigint = limit_expr.cast_assign(&DataType::Int64).map_err(|_| {
204                RwError::from(ErrorCode::ExprError(
205                    "expects an integer or expression that can be evaluated to an integer after LIMIT"
206                        .into(),
207                ))
208            })?;
209            let limit = match limit_cast_to_bigint.try_fold_const() {
210                Some(Ok(Some(datum))) => {
211                    let value = datum.as_int64();
212                    if *value < 0 {
213                        return Err(ErrorCode::ExprError(
214                            format!("LIMIT must not be negative, but found: {}", *value).into(),
215                        )
216                            .into());
217                    }
218                    *value as u64
219                }
220                // If evaluated to NULL, we follow PG to treat NULL as no limit
221                Some(Ok(None)) => {
222                    u64::MAX
223                }
224                // not const error
225                None => return Err(ErrorCode::ExprError(
226                    "expects an integer or expression that can be evaluated to an integer after LIMIT, but found non-const expression"
227                        .into(),
228                ).into()),
229                // eval error
230                Some(Err(e)) => {
231                    return Err(ErrorCode::ExprError(
232                        format!("expects an integer or expression that can be evaluated to an integer after LIMIT,\nbut the evaluation of the expression returns error:{}", e.as_report()
233                        ).into(),
234                    ).into())
235                }
236            };
237            Some(limit)
238        } else {
239            None
240        };
241
242        let offset = offset
243            .as_ref()
244            .map(|s| parse_non_negative_i64("OFFSET", s))
245            .transpose()?
246            .map(|v| v as u64);
247
248        if let Some(with) = with {
249            self.bind_with(with)?;
250        }
251        let body = self.bind_set_expr(body)?;
252        let name_to_index =
253            Self::build_name_to_index(body.schema().fields().iter().map(|f| f.name.clone()));
254        let mut extra_order_exprs = vec![];
255        let visible_output_num = body.schema().len();
256        let order = order_by
257            .iter()
258            .map(|order_by_expr| {
259                self.bind_order_by_expr_in_query(
260                    order_by_expr,
261                    &body,
262                    &name_to_index,
263                    &mut extra_order_exprs,
264                    visible_output_num,
265                )
266            })
267            .collect::<Result<_>>()?;
268        Ok(BoundQuery {
269            body,
270            order,
271            limit,
272            offset,
273            with_ties,
274            extra_order_exprs,
275        })
276    }
277
278    pub fn build_name_to_index(names: impl Iterator<Item = String>) -> HashMap<String, usize> {
279        let mut m = HashMap::new();
280        names.enumerate().for_each(|(index, name)| {
281            m.entry(name)
282                // Ambiguous (duplicate) output names are marked with usize::MAX.
283                // This is not necessarily an error as long as not actually referenced.
284                .and_modify(|v| *v = usize::MAX)
285                .or_insert(index);
286        });
287        m
288    }
289
290    /// Bind an `ORDER BY` expression in a [`Query`], which can be either:
291    /// * an output-column name
292    /// * index of an output column
293    /// * an arbitrary expression
294    ///
295    /// Refer to `bind_group_by_expr_in_select` to see their similarities and differences.
296    ///
297    /// # Arguments
298    ///
299    /// * `name_to_index` - visible output column name -> index. Ambiguous (duplicate) output names
300    ///   are marked with `usize::MAX`.
301    /// * `visible_output_num` - the number of all visible output columns, including duplicates.
302    fn bind_order_by_expr_in_query(
303        &mut self,
304        OrderByExpr {
305            expr,
306            asc,
307            nulls_first,
308        }: &OrderByExpr,
309        body: &BoundSetExpr,
310        name_to_index: &HashMap<String, usize>,
311        extra_order_exprs: &mut Vec<ExprImpl>,
312        visible_output_num: usize,
313    ) -> Result<ColumnOrder> {
314        let order_type = OrderType::from_bools(*asc, *nulls_first);
315
316        // If the query body is a simple `SELECT`, we can reuse an existing select item by
317        // expression equality, instead of always appending a new hidden column for ORDER BY.
318        //
319        // This is only safe for pure expressions. For example, `ORDER BY random()` must not be
320        // rewritten to reuse `SELECT random()` because they should be evaluated independently.
321        let select_items_for_match = match body {
322            BoundSetExpr::Select(s) => Some(&s.select_items[..]),
323            _ => None,
324        };
325
326        let column_index = match expr {
327            Expr::Identifier(name) if let Some(index) = name_to_index.get(&name.real_value()) => {
328                match *index != usize::MAX {
329                    true => *index,
330                    false => {
331                        return Err(ErrorCode::BindError(format!(
332                            "ORDER BY \"{}\" is ambiguous",
333                            name.real_value()
334                        ))
335                        .into());
336                    }
337                }
338            }
339            Expr::Value(Value::Number(number)) => match number.parse::<usize>() {
340                Ok(index) if 1 <= index && index <= visible_output_num => index - 1,
341                _ => {
342                    return Err(ErrorCode::InvalidInputSyntax(format!(
343                        "Invalid ordinal number in ORDER BY: {}",
344                        number
345                    ))
346                    .into());
347                }
348            },
349            expr => {
350                let bound_expr = self.bind_expr(expr)?;
351
352                if bound_expr.is_pure()
353                    && let Some(select_items) = select_items_for_match
354                    && let Some(existing_idx) = select_items.iter().position(|e| e == &bound_expr)
355                {
356                    existing_idx
357                } else {
358                    extra_order_exprs.push(bound_expr);
359                    visible_output_num + extra_order_exprs.len() - 1
360                }
361            }
362        };
363        Ok(ColumnOrder::new(column_index, order_type))
364    }
365
366    fn bind_with(&mut self, with: &With) -> Result<()> {
367        if with.recursive {
368            return Err(ErrorCode::BindError("RECURSIVE CTE is not supported".to_owned()).into());
369        }
370
371        for cte_table in &with.cte_tables {
372            let share_id = self.next_share_id();
373            let Cte { alias, cte_inner } = cte_table;
374            let table_name = alias.name.real_value();
375
376            match cte_inner {
377                CteInner::Query(query) => {
378                    let bound_query = self.bind_query(query)?;
379                    self.context.cte_to_relation.insert(
380                        table_name,
381                        Rc::new(RefCell::new(BindingCte {
382                            share_id,
383                            state: BindingCteState::Bound { query: bound_query },
384                            alias: alias.clone(),
385                        })),
386                    );
387                }
388                CteInner::ChangeLog(from_table_name) => {
389                    self.push_context();
390                    let from_table_relation =
391                        self.bind_relation_by_name(from_table_name, None, None, true)?;
392                    self.pop_context()?;
393                    self.context.cte_to_relation.insert(
394                        table_name,
395                        Rc::new(RefCell::new(BindingCte {
396                            share_id,
397                            state: BindingCteState::ChangeLog {
398                                table: from_table_relation,
399                            },
400                            alias: alias.clone(),
401                        })),
402                    );
403                }
404            }
405        }
406        Ok(())
407    }
408}
409
410// TODO: Make clause a const generic param after <https://github.com/rust-lang/rust/issues/95174>.
411fn parse_non_negative_i64(clause: &str, s: &str) -> Result<i64> {
412    match s.parse::<i64>() {
413        Ok(v) => {
414            if v < 0 {
415                Err(ErrorCode::InvalidInputSyntax(format!("{clause} must not be negative")).into())
416            } else {
417                Ok(v)
418            }
419        }
420        Err(e) => Err(ErrorCode::InvalidInputSyntax(e.to_report_string()).into()),
421    }
422}