risingwave_frontend/binder/relation/
window_table_function.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16
17use itertools::Itertools;
18use risingwave_common::catalog::Field;
19use risingwave_common::types::DataType;
20use risingwave_sqlparser::ast::{FunctionArg, TableAlias};
21
22use super::{Binder, Relation, Result};
23use crate::binder::statement::RewriteExprsRecursive;
24use crate::error::ErrorCode;
25use crate::expr::{ExprImpl, InputRef};
26
27#[derive(Copy, Clone, Debug)]
28pub enum WindowTableFunctionKind {
29    Tumble,
30    Hop,
31}
32
33impl FromStr for WindowTableFunctionKind {
34    type Err = ();
35
36    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
37        if s.eq_ignore_ascii_case("tumble") {
38            Ok(WindowTableFunctionKind::Tumble)
39        } else if s.eq_ignore_ascii_case("hop") {
40            Ok(WindowTableFunctionKind::Hop)
41        } else {
42            Err(())
43        }
44    }
45}
46
47#[derive(Debug, Clone)]
48pub struct BoundWindowTableFunction {
49    pub(crate) input: Relation,
50    pub(crate) kind: WindowTableFunctionKind,
51    pub(crate) time_col: InputRef,
52    pub(crate) args: Vec<ExprImpl>,
53}
54
55impl RewriteExprsRecursive for BoundWindowTableFunction {
56    fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
57        self.input.rewrite_exprs_recursive(rewriter);
58        let new_agrs = std::mem::take(&mut self.args)
59            .into_iter()
60            .map(|expr| rewriter.rewrite_expr(expr))
61            .collect::<Vec<_>>();
62        self.args = new_agrs;
63    }
64}
65
66const ERROR_1ST_ARG: &str = "The 1st arg of window table function should be a table name (incl. source, CTE, view) but not complex structure (subquery, join, another table function). Consider using an intermediate CTE or view as workaround.";
67const ERROR_2ND_ARG_EXPR: &str = "The 2st arg of window table function should be a column name but not complex expression. Consider using an intermediate CTE or view as workaround.";
68const ERROR_2ND_ARG_TYPE: &str = "The 2st arg of window table function should be a column of type timestamp with time zone, timestamp or date.";
69
70impl Binder {
71    pub(super) fn bind_window_table_function(
72        &mut self,
73        alias: Option<TableAlias>,
74        kind: WindowTableFunctionKind,
75        args: Vec<FunctionArg>,
76    ) -> Result<BoundWindowTableFunction> {
77        let mut args = args.into_iter();
78
79        self.push_context();
80
81        let (base, table_name) = self.bind_relation_by_function_arg(args.next(), ERROR_1ST_ARG)?;
82
83        let time_col = self.bind_column_by_function_args(args.next(), ERROR_2ND_ARG_EXPR)?;
84
85        let Some(output_type) = DataType::window_of(&time_col.data_type) else {
86            return Err(ErrorCode::BindError(ERROR_2ND_ARG_TYPE.to_owned()).into());
87        };
88
89        let base_columns = std::mem::take(&mut self.context.columns);
90
91        self.pop_context()?;
92
93        let columns = base_columns
94            .into_iter()
95            .map(|c| {
96                if c.field.name == "window_start" || c.field.name == "window_end" {
97                    Err(ErrorCode::BindError(
98                        "column names `window_start` and `window_end` are not allowed in window table function's input."
99                        .into())
100                    .into())
101                } else {
102                    Ok((c.is_hidden, c.field))
103                }
104            })
105            .chain(
106                [
107                    Ok((false, Field::with_name(output_type.clone(), "window_start"))),
108                    Ok((false, Field::with_name(output_type, "window_end"))),
109                ]
110                .into_iter(),
111            ).collect::<Result<Vec<_>>>()?;
112
113        let (_, table_name) = Self::resolve_schema_qualified_name(&self.db_name, table_name)?;
114        self.bind_table_to_context(columns, table_name, alias)?;
115
116        // Other arguments are validated in `plan_window_table_function`
117        let exprs: Vec<_> = args
118            .map(|arg| self.bind_function_arg(arg))
119            .flatten_ok()
120            .try_collect()?;
121        Ok(BoundWindowTableFunction {
122            input: base,
123            time_col: *time_col,
124            kind,
125            args: exprs,
126        })
127    }
128}