risingwave_frontend/expr/
window_function.rsuse itertools::Itertools;
use risingwave_common::bail_not_implemented;
use risingwave_common::types::DataType;
use risingwave_expr::aggregate::AggType;
use risingwave_expr::window_function::{Frame, WindowFuncKind};
use super::{Expr, ExprImpl, OrderBy, RwResult};
use crate::error::{ErrorCode, RwError};
use crate::expr::infer_type;
#[derive(Clone, Eq, PartialEq, Hash)]
pub struct WindowFunction {
pub kind: WindowFuncKind,
pub args: Vec<ExprImpl>,
pub return_type: DataType,
pub partition_by: Vec<ExprImpl>,
pub order_by: OrderBy,
pub frame: Option<Frame>,
}
impl WindowFunction {
pub fn new(
kind: WindowFuncKind,
partition_by: Vec<ExprImpl>,
order_by: OrderBy,
mut args: Vec<ExprImpl>,
frame: Option<Frame>,
) -> RwResult<Self> {
let return_type = Self::infer_return_type(&kind, &mut args)?;
Ok(Self {
kind,
args,
return_type,
partition_by,
order_by,
frame,
})
}
fn infer_return_type(kind: &WindowFuncKind, args: &mut [ExprImpl]) -> RwResult<DataType> {
use WindowFuncKind::*;
match (kind, args) {
(RowNumber, []) => Ok(DataType::Int64),
(Rank, []) => Ok(DataType::Int64),
(DenseRank, []) => Ok(DataType::Int64),
(Lag | Lead, [value]) => Ok(value.return_type()),
(Lag | Lead, [value, offset]) => {
if !offset.return_type().is_int() {
return Err(ErrorCode::InvalidInputSyntax(format!(
"the `offset` of `{kind}` function should be integer"
))
.into());
}
if !offset.is_const() {
bail_not_implemented!(
"non-const `offset` of `{kind}` function is not supported yet"
);
}
Ok(value.return_type())
}
(Lag | Lead, [_value, _offset, _default]) => {
bail_not_implemented!(
"`{kind}` window function with `default` argument is not supported yet"
);
}
(Aggregate(agg_type), args) => Ok(match agg_type {
AggType::Builtin(kind) => infer_type((*kind).into(), args)?,
AggType::UserDefined(udf) => udf.return_type.as_ref().unwrap().into(),
AggType::WrapScalar(expr) => expr.return_type.as_ref().unwrap().into(),
}),
(_, args) => {
let args = args
.iter()
.map(|e| format!("{}", e.return_type()))
.join(", ");
Err(RwError::from(ErrorCode::InvalidInputSyntax(format!(
"Invalid window function: {kind}({args})"
))))
}
}
}
}
impl std::fmt::Debug for WindowFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if f.alternate() {
let mut builder = f.debug_struct("WindowFunction");
builder
.field("kind", &self.kind)
.field("return_type", &self.return_type)
.field("args", &self.args)
.field("partition_by", &self.partition_by)
.field("order_by", &format_args!("{}", self.order_by));
if let Some(frame) = &self.frame {
builder.field("frame", &format_args!("{}", frame));
} else {
builder.field("frame", &"None".to_string());
}
builder.finish()
} else {
write!(f, "{}() OVER(", self.kind)?;
let mut delim = "";
if !self.partition_by.is_empty() {
delim = " ";
write!(
f,
"PARTITION BY {:?}",
self.partition_by.iter().format(", ")
)?;
}
if !self.order_by.sort_exprs.is_empty() {
write!(f, "{delim}{}", self.order_by)?;
}
if let Some(frame) = &self.frame {
write!(f, "{delim}{}", frame)?;
}
f.write_str(")")?;
Ok(())
}
}
}
impl Expr for WindowFunction {
fn return_type(&self) -> DataType {
self.return_type.clone()
}
fn to_expr_proto(&self) -> risingwave_pb::expr::ExprNode {
unreachable!("Window function should not be converted to ExprNode")
}
}