risingwave_frontend/binder/relation/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::ops::Deref;
17
18use either::Either;
19use itertools::{EitherOrBoth, Itertools};
20use risingwave_common::bail;
21use risingwave_common::catalog::{Field, TableId};
22use risingwave_sqlparser::ast::{
23    AsOf, Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias,
24    TableFactor,
25};
26use thiserror::Error;
27use thiserror_ext::AsReport;
28
29use super::bind_context::ColumnBinding;
30use super::statement::RewriteExprsRecursive;
31use crate::binder::Binder;
32use crate::binder::bind_context::{BindingCte, BindingCteState};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::expr::{ExprImpl, InputRef};
35
36mod cte_ref;
37mod join;
38mod share;
39mod subquery;
40mod table_function;
41mod table_or_source;
42mod watermark;
43mod window_table_function;
44
45pub use cte_ref::BoundBackCteRef;
46pub use join::BoundJoin;
47pub use share::{BoundShare, BoundShareInput};
48pub use subquery::BoundSubquery;
49pub use table_or_source::{BoundBaseTable, BoundSource, BoundSystemTable};
50pub use watermark::BoundWatermark;
51pub use window_table_function::{BoundWindowTableFunction, WindowTableFunctionKind};
52
53use crate::expr::{CorrelatedId, Depth};
54
55/// A validated item that refers to a table-like entity, including base table, subquery, join, etc.
56/// It is usually part of the `from` clause.
57#[derive(Debug, Clone)]
58pub enum Relation {
59    Source(Box<BoundSource>),
60    BaseTable(Box<BoundBaseTable>),
61    SystemTable(Box<BoundSystemTable>),
62    Subquery(Box<BoundSubquery>),
63    Join(Box<BoundJoin>),
64    Apply(Box<BoundJoin>),
65    WindowTableFunction(Box<BoundWindowTableFunction>),
66    /// Table function or scalar function.
67    TableFunction {
68        expr: ExprImpl,
69        with_ordinality: bool,
70    },
71    Watermark(Box<BoundWatermark>),
72    /// rcte is implicitly included in share
73    Share(Box<BoundShare>),
74    BackCteRef(Box<BoundBackCteRef>),
75}
76
77impl RewriteExprsRecursive for Relation {
78    fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
79        match self {
80            Relation::Subquery(inner) => inner.rewrite_exprs_recursive(rewriter),
81            Relation::Join(inner) => inner.rewrite_exprs_recursive(rewriter),
82            Relation::Apply(inner) => inner.rewrite_exprs_recursive(rewriter),
83            Relation::WindowTableFunction(inner) => inner.rewrite_exprs_recursive(rewriter),
84            Relation::Watermark(inner) => inner.rewrite_exprs_recursive(rewriter),
85            Relation::Share(inner) => inner.rewrite_exprs_recursive(rewriter),
86            Relation::TableFunction { expr: inner, .. } => {
87                *inner = rewriter.rewrite_expr(inner.take())
88            }
89            Relation::BackCteRef(inner) => inner.rewrite_exprs_recursive(rewriter),
90            _ => {}
91        }
92    }
93}
94
95impl Relation {
96    pub fn is_correlated(&self, depth: Depth) -> bool {
97        match self {
98            Relation::Subquery(subquery) => subquery.query.is_correlated(depth),
99            Relation::Join(join) | Relation::Apply(join) => {
100                join.cond.has_correlated_input_ref_by_depth(depth)
101                    || join.left.is_correlated(depth)
102                    || join.right.is_correlated(depth)
103            }
104            _ => false,
105        }
106    }
107
108    pub fn collect_correlated_indices_by_depth_and_assign_id(
109        &mut self,
110        depth: Depth,
111        correlated_id: CorrelatedId,
112    ) -> Vec<usize> {
113        match self {
114            Relation::Subquery(subquery) => subquery
115                .query
116                .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
117            Relation::Join(join) | Relation::Apply(join) => {
118                let mut correlated_indices = vec![];
119                correlated_indices.extend(
120                    join.cond
121                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
122                );
123                correlated_indices.extend(
124                    join.left
125                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
126                );
127                correlated_indices.extend(
128                    join.right
129                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
130                );
131                correlated_indices
132            }
133            Relation::TableFunction {
134                expr: table_function,
135                with_ordinality: _,
136            } => table_function
137                .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
138            Relation::Share(share) => match &mut share.input {
139                BoundShareInput::Query(query) => match query {
140                    Either::Left(query) => query
141                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
142                    Either::Right(_) => vec![],
143                },
144                BoundShareInput::ChangeLog(change_log) => change_log
145                    .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
146            },
147            _ => vec![],
148        }
149    }
150}
151
152#[derive(Debug)]
153#[non_exhaustive]
154pub enum ResolveQualifiedNameErrorKind {
155    QualifiedNameTooLong,
156    NotCurrentDatabase,
157}
158
159#[derive(Debug, Error)]
160pub struct ResolveQualifiedNameError {
161    qualified: String,
162    kind: ResolveQualifiedNameErrorKind,
163}
164
165impl std::fmt::Display for ResolveQualifiedNameError {
166    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167        match self.kind {
168            ResolveQualifiedNameErrorKind::QualifiedNameTooLong => write!(
169                f,
170                "improper qualified name (too many dotted names): {}",
171                self.qualified
172            ),
173            ResolveQualifiedNameErrorKind::NotCurrentDatabase => write!(
174                f,
175                "cross-database references are not implemented: \"{}\"",
176                self.qualified
177            ),
178        }
179    }
180}
181
182impl ResolveQualifiedNameError {
183    pub fn new(qualified: String, kind: ResolveQualifiedNameErrorKind) -> Self {
184        Self { qualified, kind }
185    }
186}
187
188impl From<ResolveQualifiedNameError> for RwError {
189    fn from(e: ResolveQualifiedNameError) -> Self {
190        ErrorCode::InvalidInputSyntax(format!("{}", e.as_report())).into()
191    }
192}
193
194impl Binder {
195    /// return (`schema_name`, `name`)
196    pub fn resolve_schema_qualified_name(
197        db_name: &str,
198        name: ObjectName,
199    ) -> std::result::Result<(Option<String>, String), ResolveQualifiedNameError> {
200        let formatted_name = name.to_string();
201        let mut identifiers = name.0;
202
203        if identifiers.len() > 3 {
204            return Err(ResolveQualifiedNameError::new(
205                formatted_name,
206                ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
207            ));
208        }
209
210        let name = identifiers.pop().unwrap().real_value();
211
212        let schema_name = identifiers.pop().map(|ident| ident.real_value());
213        let database_name = identifiers.pop().map(|ident| ident.real_value());
214
215        if let Some(database_name) = database_name
216            && database_name != db_name
217        {
218            return Err(ResolveQualifiedNameError::new(
219                formatted_name,
220                ResolveQualifiedNameErrorKind::NotCurrentDatabase,
221            ));
222        }
223
224        Ok((schema_name, name))
225    }
226
227    /// check whether the name is a cross-database reference
228    pub fn validate_cross_db_reference(
229        db_name: &str,
230        name: &ObjectName,
231    ) -> std::result::Result<(), ResolveQualifiedNameError> {
232        let formatted_name = name.to_string();
233        let identifiers = &name.0;
234        if identifiers.len() > 3 {
235            return Err(ResolveQualifiedNameError::new(
236                formatted_name,
237                ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
238            ));
239        }
240
241        if identifiers.len() == 3 && identifiers[0].real_value() != db_name {
242            return Err(ResolveQualifiedNameError::new(
243                formatted_name,
244                ResolveQualifiedNameErrorKind::NotCurrentDatabase,
245            ));
246        }
247
248        Ok(())
249    }
250
251    /// return (`database_name`, `schema_name`, `name`)
252    pub fn resolve_db_schema_qualified_name(
253        name: ObjectName,
254    ) -> std::result::Result<(Option<String>, Option<String>, String), ResolveQualifiedNameError>
255    {
256        let formatted_name = name.to_string();
257        let mut identifiers = name.0;
258
259        if identifiers.len() > 3 {
260            return Err(ResolveQualifiedNameError::new(
261                formatted_name,
262                ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
263            ));
264        }
265
266        let name = identifiers.pop().unwrap().real_value();
267        let schema_name = identifiers.pop().map(|ident| ident.real_value());
268        let database_name = identifiers.pop().map(|ident| ident.real_value());
269
270        Ok((database_name, schema_name, name))
271    }
272
273    /// return first name in identifiers, must have only one name.
274    fn resolve_single_name(mut identifiers: Vec<Ident>, ident_desc: &str) -> Result<String> {
275        if identifiers.len() > 1 {
276            bail!("{} must contain 1 argument", ident_desc);
277        }
278        let name = identifiers.pop().unwrap().real_value();
279
280        Ok(name)
281    }
282
283    /// return the `database_name`
284    pub fn resolve_database_name(name: ObjectName) -> Result<String> {
285        Self::resolve_single_name(name.0, "database name")
286    }
287
288    /// return the `schema_name`
289    pub fn resolve_schema_name(name: ObjectName) -> Result<String> {
290        Self::resolve_single_name(name.0, "schema name")
291    }
292
293    /// return the `index_name`
294    pub fn resolve_index_name(name: ObjectName) -> Result<String> {
295        Self::resolve_single_name(name.0, "index name")
296    }
297
298    /// return the `view_name`
299    pub fn resolve_view_name(name: ObjectName) -> Result<String> {
300        Self::resolve_single_name(name.0, "view name")
301    }
302
303    /// return the `sink_name`
304    pub fn resolve_sink_name(name: ObjectName) -> Result<String> {
305        Self::resolve_single_name(name.0, "sink name")
306    }
307
308    /// return the `subscription_name`
309    pub fn resolve_subscription_name(name: ObjectName) -> Result<String> {
310        Self::resolve_single_name(name.0, "subscription name")
311    }
312
313    /// return the `table_name`
314    pub fn resolve_table_name(name: ObjectName) -> Result<String> {
315        Self::resolve_single_name(name.0, "table name")
316    }
317
318    /// return the `source_name`
319    pub fn resolve_source_name(name: ObjectName) -> Result<String> {
320        Self::resolve_single_name(name.0, "source name")
321    }
322
323    /// return the `user_name`
324    pub fn resolve_user_name(name: ObjectName) -> Result<String> {
325        Self::resolve_single_name(name.0, "user name")
326    }
327
328    /// Fill the [`BindContext`](super::BindContext) for table.
329    pub(super) fn bind_table_to_context(
330        &mut self,
331        columns: impl IntoIterator<Item = (bool, Field)>, // bool indicates if the field is hidden
332        table_name: String,
333        alias: Option<TableAlias>,
334    ) -> Result<()> {
335        let (table_name, column_aliases) = match alias {
336            None => (table_name, vec![]),
337            Some(TableAlias { name, columns }) => (name.real_value(), columns),
338        };
339
340        let num_col_aliases = column_aliases.len();
341
342        let begin = self.context.columns.len();
343        // Column aliases can be less than columns, but not more.
344        // It also needs to skip hidden columns.
345        let mut alias_iter = column_aliases.into_iter().fuse();
346        let mut index = 0;
347        columns.into_iter().for_each(|(is_hidden, mut field)| {
348            let name = match is_hidden {
349                true => field.name.clone(),
350                false => alias_iter
351                    .next()
352                    .map(|t| t.real_value())
353                    .unwrap_or_else(|| field.name.clone()),
354            };
355            field.name.clone_from(&name);
356            self.context.columns.push(ColumnBinding::new(
357                table_name.clone(),
358                begin + index,
359                is_hidden,
360                field,
361            ));
362            self.context
363                .indices_of
364                .entry(name)
365                .or_default()
366                .push(self.context.columns.len() - 1);
367            index += 1;
368        });
369
370        let num_cols = index;
371        if num_cols < num_col_aliases {
372            return Err(ErrorCode::BindError(format!(
373                "table \"{table_name}\" has {num_cols} columns available but {num_col_aliases} column aliases specified",
374            ))
375            .into());
376        }
377
378        match self.context.range_of.entry(table_name.clone()) {
379            Entry::Occupied(_) => Err(ErrorCode::InternalError(format!(
380                "Duplicated table name while binding table to context: {}",
381                table_name
382            ))
383            .into()),
384            Entry::Vacant(entry) => {
385                entry.insert((begin, self.context.columns.len()));
386                Ok(())
387            }
388        }
389    }
390
391    /// Binds a relation, which can be:
392    /// - a table/source/materialized view
393    /// - a reference to a CTE
394    /// - a logical view
395    pub fn bind_relation_by_name(
396        &mut self,
397        name: ObjectName,
398        alias: Option<TableAlias>,
399        as_of: Option<AsOf>,
400        allow_cross_db: bool,
401    ) -> Result<Relation> {
402        let (db_name, schema_name, table_name) = if allow_cross_db {
403            Self::resolve_db_schema_qualified_name(name)?
404        } else {
405            let (schema_name, table_name) =
406                Self::resolve_schema_qualified_name(&self.db_name, name)?;
407            (None, schema_name, table_name)
408        };
409
410        if schema_name.is_none()
411            // the `table_name` here is the name of the currently binding cte.
412            && let Some(item) = self.context.cte_to_relation.get(&table_name)
413        {
414            // Handles CTE
415
416            if as_of.is_some() {
417                return Err(ErrorCode::BindError(
418                    "Right table of a temporal join should not be a CTE. \
419                 It should be a table, index, or materialized view"
420                        .to_owned(),
421                )
422                .into());
423            }
424
425            let BindingCte {
426                share_id,
427                state: cte_state,
428                alias: mut original_alias,
429            } = item.deref().borrow().clone();
430
431            // The original CTE alias ought to be its table name.
432            debug_assert_eq!(original_alias.name.real_value(), table_name);
433
434            if let Some(from_alias) = alias {
435                original_alias.name = from_alias.name;
436                original_alias.columns = original_alias
437                    .columns
438                    .into_iter()
439                    .zip_longest(from_alias.columns)
440                    .map(EitherOrBoth::into_right)
441                    .collect();
442            }
443
444            match cte_state {
445                BindingCteState::Init => {
446                    Err(ErrorCode::BindError("Base term of recursive CTE not found, consider writing it to left side of the `UNION ALL` operator".to_owned()).into())
447                }
448                BindingCteState::BaseResolved { base } => {
449                    self.bind_table_to_context(
450                        base.schema().fields.iter().map(|f| (false, f.clone())),
451                        table_name.clone(),
452                        Some(original_alias),
453                    )?;
454                    Ok(Relation::BackCteRef(Box::new(BoundBackCteRef { share_id, base })))
455                }
456                BindingCteState::Bound { query } => {
457                    let input = BoundShareInput::Query(query);
458                    self.bind_table_to_context(
459                        input.fields()?,
460                        table_name.clone(),
461                        Some(original_alias),
462                    )?;
463                    // we could always share the cte,
464                    // no matter it's recursive or not.
465                    Ok(Relation::Share(Box::new(BoundShare { share_id, input})))
466                }
467                BindingCteState::ChangeLog { table } => {
468                    let input = BoundShareInput::ChangeLog(table);
469                    self.bind_table_to_context(
470                        input.fields()?,
471                        table_name.clone(),
472                        Some(original_alias),
473                    )?;
474                    Ok(Relation::Share(Box::new(BoundShare { share_id, input })))
475                },
476            }
477        } else {
478            self.bind_relation_by_name_inner(
479                db_name.as_deref(),
480                schema_name.as_deref(),
481                &table_name,
482                alias,
483                as_of,
484            )
485        }
486    }
487
488    // Bind a relation provided a function arg.
489    fn bind_relation_by_function_arg(
490        &mut self,
491        arg: Option<FunctionArg>,
492        err_msg: &str,
493    ) -> Result<(Relation, ObjectName)> {
494        let Some(FunctionArg::Unnamed(FunctionArgExpr::Expr(expr))) = arg else {
495            return Err(ErrorCode::BindError(err_msg.to_owned()).into());
496        };
497        let table_name = match expr {
498            ParserExpr::Identifier(ident) => Ok::<_, RwError>(ObjectName(vec![ident])),
499            ParserExpr::CompoundIdentifier(idents) => Ok(ObjectName(idents)),
500            _ => Err(ErrorCode::BindError(err_msg.to_owned()).into()),
501        }?;
502
503        Ok((
504            self.bind_relation_by_name(table_name.clone(), None, None, true)?,
505            table_name,
506        ))
507    }
508
509    // Bind column provided a function arg.
510    fn bind_column_by_function_args(
511        &mut self,
512        arg: Option<FunctionArg>,
513        err_msg: &str,
514    ) -> Result<Box<InputRef>> {
515        if let Some(time_col_arg) = arg
516            && let Some(ExprImpl::InputRef(time_col)) =
517                self.bind_function_arg(time_col_arg)?.into_iter().next()
518        {
519            Ok(time_col)
520        } else {
521            Err(ErrorCode::BindError(err_msg.to_owned()).into())
522        }
523    }
524
525    /// `rw_table(table_id[,schema_name])` which queries internal table
526    fn bind_internal_table(
527        &mut self,
528        args: Vec<FunctionArg>,
529        alias: Option<TableAlias>,
530    ) -> Result<Relation> {
531        if args.is_empty() || args.len() > 2 {
532            return Err(
533                ErrorCode::BindError("usage: rw_table(table_id[,schema_name])".to_owned()).into(),
534            );
535        }
536
537        let table_id: TableId = args[0]
538            .to_string()
539            .parse::<u32>()
540            .map_err(|err| {
541                RwError::from(ErrorCode::BindError(format!(
542                    "invalid table id: {}",
543                    err.as_report()
544                )))
545            })?
546            .into();
547
548        let schema = args.get(1).map(|arg| arg.to_string());
549
550        let table_name = self.catalog.get_table_name_by_id(table_id)?;
551        self.bind_relation_by_name_inner(None, schema.as_deref(), &table_name, alias, None)
552    }
553
554    pub(super) fn bind_table_factor(&mut self, table_factor: TableFactor) -> Result<Relation> {
555        match table_factor {
556            TableFactor::Table { name, alias, as_of } => {
557                self.bind_relation_by_name(name, alias, as_of, true)
558            }
559            TableFactor::TableFunction {
560                name,
561                alias,
562                args,
563                with_ordinality,
564            } => {
565                self.try_mark_lateral_as_visible();
566                let result = self.bind_table_function(name, alias, args, with_ordinality);
567                self.try_mark_lateral_as_invisible();
568                result
569            }
570            TableFactor::Derived {
571                lateral,
572                subquery,
573                alias,
574            } => {
575                if lateral {
576                    // If we detect a lateral, we mark the lateral context as visible.
577                    self.try_mark_lateral_as_visible();
578
579                    // Bind lateral subquery here.
580                    let bound_subquery = self.bind_subquery_relation(*subquery, alias, true)?;
581
582                    // Mark the lateral context as invisible once again.
583                    self.try_mark_lateral_as_invisible();
584                    Ok(Relation::Subquery(Box::new(bound_subquery)))
585                } else {
586                    // Non-lateral subqueries to not have access to the join-tree context.
587                    self.push_lateral_context();
588                    let bound_subquery = self.bind_subquery_relation(*subquery, alias, false)?;
589                    self.pop_and_merge_lateral_context()?;
590                    Ok(Relation::Subquery(Box::new(bound_subquery)))
591                }
592            }
593            TableFactor::NestedJoin(table_with_joins) => {
594                self.push_lateral_context();
595                let bound_join = self.bind_table_with_joins(*table_with_joins)?;
596                self.pop_and_merge_lateral_context()?;
597                Ok(bound_join)
598            }
599        }
600    }
601}