risingwave_frontend/expr/
window_function.rs1use itertools::Itertools;
16use risingwave_common::bail_not_implemented;
17use risingwave_common::types::DataType;
18use risingwave_expr::aggregate::AggType;
19use risingwave_expr::window_function::{Frame, WindowFuncKind};
20
21use super::{Expr, ExprImpl, OrderBy, RwResult};
22use crate::error::{ErrorCode, RwError};
23use crate::expr::infer_type;
24
25#[derive(Clone, Eq, PartialEq, Hash)]
32pub struct WindowFunction {
33 pub kind: WindowFuncKind,
34 pub return_type: DataType,
35 pub args: Vec<ExprImpl>,
36 pub ignore_nulls: bool,
37 pub partition_by: Vec<ExprImpl>,
38 pub order_by: OrderBy,
39 pub frame: Option<Frame>,
40}
41
42impl WindowFunction {
43 pub fn new(
46 kind: WindowFuncKind,
47 mut args: Vec<ExprImpl>,
48 ignore_nulls: bool,
49 partition_by: Vec<ExprImpl>,
50 order_by: OrderBy,
51 frame: Option<Frame>,
52 ) -> RwResult<Self> {
53 let return_type = Self::infer_return_type(&kind, &mut args)?;
54 Ok(Self {
55 kind,
56 return_type,
57 args,
58 ignore_nulls,
59 partition_by,
60 order_by,
61 frame,
62 })
63 }
64
65 fn infer_return_type(kind: &WindowFuncKind, args: &mut [ExprImpl]) -> RwResult<DataType> {
66 use WindowFuncKind::*;
67 match (kind, args) {
68 (RowNumber, []) => Ok(DataType::Int64),
69 (Rank, []) => Ok(DataType::Int64),
70 (DenseRank, []) => Ok(DataType::Int64),
71
72 (Lag | Lead, [value]) => Ok(value.return_type()),
73 (Lag | Lead, [value, offset]) => {
74 if !offset.return_type().is_int() {
75 return Err(ErrorCode::InvalidInputSyntax(format!(
76 "the `offset` of `{kind}` function should be integer"
77 ))
78 .into());
79 }
80 if !offset.is_const() {
81 bail_not_implemented!(
82 "non-const `offset` of `{kind}` function is not supported yet"
83 );
84 }
85 Ok(value.return_type())
86 }
87 (Lag | Lead, [_value, _offset, _default]) => {
88 bail_not_implemented!(
89 "`{kind}` window function with `default` argument is not supported yet"
90 );
91 }
92
93 (Aggregate(agg_type), args) => Ok(match agg_type {
94 AggType::Builtin(kind) => infer_type((*kind).into(), args)?,
95 AggType::UserDefined(udf) => udf.return_type.as_ref().unwrap().into(),
96 AggType::WrapScalar(expr) => expr.return_type.as_ref().unwrap().into(),
97 }),
98
99 (_, args) => {
100 let args = args
101 .iter()
102 .map(|e| format!("{}", e.return_type()))
103 .join(", ");
104 Err(RwError::from(ErrorCode::InvalidInputSyntax(format!(
105 "Invalid window function: {kind}({args})"
106 ))))
107 }
108 }
109 }
110}
111
112impl std::fmt::Debug for WindowFunction {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 if f.alternate() {
115 let mut builder = f.debug_struct("WindowFunction");
116 builder
117 .field("kind", &self.kind)
118 .field("return_type", &self.return_type)
119 .field("args", &self.args)
120 .field("ignore_nulls", &self.ignore_nulls)
121 .field("partition_by", &self.partition_by)
122 .field("order_by", &format_args!("{}", self.order_by));
123 if let Some(frame) = &self.frame {
124 builder.field("frame", &format_args!("{}", frame));
125 } else {
126 builder.field("frame", &"None".to_owned());
127 }
128 builder.finish()
129 } else {
130 write!(f, "{}() OVER(", self.kind)?;
131
132 let mut delim = "";
133 if !self.partition_by.is_empty() {
134 delim = " ";
135 write!(
136 f,
137 "PARTITION BY {:?}",
138 self.partition_by.iter().format(", ")
139 )?;
140 }
141 if !self.order_by.sort_exprs.is_empty() {
142 write!(f, "{delim}{}", self.order_by)?;
143 }
144 if let Some(frame) = &self.frame {
145 write!(f, "{delim}{}", frame)?;
146 }
147 f.write_str(")")?;
148
149 Ok(())
150 }
151 }
152}
153
154impl Expr for WindowFunction {
155 fn return_type(&self) -> DataType {
156 self.return_type.clone()
157 }
158
159 fn to_expr_proto(&self) -> risingwave_pb::expr::ExprNode {
160 unreachable!("Window function should not be converted to ExprNode")
161 }
162}