risingwave_frontend/binder/
query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::collections::HashMap;
17use std::rc::Rc;
18
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
22use risingwave_sqlparser::ast::{
23    Corresponding, Cte, CteInner, Expr, Fetch, OrderByExpr, Query, SetExpr, SetOperator, Value,
24    With,
25};
26use thiserror_ext::AsReport;
27
28use super::BoundValues;
29use super::bind_context::BindingCteState;
30use super::statement::RewriteExprsRecursive;
31use crate::binder::bind_context::{BindingCte, RecursiveUnion};
32use crate::binder::{Binder, BoundSetExpr};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::expr::{CorrelatedId, Depth, ExprImpl, ExprRewriter};
35
36/// A validated sql query, including order and union.
37/// An example of its relationship with `BoundSetExpr` and `BoundSelect` can be found here: <https://bit.ly/3GQwgPz>
38#[derive(Debug, Clone)]
39pub struct BoundQuery {
40    pub body: BoundSetExpr,
41    pub order: Vec<ColumnOrder>,
42    pub limit: Option<u64>,
43    pub offset: Option<u64>,
44    pub with_ties: bool,
45    pub extra_order_exprs: Vec<ExprImpl>,
46}
47
48impl BoundQuery {
49    /// The schema returned by this [`BoundQuery`].
50    pub fn schema(&self) -> std::borrow::Cow<'_, Schema> {
51        self.body.schema()
52    }
53
54    /// The types returned by this [`BoundQuery`].
55    pub fn data_types(&self) -> Vec<DataType> {
56        self.schema().data_types()
57    }
58
59    /// Checks whether this query contains references to outer queries.
60    ///
61    /// Note there are 3 cases:
62    /// ```sql
63    /// select 1 from a having exists ( -- this is self
64    ///   select 1 from b where exists (
65    ///     select b1 from c
66    ///   )
67    /// );
68    ///
69    /// select 1 from a having exists ( -- this is self
70    ///   select 1 from b where exists (
71    ///     select a1 from c
72    ///   )
73    /// );
74    ///
75    /// select 1 from a where exists (
76    ///   select 1 from b having exists ( -- this is self, not the one above
77    ///     select a1 from c
78    ///   )
79    /// );
80    /// ```
81    /// We assume `self` is the subquery after `having`. In other words, the query with `from b` in
82    /// first 2 examples and the query with `from c` in the last example.
83    ///
84    /// * The first example is uncorrelated, because it is self-contained and does not depend on
85    ///   table `a`, although there is correlated input ref (`b1`) in it.
86    /// * The second example is correlated, because it depend on a correlated input ref (`a1`) that
87    ///   goes out.
88    /// * The last example is also correlated. because it cannot be evaluated independently either.
89    pub fn is_correlated_by_depth(&self, depth: Depth) -> bool {
90        self.body.is_correlated_by_depth(depth + 1)
91            || self
92                .extra_order_exprs
93                .iter()
94                .any(|e| e.has_correlated_input_ref_by_depth(depth + 1))
95    }
96
97    pub fn is_correlated_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
98        self.body.is_correlated_by_correlated_id(correlated_id)
99            || self
100                .extra_order_exprs
101                .iter()
102                .any(|e| e.has_correlated_input_ref_by_correlated_id(correlated_id))
103    }
104
105    pub fn collect_correlated_indices_by_depth_and_assign_id(
106        &mut self,
107        depth: Depth,
108        correlated_id: CorrelatedId,
109    ) -> Vec<usize> {
110        let mut correlated_indices = vec![];
111
112        correlated_indices.extend(
113            self.body
114                .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
115        );
116
117        correlated_indices.extend(self.extra_order_exprs.iter_mut().flat_map(|expr| {
118            expr.collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id)
119        }));
120        correlated_indices
121    }
122
123    /// Simple `VALUES` without other clauses.
124    pub fn with_values(values: BoundValues) -> Self {
125        BoundQuery {
126            body: BoundSetExpr::Values(values.into()),
127            order: vec![],
128            limit: None,
129            offset: None,
130            with_ties: false,
131            extra_order_exprs: vec![],
132        }
133    }
134}
135
136impl RewriteExprsRecursive for BoundQuery {
137    fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl ExprRewriter) {
138        let new_extra_order_exprs = std::mem::take(&mut self.extra_order_exprs)
139            .into_iter()
140            .map(|expr| rewriter.rewrite_expr(expr))
141            .collect::<Vec<_>>();
142        self.extra_order_exprs = new_extra_order_exprs;
143
144        self.body.rewrite_exprs_recursive(rewriter);
145    }
146}
147
148impl Binder {
149    /// Bind a [`Query`].
150    ///
151    /// Before binding the [`Query`], we push the current [`BindContext`](super::BindContext) to the
152    /// stack and create a new context, because it may be a subquery.
153    ///
154    /// After finishing binding, we pop the previous context from the stack.
155    pub fn bind_query(&mut self, query: &Query) -> Result<BoundQuery> {
156        self.push_context();
157        let result = self.bind_query_inner(query);
158        self.pop_context()?;
159        result
160    }
161
162    /// Bind a [`Query`] for view.
163    /// TODO: support `SECURITY INVOKER` for view.
164    pub fn bind_query_for_view(&mut self, query: &Query) -> Result<BoundQuery> {
165        self.push_context();
166        self.context.disable_security_invoker = true;
167        let result = self.bind_query_inner(query);
168        self.pop_context()?;
169        result
170    }
171
172    /// Bind a [`Query`] using the current [`BindContext`](super::BindContext).
173    pub(super) fn bind_query_inner(
174        &mut self,
175        Query {
176            with,
177            body,
178            order_by,
179            limit,
180            offset,
181            fetch,
182        }: &Query,
183    ) -> Result<BoundQuery> {
184        let mut with_ties = false;
185        let limit = match (limit, fetch) {
186            (None, None) => None,
187            (
188                None,
189                Some(Fetch {
190                    with_ties: fetch_with_ties,
191                    quantity,
192                }),
193            ) => {
194                with_ties = *fetch_with_ties;
195                match quantity {
196                    Some(v) => Some(Expr::Value(Value::Number(v.clone()))),
197                    None => Some(Expr::Value(Value::Number("1".to_owned()))),
198                }
199            }
200            (Some(limit), None) => Some(limit.clone()),
201            (Some(_), Some(_)) => unreachable!(), // parse error
202        };
203        let limit_expr = limit.map(|expr| self.bind_expr(&expr)).transpose()?;
204        let limit = if let Some(limit_expr) = limit_expr {
205            // wrong type error is handled here
206            let limit_cast_to_bigint = limit_expr.cast_assign(&DataType::Int64).map_err(|_| {
207                RwError::from(ErrorCode::ExprError(
208                    "expects an integer or expression that can be evaluated to an integer after LIMIT"
209                        .into(),
210                ))
211            })?;
212            let limit = match limit_cast_to_bigint.try_fold_const() {
213                Some(Ok(Some(datum))) => {
214                    let value = datum.as_int64();
215                    if *value < 0 {
216                        return Err(ErrorCode::ExprError(
217                            format!("LIMIT must not be negative, but found: {}", *value).into(),
218                        )
219                            .into());
220                    }
221                    *value as u64
222                }
223                // If evaluated to NULL, we follow PG to treat NULL as no limit
224                Some(Ok(None)) => {
225                    u64::MAX
226                }
227                // not const error
228                None => return Err(ErrorCode::ExprError(
229                    "expects an integer or expression that can be evaluated to an integer after LIMIT, but found non-const expression"
230                        .into(),
231                ).into()),
232                // eval error
233                Some(Err(e)) => {
234                    return Err(ErrorCode::ExprError(
235                        format!("expects an integer or expression that can be evaluated to an integer after LIMIT,\nbut the evaluation of the expression returns error:{}", e.as_report()
236                        ).into(),
237                    ).into())
238                }
239            };
240            Some(limit)
241        } else {
242            None
243        };
244
245        let offset = offset
246            .as_ref()
247            .map(|s| parse_non_negative_i64("OFFSET", s))
248            .transpose()?
249            .map(|v| v as u64);
250
251        if let Some(with) = with {
252            self.bind_with(with)?;
253        }
254        let body = self.bind_set_expr(body)?;
255        let name_to_index =
256            Self::build_name_to_index(body.schema().fields().iter().map(|f| f.name.clone()));
257        let mut extra_order_exprs = vec![];
258        let visible_output_num = body.schema().len();
259        let order = order_by
260            .iter()
261            .map(|order_by_expr| {
262                self.bind_order_by_expr_in_query(
263                    order_by_expr,
264                    &body,
265                    &name_to_index,
266                    &mut extra_order_exprs,
267                    visible_output_num,
268                )
269            })
270            .collect::<Result<_>>()?;
271        Ok(BoundQuery {
272            body,
273            order,
274            limit,
275            offset,
276            with_ties,
277            extra_order_exprs,
278        })
279    }
280
281    pub fn build_name_to_index(names: impl Iterator<Item = String>) -> HashMap<String, usize> {
282        let mut m = HashMap::new();
283        names.enumerate().for_each(|(index, name)| {
284            m.entry(name)
285                // Ambiguous (duplicate) output names are marked with usize::MAX.
286                // This is not necessarily an error as long as not actually referenced.
287                .and_modify(|v| *v = usize::MAX)
288                .or_insert(index);
289        });
290        m
291    }
292
293    /// Bind an `ORDER BY` expression in a [`Query`], which can be either:
294    /// * an output-column name
295    /// * index of an output column
296    /// * an arbitrary expression
297    ///
298    /// Refer to `bind_group_by_expr_in_select` to see their similarities and differences.
299    ///
300    /// # Arguments
301    ///
302    /// * `name_to_index` - visible output column name -> index. Ambiguous (duplicate) output names
303    ///   are marked with `usize::MAX`.
304    /// * `visible_output_num` - the number of all visible output columns, including duplicates.
305    fn bind_order_by_expr_in_query(
306        &mut self,
307        OrderByExpr {
308            expr,
309            asc,
310            nulls_first,
311        }: &OrderByExpr,
312        body: &BoundSetExpr,
313        name_to_index: &HashMap<String, usize>,
314        extra_order_exprs: &mut Vec<ExprImpl>,
315        visible_output_num: usize,
316    ) -> Result<ColumnOrder> {
317        let order_type = OrderType::from_bools(*asc, *nulls_first);
318
319        // If the query body is a simple `SELECT`, we can reuse an existing select item by
320        // expression equality, instead of always appending a new hidden column for ORDER BY.
321        //
322        // This is only safe for pure expressions. For example, `ORDER BY random()` must not be
323        // rewritten to reuse `SELECT random()` because they should be evaluated independently.
324        let select_items_for_match = match body {
325            BoundSetExpr::Select(s) => Some(&s.select_items[..]),
326            _ => None,
327        };
328
329        let column_index = match expr {
330            Expr::Identifier(name) if let Some(index) = name_to_index.get(&name.real_value()) => {
331                match *index != usize::MAX {
332                    true => *index,
333                    false => {
334                        return Err(ErrorCode::BindError(format!(
335                            "ORDER BY \"{}\" is ambiguous",
336                            name.real_value()
337                        ))
338                        .into());
339                    }
340                }
341            }
342            Expr::Value(Value::Number(number)) => match number.parse::<usize>() {
343                Ok(index) if 1 <= index && index <= visible_output_num => index - 1,
344                _ => {
345                    return Err(ErrorCode::InvalidInputSyntax(format!(
346                        "Invalid ordinal number in ORDER BY: {}",
347                        number
348                    ))
349                    .into());
350                }
351            },
352            expr => {
353                let bound_expr = self.bind_expr(expr)?;
354
355                if bound_expr.is_pure()
356                    && let Some(select_items) = select_items_for_match
357                    && let Some(existing_idx) = select_items.iter().position(|e| e == &bound_expr)
358                {
359                    existing_idx
360                } else {
361                    extra_order_exprs.push(bound_expr);
362                    visible_output_num + extra_order_exprs.len() - 1
363                }
364            }
365        };
366        Ok(ColumnOrder::new(column_index, order_type))
367    }
368
369    fn bind_with(&mut self, with: &With) -> Result<()> {
370        for cte_table in &with.cte_tables {
371            // note that the new `share_id` for the rcte is generated here
372            let share_id = self.next_share_id();
373            let Cte { alias, cte_inner } = cte_table;
374            let table_name = alias.name.real_value();
375
376            if with.recursive {
377                if let CteInner::Query(query) = cte_inner {
378                    let (all, corresponding, left, right, with) = Self::validate_rcte(query)?;
379
380                    // validated in `validate_rcte`
381                    assert!(
382                        !corresponding.is_corresponding(),
383                        "`CORRESPONDING` is not supported in recursive CTE"
384                    );
385
386                    let entry = self
387                        .context
388                        .cte_to_relation
389                        .entry(table_name)
390                        .insert_entry(Rc::new(RefCell::new(BindingCte {
391                            share_id,
392                            state: BindingCteState::Init,
393                            alias: alias.clone(),
394                        })))
395                        .get()
396                        .clone();
397
398                    self.bind_rcte(with, entry, left, right, all)?;
399                } else {
400                    return Err(ErrorCode::BindError(
401                        "RECURSIVE CTE only support query".to_owned(),
402                    )
403                    .into());
404                }
405            } else {
406                match cte_inner {
407                    CteInner::Query(query) => {
408                        let bound_query = self.bind_query(query)?;
409                        self.context.cte_to_relation.insert(
410                            table_name,
411                            Rc::new(RefCell::new(BindingCte {
412                                share_id,
413                                state: BindingCteState::Bound {
414                                    query: either::Either::Left(bound_query),
415                                },
416                                alias: alias.clone(),
417                            })),
418                        );
419                    }
420                    CteInner::ChangeLog(from_table_name) => {
421                        self.push_context();
422                        let from_table_relation =
423                            self.bind_relation_by_name(from_table_name, None, None, true)?;
424                        self.pop_context()?;
425                        self.context.cte_to_relation.insert(
426                            table_name,
427                            Rc::new(RefCell::new(BindingCte {
428                                share_id,
429                                state: BindingCteState::ChangeLog {
430                                    table: from_table_relation,
431                                },
432                                alias: alias.clone(),
433                            })),
434                        );
435                    }
436                }
437            }
438        }
439        Ok(())
440    }
441
442    /// syntactically validate the recursive cte ast with the current support features in rw.
443    fn validate_rcte(
444        query: &Query,
445    ) -> Result<(bool, &Corresponding, &SetExpr, &SetExpr, Option<&With>)> {
446        let Query {
447            with,
448            body,
449            order_by,
450            limit,
451            offset,
452            fetch,
453        } = query;
454
455        /// the input clause should not be supported.
456        fn should_be_empty<T>(v: Option<T>, clause: &str) -> Result<()> {
457            if v.is_some() {
458                return Err(ErrorCode::BindError(format!(
459                    "`{clause}` is not supported in recursive CTE"
460                ))
461                .into());
462            }
463            Ok(())
464        }
465
466        should_be_empty(order_by.first(), "ORDER BY")?;
467        should_be_empty(limit.as_ref(), "LIMIT")?;
468        should_be_empty(offset.as_ref(), "OFFSET")?;
469        should_be_empty(fetch.as_ref(), "FETCH")?;
470
471        let SetExpr::SetOperation {
472            op: SetOperator::Union,
473            all,
474            corresponding,
475            left,
476            right,
477        } = body
478        else {
479            return Err(
480                ErrorCode::BindError("`UNION` is required in recursive CTE".to_owned()).into(),
481            );
482        };
483
484        if !all {
485            return Err(ErrorCode::BindError(
486                "only `UNION ALL` is supported in recursive CTE now".to_owned(),
487            )
488            .into());
489        }
490
491        if corresponding.is_corresponding() {
492            return Err(ErrorCode::BindError(
493                "`CORRESPONDING` is not supported in recursive CTE".to_owned(),
494            )
495            .into());
496        }
497
498        Ok((*all, corresponding, left, right, with.as_ref()))
499    }
500
501    fn bind_rcte(
502        &mut self,
503        with: Option<&With>,
504        entry: Rc<RefCell<BindingCte>>,
505        left: &SetExpr,
506        right: &SetExpr,
507        all: bool,
508    ) -> Result<()> {
509        self.push_context();
510        let result = self.bind_rcte_inner(with, entry, left, right, all);
511        self.pop_context()?;
512        result
513    }
514
515    fn bind_rcte_inner(
516        &mut self,
517        with: Option<&With>,
518        entry: Rc<RefCell<BindingCte>>,
519        left: &SetExpr,
520        right: &SetExpr,
521        all: bool,
522    ) -> Result<()> {
523        if let Some(with) = with {
524            self.bind_with(with)?;
525        }
526
527        // We assume `left` is the base term, otherwise the implementation may be very hard.
528        // The behavior is the same as PostgreSQL's.
529        // reference: <https://www.postgresql.org/docs/16/sql-select.html#:~:text=the%20recursive%20self%2Dreference%20must%20appear%20on%20the%20right%2Dhand%20side%20of%20the%20UNION>
530        let mut base = self.bind_set_expr(left)?;
531
532        entry.borrow_mut().state = BindingCteState::BaseResolved { base: base.clone() };
533
534        // Reset context for right side, but keep `cte_to_relation`.
535        let new_context = std::mem::take(&mut self.context);
536        self.context
537            .cte_to_relation
538            .clone_from(&new_context.cte_to_relation);
539        self.context.disable_security_invoker = new_context.disable_security_invoker;
540        // bind the rest of the recursive cte
541        let mut recursive = self.bind_set_expr(right)?;
542        // Reset context for the set operation.
543        self.context = Default::default();
544        self.context.cte_to_relation = new_context.cte_to_relation;
545        self.context.disable_security_invoker = new_context.disable_security_invoker;
546
547        Self::align_schema(&mut base, &mut recursive, SetOperator::Union)?;
548        let schema = base.schema().into_owned();
549
550        let recursive_union = RecursiveUnion {
551            all,
552            base: Box::new(base),
553            recursive: Box::new(recursive),
554            schema,
555        };
556
557        entry.borrow_mut().state = BindingCteState::Bound {
558            query: either::Either::Right(recursive_union),
559        };
560
561        Ok(())
562    }
563}
564
565// TODO: Make clause a const generic param after <https://github.com/rust-lang/rust/issues/95174>.
566fn parse_non_negative_i64(clause: &str, s: &str) -> Result<i64> {
567    match s.parse::<i64>() {
568        Ok(v) => {
569            if v < 0 {
570                Err(ErrorCode::InvalidInputSyntax(format!("{clause} must not be negative")).into())
571            } else {
572                Ok(v)
573            }
574        }
575        Err(e) => Err(ErrorCode::InvalidInputSyntax(e.to_report_string()).into()),
576    }
577}