risingwave_frontend/binder/
query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::collections::HashMap;
17use std::rc::Rc;
18
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
22use risingwave_sqlparser::ast::{
23    Cte, CteInner, Expr, Fetch, OrderByExpr, Query, SetExpr, SetOperator, Value, With,
24};
25use thiserror_ext::AsReport;
26
27use super::BoundValues;
28use super::bind_context::BindingCteState;
29use super::statement::RewriteExprsRecursive;
30use crate::binder::bind_context::{BindingCte, RecursiveUnion};
31use crate::binder::{Binder, BoundSetExpr};
32use crate::error::{ErrorCode, Result, RwError};
33use crate::expr::{CorrelatedId, Depth, ExprImpl, ExprRewriter};
34
35/// A validated sql query, including order and union.
36/// An example of its relationship with `BoundSetExpr` and `BoundSelect` can be found here: <https://bit.ly/3GQwgPz>
37#[derive(Debug, Clone)]
38pub struct BoundQuery {
39    pub body: BoundSetExpr,
40    pub order: Vec<ColumnOrder>,
41    pub limit: Option<u64>,
42    pub offset: Option<u64>,
43    pub with_ties: bool,
44    pub extra_order_exprs: Vec<ExprImpl>,
45}
46
47impl BoundQuery {
48    /// The schema returned by this [`BoundQuery`].
49    pub fn schema(&self) -> std::borrow::Cow<'_, Schema> {
50        self.body.schema()
51    }
52
53    /// The types returned by this [`BoundQuery`].
54    pub fn data_types(&self) -> Vec<DataType> {
55        self.schema().data_types()
56    }
57
58    /// Checks whether this query contains references to outer queries.
59    ///
60    /// Note there are 3 cases:
61    /// ```sql
62    /// select 1 from a having exists ( -- this is self
63    ///   select 1 from b where exists (
64    ///     select b1 from c
65    ///   )
66    /// );
67    ///
68    /// select 1 from a having exists ( -- this is self
69    ///   select 1 from b where exists (
70    ///     select a1 from c
71    ///   )
72    /// );
73    ///
74    /// select 1 from a where exists (
75    ///   select 1 from b having exists ( -- this is self, not the one above
76    ///     select a1 from c
77    ///   )
78    /// );
79    /// ```
80    /// We assume `self` is the subquery after `having`. In other words, the query with `from b` in
81    /// first 2 examples and the query with `from c` in the last example.
82    ///
83    /// * The first example is uncorrelated, because it is self-contained and does not depend on
84    ///   table `a`, although there is correlated input ref (`b1`) in it.
85    /// * The second example is correlated, because it depend on a correlated input ref (`a1`) that
86    ///   goes out.
87    /// * The last example is also correlated. because it cannot be evaluated independently either.
88    pub fn is_correlated(&self, depth: Depth) -> bool {
89        self.body.is_correlated(depth + 1)
90            || self
91                .extra_order_exprs
92                .iter()
93                .any(|e| e.has_correlated_input_ref_by_depth(depth + 1))
94    }
95
96    pub fn collect_correlated_indices_by_depth_and_assign_id(
97        &mut self,
98        depth: Depth,
99        correlated_id: CorrelatedId,
100    ) -> Vec<usize> {
101        let mut correlated_indices = vec![];
102
103        correlated_indices.extend(
104            self.body
105                .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
106        );
107
108        correlated_indices.extend(self.extra_order_exprs.iter_mut().flat_map(|expr| {
109            expr.collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id)
110        }));
111        correlated_indices
112    }
113
114    /// Simple `VALUES` without other clauses.
115    pub fn with_values(values: BoundValues) -> Self {
116        BoundQuery {
117            body: BoundSetExpr::Values(values.into()),
118            order: vec![],
119            limit: None,
120            offset: None,
121            with_ties: false,
122            extra_order_exprs: vec![],
123        }
124    }
125}
126
127impl RewriteExprsRecursive for BoundQuery {
128    fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl ExprRewriter) {
129        let new_extra_order_exprs = std::mem::take(&mut self.extra_order_exprs)
130            .into_iter()
131            .map(|expr| rewriter.rewrite_expr(expr))
132            .collect::<Vec<_>>();
133        self.extra_order_exprs = new_extra_order_exprs;
134
135        self.body.rewrite_exprs_recursive(rewriter);
136    }
137}
138
139impl Binder {
140    /// Bind a [`Query`].
141    ///
142    /// Before binding the [`Query`], we push the current [`BindContext`](super::BindContext) to the
143    /// stack and create a new context, because it may be a subquery.
144    ///
145    /// After finishing binding, we pop the previous context from the stack.
146    pub fn bind_query(&mut self, query: Query) -> Result<BoundQuery> {
147        self.push_context();
148        let result = self.bind_query_inner(query);
149        self.pop_context()?;
150        result
151    }
152
153    /// Bind a [`Query`] for view.
154    /// TODO: support `SECURITY INVOKER` for view.
155    pub fn bind_query_for_view(&mut self, query: Query) -> Result<BoundQuery> {
156        self.push_context();
157        self.context.disable_security_invoker = true;
158        let result = self.bind_query_inner(query);
159        self.pop_context()?;
160        result
161    }
162
163    /// Bind a [`Query`] using the current [`BindContext`](super::BindContext).
164    pub(super) fn bind_query_inner(
165        &mut self,
166        Query {
167            with,
168            body,
169            order_by,
170            limit,
171            offset,
172            fetch,
173        }: Query,
174    ) -> Result<BoundQuery> {
175        let mut with_ties = false;
176        let limit = match (limit, fetch) {
177            (None, None) => None,
178            (
179                None,
180                Some(Fetch {
181                    with_ties: fetch_with_ties,
182                    quantity,
183                }),
184            ) => {
185                with_ties = fetch_with_ties;
186                match quantity {
187                    Some(v) => Some(Expr::Value(Value::Number(v))),
188                    None => Some(Expr::Value(Value::Number("1".to_owned()))),
189                }
190            }
191            (Some(limit), None) => Some(limit),
192            (Some(_), Some(_)) => unreachable!(), // parse error
193        };
194        let limit_expr = limit.map(|expr| self.bind_expr(expr)).transpose()?;
195        let limit = if let Some(limit_expr) = limit_expr {
196            // wrong type error is handled here
197            let limit_cast_to_bigint = limit_expr.cast_assign(DataType::Int64).map_err(|_| {
198                RwError::from(ErrorCode::ExprError(
199                    "expects an integer or expression that can be evaluated to an integer after LIMIT"
200                        .into(),
201                ))
202            })?;
203            let limit = match limit_cast_to_bigint.try_fold_const() {
204                Some(Ok(Some(datum))) => {
205                    let value = datum.as_int64();
206                    if *value < 0 {
207                        return Err(ErrorCode::ExprError(
208                            format!("LIMIT must not be negative, but found: {}", *value).into(),
209                        )
210                            .into());
211                    }
212                    *value as u64
213                }
214                // If evaluated to NULL, we follow PG to treat NULL as no limit
215                Some(Ok(None)) => {
216                    u64::MAX
217                }
218                // not const error
219                None => return Err(ErrorCode::ExprError(
220                    "expects an integer or expression that can be evaluated to an integer after LIMIT, but found non-const expression"
221                        .into(),
222                ).into()),
223                // eval error
224                Some(Err(e)) => {
225                    return Err(ErrorCode::ExprError(
226                        format!("expects an integer or expression that can be evaluated to an integer after LIMIT,\nbut the evaluation of the expression returns error:{}", e.as_report()
227                        ).into(),
228                    ).into())
229                }
230            };
231            Some(limit)
232        } else {
233            None
234        };
235
236        let offset = offset
237            .map(|s| parse_non_negative_i64("OFFSET", &s))
238            .transpose()?
239            .map(|v| v as u64);
240
241        if let Some(with) = with {
242            self.bind_with(with)?;
243        }
244        let body = self.bind_set_expr(body)?;
245        let name_to_index =
246            Self::build_name_to_index(body.schema().fields().iter().map(|f| f.name.clone()));
247        let mut extra_order_exprs = vec![];
248        let visible_output_num = body.schema().len();
249        let order = order_by
250            .into_iter()
251            .map(|order_by_expr| {
252                self.bind_order_by_expr_in_query(
253                    order_by_expr,
254                    &name_to_index,
255                    &mut extra_order_exprs,
256                    visible_output_num,
257                )
258            })
259            .collect::<Result<_>>()?;
260        Ok(BoundQuery {
261            body,
262            order,
263            limit,
264            offset,
265            with_ties,
266            extra_order_exprs,
267        })
268    }
269
270    pub fn build_name_to_index(names: impl Iterator<Item = String>) -> HashMap<String, usize> {
271        let mut m = HashMap::new();
272        names.enumerate().for_each(|(index, name)| {
273            m.entry(name)
274                // Ambiguous (duplicate) output names are marked with usize::MAX.
275                // This is not necessarily an error as long as not actually referenced.
276                .and_modify(|v| *v = usize::MAX)
277                .or_insert(index);
278        });
279        m
280    }
281
282    /// Bind an `ORDER BY` expression in a [`Query`], which can be either:
283    /// * an output-column name
284    /// * index of an output column
285    /// * an arbitrary expression
286    ///
287    /// Refer to `bind_group_by_expr_in_select` to see their similarities and differences.
288    ///
289    /// # Arguments
290    ///
291    /// * `name_to_index` - visible output column name -> index. Ambiguous (duplicate) output names
292    ///   are marked with `usize::MAX`.
293    /// * `visible_output_num` - the number of all visible output columns, including duplicates.
294    fn bind_order_by_expr_in_query(
295        &mut self,
296        OrderByExpr {
297            expr,
298            asc,
299            nulls_first,
300        }: OrderByExpr,
301        name_to_index: &HashMap<String, usize>,
302        extra_order_exprs: &mut Vec<ExprImpl>,
303        visible_output_num: usize,
304    ) -> Result<ColumnOrder> {
305        let order_type = OrderType::from_bools(asc, nulls_first);
306        let column_index = match expr {
307            Expr::Identifier(name) if let Some(index) = name_to_index.get(&name.real_value()) => {
308                match *index != usize::MAX {
309                    true => *index,
310                    false => {
311                        return Err(ErrorCode::BindError(format!(
312                            "ORDER BY \"{}\" is ambiguous",
313                            name.real_value()
314                        ))
315                        .into());
316                    }
317                }
318            }
319            Expr::Value(Value::Number(number)) => match number.parse::<usize>() {
320                Ok(index) if 1 <= index && index <= visible_output_num => index - 1,
321                _ => {
322                    return Err(ErrorCode::InvalidInputSyntax(format!(
323                        "Invalid ordinal number in ORDER BY: {}",
324                        number
325                    ))
326                    .into());
327                }
328            },
329            expr => {
330                extra_order_exprs.push(self.bind_expr(expr)?);
331                visible_output_num + extra_order_exprs.len() - 1
332            }
333        };
334        Ok(ColumnOrder::new(column_index, order_type))
335    }
336
337    fn bind_with(&mut self, with: With) -> Result<()> {
338        for cte_table in with.cte_tables {
339            // note that the new `share_id` for the rcte is generated here
340            let share_id = self.next_share_id();
341            let Cte { alias, cte_inner } = cte_table;
342            let table_name = alias.name.real_value();
343
344            if with.recursive {
345                if let CteInner::Query(query) = cte_inner {
346                    let (
347                        SetExpr::SetOperation {
348                            op: SetOperator::Union,
349                            all,
350                            corresponding,
351                            left,
352                            right,
353                        },
354                        with,
355                    ) = Self::validate_rcte(*query)?
356                    else {
357                        return Err(ErrorCode::BindError(
358                            "expect `SetOperation` as the return type of validation".into(),
359                        )
360                        .into());
361                    };
362
363                    // validated in `validate_rcte`
364                    assert!(
365                        !corresponding.is_corresponding(),
366                        "`CORRESPONDING` is not supported in recursive CTE"
367                    );
368
369                    let entry = self
370                        .context
371                        .cte_to_relation
372                        .entry(table_name)
373                        .insert_entry(Rc::new(RefCell::new(BindingCte {
374                            share_id,
375                            state: BindingCteState::Init,
376                            alias,
377                        })))
378                        .get()
379                        .clone();
380
381                    self.bind_rcte(with, entry, *left, *right, all)?;
382                } else {
383                    return Err(ErrorCode::BindError(
384                        "RECURSIVE CTE only support query".to_owned(),
385                    )
386                    .into());
387                }
388            } else {
389                match cte_inner {
390                    CteInner::Query(query) => {
391                        let bound_query = self.bind_query(*query)?;
392                        self.context.cte_to_relation.insert(
393                            table_name,
394                            Rc::new(RefCell::new(BindingCte {
395                                share_id,
396                                state: BindingCteState::Bound {
397                                    query: either::Either::Left(bound_query),
398                                },
399                                alias,
400                            })),
401                        );
402                    }
403                    CteInner::ChangeLog(from_table_name) => {
404                        self.push_context();
405                        let from_table_relation =
406                            self.bind_relation_by_name(from_table_name.clone(), None, None, true)?;
407                        self.pop_context()?;
408                        self.context.cte_to_relation.insert(
409                            table_name,
410                            Rc::new(RefCell::new(BindingCte {
411                                share_id,
412                                state: BindingCteState::ChangeLog {
413                                    table: from_table_relation,
414                                },
415                                alias,
416                            })),
417                        );
418                    }
419                }
420            }
421        }
422        Ok(())
423    }
424
425    /// syntactically validate the recursive cte ast with the current support features in rw.
426    fn validate_rcte(query: Query) -> Result<(SetExpr, Option<With>)> {
427        let Query {
428            with,
429            body,
430            order_by,
431            limit,
432            offset,
433            fetch,
434        } = query;
435
436        /// the input clause should not be supported.
437        fn should_be_empty<T>(v: Option<T>, clause: &str) -> Result<()> {
438            if v.is_some() {
439                return Err(ErrorCode::BindError(format!(
440                    "`{clause}` is not supported in recursive CTE"
441                ))
442                .into());
443            }
444            Ok(())
445        }
446
447        should_be_empty(order_by.first(), "ORDER BY")?;
448        should_be_empty(limit, "LIMIT")?;
449        should_be_empty(offset, "OFFSET")?;
450        should_be_empty(fetch, "FETCH")?;
451
452        let SetExpr::SetOperation {
453            op: SetOperator::Union,
454            all,
455            corresponding,
456            left,
457            right,
458        } = body
459        else {
460            return Err(
461                ErrorCode::BindError("`UNION` is required in recursive CTE".to_owned()).into(),
462            );
463        };
464
465        if !all {
466            return Err(ErrorCode::BindError(
467                "only `UNION ALL` is supported in recursive CTE now".to_owned(),
468            )
469            .into());
470        }
471
472        if corresponding.is_corresponding() {
473            return Err(ErrorCode::BindError(
474                "`CORRESPONDING` is not supported in recursive CTE".to_owned(),
475            )
476            .into());
477        }
478
479        Ok((
480            SetExpr::SetOperation {
481                op: SetOperator::Union,
482                all,
483                corresponding,
484                left,
485                right,
486            },
487            with,
488        ))
489    }
490
491    fn bind_rcte(
492        &mut self,
493        with: Option<With>,
494        entry: Rc<RefCell<BindingCte>>,
495        left: SetExpr,
496        right: SetExpr,
497        all: bool,
498    ) -> Result<()> {
499        self.push_context();
500        let result = self.bind_rcte_inner(with, entry, left, right, all);
501        self.pop_context()?;
502        result
503    }
504
505    fn bind_rcte_inner(
506        &mut self,
507        with: Option<With>,
508        entry: Rc<RefCell<BindingCte>>,
509        left: SetExpr,
510        right: SetExpr,
511        all: bool,
512    ) -> Result<()> {
513        if let Some(with) = with {
514            self.bind_with(with)?;
515        }
516
517        // We assume `left` is the base term, otherwise the implementation may be very hard.
518        // The behavior is the same as PostgreSQL's.
519        // reference: <https://www.postgresql.org/docs/16/sql-select.html#:~:text=the%20recursive%20self%2Dreference%20must%20appear%20on%20the%20right%2Dhand%20side%20of%20the%20UNION>
520        let mut base = self.bind_set_expr(left)?;
521
522        entry.borrow_mut().state = BindingCteState::BaseResolved { base: base.clone() };
523
524        // Reset context for right side, but keep `cte_to_relation`.
525        let new_context = std::mem::take(&mut self.context);
526        self.context
527            .cte_to_relation
528            .clone_from(&new_context.cte_to_relation);
529        self.context.disable_security_invoker = new_context.disable_security_invoker;
530        // bind the rest of the recursive cte
531        let mut recursive = self.bind_set_expr(right)?;
532        // Reset context for the set operation.
533        self.context = Default::default();
534        self.context.cte_to_relation = new_context.cte_to_relation;
535        self.context.disable_security_invoker = new_context.disable_security_invoker;
536
537        Self::align_schema(&mut base, &mut recursive, SetOperator::Union)?;
538        let schema = base.schema().into_owned();
539
540        let recursive_union = RecursiveUnion {
541            all,
542            base: Box::new(base),
543            recursive: Box::new(recursive),
544            schema,
545        };
546
547        entry.borrow_mut().state = BindingCteState::Bound {
548            query: either::Either::Right(recursive_union),
549        };
550
551        Ok(())
552    }
553}
554
555// TODO: Make clause a const generic param after <https://github.com/rust-lang/rust/issues/95174>.
556fn parse_non_negative_i64(clause: &str, s: &str) -> Result<i64> {
557    match s.parse::<i64>() {
558        Ok(v) => {
559            if v < 0 {
560                Err(ErrorCode::InvalidInputSyntax(format!("{clause} must not be negative")).into())
561            } else {
562                Ok(v)
563            }
564        }
565        Err(e) => Err(ErrorCode::InvalidInputSyntax(e.to_report_string()).into()),
566    }
567}