risingwave_frontend/binder/relation/
watermark.rs1use itertools::Itertools;
16use risingwave_common::types::DataType;
17use risingwave_sqlparser::ast::{FunctionArg, TableAlias};
18
19use super::{Binder, Relation, Result};
20use crate::binder::statement::RewriteExprsRecursive;
21use crate::error::ErrorCode;
22use crate::expr::{ExprImpl, InputRef};
23
24const ERROR_1ST_ARG: &str = "The 1st arg of watermark function should be a table name (incl. source, CTE, view) but not complex structure (subquery, join, another table function). Consider using an intermediate CTE or view as workaround.";
25const ERROR_2ND_ARG_EXPR: &str = "The 2nd arg of watermark function should be a column name but not complex expression. Consider using an intermediate CTE or view as workaround.";
26const ERROR_2ND_ARG_TYPE: &str = "The 2nd arg of watermark function should be a column of type timestamp with time zone, timestamp or date.";
27
28#[derive(Debug, Clone)]
29#[expect(dead_code)]
30pub struct BoundWatermark {
31 pub(crate) input: Relation,
32 pub(crate) time_col: InputRef,
33 pub(crate) args: Vec<ExprImpl>,
34}
35
36impl RewriteExprsRecursive for BoundWatermark {
37 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
38 self.input.rewrite_exprs_recursive(rewriter);
39 let new_agrs = std::mem::take(&mut self.args)
40 .into_iter()
41 .map(|expr| rewriter.rewrite_expr(expr))
42 .collect::<Vec<_>>();
43 self.args = new_agrs;
44 }
45}
46
47pub(super) fn is_watermark_func(func_name: &str) -> bool {
48 func_name.eq_ignore_ascii_case("watermark")
49}
50
51impl Binder {
52 pub(super) fn bind_watermark(
53 &mut self,
54 alias: Option<TableAlias>,
55 args: Vec<FunctionArg>,
56 ) -> Result<BoundWatermark> {
57 let mut args = args.into_iter();
58
59 self.push_context();
60
61 let (base, table_name) = self.bind_relation_by_function_arg(args.next(), ERROR_1ST_ARG)?;
62
63 let time_col = self.bind_column_by_function_args(args.next(), ERROR_2ND_ARG_EXPR)?;
64
65 if DataType::window_of(&time_col.data_type).is_none() {
66 return Err(ErrorCode::BindError(ERROR_2ND_ARG_TYPE.to_owned()).into());
67 };
68
69 let base_columns = std::mem::take(&mut self.context.columns);
70
71 self.pop_context()?;
72
73 let columns = base_columns
74 .into_iter()
75 .map(|c| (c.is_hidden, c.field))
76 .collect_vec();
77
78 let (_, table_name) = Self::resolve_schema_qualified_name(&self.db_name, table_name)?;
79 self.bind_table_to_context(columns, table_name, alias)?;
80
81 let exprs: Vec<_> = args
83 .map(|arg| self.bind_function_arg(arg))
84 .flatten_ok()
85 .try_collect()?;
86 Ok(BoundWatermark {
87 input: base,
88 time_col: *time_col,
89 args: exprs,
90 })
91 }
92}