risingwave_frontend/binder/relation/
table_function.rs1use std::str::FromStr;
16
17use itertools::Itertools;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::catalog::{Field, RW_INTERNAL_TABLE_FUNCTION_NAME, Schema};
20use risingwave_common::types::DataType;
21use risingwave_sqlparser::ast::{Function, FunctionArg, FunctionArgList, ObjectName, TableAlias};
22
23use super::watermark::is_watermark_func;
24use super::{Binder, Relation, Result, WindowTableFunctionKind};
25use crate::binder::bind_context::Clause;
26use crate::error::ErrorCode;
27use crate::expr::{Expr, ExprImpl};
28
29impl Binder {
30 pub(super) fn bind_table_function(
37 &mut self,
38 name: ObjectName,
39 alias: Option<TableAlias>,
40 args: Vec<FunctionArg>,
41 with_ordinality: bool,
42 ) -> Result<Relation> {
43 let func_name = &name.0[0].real_value();
44 {
46 if func_name.eq_ignore_ascii_case(RW_INTERNAL_TABLE_FUNCTION_NAME) {
47 if with_ordinality {
48 bail_not_implemented!(
49 "WITH ORDINALITY for internal/system table function {}",
50 func_name
51 );
52 }
53 return self.bind_internal_table(args, alias);
54 }
55 }
56 if let Ok(kind) = WindowTableFunctionKind::from_str(func_name) {
58 if with_ordinality {
59 return Err(ErrorCode::InvalidInputSyntax(format!(
60 "WITH ORDINALITY for window table function {}",
61 func_name
62 ))
63 .into());
64 }
65 return Ok(Relation::WindowTableFunction(Box::new(
66 self.bind_window_table_function(alias, kind, args)?,
67 )));
68 }
69 if is_watermark_func(func_name) {
71 if with_ordinality {
72 return Err(ErrorCode::InvalidInputSyntax(
73 "WITH ORDINALITY for watermark".to_owned(),
74 )
75 .into());
76 }
77 return Ok(Relation::Watermark(Box::new(
78 self.bind_watermark(alias, args)?,
79 )));
80 };
81
82 self.push_context();
83 let mut clause = Some(Clause::From);
84 std::mem::swap(&mut self.context.clause, &mut clause);
85 let func = self.bind_function(Function {
86 scalar_as_agg: false,
87 name,
88 arg_list: FunctionArgList::args_only(args),
89 over: None,
90 filter: None,
91 within_group: None,
92 });
93 self.context.clause = clause;
94 self.pop_context()?;
95 let func = func?;
96
97 if let ExprImpl::TableFunction(func) = &func {
98 if func.args.iter().any(|arg| arg.has_subquery()) {
99 return Err(ErrorCode::InvalidInputSyntax(
101 format!("Only table-in-out functions can have subquery parameters. The table function has subquery parameters is {}", func.name()),
102 )
103 .into());
104 }
105 }
106
107 let mut columns = if let DataType::Struct(s) = func.return_type() {
109 let schema = Schema::from(&s);
111 schema.fields.into_iter().map(|f| (false, f)).collect_vec()
112 } else {
113 let col_name = if let Some(alias) = &alias {
126 alias.name.real_value()
127 } else {
128 func_name.clone()
129 };
130 vec![(false, Field::with_name(func.return_type(), col_name))]
131 };
132 if with_ordinality {
133 columns.push((false, Field::with_name(DataType::Int64, "ordinality")));
134 }
135
136 self.bind_table_to_context(columns, func_name.clone(), alias)?;
137
138 Ok(Relation::TableFunction {
139 expr: func,
140 with_ordinality,
141 })
142 }
143}