risingwave_frontend/binder/relation/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::ops::Deref;
17
18use itertools::{EitherOrBoth, Itertools};
19use risingwave_common::bail;
20use risingwave_common::catalog::{Field, TableId};
21use risingwave_sqlparser::ast::{
22    AsOf, Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias,
23    TableFactor,
24};
25use thiserror::Error;
26use thiserror_ext::AsReport;
27
28use super::bind_context::ColumnBinding;
29use super::statement::RewriteExprsRecursive;
30use crate::binder::Binder;
31use crate::binder::bind_context::{BindingCte, BindingCteState};
32use crate::error::{ErrorCode, Result, RwError};
33use crate::expr::{ExprImpl, InputRef};
34
35mod gap_fill;
36mod join;
37mod share;
38mod subquery;
39mod table_function;
40mod table_or_source;
41mod watermark;
42mod window_table_function;
43
44pub use gap_fill::BoundGapFill;
45pub use join::BoundJoin;
46pub use share::{BoundShare, BoundShareInput};
47pub use subquery::BoundSubquery;
48pub use table_or_source::{BoundBaseTable, BoundSource, BoundSystemTable};
49pub use watermark::BoundWatermark;
50pub use window_table_function::{BoundWindowTableFunction, WindowTableFunctionKind};
51
52use crate::expr::{CorrelatedId, Depth};
53
54/// A validated item that refers to a table-like entity, including base table, subquery, join, etc.
55/// It is usually part of the `from` clause.
56#[derive(Debug, Clone)]
57pub enum Relation {
58    Source(Box<BoundSource>),
59    BaseTable(Box<BoundBaseTable>),
60    SystemTable(Box<BoundSystemTable>),
61    Subquery(Box<BoundSubquery>),
62    Join(Box<BoundJoin>),
63    Apply(Box<BoundJoin>),
64    WindowTableFunction(Box<BoundWindowTableFunction>),
65    /// Table function or scalar function.
66    TableFunction {
67        expr: ExprImpl,
68        with_ordinality: bool,
69    },
70    Watermark(Box<BoundWatermark>),
71    Share(Box<BoundShare>),
72    GapFill(Box<BoundGapFill>),
73}
74
75impl RewriteExprsRecursive for Relation {
76    fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
77        match self {
78            Relation::Subquery(inner) => inner.rewrite_exprs_recursive(rewriter),
79            Relation::Join(inner) => inner.rewrite_exprs_recursive(rewriter),
80            Relation::Apply(inner) => inner.rewrite_exprs_recursive(rewriter),
81            Relation::WindowTableFunction(inner) => inner.rewrite_exprs_recursive(rewriter),
82            Relation::Watermark(inner) => inner.rewrite_exprs_recursive(rewriter),
83            Relation::Share(inner) => inner.rewrite_exprs_recursive(rewriter),
84            Relation::TableFunction { expr: inner, .. } => {
85                *inner = rewriter.rewrite_expr(inner.take())
86            }
87            _ => {}
88        }
89    }
90}
91
92impl Relation {
93    pub fn is_correlated_by_depth(&self, depth: Depth) -> bool {
94        match self {
95            Relation::Subquery(subquery) => subquery.query.is_correlated_by_depth(depth),
96            Relation::Join(join) | Relation::Apply(join) => {
97                join.cond.has_correlated_input_ref_by_depth(depth)
98                    || join.left.is_correlated_by_depth(depth)
99                    || join.right.is_correlated_by_depth(depth)
100            }
101            _ => false,
102        }
103    }
104
105    pub fn is_correlated_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
106        match self {
107            Relation::Subquery(subquery) => {
108                subquery.query.is_correlated_by_correlated_id(correlated_id)
109            }
110            Relation::Join(join) | Relation::Apply(join) => {
111                join.cond
112                    .has_correlated_input_ref_by_correlated_id(correlated_id)
113                    || join.left.is_correlated_by_correlated_id(correlated_id)
114                    || join.right.is_correlated_by_correlated_id(correlated_id)
115            }
116            _ => false,
117        }
118    }
119
120    pub fn collect_correlated_indices_by_depth_and_assign_id(
121        &mut self,
122        depth: Depth,
123        correlated_id: CorrelatedId,
124    ) -> Vec<usize> {
125        match self {
126            Relation::Subquery(subquery) => subquery
127                .query
128                .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
129            Relation::Join(join) | Relation::Apply(join) => {
130                let mut correlated_indices = vec![];
131                correlated_indices.extend(
132                    join.cond
133                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
134                );
135                correlated_indices.extend(
136                    join.left
137                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
138                );
139                correlated_indices.extend(
140                    join.right
141                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
142                );
143                correlated_indices
144            }
145            Relation::TableFunction {
146                expr: table_function,
147                with_ordinality: _,
148            } => table_function
149                .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
150            Relation::Share(share) => match &mut share.input {
151                BoundShareInput::Query(query) => {
152                    query.collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id)
153                }
154                BoundShareInput::ChangeLog(change_log) => change_log
155                    .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
156            },
157            _ => vec![],
158        }
159    }
160}
161
162#[derive(Debug)]
163#[non_exhaustive]
164pub enum ResolveQualifiedNameErrorKind {
165    QualifiedNameTooLong,
166    NotCurrentDatabase,
167}
168
169#[derive(Debug, Error)]
170pub struct ResolveQualifiedNameError {
171    qualified: String,
172    kind: ResolveQualifiedNameErrorKind,
173}
174
175impl std::fmt::Display for ResolveQualifiedNameError {
176    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177        match self.kind {
178            ResolveQualifiedNameErrorKind::QualifiedNameTooLong => write!(
179                f,
180                "improper qualified name (too many dotted names): {}",
181                self.qualified
182            ),
183            ResolveQualifiedNameErrorKind::NotCurrentDatabase => write!(
184                f,
185                "cross-database references are not implemented: \"{}\"",
186                self.qualified
187            ),
188        }
189    }
190}
191
192impl ResolveQualifiedNameError {
193    pub fn new(qualified: String, kind: ResolveQualifiedNameErrorKind) -> Self {
194        Self { qualified, kind }
195    }
196}
197
198impl From<ResolveQualifiedNameError> for RwError {
199    fn from(e: ResolveQualifiedNameError) -> Self {
200        ErrorCode::InvalidInputSyntax(format!("{}", e.as_report())).into()
201    }
202}
203
204impl Binder {
205    /// return (`schema_name`, `name`)
206    pub fn resolve_schema_qualified_name(
207        db_name: &str,
208        name: &ObjectName,
209    ) -> std::result::Result<(Option<String>, String), ResolveQualifiedNameError> {
210        let formatted_name = name.to_string();
211        let mut identifiers = name.0.clone();
212
213        if identifiers.len() > 3 {
214            return Err(ResolveQualifiedNameError::new(
215                formatted_name,
216                ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
217            ));
218        }
219
220        let name = identifiers.pop().unwrap().real_value();
221
222        let schema_name = identifiers.pop().map(|ident| ident.real_value());
223        let database_name = identifiers.pop().map(|ident| ident.real_value());
224
225        if let Some(database_name) = database_name
226            && database_name != db_name
227        {
228            return Err(ResolveQualifiedNameError::new(
229                formatted_name,
230                ResolveQualifiedNameErrorKind::NotCurrentDatabase,
231            ));
232        }
233
234        Ok((schema_name, name))
235    }
236
237    /// check whether the name is a cross-database reference
238    pub fn validate_cross_db_reference(
239        db_name: &str,
240        name: &ObjectName,
241    ) -> std::result::Result<(), ResolveQualifiedNameError> {
242        let formatted_name = name.to_string();
243        let identifiers = &name.0;
244        if identifiers.len() > 3 {
245            return Err(ResolveQualifiedNameError::new(
246                formatted_name,
247                ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
248            ));
249        }
250
251        if identifiers.len() == 3 && identifiers[0].real_value() != db_name {
252            return Err(ResolveQualifiedNameError::new(
253                formatted_name,
254                ResolveQualifiedNameErrorKind::NotCurrentDatabase,
255            ));
256        }
257
258        Ok(())
259    }
260
261    /// return (`database_name`, `schema_name`, `name`)
262    pub fn resolve_db_schema_qualified_name(
263        name: &ObjectName,
264    ) -> std::result::Result<(Option<String>, Option<String>, String), ResolveQualifiedNameError>
265    {
266        let formatted_name = name.to_string();
267        let mut identifiers = name.0.clone();
268
269        if identifiers.len() > 3 {
270            return Err(ResolveQualifiedNameError::new(
271                formatted_name,
272                ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
273            ));
274        }
275
276        let name = identifiers.pop().unwrap().real_value();
277        let schema_name = identifiers.pop().map(|ident| ident.real_value());
278        let database_name = identifiers.pop().map(|ident| ident.real_value());
279
280        Ok((database_name, schema_name, name))
281    }
282
283    /// return first name in identifiers, must have only one name.
284    fn resolve_single_name(mut identifiers: Vec<Ident>, ident_desc: &str) -> Result<String> {
285        if identifiers.len() > 1 {
286            bail!("{} must contain 1 argument", ident_desc);
287        }
288        let name = identifiers.pop().unwrap().real_value();
289
290        Ok(name)
291    }
292
293    /// return the `database_name`
294    pub fn resolve_database_name(name: ObjectName) -> Result<String> {
295        Self::resolve_single_name(name.0, "database name")
296    }
297
298    /// return the `schema_name`
299    pub fn resolve_schema_name(name: ObjectName) -> Result<String> {
300        Self::resolve_single_name(name.0, "schema name")
301    }
302
303    /// return the `index_name`
304    pub fn resolve_index_name(name: ObjectName) -> Result<String> {
305        Self::resolve_single_name(name.0, "index name")
306    }
307
308    /// return the `view_name`
309    pub fn resolve_view_name(name: ObjectName) -> Result<String> {
310        Self::resolve_single_name(name.0, "view name")
311    }
312
313    /// return the `sink_name`
314    pub fn resolve_sink_name(name: ObjectName) -> Result<String> {
315        Self::resolve_single_name(name.0, "sink name")
316    }
317
318    /// return the `subscription_name`
319    pub fn resolve_subscription_name(name: ObjectName) -> Result<String> {
320        Self::resolve_single_name(name.0, "subscription name")
321    }
322
323    /// return the `table_name`
324    pub fn resolve_table_name(name: ObjectName) -> Result<String> {
325        Self::resolve_single_name(name.0, "table name")
326    }
327
328    /// return the `source_name`
329    pub fn resolve_source_name(name: ObjectName) -> Result<String> {
330        Self::resolve_single_name(name.0, "source name")
331    }
332
333    /// return the `user_name`
334    pub fn resolve_user_name(name: ObjectName) -> Result<String> {
335        Self::resolve_single_name(name.0, "user name")
336    }
337
338    /// Fill the [`BindContext`](super::BindContext) for table.
339    pub(super) fn bind_table_to_context(
340        &mut self,
341        columns: impl IntoIterator<Item = (bool, Field)>, // bool indicates if the field is hidden
342        table_name: String,
343        alias: Option<&TableAlias>,
344    ) -> Result<()> {
345        const EMPTY: [Ident; 0] = [];
346        let (table_name, column_aliases) = match alias {
347            None => (table_name, &EMPTY[..]),
348            Some(TableAlias { name, columns }) => (name.real_value(), columns.as_slice()),
349        };
350
351        let num_col_aliases = column_aliases.len();
352
353        let begin = self.context.columns.len();
354        // Column aliases can be less than columns, but not more.
355        // It also needs to skip hidden columns.
356        let mut alias_iter = column_aliases.iter().fuse();
357        let mut index = 0;
358        columns.into_iter().for_each(|(is_hidden, mut field)| {
359            let name = match is_hidden {
360                true => field.name.clone(),
361                false => alias_iter
362                    .next()
363                    .map(|t| t.real_value())
364                    .unwrap_or_else(|| field.name.clone()),
365            };
366            field.name.clone_from(&name);
367            self.context.columns.push(ColumnBinding::new(
368                table_name.clone(),
369                begin + index,
370                is_hidden,
371                field,
372            ));
373            self.context
374                .indices_of
375                .entry(name)
376                .or_default()
377                .push(self.context.columns.len() - 1);
378            index += 1;
379        });
380
381        let num_cols = index;
382        if num_cols < num_col_aliases {
383            return Err(ErrorCode::BindError(format!(
384                "table \"{table_name}\" has {num_cols} columns available but {num_col_aliases} column aliases specified",
385            ))
386            .into());
387        }
388
389        match self.context.range_of.entry(table_name.clone()) {
390            Entry::Occupied(_) => Err(ErrorCode::InternalError(format!(
391                "Duplicated table name while binding table to context: {}",
392                table_name
393            ))
394            .into()),
395            Entry::Vacant(entry) => {
396                entry.insert((begin, self.context.columns.len()));
397                Ok(())
398            }
399        }
400    }
401
402    /// Binds a relation, which can be:
403    /// - a table/source/materialized view
404    /// - a reference to a CTE
405    /// - a logical view
406    pub fn bind_relation_by_name(
407        &mut self,
408        name: &ObjectName,
409        alias: Option<&TableAlias>,
410        as_of: Option<&AsOf>,
411        allow_cross_db: bool,
412    ) -> Result<Relation> {
413        let (db_name, schema_name, table_name) = if allow_cross_db {
414            Self::resolve_db_schema_qualified_name(name)?
415        } else {
416            let (schema_name, table_name) =
417                Self::resolve_schema_qualified_name(&self.db_name, name)?;
418            (None, schema_name, table_name)
419        };
420
421        if schema_name.is_none()
422            // the `table_name` here is the name of the currently binding cte.
423            && let Some(item) = self.context.cte_to_relation.get(&table_name)
424        {
425            // Handles CTE
426
427            if as_of.is_some() {
428                return Err(ErrorCode::BindError(
429                    "Right table of a temporal join should not be a CTE. \
430                 It should be a table, index, or materialized view"
431                        .to_owned(),
432                )
433                .into());
434            }
435
436            let BindingCte {
437                share_id,
438                state: cte_state,
439                alias: mut original_alias,
440            } = item.deref().borrow().clone();
441
442            // The original CTE alias ought to be its table name.
443            debug_assert_eq!(original_alias.name.real_value(), table_name);
444
445            if let Some(from_alias) = alias {
446                original_alias.name = from_alias.name.clone();
447                original_alias.columns = original_alias
448                    .columns
449                    .into_iter()
450                    .zip_longest(from_alias.columns.iter().cloned())
451                    .map(EitherOrBoth::into_right)
452                    .collect();
453            }
454
455            match cte_state {
456                BindingCteState::Bound { query } => {
457                    let input = BoundShareInput::Query(query);
458                    self.bind_table_to_context(input.fields()?, table_name, Some(&original_alias))?;
459                    // we could always share the cte,
460                    // no matter it's recursive or not.
461                    Ok(Relation::Share(Box::new(BoundShare { share_id, input })))
462                }
463                BindingCteState::ChangeLog { table } => {
464                    let input = BoundShareInput::ChangeLog(table);
465                    self.bind_table_to_context(input.fields()?, table_name, Some(&original_alias))?;
466                    Ok(Relation::Share(Box::new(BoundShare { share_id, input })))
467                }
468            }
469        } else {
470            self.bind_catalog_relation_by_name(
471                db_name.as_deref(),
472                schema_name.as_deref(),
473                &table_name,
474                alias,
475                as_of,
476                false,
477            )
478        }
479    }
480
481    // Bind a relation provided a function arg.
482    fn bind_relation_by_function_arg(
483        &mut self,
484        arg: Option<&FunctionArg>,
485        err_msg: &str,
486    ) -> Result<(Relation, ObjectName)> {
487        let Some(FunctionArg::Unnamed(FunctionArgExpr::Expr(expr))) = arg else {
488            return Err(ErrorCode::BindError(err_msg.to_owned()).into());
489        };
490        let table_name = match expr {
491            ParserExpr::Identifier(ident) => Ok::<_, RwError>(ObjectName(vec![ident.clone()])),
492            ParserExpr::CompoundIdentifier(idents) => Ok(ObjectName(idents.clone())),
493            _ => Err(ErrorCode::BindError(err_msg.to_owned()).into()),
494        }?;
495
496        Ok((
497            self.bind_relation_by_name(&table_name, None, None, true)?,
498            table_name,
499        ))
500    }
501
502    // Bind column provided a function arg.
503    fn bind_column_by_function_args(
504        &mut self,
505        arg: Option<&FunctionArg>,
506        err_msg: &str,
507    ) -> Result<Box<InputRef>> {
508        if let Some(time_col_arg) = arg
509            && let Some(ExprImpl::InputRef(time_col)) =
510                self.bind_function_arg(time_col_arg)?.into_iter().next()
511        {
512            Ok(time_col)
513        } else {
514            Err(ErrorCode::BindError(err_msg.to_owned()).into())
515        }
516    }
517
518    /// `rw_table(table_id[,schema_name])` which queries internal table
519    fn bind_internal_table(
520        &mut self,
521        args: &[FunctionArg],
522        alias: Option<&TableAlias>,
523    ) -> Result<Relation> {
524        if args.is_empty() || args.len() > 2 {
525            return Err(
526                ErrorCode::BindError("usage: rw_table(table_id[,schema_name])".to_owned()).into(),
527            );
528        }
529
530        let table_id: TableId = args[0]
531            .to_string()
532            .parse::<u32>()
533            .map_err(|err| {
534                RwError::from(ErrorCode::BindError(format!(
535                    "invalid table id: {}",
536                    err.as_report()
537                )))
538            })?
539            .into();
540
541        let schema = args.get(1).map(|arg| arg.to_string());
542
543        let table_name = self.catalog.get_table_name_by_id(table_id)?;
544        self.bind_catalog_relation_by_name(None, schema.as_deref(), &table_name, alias, None, false)
545    }
546
547    pub(super) fn bind_table_factor(&mut self, table_factor: &TableFactor) -> Result<Relation> {
548        match table_factor {
549            TableFactor::Table { name, alias, as_of } => {
550                self.bind_relation_by_name(name, alias.as_ref(), as_of.as_ref(), true)
551            }
552            TableFactor::TableFunction {
553                name,
554                alias,
555                args,
556                with_ordinality,
557            } => {
558                self.try_mark_lateral_as_visible();
559                let result = self.bind_table_function(name, alias.as_ref(), args, *with_ordinality);
560                self.try_mark_lateral_as_invisible();
561                result
562            }
563            TableFactor::Derived {
564                lateral,
565                subquery,
566                alias,
567            } => {
568                if *lateral {
569                    // If we detect a lateral, we mark the lateral context as visible.
570                    self.try_mark_lateral_as_visible();
571
572                    // Bind lateral subquery here.
573                    let bound_subquery =
574                        self.bind_subquery_relation(subquery, alias.as_ref(), true)?;
575
576                    // Mark the lateral context as invisible once again.
577                    self.try_mark_lateral_as_invisible();
578                    Ok(Relation::Subquery(Box::new(bound_subquery)))
579                } else {
580                    // Non-lateral subqueries to not have access to the join-tree context.
581                    self.push_lateral_context();
582                    let bound_subquery =
583                        self.bind_subquery_relation(subquery, alias.as_ref(), false)?;
584                    self.pop_and_merge_lateral_context()?;
585                    Ok(Relation::Subquery(Box::new(bound_subquery)))
586                }
587            }
588            TableFactor::NestedJoin(table_with_joins) => {
589                self.push_lateral_context();
590                let bound_join = self.bind_table_with_joins(table_with_joins)?;
591                self.pop_and_merge_lateral_context()?;
592                Ok(bound_join)
593            }
594        }
595    }
596}