risingwave_frontend/binder/relation/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::ops::Deref;
17
18use either::Either;
19use itertools::{EitherOrBoth, Itertools};
20use risingwave_common::bail;
21use risingwave_common::catalog::{Field, TableId};
22use risingwave_sqlparser::ast::{
23    AsOf, Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias,
24    TableFactor,
25};
26use thiserror::Error;
27use thiserror_ext::AsReport;
28
29use super::bind_context::ColumnBinding;
30use super::statement::RewriteExprsRecursive;
31use crate::binder::Binder;
32use crate::binder::bind_context::{BindingCte, BindingCteState};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::expr::{ExprImpl, InputRef};
35
36mod cte_ref;
37mod gap_fill;
38mod join;
39mod share;
40mod subquery;
41mod table_function;
42mod table_or_source;
43mod watermark;
44mod window_table_function;
45
46pub use cte_ref::BoundBackCteRef;
47pub use gap_fill::BoundGapFill;
48pub use join::BoundJoin;
49pub use share::{BoundShare, BoundShareInput};
50pub use subquery::BoundSubquery;
51pub use table_or_source::{BoundBaseTable, BoundSource, BoundSystemTable};
52pub use watermark::BoundWatermark;
53pub use window_table_function::{BoundWindowTableFunction, WindowTableFunctionKind};
54
55use crate::expr::{CorrelatedId, Depth};
56
57/// A validated item that refers to a table-like entity, including base table, subquery, join, etc.
58/// It is usually part of the `from` clause.
59#[derive(Debug, Clone)]
60pub enum Relation {
61    Source(Box<BoundSource>),
62    BaseTable(Box<BoundBaseTable>),
63    SystemTable(Box<BoundSystemTable>),
64    Subquery(Box<BoundSubquery>),
65    Join(Box<BoundJoin>),
66    Apply(Box<BoundJoin>),
67    WindowTableFunction(Box<BoundWindowTableFunction>),
68    /// Table function or scalar function.
69    TableFunction {
70        expr: ExprImpl,
71        with_ordinality: bool,
72    },
73    Watermark(Box<BoundWatermark>),
74    /// rcte is implicitly included in share
75    Share(Box<BoundShare>),
76    BackCteRef(Box<BoundBackCteRef>),
77    GapFill(Box<BoundGapFill>),
78}
79
80impl RewriteExprsRecursive for Relation {
81    fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
82        match self {
83            Relation::Subquery(inner) => inner.rewrite_exprs_recursive(rewriter),
84            Relation::Join(inner) => inner.rewrite_exprs_recursive(rewriter),
85            Relation::Apply(inner) => inner.rewrite_exprs_recursive(rewriter),
86            Relation::WindowTableFunction(inner) => inner.rewrite_exprs_recursive(rewriter),
87            Relation::Watermark(inner) => inner.rewrite_exprs_recursive(rewriter),
88            Relation::Share(inner) => inner.rewrite_exprs_recursive(rewriter),
89            Relation::TableFunction { expr: inner, .. } => {
90                *inner = rewriter.rewrite_expr(inner.take())
91            }
92            Relation::BackCteRef(inner) => inner.rewrite_exprs_recursive(rewriter),
93            _ => {}
94        }
95    }
96}
97
98impl Relation {
99    pub fn is_correlated_by_depth(&self, depth: Depth) -> bool {
100        match self {
101            Relation::Subquery(subquery) => subquery.query.is_correlated_by_depth(depth),
102            Relation::Join(join) | Relation::Apply(join) => {
103                join.cond.has_correlated_input_ref_by_depth(depth)
104                    || join.left.is_correlated_by_depth(depth)
105                    || join.right.is_correlated_by_depth(depth)
106            }
107            _ => false,
108        }
109    }
110
111    pub fn is_correlated_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
112        match self {
113            Relation::Subquery(subquery) => {
114                subquery.query.is_correlated_by_correlated_id(correlated_id)
115            }
116            Relation::Join(join) | Relation::Apply(join) => {
117                join.cond
118                    .has_correlated_input_ref_by_correlated_id(correlated_id)
119                    || join.left.is_correlated_by_correlated_id(correlated_id)
120                    || join.right.is_correlated_by_correlated_id(correlated_id)
121            }
122            _ => false,
123        }
124    }
125
126    pub fn collect_correlated_indices_by_depth_and_assign_id(
127        &mut self,
128        depth: Depth,
129        correlated_id: CorrelatedId,
130    ) -> Vec<usize> {
131        match self {
132            Relation::Subquery(subquery) => subquery
133                .query
134                .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
135            Relation::Join(join) | Relation::Apply(join) => {
136                let mut correlated_indices = vec![];
137                correlated_indices.extend(
138                    join.cond
139                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
140                );
141                correlated_indices.extend(
142                    join.left
143                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
144                );
145                correlated_indices.extend(
146                    join.right
147                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
148                );
149                correlated_indices
150            }
151            Relation::TableFunction {
152                expr: table_function,
153                with_ordinality: _,
154            } => table_function
155                .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
156            Relation::Share(share) => match &mut share.input {
157                BoundShareInput::Query(query) => match query {
158                    Either::Left(query) => query
159                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
160                    Either::Right(_) => vec![],
161                },
162                BoundShareInput::ChangeLog(change_log) => change_log
163                    .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
164            },
165            _ => vec![],
166        }
167    }
168}
169
170#[derive(Debug)]
171#[non_exhaustive]
172pub enum ResolveQualifiedNameErrorKind {
173    QualifiedNameTooLong,
174    NotCurrentDatabase,
175}
176
177#[derive(Debug, Error)]
178pub struct ResolveQualifiedNameError {
179    qualified: String,
180    kind: ResolveQualifiedNameErrorKind,
181}
182
183impl std::fmt::Display for ResolveQualifiedNameError {
184    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185        match self.kind {
186            ResolveQualifiedNameErrorKind::QualifiedNameTooLong => write!(
187                f,
188                "improper qualified name (too many dotted names): {}",
189                self.qualified
190            ),
191            ResolveQualifiedNameErrorKind::NotCurrentDatabase => write!(
192                f,
193                "cross-database references are not implemented: \"{}\"",
194                self.qualified
195            ),
196        }
197    }
198}
199
200impl ResolveQualifiedNameError {
201    pub fn new(qualified: String, kind: ResolveQualifiedNameErrorKind) -> Self {
202        Self { qualified, kind }
203    }
204}
205
206impl From<ResolveQualifiedNameError> for RwError {
207    fn from(e: ResolveQualifiedNameError) -> Self {
208        ErrorCode::InvalidInputSyntax(format!("{}", e.as_report())).into()
209    }
210}
211
212impl Binder {
213    /// return (`schema_name`, `name`)
214    pub fn resolve_schema_qualified_name(
215        db_name: &str,
216        name: &ObjectName,
217    ) -> std::result::Result<(Option<String>, String), ResolveQualifiedNameError> {
218        let formatted_name = name.to_string();
219        let mut identifiers = name.0.clone();
220
221        if identifiers.len() > 3 {
222            return Err(ResolveQualifiedNameError::new(
223                formatted_name,
224                ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
225            ));
226        }
227
228        let name = identifiers.pop().unwrap().real_value();
229
230        let schema_name = identifiers.pop().map(|ident| ident.real_value());
231        let database_name = identifiers.pop().map(|ident| ident.real_value());
232
233        if let Some(database_name) = database_name
234            && database_name != db_name
235        {
236            return Err(ResolveQualifiedNameError::new(
237                formatted_name,
238                ResolveQualifiedNameErrorKind::NotCurrentDatabase,
239            ));
240        }
241
242        Ok((schema_name, name))
243    }
244
245    /// check whether the name is a cross-database reference
246    pub fn validate_cross_db_reference(
247        db_name: &str,
248        name: &ObjectName,
249    ) -> std::result::Result<(), ResolveQualifiedNameError> {
250        let formatted_name = name.to_string();
251        let identifiers = &name.0;
252        if identifiers.len() > 3 {
253            return Err(ResolveQualifiedNameError::new(
254                formatted_name,
255                ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
256            ));
257        }
258
259        if identifiers.len() == 3 && identifiers[0].real_value() != db_name {
260            return Err(ResolveQualifiedNameError::new(
261                formatted_name,
262                ResolveQualifiedNameErrorKind::NotCurrentDatabase,
263            ));
264        }
265
266        Ok(())
267    }
268
269    /// return (`database_name`, `schema_name`, `name`)
270    pub fn resolve_db_schema_qualified_name(
271        name: &ObjectName,
272    ) -> std::result::Result<(Option<String>, Option<String>, String), ResolveQualifiedNameError>
273    {
274        let formatted_name = name.to_string();
275        let mut identifiers = name.0.clone();
276
277        if identifiers.len() > 3 {
278            return Err(ResolveQualifiedNameError::new(
279                formatted_name,
280                ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
281            ));
282        }
283
284        let name = identifiers.pop().unwrap().real_value();
285        let schema_name = identifiers.pop().map(|ident| ident.real_value());
286        let database_name = identifiers.pop().map(|ident| ident.real_value());
287
288        Ok((database_name, schema_name, name))
289    }
290
291    /// return first name in identifiers, must have only one name.
292    fn resolve_single_name(mut identifiers: Vec<Ident>, ident_desc: &str) -> Result<String> {
293        if identifiers.len() > 1 {
294            bail!("{} must contain 1 argument", ident_desc);
295        }
296        let name = identifiers.pop().unwrap().real_value();
297
298        Ok(name)
299    }
300
301    /// return the `database_name`
302    pub fn resolve_database_name(name: ObjectName) -> Result<String> {
303        Self::resolve_single_name(name.0, "database name")
304    }
305
306    /// return the `schema_name`
307    pub fn resolve_schema_name(name: ObjectName) -> Result<String> {
308        Self::resolve_single_name(name.0, "schema name")
309    }
310
311    /// return the `index_name`
312    pub fn resolve_index_name(name: ObjectName) -> Result<String> {
313        Self::resolve_single_name(name.0, "index name")
314    }
315
316    /// return the `view_name`
317    pub fn resolve_view_name(name: ObjectName) -> Result<String> {
318        Self::resolve_single_name(name.0, "view name")
319    }
320
321    /// return the `sink_name`
322    pub fn resolve_sink_name(name: ObjectName) -> Result<String> {
323        Self::resolve_single_name(name.0, "sink name")
324    }
325
326    /// return the `subscription_name`
327    pub fn resolve_subscription_name(name: ObjectName) -> Result<String> {
328        Self::resolve_single_name(name.0, "subscription name")
329    }
330
331    /// return the `table_name`
332    pub fn resolve_table_name(name: ObjectName) -> Result<String> {
333        Self::resolve_single_name(name.0, "table name")
334    }
335
336    /// return the `source_name`
337    pub fn resolve_source_name(name: ObjectName) -> Result<String> {
338        Self::resolve_single_name(name.0, "source name")
339    }
340
341    /// return the `user_name`
342    pub fn resolve_user_name(name: ObjectName) -> Result<String> {
343        Self::resolve_single_name(name.0, "user name")
344    }
345
346    /// Fill the [`BindContext`](super::BindContext) for table.
347    pub(super) fn bind_table_to_context(
348        &mut self,
349        columns: impl IntoIterator<Item = (bool, Field)>, // bool indicates if the field is hidden
350        table_name: String,
351        alias: Option<&TableAlias>,
352    ) -> Result<()> {
353        const EMPTY: [Ident; 0] = [];
354        let (table_name, column_aliases) = match alias {
355            None => (table_name, &EMPTY[..]),
356            Some(TableAlias { name, columns }) => (name.real_value(), columns.as_slice()),
357        };
358
359        let num_col_aliases = column_aliases.len();
360
361        let begin = self.context.columns.len();
362        // Column aliases can be less than columns, but not more.
363        // It also needs to skip hidden columns.
364        let mut alias_iter = column_aliases.iter().fuse();
365        let mut index = 0;
366        columns.into_iter().for_each(|(is_hidden, mut field)| {
367            let name = match is_hidden {
368                true => field.name.clone(),
369                false => alias_iter
370                    .next()
371                    .map(|t| t.real_value())
372                    .unwrap_or_else(|| field.name.clone()),
373            };
374            field.name.clone_from(&name);
375            self.context.columns.push(ColumnBinding::new(
376                table_name.clone(),
377                begin + index,
378                is_hidden,
379                field,
380            ));
381            self.context
382                .indices_of
383                .entry(name)
384                .or_default()
385                .push(self.context.columns.len() - 1);
386            index += 1;
387        });
388
389        let num_cols = index;
390        if num_cols < num_col_aliases {
391            return Err(ErrorCode::BindError(format!(
392                "table \"{table_name}\" has {num_cols} columns available but {num_col_aliases} column aliases specified",
393            ))
394            .into());
395        }
396
397        match self.context.range_of.entry(table_name.clone()) {
398            Entry::Occupied(_) => Err(ErrorCode::InternalError(format!(
399                "Duplicated table name while binding table to context: {}",
400                table_name
401            ))
402            .into()),
403            Entry::Vacant(entry) => {
404                entry.insert((begin, self.context.columns.len()));
405                Ok(())
406            }
407        }
408    }
409
410    /// Binds a relation, which can be:
411    /// - a table/source/materialized view
412    /// - a reference to a CTE
413    /// - a logical view
414    pub fn bind_relation_by_name(
415        &mut self,
416        name: &ObjectName,
417        alias: Option<&TableAlias>,
418        as_of: Option<&AsOf>,
419        allow_cross_db: bool,
420    ) -> Result<Relation> {
421        let (db_name, schema_name, table_name) = if allow_cross_db {
422            Self::resolve_db_schema_qualified_name(name)?
423        } else {
424            let (schema_name, table_name) =
425                Self::resolve_schema_qualified_name(&self.db_name, name)?;
426            (None, schema_name, table_name)
427        };
428
429        if schema_name.is_none()
430            // the `table_name` here is the name of the currently binding cte.
431            && let Some(item) = self.context.cte_to_relation.get(&table_name)
432        {
433            // Handles CTE
434
435            if as_of.is_some() {
436                return Err(ErrorCode::BindError(
437                    "Right table of a temporal join should not be a CTE. \
438                 It should be a table, index, or materialized view"
439                        .to_owned(),
440                )
441                .into());
442            }
443
444            let BindingCte {
445                share_id,
446                state: cte_state,
447                alias: mut original_alias,
448            } = item.deref().borrow().clone();
449
450            // The original CTE alias ought to be its table name.
451            debug_assert_eq!(original_alias.name.real_value(), table_name);
452
453            if let Some(from_alias) = alias {
454                original_alias.name = from_alias.name.clone();
455                original_alias.columns = original_alias
456                    .columns
457                    .into_iter()
458                    .zip_longest(from_alias.columns.iter().cloned())
459                    .map(EitherOrBoth::into_right)
460                    .collect();
461            }
462
463            match cte_state {
464                BindingCteState::Init => {
465                    Err(ErrorCode::BindError("Base term of recursive CTE not found, consider writing it to left side of the `UNION ALL` operator".to_owned()).into())
466                }
467                BindingCteState::BaseResolved { base } => {
468                    self.bind_table_to_context(
469                        base.schema().fields.iter().map(|f| (false, f.clone())),
470                        table_name,
471                        Some(&original_alias),
472                    )?;
473                    Ok(Relation::BackCteRef(Box::new(BoundBackCteRef { share_id, base })))
474                }
475                BindingCteState::Bound { query } => {
476                    let input = BoundShareInput::Query(query);
477                    self.bind_table_to_context(
478                        input.fields()?,
479                        table_name,
480                        Some(&original_alias),
481                    )?;
482                    // we could always share the cte,
483                    // no matter it's recursive or not.
484                    Ok(Relation::Share(Box::new(BoundShare { share_id, input})))
485                }
486                BindingCteState::ChangeLog { table } => {
487                    let input = BoundShareInput::ChangeLog(table);
488                    self.bind_table_to_context(
489                        input.fields()?,
490                        table_name,
491                        Some(&original_alias),
492                    )?;
493                    Ok(Relation::Share(Box::new(BoundShare { share_id, input })))
494                },
495            }
496        } else {
497            self.bind_catalog_relation_by_name(
498                db_name.as_deref(),
499                schema_name.as_deref(),
500                &table_name,
501                alias,
502                as_of,
503                false,
504            )
505        }
506    }
507
508    // Bind a relation provided a function arg.
509    fn bind_relation_by_function_arg(
510        &mut self,
511        arg: Option<&FunctionArg>,
512        err_msg: &str,
513    ) -> Result<(Relation, ObjectName)> {
514        let Some(FunctionArg::Unnamed(FunctionArgExpr::Expr(expr))) = arg else {
515            return Err(ErrorCode::BindError(err_msg.to_owned()).into());
516        };
517        let table_name = match expr {
518            ParserExpr::Identifier(ident) => Ok::<_, RwError>(ObjectName(vec![ident.clone()])),
519            ParserExpr::CompoundIdentifier(idents) => Ok(ObjectName(idents.clone())),
520            _ => Err(ErrorCode::BindError(err_msg.to_owned()).into()),
521        }?;
522
523        Ok((
524            self.bind_relation_by_name(&table_name, None, None, true)?,
525            table_name,
526        ))
527    }
528
529    // Bind column provided a function arg.
530    fn bind_column_by_function_args(
531        &mut self,
532        arg: Option<&FunctionArg>,
533        err_msg: &str,
534    ) -> Result<Box<InputRef>> {
535        if let Some(time_col_arg) = arg
536            && let Some(ExprImpl::InputRef(time_col)) =
537                self.bind_function_arg(time_col_arg)?.into_iter().next()
538        {
539            Ok(time_col)
540        } else {
541            Err(ErrorCode::BindError(err_msg.to_owned()).into())
542        }
543    }
544
545    /// `rw_table(table_id[,schema_name])` which queries internal table
546    fn bind_internal_table(
547        &mut self,
548        args: &[FunctionArg],
549        alias: Option<&TableAlias>,
550    ) -> Result<Relation> {
551        if args.is_empty() || args.len() > 2 {
552            return Err(
553                ErrorCode::BindError("usage: rw_table(table_id[,schema_name])".to_owned()).into(),
554            );
555        }
556
557        let table_id: TableId = args[0]
558            .to_string()
559            .parse::<u32>()
560            .map_err(|err| {
561                RwError::from(ErrorCode::BindError(format!(
562                    "invalid table id: {}",
563                    err.as_report()
564                )))
565            })?
566            .into();
567
568        let schema = args.get(1).map(|arg| arg.to_string());
569
570        let table_name = self.catalog.get_table_name_by_id(table_id)?;
571        self.bind_catalog_relation_by_name(None, schema.as_deref(), &table_name, alias, None, false)
572    }
573
574    pub(super) fn bind_table_factor(&mut self, table_factor: &TableFactor) -> Result<Relation> {
575        match table_factor {
576            TableFactor::Table { name, alias, as_of } => {
577                self.bind_relation_by_name(name, alias.as_ref(), as_of.as_ref(), true)
578            }
579            TableFactor::TableFunction {
580                name,
581                alias,
582                args,
583                with_ordinality,
584            } => {
585                self.try_mark_lateral_as_visible();
586                let result = self.bind_table_function(name, alias.as_ref(), args, *with_ordinality);
587                self.try_mark_lateral_as_invisible();
588                result
589            }
590            TableFactor::Derived {
591                lateral,
592                subquery,
593                alias,
594            } => {
595                if *lateral {
596                    // If we detect a lateral, we mark the lateral context as visible.
597                    self.try_mark_lateral_as_visible();
598
599                    // Bind lateral subquery here.
600                    let bound_subquery =
601                        self.bind_subquery_relation(subquery, alias.as_ref(), true)?;
602
603                    // Mark the lateral context as invisible once again.
604                    self.try_mark_lateral_as_invisible();
605                    Ok(Relation::Subquery(Box::new(bound_subquery)))
606                } else {
607                    // Non-lateral subqueries to not have access to the join-tree context.
608                    self.push_lateral_context();
609                    let bound_subquery =
610                        self.bind_subquery_relation(subquery, alias.as_ref(), false)?;
611                    self.pop_and_merge_lateral_context()?;
612                    Ok(Relation::Subquery(Box::new(bound_subquery)))
613                }
614            }
615            TableFactor::NestedJoin(table_with_joins) => {
616                self.push_lateral_context();
617                let bound_join = self.bind_table_with_joins(table_with_joins)?;
618                self.pop_and_merge_lateral_context()?;
619                Ok(bound_join)
620            }
621        }
622    }
623}