risingwave_frontend/binder/relation/
window_table_function.rs1use std::str::FromStr;
16
17use itertools::Itertools;
18use risingwave_common::catalog::Field;
19use risingwave_common::types::DataType;
20use risingwave_sqlparser::ast::{FunctionArg, TableAlias};
21
22use super::{Binder, Relation, Result};
23use crate::binder::statement::RewriteExprsRecursive;
24use crate::error::ErrorCode;
25use crate::expr::{ExprImpl, InputRef};
26
27#[derive(Copy, Clone, Debug)]
28pub enum WindowTableFunctionKind {
29 Tumble,
30 Hop,
31}
32
33impl FromStr for WindowTableFunctionKind {
34 type Err = ();
35
36 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
37 if s.eq_ignore_ascii_case("tumble") {
38 Ok(WindowTableFunctionKind::Tumble)
39 } else if s.eq_ignore_ascii_case("hop") {
40 Ok(WindowTableFunctionKind::Hop)
41 } else {
42 Err(())
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
48pub struct BoundWindowTableFunction {
49 pub(crate) input: Relation,
50 pub(crate) kind: WindowTableFunctionKind,
51 pub(crate) time_col: InputRef,
52 pub(crate) args: Vec<ExprImpl>,
53}
54
55impl RewriteExprsRecursive for BoundWindowTableFunction {
56 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
57 self.input.rewrite_exprs_recursive(rewriter);
58 let new_agrs = std::mem::take(&mut self.args)
59 .into_iter()
60 .map(|expr| rewriter.rewrite_expr(expr))
61 .collect::<Vec<_>>();
62 self.args = new_agrs;
63 }
64}
65
66const ERROR_1ST_ARG: &str = "The 1st arg of window table function should be a table name (incl. source, CTE, view) but not complex structure (subquery, join, another table function). Consider using an intermediate CTE or view as workaround.";
67const ERROR_2ND_ARG_EXPR: &str = "The 2st arg of window table function should be a column name but not complex expression. Consider using an intermediate CTE or view as workaround.";
68const ERROR_2ND_ARG_TYPE: &str = "The 2st arg of window table function should be a column of type timestamp with time zone, timestamp or date.";
69
70impl Binder {
71 pub(super) fn bind_window_table_function(
72 &mut self,
73 alias: Option<TableAlias>,
74 kind: WindowTableFunctionKind,
75 args: Vec<FunctionArg>,
76 ) -> Result<BoundWindowTableFunction> {
77 let mut args = args.into_iter();
78
79 self.push_context();
80
81 let (base, table_name) = self.bind_relation_by_function_arg(args.next(), ERROR_1ST_ARG)?;
82
83 let time_col = self.bind_column_by_function_args(args.next(), ERROR_2ND_ARG_EXPR)?;
84
85 let Some(output_type) = DataType::window_of(&time_col.data_type) else {
86 return Err(ErrorCode::BindError(ERROR_2ND_ARG_TYPE.to_owned()).into());
87 };
88
89 let base_columns = std::mem::take(&mut self.context.columns);
90
91 self.pop_context()?;
92
93 let columns = base_columns
94 .into_iter()
95 .map(|c| {
96 if c.field.name == "window_start" || c.field.name == "window_end" {
97 Err(ErrorCode::BindError(
98 "column names `window_start` and `window_end` are not allowed in window table function's input."
99 .into())
100 .into())
101 } else {
102 Ok((c.is_hidden, c.field))
103 }
104 })
105 .chain(
106 [
107 Ok((false, Field::with_name(output_type.clone(), "window_start"))),
108 Ok((false, Field::with_name(output_type, "window_end"))),
109 ]
110 .into_iter(),
111 ).collect::<Result<Vec<_>>>()?;
112
113 let (_, table_name) = Self::resolve_schema_qualified_name(&self.db_name, table_name)?;
114 self.bind_table_to_context(columns, table_name, alias)?;
115
116 let exprs: Vec<_> = args
118 .map(|arg| self.bind_function_arg(arg))
119 .flatten_ok()
120 .try_collect()?;
121 Ok(BoundWindowTableFunction {
122 input: base,
123 time_col: *time_col,
124 kind,
125 args: exprs,
126 })
127 }
128}