risingwave_frontend/binder/relation/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::ops::Deref;
17
18use itertools::{EitherOrBoth, Itertools};
19use risingwave_common::bail;
20use risingwave_common::catalog::{Field, TableId};
21use risingwave_sqlparser::ast::{
22    AsOf, Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias,
23    TableFactor,
24};
25use thiserror::Error;
26use thiserror_ext::AsReport;
27
28use super::bind_context::ColumnBinding;
29use super::statement::RewriteExprsRecursive;
30use crate::binder::Binder;
31use crate::binder::bind_context::{BindingCte, BindingCteState};
32use crate::error::{ErrorCode, Result, RwError};
33use crate::expr::{ExprImpl, InputRef};
34
35mod gap_fill;
36mod join;
37mod share;
38mod subquery;
39mod table_function;
40mod table_or_source;
41mod watermark;
42mod window_table_function;
43
44pub use gap_fill::BoundGapFill;
45pub use join::BoundJoin;
46pub use share::{BoundShare, BoundShareInput};
47pub use subquery::BoundSubquery;
48pub use table_or_source::{BoundBaseTable, BoundSource, BoundSystemTable};
49pub use watermark::BoundWatermark;
50pub use window_table_function::{BoundWindowTableFunction, WindowTableFunctionKind};
51
52use crate::expr::{CorrelatedId, Depth};
53
54/// A validated item that refers to a table-like entity, including base table, subquery, join, etc.
55/// It is usually part of the `from` clause.
56#[derive(Debug, Clone)]
57pub enum Relation {
58    Source(Box<BoundSource>),
59    BaseTable(Box<BoundBaseTable>),
60    SystemTable(Box<BoundSystemTable>),
61    Subquery(Box<BoundSubquery>),
62    Join(Box<BoundJoin>),
63    Apply(Box<BoundJoin>),
64    WindowTableFunction(Box<BoundWindowTableFunction>),
65    /// Table function or scalar function.
66    TableFunction {
67        expr: ExprImpl,
68        with_ordinality: bool,
69    },
70    Watermark(Box<BoundWatermark>),
71    Share(Box<BoundShare>),
72    GapFill(Box<BoundGapFill>),
73}
74
75impl RewriteExprsRecursive for Relation {
76    fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
77        match self {
78            Relation::Subquery(inner) => inner.rewrite_exprs_recursive(rewriter),
79            Relation::Join(inner) => inner.rewrite_exprs_recursive(rewriter),
80            Relation::Apply(inner) => inner.rewrite_exprs_recursive(rewriter),
81            Relation::WindowTableFunction(inner) => inner.rewrite_exprs_recursive(rewriter),
82            Relation::Watermark(inner) => inner.rewrite_exprs_recursive(rewriter),
83            Relation::Share(inner) => inner.rewrite_exprs_recursive(rewriter),
84            Relation::TableFunction { expr: inner, .. } => {
85                *inner = rewriter.rewrite_expr(inner.take())
86            }
87            _ => {}
88        }
89    }
90}
91
92impl Relation {
93    pub fn is_correlated_by_depth(&self, depth: Depth) -> bool {
94        match self {
95            Relation::Subquery(subquery) => subquery.query.is_correlated_by_depth(depth),
96            Relation::Join(join) | Relation::Apply(join) => {
97                join.cond.has_correlated_input_ref_by_depth(depth)
98                    || join.left.is_correlated_by_depth(depth)
99                    || join.right.is_correlated_by_depth(depth)
100            }
101            _ => false,
102        }
103    }
104
105    pub fn is_correlated_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
106        match self {
107            Relation::Subquery(subquery) => {
108                subquery.query.is_correlated_by_correlated_id(correlated_id)
109            }
110            Relation::Join(join) | Relation::Apply(join) => {
111                join.cond
112                    .has_correlated_input_ref_by_correlated_id(correlated_id)
113                    || join.left.is_correlated_by_correlated_id(correlated_id)
114                    || join.right.is_correlated_by_correlated_id(correlated_id)
115            }
116            _ => false,
117        }
118    }
119
120    pub fn collect_correlated_indices_by_depth_and_assign_id(
121        &mut self,
122        depth: Depth,
123        correlated_id: CorrelatedId,
124    ) -> Vec<usize> {
125        match self {
126            Relation::Subquery(subquery) => subquery
127                .query
128                .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
129            Relation::Join(join) | Relation::Apply(join) => {
130                let mut correlated_indices = vec![];
131                correlated_indices.extend(
132                    join.cond
133                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
134                );
135                correlated_indices.extend(
136                    join.left
137                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
138                );
139                correlated_indices.extend(
140                    join.right
141                        .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
142                );
143                correlated_indices
144            }
145            Relation::TableFunction {
146                expr: table_function,
147                with_ordinality: _,
148            } => table_function
149                .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
150            Relation::Share(share) => match &mut share.input {
151                BoundShareInput::Query(query) => {
152                    query.collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id)
153                }
154                BoundShareInput::ChangeLog(change_log) => change_log
155                    .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
156            },
157            _ => vec![],
158        }
159    }
160}
161
162#[derive(Debug)]
163#[non_exhaustive]
164pub enum ResolveQualifiedNameErrorKind {
165    QualifiedNameTooLong,
166    NotCurrentDatabase,
167}
168
169#[derive(Debug, Error)]
170pub struct ResolveQualifiedNameError {
171    qualified: String,
172    kind: ResolveQualifiedNameErrorKind,
173}
174
175impl std::fmt::Display for ResolveQualifiedNameError {
176    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177        match self.kind {
178            ResolveQualifiedNameErrorKind::QualifiedNameTooLong => write!(
179                f,
180                "improper qualified name (too many dotted names): {}",
181                self.qualified
182            ),
183            ResolveQualifiedNameErrorKind::NotCurrentDatabase => write!(
184                f,
185                "cross-database references are not implemented: \"{}\"",
186                self.qualified
187            ),
188        }
189    }
190}
191
192impl ResolveQualifiedNameError {
193    pub fn new(qualified: String, kind: ResolveQualifiedNameErrorKind) -> Self {
194        Self { qualified, kind }
195    }
196}
197
198impl From<ResolveQualifiedNameError> for RwError {
199    fn from(e: ResolveQualifiedNameError) -> Self {
200        ErrorCode::InvalidInputSyntax(format!("{}", e.as_report())).into()
201    }
202}
203
204impl Binder {
205    /// return (`schema_name`, `name`)
206    pub fn resolve_schema_qualified_name(
207        db_name: &str,
208        name: &ObjectName,
209    ) -> std::result::Result<(Option<String>, String), ResolveQualifiedNameError> {
210        let formatted_name = name.to_string();
211        let mut identifiers = name.0.clone();
212
213        if identifiers.len() > 3 {
214            return Err(ResolveQualifiedNameError::new(
215                formatted_name,
216                ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
217            ));
218        }
219
220        let name = identifiers.pop().unwrap().real_value();
221
222        let schema_name = identifiers.pop().map(|ident| ident.real_value());
223        let database_name = identifiers.pop().map(|ident| ident.real_value());
224
225        if let Some(database_name) = database_name
226            && database_name != db_name
227        {
228            return Err(ResolveQualifiedNameError::new(
229                formatted_name,
230                ResolveQualifiedNameErrorKind::NotCurrentDatabase,
231            ));
232        }
233
234        Ok((schema_name, name))
235    }
236
237    /// check whether the name is a cross-database reference
238    pub fn validate_cross_db_reference(
239        db_name: &str,
240        name: &ObjectName,
241    ) -> std::result::Result<(), ResolveQualifiedNameError> {
242        let formatted_name = name.to_string();
243        let identifiers = &name.0;
244        if identifiers.len() > 3 {
245            return Err(ResolveQualifiedNameError::new(
246                formatted_name,
247                ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
248            ));
249        }
250
251        if identifiers.len() == 3 && identifiers[0].real_value() != db_name {
252            return Err(ResolveQualifiedNameError::new(
253                formatted_name,
254                ResolveQualifiedNameErrorKind::NotCurrentDatabase,
255            ));
256        }
257
258        Ok(())
259    }
260
261    /// return (`database_name`, `schema_name`, `name`)
262    pub fn resolve_db_schema_qualified_name(
263        name: &ObjectName,
264    ) -> std::result::Result<(Option<String>, Option<String>, String), ResolveQualifiedNameError>
265    {
266        let formatted_name = name.to_string();
267        let mut identifiers = name.0.clone();
268
269        if identifiers.len() > 3 {
270            return Err(ResolveQualifiedNameError::new(
271                formatted_name,
272                ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
273            ));
274        }
275
276        let name = identifiers.pop().unwrap().real_value();
277        let schema_name = identifiers.pop().map(|ident| ident.real_value());
278        let database_name = identifiers.pop().map(|ident| ident.real_value());
279
280        Ok((database_name, schema_name, name))
281    }
282
283    /// return first name in identifiers, must have only one name.
284    fn resolve_single_name(mut identifiers: Vec<Ident>, ident_desc: &str) -> Result<String> {
285        if identifiers.len() > 1 {
286            bail!("{} must contain 1 argument", ident_desc);
287        }
288        let name = identifiers.pop().unwrap().real_value();
289
290        Ok(name)
291    }
292
293    /// return the `database_name`
294    pub fn resolve_database_name(name: ObjectName) -> Result<String> {
295        Self::resolve_single_name(name.0, "database name")
296    }
297
298    /// return the `schema_name`
299    pub fn resolve_schema_name(name: ObjectName) -> Result<String> {
300        Self::resolve_single_name(name.0, "schema name")
301    }
302
303    /// return the `index_name`
304    pub fn resolve_index_name(name: ObjectName) -> Result<String> {
305        Self::resolve_single_name(name.0, "index name")
306    }
307
308    /// return the `view_name`
309    pub fn resolve_view_name(name: ObjectName) -> Result<String> {
310        Self::resolve_single_name(name.0, "view name")
311    }
312
313    /// return the `sink_name`
314    pub fn resolve_sink_name(name: ObjectName) -> Result<String> {
315        Self::resolve_single_name(name.0, "sink name")
316    }
317
318    /// return the `subscription_name`
319    pub fn resolve_subscription_name(name: ObjectName) -> Result<String> {
320        Self::resolve_single_name(name.0, "subscription name")
321    }
322
323    /// return the `table_name`
324    pub fn resolve_table_name(name: ObjectName) -> Result<String> {
325        Self::resolve_single_name(name.0, "table name")
326    }
327
328    /// return the `source_name`
329    pub fn resolve_source_name(name: ObjectName) -> Result<String> {
330        Self::resolve_single_name(name.0, "source name")
331    }
332
333    /// return the `user_name`
334    pub fn resolve_user_name(name: ObjectName) -> Result<String> {
335        Self::resolve_single_name(name.0, "user name")
336    }
337
338    /// Fill the [`BindContext`](super::BindContext) for table.
339    pub(super) fn bind_table_to_context(
340        &mut self,
341        columns: impl IntoIterator<Item = (bool, Field)>, // bool indicates if the field is hidden
342        table_name: String,
343        schema_name: Option<String>,
344        alias: Option<&TableAlias>,
345    ) -> Result<()> {
346        const EMPTY: [Ident; 0] = [];
347        let (resolved_schema_name, table_name, column_aliases, table_alias) = match alias {
348            None => (schema_name.clone(), table_name, &EMPTY[..], None),
349            Some(TableAlias { name, columns }) => (
350                None,
351                name.real_value(),
352                columns.as_slice(),
353                Some(table_name),
354            ),
355        };
356
357        let num_col_aliases = column_aliases.len();
358
359        let begin = self.context.columns.len();
360        // Column aliases can be less than columns, but not more.
361        // It also needs to skip hidden columns.
362        let mut alias_iter = column_aliases.iter().fuse();
363        let mut index = 0;
364        columns.into_iter().for_each(|(is_hidden, mut field)| {
365            let name = match is_hidden {
366                true => field.name.clone(),
367                false => alias_iter
368                    .next()
369                    .map(|t| t.real_value())
370                    .unwrap_or_else(|| field.name.clone()),
371            };
372            field.name.clone_from(&name);
373            self.context.columns.push(ColumnBinding::new(
374                table_name.clone(),
375                schema_name.clone(),
376                table_alias.clone(),
377                begin + index,
378                is_hidden,
379                field,
380            ));
381            self.context
382                .indices_of
383                .entry(name)
384                .or_default()
385                .push(self.context.columns.len() - 1);
386            index += 1;
387        });
388
389        let num_cols = index;
390        if num_cols < num_col_aliases {
391            return Err(ErrorCode::BindError(format!(
392                "table \"{table_name}\" has {num_cols} columns available but {num_col_aliases} column aliases specified",
393            ))
394            .into());
395        }
396
397        match self
398            .context
399            .range_of
400            .entry((resolved_schema_name, table_name.clone()))
401        {
402            Entry::Occupied(_) => Err(ErrorCode::InternalError(format!(
403                "Duplicated table name while binding table to context: {}",
404                table_name
405            ))
406            .into()),
407            Entry::Vacant(entry) => {
408                entry.insert((begin, self.context.columns.len()));
409                Ok(())
410            }
411        }
412    }
413
414    /// Binds a relation, which can be:
415    /// - a table/source/materialized view
416    /// - a reference to a CTE
417    /// - a logical view
418    pub fn bind_relation_by_name(
419        &mut self,
420        name: &ObjectName,
421        alias: Option<&TableAlias>,
422        as_of: Option<&AsOf>,
423        allow_cross_db: bool,
424    ) -> Result<Relation> {
425        let (db_name, schema_name, table_name) = if allow_cross_db {
426            Self::resolve_db_schema_qualified_name(name)?
427        } else {
428            let (schema_name, table_name) =
429                Self::resolve_schema_qualified_name(&self.db_name, name)?;
430            (None, schema_name, table_name)
431        };
432
433        if schema_name.is_none()
434            // the `table_name` here is the name of the currently binding cte.
435            && let Some(item) = self.context.cte_to_relation.get(&table_name)
436        {
437            // Handles CTE
438
439            if as_of.is_some() {
440                return Err(ErrorCode::BindError(
441                    "Right table of a temporal join should not be a CTE. \
442                 It should be a table, index, or materialized view"
443                        .to_owned(),
444                )
445                .into());
446            }
447
448            let BindingCte {
449                share_id,
450                state: cte_state,
451                alias: mut original_alias,
452            } = item.deref().borrow().clone();
453
454            // The original CTE alias ought to be its table name.
455            debug_assert_eq!(original_alias.name.real_value(), table_name);
456
457            if let Some(from_alias) = alias {
458                original_alias.name = from_alias.name.clone();
459                original_alias.columns = original_alias
460                    .columns
461                    .into_iter()
462                    .zip_longest(from_alias.columns.iter().cloned())
463                    .map(EitherOrBoth::into_right)
464                    .collect();
465            }
466
467            match cte_state {
468                BindingCteState::Bound { query } => {
469                    let input = BoundShareInput::Query(query);
470                    self.bind_table_to_context(
471                        input.fields()?,
472                        table_name,
473                        None,
474                        Some(&original_alias),
475                    )?;
476                    // we could always share the cte,
477                    // no matter it's recursive or not.
478                    Ok(Relation::Share(Box::new(BoundShare { share_id, input })))
479                }
480                BindingCteState::ChangeLog { table } => {
481                    let input = BoundShareInput::ChangeLog(table);
482                    self.bind_table_to_context(
483                        input.fields()?,
484                        table_name,
485                        None,
486                        Some(&original_alias),
487                    )?;
488                    Ok(Relation::Share(Box::new(BoundShare { share_id, input })))
489                }
490            }
491        } else {
492            self.bind_catalog_relation_by_name(
493                db_name.as_deref(),
494                schema_name.as_deref(),
495                &table_name,
496                alias,
497                as_of,
498                false,
499            )
500        }
501    }
502
503    // Bind a relation provided a function arg.
504    fn bind_relation_by_function_arg(
505        &mut self,
506        arg: Option<&FunctionArg>,
507        err_msg: &str,
508    ) -> Result<(Relation, ObjectName)> {
509        let Some(FunctionArg::Unnamed(FunctionArgExpr::Expr(expr))) = arg else {
510            return Err(ErrorCode::BindError(err_msg.to_owned()).into());
511        };
512        let table_name = match expr {
513            ParserExpr::Identifier(ident) => Ok::<_, RwError>(ObjectName(vec![ident.clone()])),
514            ParserExpr::CompoundIdentifier(idents) => Ok(ObjectName(idents.clone())),
515            _ => Err(ErrorCode::BindError(err_msg.to_owned()).into()),
516        }?;
517
518        Ok((
519            self.bind_relation_by_name(&table_name, None, None, true)?,
520            table_name,
521        ))
522    }
523
524    // Bind column provided a function arg.
525    fn bind_column_by_function_args(
526        &mut self,
527        arg: Option<&FunctionArg>,
528        err_msg: &str,
529    ) -> Result<Box<InputRef>> {
530        if let Some(time_col_arg) = arg
531            && let Some(ExprImpl::InputRef(time_col)) =
532                self.bind_function_arg(time_col_arg)?.into_iter().next()
533        {
534            Ok(time_col)
535        } else {
536            Err(ErrorCode::BindError(err_msg.to_owned()).into())
537        }
538    }
539
540    /// `rw_table(table_id[,schema_name])` which queries internal table
541    fn bind_internal_table(
542        &mut self,
543        args: &[FunctionArg],
544        alias: Option<&TableAlias>,
545    ) -> Result<Relation> {
546        if args.is_empty() || args.len() > 2 {
547            return Err(
548                ErrorCode::BindError("usage: rw_table(table_id[,schema_name])".to_owned()).into(),
549            );
550        }
551
552        let table_id: TableId = args[0]
553            .to_string()
554            .parse::<u32>()
555            .map_err(|err| {
556                RwError::from(ErrorCode::BindError(format!(
557                    "invalid table id: {}",
558                    err.as_report()
559                )))
560            })?
561            .into();
562
563        let schema = args.get(1).map(|arg| arg.to_string());
564
565        let table_name = self.catalog.get_table_name_by_id(table_id)?;
566        self.bind_catalog_relation_by_name(None, schema.as_deref(), &table_name, alias, None, false)
567    }
568
569    pub(super) fn bind_table_factor(&mut self, table_factor: &TableFactor) -> Result<Relation> {
570        match table_factor {
571            TableFactor::Table { name, alias, as_of } => {
572                self.bind_relation_by_name(name, alias.as_ref(), as_of.as_ref(), true)
573            }
574            TableFactor::TableFunction {
575                name,
576                alias,
577                args,
578                with_ordinality,
579            } => {
580                self.try_mark_lateral_as_visible();
581                let result = self.bind_table_function(name, alias.as_ref(), args, *with_ordinality);
582                self.try_mark_lateral_as_invisible();
583                result
584            }
585            TableFactor::Derived {
586                lateral,
587                subquery,
588                alias,
589            } => {
590                if *lateral {
591                    // If we detect a lateral, we mark the lateral context as visible.
592                    self.try_mark_lateral_as_visible();
593
594                    // Bind lateral subquery here.
595                    let bound_subquery =
596                        self.bind_subquery_relation(subquery, alias.as_ref(), true)?;
597
598                    // Mark the lateral context as invisible once again.
599                    self.try_mark_lateral_as_invisible();
600                    Ok(Relation::Subquery(Box::new(bound_subquery)))
601                } else {
602                    // Non-lateral subqueries to not have access to the join-tree context.
603                    self.push_lateral_context();
604                    let bound_subquery =
605                        self.bind_subquery_relation(subquery, alias.as_ref(), false)?;
606                    self.pop_and_merge_lateral_context()?;
607                    Ok(Relation::Subquery(Box::new(bound_subquery)))
608                }
609            }
610            TableFactor::NestedJoin(table_with_joins) => {
611                self.push_lateral_context();
612                let bound_join = self.bind_table_with_joins(table_with_joins)?;
613                self.pop_and_merge_lateral_context()?;
614                Ok(bound_join)
615            }
616        }
617    }
618}