watermark.rsuse itertools::Itertools;
use risingwave_common::types::DataType;
use risingwave_sqlparser::ast::{FunctionArg, TableAlias};
use super::{Binder, Relation, Result};
use crate::binder::statement::RewriteExprsRecursive;
use crate::error::ErrorCode;
use crate::expr::{ExprImpl, InputRef};
const ERROR_1ST_ARG: &str = "The 1st arg of watermark function should be a table name (incl. source, CTE, view) but not complex structure (subquery, join, another table function). Consider using an intermediate CTE or view as workaround.";
const ERROR_2ND_ARG_EXPR: &str = "The 2nd arg of watermark function should be a column name but not complex expression. Consider using an intermediate CTE or view as workaround.";
const ERROR_2ND_ARG_TYPE: &str = "The 2nd arg of watermark function should be a column of type timestamp with time zone, timestamp or date.";
#[derive(Debug, Clone)]
pub struct BoundWatermark {
pub(crate) input: Relation,
pub(crate) time_col: InputRef,
pub(crate) args: Vec<ExprImpl>,
impl RewriteExprsRecursive for BoundWatermark {
fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
let new_agrs = std::mem::take(&mut self.args)
.map(|expr| rewriter.rewrite_expr(expr))
self.args = new_agrs;
pub(super) fn is_watermark_func(func_name: &str) -> bool {
impl Binder {
pub(super) fn bind_watermark(
&mut self,
alias: Option<TableAlias>,
args: Vec<FunctionArg>,
) -> Result<BoundWatermark> {
let mut args = args.into_iter();
let (base, table_name) = self.bind_relation_by_function_arg(args.next(), ERROR_1ST_ARG)?;
let time_col = self.bind_column_by_function_args(args.next(), ERROR_2ND_ARG_EXPR)?;
if DataType::window_of(&time_col.data_type).is_none() {
return Err(ErrorCode::BindError(ERROR_2ND_ARG_TYPE.to_owned()).into());
let base_columns = std::mem::take(&mut self.context.columns);
let columns = base_columns
.map(|c| (c.is_hidden, c.field))
let (_, table_name) = Self::resolve_schema_qualified_name(&self.db_name, table_name)?;
self.bind_table_to_context(columns, table_name, alias)?;
let exprs: Vec<_> = args
.map(|arg| self.bind_function_arg(arg))
Ok(BoundWatermark {
input: base,
time_col: *time_col,
args: exprs,