risingwave_frontend/binder/relation/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::ops::Deref;
17
18use either::Either;
19use itertools::{EitherOrBoth, Itertools};
20use risingwave_common::bail;
21use risingwave_common::catalog::{Field, TableId};
22use risingwave_sqlparser::ast::{
23    AsOf, Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias,
24    TableFactor,
25};
26use thiserror::Error;
27use thiserror_ext::AsReport;
28
29use super::bind_context::ColumnBinding;
30use super::statement::RewriteExprsRecursive;
31use crate::binder::Binder;
32use crate::binder::bind_context::{BindingCte, BindingCteState};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::expr::{ExprImpl, InputRef};
35
36mod cte_ref;
37mod join;
38mod share;
39mod subquery;
40mod table_function;
41mod table_or_source;
42mod watermark;
43mod window_table_function;
44
45pub use cte_ref::BoundBackCteRef;
46pub use join::BoundJoin;
47pub use share::{BoundShare, BoundShareInput};
48pub use subquery::BoundSubquery;
49pub use table_or_source::{BoundBaseTable, BoundSource, BoundSystemTable};
50pub use watermark::BoundWatermark;
51pub use window_table_function::{BoundWindowTableFunction, WindowTableFunctionKind};
52
53use crate::expr::{CorrelatedId, Depth};
54
55/// A validated item that refers to a table-like entity, including base table, subquery, join, etc.
56/// It is usually part of the `from` clause.
57#[derive(Debug, Clone)]
58pub enum Relation {
59    Source(Box<BoundSource>),
60    BaseTable(Box<BoundBaseTable>),
61    SystemTable(Box<BoundSystemTable>),
62    Subquery(Box<BoundSubquery>),
63    Join(Box<BoundJoin>),
64    Apply(Box<BoundJoin>),
65    WindowTableFunction(Box<BoundWindowTableFunction>),
66    /// Table function or scalar function.
67    TableFunction {
68        expr: ExprImpl,
69        with_ordinality: bool,
70    },
71    Watermark(Box<BoundWatermark>),
72    /// rcte is implicitly included in share
73    Share(Box<BoundShare>),
74    BackCteRef(Box<BoundBackCteRef>),
75}
76
77impl RewriteExprsRecursive for Relation {
78    fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
79        match self {
80            Relation::Subquery(inner) => inner.rewrite_exprs_recursive(rewriter),
81            Relation::Join(inner) => inner.rewrite_exprs_recursive(rewriter),
82            Relation::Apply(inner) => inner.rewrite_exprs_recursive(rewriter),
83            Relation::WindowTableFunction(inner) => inner.rewrite_exprs_recursive(rewriter),
84            Relation::Watermark(inner) => inner.rewrite_exprs_recursive(rewriter),
85            Relation::Share(inner) => inner.rewrite_exprs_recursive(rewriter),
86            Relation::TableFunction { expr: inner, .. } => {
87                *inner = rewriter.rewrite_expr(inner.take())
88            }
89            Relation::BackCteRef(inner) => inner.rewrite_exprs_recursive(rewriter),
90            _ => {}
91        }
92    }
93}
94
95impl Relation {
96    pub fn is_correlated_by_depth(&self, depth: Depth) -> bool {
97        match self {
98            Relation::Subquery(subquery) => subquery.query.is_correlated_by_depth(depth),
99            Relation::Join(join) | Relation::Apply(join) => {
100                join.cond.has_correlated_input_ref_by_depth(depth)
101                    || join.left.is_correlated_by_depth(depth)
102                    || join.right.is_correlated_by_depth(depth)
103            }
104            _ => false,
105        }
106    }
107
108    pub fn is_correlated_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
109        match self {
110            Relation::Subquery(subquery) => {
111                subquery.query.is_correlated_by_correlated_id(correlated_id)
112            }
113            Relation::Join(join) | Relation::Apply(join) => {
114                join.cond
115                    .has_correlated_input_ref_by_correlated_id(correlated_id)
116                    || join.left.is_correlated_by_correlated_id(correlated_id)
117                    || join.right.is_correlated_by_correlated_id(correlated_id)
118            }
119            _ => false,
120        }
121    }
122
123    pub fn collect_correlated_indices_by_depth_and_assign_id(
124        &mut self,
125        depth: Depth,
126        correlated_id: CorrelatedId,
127    ) -> Vec<usize> {
128        match self {
129            Relation::Subquery(subquery) => subquery
130                .query
131                .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
132            Relation::Join(join) | Relation::Apply(join) => {
133                let mut correlated_indices = vec![];
134                correlated_indices.extend(
135                    join.cond
136                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
137                );
138                correlated_indices.extend(
139                    join.left
140                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
141                );
142                correlated_indices.extend(
143                    join.right
144                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
145                );
146                correlated_indices
147            }
148            Relation::TableFunction {
149                expr: table_function,
150                with_ordinality: _,
151            } => table_function
152                .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
153            Relation::Share(share) => match &mut share.input {
154                BoundShareInput::Query(query) => match query {
155                    Either::Left(query) => query
156                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
157                    Either::Right(_) => vec![],
158                },
159                BoundShareInput::ChangeLog(change_log) => change_log
160                    .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
161            },
162            _ => vec![],
163        }
164    }
165}
166
167#[derive(Debug)]
168#[non_exhaustive]
169pub enum ResolveQualifiedNameErrorKind {
170    QualifiedNameTooLong,
171    NotCurrentDatabase,
172}
173
174#[derive(Debug, Error)]
175pub struct ResolveQualifiedNameError {
176    qualified: String,
177    kind: ResolveQualifiedNameErrorKind,
178}
179
180impl std::fmt::Display for ResolveQualifiedNameError {
181    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182        match self.kind {
183            ResolveQualifiedNameErrorKind::QualifiedNameTooLong => write!(
184                f,
185                "improper qualified name (too many dotted names): {}",
186                self.qualified
187            ),
188            ResolveQualifiedNameErrorKind::NotCurrentDatabase => write!(
189                f,
190                "cross-database references are not implemented: \"{}\"",
191                self.qualified
192            ),
193        }
194    }
195}
196
197impl ResolveQualifiedNameError {
198    pub fn new(qualified: String, kind: ResolveQualifiedNameErrorKind) -> Self {
199        Self { qualified, kind }
200    }
201}
202
203impl From<ResolveQualifiedNameError> for RwError {
204    fn from(e: ResolveQualifiedNameError) -> Self {
205        ErrorCode::InvalidInputSyntax(format!("{}", e.as_report())).into()
206    }
207}
208
209impl Binder {
210    /// return (`schema_name`, `name`)
211    pub fn resolve_schema_qualified_name(
212        db_name: &str,
213        name: &ObjectName,
214    ) -> std::result::Result<(Option<String>, String), ResolveQualifiedNameError> {
215        let formatted_name = name.to_string();
216        let mut identifiers = name.0.clone();
217
218        if identifiers.len() > 3 {
219            return Err(ResolveQualifiedNameError::new(
220                formatted_name,
221                ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
222            ));
223        }
224
225        let name = identifiers.pop().unwrap().real_value();
226
227        let schema_name = identifiers.pop().map(|ident| ident.real_value());
228        let database_name = identifiers.pop().map(|ident| ident.real_value());
229
230        if let Some(database_name) = database_name
231            && database_name != db_name
232        {
233            return Err(ResolveQualifiedNameError::new(
234                formatted_name,
235                ResolveQualifiedNameErrorKind::NotCurrentDatabase,
236            ));
237        }
238
239        Ok((schema_name, name))
240    }
241
242    /// check whether the name is a cross-database reference
243    pub fn validate_cross_db_reference(
244        db_name: &str,
245        name: &ObjectName,
246    ) -> std::result::Result<(), ResolveQualifiedNameError> {
247        let formatted_name = name.to_string();
248        let identifiers = &name.0;
249        if identifiers.len() > 3 {
250            return Err(ResolveQualifiedNameError::new(
251                formatted_name,
252                ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
253            ));
254        }
255
256        if identifiers.len() == 3 && identifiers[0].real_value() != db_name {
257            return Err(ResolveQualifiedNameError::new(
258                formatted_name,
259                ResolveQualifiedNameErrorKind::NotCurrentDatabase,
260            ));
261        }
262
263        Ok(())
264    }
265
266    /// return (`database_name`, `schema_name`, `name`)
267    pub fn resolve_db_schema_qualified_name(
268        name: &ObjectName,
269    ) -> std::result::Result<(Option<String>, Option<String>, String), ResolveQualifiedNameError>
270    {
271        let formatted_name = name.to_string();
272        let mut identifiers = name.0.clone();
273
274        if identifiers.len() > 3 {
275            return Err(ResolveQualifiedNameError::new(
276                formatted_name,
277                ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
278            ));
279        }
280
281        let name = identifiers.pop().unwrap().real_value();
282        let schema_name = identifiers.pop().map(|ident| ident.real_value());
283        let database_name = identifiers.pop().map(|ident| ident.real_value());
284
285        Ok((database_name, schema_name, name))
286    }
287
288    /// return first name in identifiers, must have only one name.
289    fn resolve_single_name(mut identifiers: Vec<Ident>, ident_desc: &str) -> Result<String> {
290        if identifiers.len() > 1 {
291            bail!("{} must contain 1 argument", ident_desc);
292        }
293        let name = identifiers.pop().unwrap().real_value();
294
295        Ok(name)
296    }
297
298    /// return the `database_name`
299    pub fn resolve_database_name(name: ObjectName) -> Result<String> {
300        Self::resolve_single_name(name.0, "database name")
301    }
302
303    /// return the `schema_name`
304    pub fn resolve_schema_name(name: ObjectName) -> Result<String> {
305        Self::resolve_single_name(name.0, "schema name")
306    }
307
308    /// return the `index_name`
309    pub fn resolve_index_name(name: ObjectName) -> Result<String> {
310        Self::resolve_single_name(name.0, "index name")
311    }
312
313    /// return the `view_name`
314    pub fn resolve_view_name(name: ObjectName) -> Result<String> {
315        Self::resolve_single_name(name.0, "view name")
316    }
317
318    /// return the `sink_name`
319    pub fn resolve_sink_name(name: ObjectName) -> Result<String> {
320        Self::resolve_single_name(name.0, "sink name")
321    }
322
323    /// return the `subscription_name`
324    pub fn resolve_subscription_name(name: ObjectName) -> Result<String> {
325        Self::resolve_single_name(name.0, "subscription name")
326    }
327
328    /// return the `table_name`
329    pub fn resolve_table_name(name: ObjectName) -> Result<String> {
330        Self::resolve_single_name(name.0, "table name")
331    }
332
333    /// return the `source_name`
334    pub fn resolve_source_name(name: ObjectName) -> Result<String> {
335        Self::resolve_single_name(name.0, "source name")
336    }
337
338    /// return the `user_name`
339    pub fn resolve_user_name(name: ObjectName) -> Result<String> {
340        Self::resolve_single_name(name.0, "user name")
341    }
342
343    /// Fill the [`BindContext`](super::BindContext) for table.
344    pub(super) fn bind_table_to_context(
345        &mut self,
346        columns: impl IntoIterator<Item = (bool, Field)>, // bool indicates if the field is hidden
347        table_name: String,
348        alias: Option<&TableAlias>,
349    ) -> Result<()> {
350        const EMPTY: [Ident; 0] = [];
351        let (table_name, column_aliases) = match alias {
352            None => (table_name, &EMPTY[..]),
353            Some(TableAlias { name, columns }) => (name.real_value(), columns.as_slice()),
354        };
355
356        let num_col_aliases = column_aliases.len();
357
358        let begin = self.context.columns.len();
359        // Column aliases can be less than columns, but not more.
360        // It also needs to skip hidden columns.
361        let mut alias_iter = column_aliases.iter().fuse();
362        let mut index = 0;
363        columns.into_iter().for_each(|(is_hidden, mut field)| {
364            let name = match is_hidden {
365                true => field.name.clone(),
366                false => alias_iter
367                    .next()
368                    .map(|t| t.real_value())
369                    .unwrap_or_else(|| field.name.clone()),
370            };
371            field.name.clone_from(&name);
372            self.context.columns.push(ColumnBinding::new(
373                table_name.clone(),
374                begin + index,
375                is_hidden,
376                field,
377            ));
378            self.context
379                .indices_of
380                .entry(name)
381                .or_default()
382                .push(self.context.columns.len() - 1);
383            index += 1;
384        });
385
386        let num_cols = index;
387        if num_cols < num_col_aliases {
388            return Err(ErrorCode::BindError(format!(
389                "table \"{table_name}\" has {num_cols} columns available but {num_col_aliases} column aliases specified",
390            ))
391            .into());
392        }
393
394        match self.context.range_of.entry(table_name.clone()) {
395            Entry::Occupied(_) => Err(ErrorCode::InternalError(format!(
396                "Duplicated table name while binding table to context: {}",
397                table_name
398            ))
399            .into()),
400            Entry::Vacant(entry) => {
401                entry.insert((begin, self.context.columns.len()));
402                Ok(())
403            }
404        }
405    }
406
407    /// Binds a relation, which can be:
408    /// - a table/source/materialized view
409    /// - a reference to a CTE
410    /// - a logical view
411    pub fn bind_relation_by_name(
412        &mut self,
413        name: &ObjectName,
414        alias: Option<&TableAlias>,
415        as_of: Option<&AsOf>,
416        allow_cross_db: bool,
417    ) -> Result<Relation> {
418        let (db_name, schema_name, table_name) = if allow_cross_db {
419            Self::resolve_db_schema_qualified_name(name)?
420        } else {
421            let (schema_name, table_name) =
422                Self::resolve_schema_qualified_name(&self.db_name, name)?;
423            (None, schema_name, table_name)
424        };
425
426        if schema_name.is_none()
427            // the `table_name` here is the name of the currently binding cte.
428            && let Some(item) = self.context.cte_to_relation.get(&table_name)
429        {
430            // Handles CTE
431
432            if as_of.is_some() {
433                return Err(ErrorCode::BindError(
434                    "Right table of a temporal join should not be a CTE. \
435                 It should be a table, index, or materialized view"
436                        .to_owned(),
437                )
438                .into());
439            }
440
441            let BindingCte {
442                share_id,
443                state: cte_state,
444                alias: mut original_alias,
445            } = item.deref().borrow().clone();
446
447            // The original CTE alias ought to be its table name.
448            debug_assert_eq!(original_alias.name.real_value(), table_name);
449
450            if let Some(from_alias) = alias {
451                original_alias.name = from_alias.name.clone();
452                original_alias.columns = original_alias
453                    .columns
454                    .into_iter()
455                    .zip_longest(from_alias.columns.iter().cloned())
456                    .map(EitherOrBoth::into_right)
457                    .collect();
458            }
459
460            match cte_state {
461                BindingCteState::Init => {
462                    Err(ErrorCode::BindError("Base term of recursive CTE not found, consider writing it to left side of the `UNION ALL` operator".to_owned()).into())
463                }
464                BindingCteState::BaseResolved { base } => {
465                    self.bind_table_to_context(
466                        base.schema().fields.iter().map(|f| (false, f.clone())),
467                        table_name,
468                        Some(&original_alias),
469                    )?;
470                    Ok(Relation::BackCteRef(Box::new(BoundBackCteRef { share_id, base })))
471                }
472                BindingCteState::Bound { query } => {
473                    let input = BoundShareInput::Query(query);
474                    self.bind_table_to_context(
475                        input.fields()?,
476                        table_name,
477                        Some(&original_alias),
478                    )?;
479                    // we could always share the cte,
480                    // no matter it's recursive or not.
481                    Ok(Relation::Share(Box::new(BoundShare { share_id, input})))
482                }
483                BindingCteState::ChangeLog { table } => {
484                    let input = BoundShareInput::ChangeLog(table);
485                    self.bind_table_to_context(
486                        input.fields()?,
487                        table_name,
488                        Some(&original_alias),
489                    )?;
490                    Ok(Relation::Share(Box::new(BoundShare { share_id, input })))
491                },
492            }
493        } else {
494            self.bind_catalog_relation_by_name(
495                db_name.as_deref(),
496                schema_name.as_deref(),
497                &table_name,
498                alias,
499                as_of,
500                false,
501            )
502        }
503    }
504
505    // Bind a relation provided a function arg.
506    fn bind_relation_by_function_arg(
507        &mut self,
508        arg: Option<&FunctionArg>,
509        err_msg: &str,
510    ) -> Result<(Relation, ObjectName)> {
511        let Some(FunctionArg::Unnamed(FunctionArgExpr::Expr(expr))) = arg else {
512            return Err(ErrorCode::BindError(err_msg.to_owned()).into());
513        };
514        let table_name = match expr {
515            ParserExpr::Identifier(ident) => Ok::<_, RwError>(ObjectName(vec![ident.clone()])),
516            ParserExpr::CompoundIdentifier(idents) => Ok(ObjectName(idents.clone())),
517            _ => Err(ErrorCode::BindError(err_msg.to_owned()).into()),
518        }?;
519
520        Ok((
521            self.bind_relation_by_name(&table_name, None, None, true)?,
522            table_name,
523        ))
524    }
525
526    // Bind column provided a function arg.
527    fn bind_column_by_function_args(
528        &mut self,
529        arg: Option<&FunctionArg>,
530        err_msg: &str,
531    ) -> Result<Box<InputRef>> {
532        if let Some(time_col_arg) = arg
533            && let Some(ExprImpl::InputRef(time_col)) =
534                self.bind_function_arg(time_col_arg)?.into_iter().next()
535        {
536            Ok(time_col)
537        } else {
538            Err(ErrorCode::BindError(err_msg.to_owned()).into())
539        }
540    }
541
542    /// `rw_table(table_id[,schema_name])` which queries internal table
543    fn bind_internal_table(
544        &mut self,
545        args: &[FunctionArg],
546        alias: Option<&TableAlias>,
547    ) -> Result<Relation> {
548        if args.is_empty() || args.len() > 2 {
549            return Err(
550                ErrorCode::BindError("usage: rw_table(table_id[,schema_name])".to_owned()).into(),
551            );
552        }
553
554        let table_id: TableId = args[0]
555            .to_string()
556            .parse::<u32>()
557            .map_err(|err| {
558                RwError::from(ErrorCode::BindError(format!(
559                    "invalid table id: {}",
560                    err.as_report()
561                )))
562            })?
563            .into();
564
565        let schema = args.get(1).map(|arg| arg.to_string());
566
567        let table_name = self.catalog.get_table_name_by_id(table_id)?;
568        self.bind_catalog_relation_by_name(None, schema.as_deref(), &table_name, alias, None, false)
569    }
570
571    pub(super) fn bind_table_factor(&mut self, table_factor: &TableFactor) -> Result<Relation> {
572        match table_factor {
573            TableFactor::Table { name, alias, as_of } => {
574                self.bind_relation_by_name(name, alias.as_ref(), as_of.as_ref(), true)
575            }
576            TableFactor::TableFunction {
577                name,
578                alias,
579                args,
580                with_ordinality,
581            } => {
582                self.try_mark_lateral_as_visible();
583                let result = self.bind_table_function(name, alias.as_ref(), args, *with_ordinality);
584                self.try_mark_lateral_as_invisible();
585                result
586            }
587            TableFactor::Derived {
588                lateral,
589                subquery,
590                alias,
591            } => {
592                if *lateral {
593                    // If we detect a lateral, we mark the lateral context as visible.
594                    self.try_mark_lateral_as_visible();
595
596                    // Bind lateral subquery here.
597                    let bound_subquery =
598                        self.bind_subquery_relation(subquery, alias.as_ref(), true)?;
599
600                    // Mark the lateral context as invisible once again.
601                    self.try_mark_lateral_as_invisible();
602                    Ok(Relation::Subquery(Box::new(bound_subquery)))
603                } else {
604                    // Non-lateral subqueries to not have access to the join-tree context.
605                    self.push_lateral_context();
606                    let bound_subquery =
607                        self.bind_subquery_relation(subquery, alias.as_ref(), false)?;
608                    self.pop_and_merge_lateral_context()?;
609                    Ok(Relation::Subquery(Box::new(bound_subquery)))
610                }
611            }
612            TableFactor::NestedJoin(table_with_joins) => {
613                self.push_lateral_context();
614                let bound_join = self.bind_table_with_joins(table_with_joins)?;
615                self.pop_and_merge_lateral_context()?;
616                Ok(bound_join)
617            }
618        }
619    }
620}