risingwave_frontend/binder/expr/function/
window.rs1use itertools::Itertools;
16use risingwave_common::types::{DataType, ScalarImpl, data_types};
17use risingwave_common::{bail_not_implemented, must_match};
18use risingwave_expr::aggregate::{AggType, PbAggKind};
19use risingwave_expr::window_function::{
20 Frame, FrameBound, FrameBounds, FrameExclusion, RangeFrameBounds, RangeFrameOffset,
21 RowsFrameBounds, SessionFrameBounds, SessionFrameGap, WindowFuncKind,
22};
23use risingwave_sqlparser::ast::{
24 self, WindowFrameBound, WindowFrameBounds, WindowFrameExclusion, WindowFrameUnits, WindowSpec,
25};
26
27use crate::Binder;
28use crate::binder::Clause;
29use crate::error::{ErrorCode, Result};
30use crate::expr::{Expr, ExprImpl, OrderBy, WindowFunction};
31
32impl Binder {
33 fn ensure_window_function_allowed(&self) -> Result<()> {
34 if let Some(clause) = self.context.clause {
35 match clause {
36 Clause::Where
37 | Clause::Values
38 | Clause::GroupBy
39 | Clause::Having
40 | Clause::Filter
41 | Clause::GeneratedColumn
42 | Clause::From
43 | Clause::Insert
44 | Clause::JoinOn => {
45 return Err(ErrorCode::InvalidInputSyntax(format!(
46 "window functions are not allowed in {}",
47 clause
48 ))
49 .into());
50 }
51 }
52 }
53 Ok(())
54 }
55
56 pub(super) fn bind_window_function(
59 &mut self,
60 kind: WindowFuncKind,
61 args: Vec<ExprImpl>,
62 ignore_nulls: bool,
63 filter: Option<Box<ast::Expr>>,
64 WindowSpec {
65 partition_by,
66 order_by,
67 window_frame,
68 }: WindowSpec,
69 ) -> Result<ExprImpl> {
70 self.ensure_window_function_allowed()?;
71
72 if ignore_nulls {
73 match &kind {
74 WindowFuncKind::Aggregate(AggType::Builtin(
75 PbAggKind::FirstValue | PbAggKind::LastValue,
76 )) => {
77 }
79 WindowFuncKind::Lag | WindowFuncKind::Lead => {
80 bail_not_implemented!("`IGNORE NULLS` is not supported for `{}` yet", kind);
81 }
82 _ => {
83 return Err(ErrorCode::InvalidInputSyntax(format!(
84 "`IGNORE NULLS` is not allowed for `{}`",
85 kind
86 ))
87 .into());
88 }
89 }
90 }
91
92 if filter.is_some() {
93 bail_not_implemented!("`FILTER` is not supported yet");
94 }
95
96 let partition_by = partition_by
97 .into_iter()
98 .map(|arg| self.bind_expr_inner(arg))
99 .try_collect()?;
100 let order_by = OrderBy::new(
101 order_by
102 .into_iter()
103 .map(|order_by_expr| self.bind_order_by_expr(order_by_expr))
104 .collect::<Result<_>>()?,
105 );
106 let frame = if let Some(frame) = window_frame {
107 let exclusion = if let Some(exclusion) = frame.exclusion {
108 match exclusion {
109 WindowFrameExclusion::CurrentRow => FrameExclusion::CurrentRow,
110 WindowFrameExclusion::Group | WindowFrameExclusion::Ties => {
111 bail_not_implemented!(
112 issue = 9124,
113 "window frame exclusion `{}` is not supported yet",
114 exclusion
115 );
116 }
117 WindowFrameExclusion::NoOthers => FrameExclusion::NoOthers,
118 }
119 } else {
120 FrameExclusion::NoOthers
121 };
122 let bounds = match frame.units {
123 WindowFrameUnits::Rows => {
124 let (start, end) = must_match!(frame.bounds, WindowFrameBounds::Bounds { start, end } => (start, end));
125 let (start, end) = self.bind_window_frame_usize_bounds(start, end)?;
126 FrameBounds::Rows(RowsFrameBounds { start, end })
127 }
128 unit @ (WindowFrameUnits::Range | WindowFrameUnits::Session) => {
129 let order_by_expr = order_by
130 .sort_exprs
131 .iter()
132 .exactly_one()
134 .map_err(|_| {
135 ErrorCode::InvalidInputSyntax(format!(
136 "there should be exactly one ordering column for `{}` frame",
137 unit
138 ))
139 })?;
140 let order_data_type = order_by_expr.expr.return_type();
141 let order_type = order_by_expr.order_type;
142
143 let offset_data_type = match &order_data_type {
144 t @ data_types::range_frame_numeric!() => t.clone(),
147 t @ data_types::range_frame_datetime!() => {
149 if matches!(t, DataType::Date | DataType::Time) {
150 bail_not_implemented!(
151 "`{}` frame with offset of type `{}` is not implemented yet, please manually cast the `ORDER BY` column to `timestamp`",
152 unit,
153 t
154 );
155 }
156 DataType::Interval
157 }
158 t => {
160 return Err(ErrorCode::NotSupported(
161 format!(
162 "`{}` frame with offset of type `{}` is not supported",
163 unit, t
164 ),
165 "Please re-consider the `ORDER BY` column".to_owned(),
166 )
167 .into());
168 }
169 };
170
171 if unit == WindowFrameUnits::Range {
172 let (start, end) = must_match!(frame.bounds, WindowFrameBounds::Bounds { start, end } => (start, end));
173 let (start, end) = self.bind_window_frame_scalar_impl_bounds(
174 start,
175 end,
176 &offset_data_type,
177 )?;
178 FrameBounds::Range(RangeFrameBounds {
179 order_data_type,
180 order_type,
181 offset_data_type,
182 start: start.map(RangeFrameOffset::new),
183 end: end.map(RangeFrameOffset::new),
184 })
185 } else {
186 let gap = must_match!(frame.bounds, WindowFrameBounds::Gap(gap) => gap);
187 let gap_value =
188 self.bind_window_frame_bound_offset(*gap, offset_data_type.clone())?;
189 FrameBounds::Session(SessionFrameBounds {
190 order_data_type,
191 order_type,
192 gap_data_type: offset_data_type,
193 gap: SessionFrameGap::new(gap_value),
194 })
195 }
196 }
197 WindowFrameUnits::Groups => {
198 bail_not_implemented!(
199 issue = 9124,
200 "window frame in `GROUPS` mode is not supported yet",
201 );
202 }
203 };
204
205 bounds.validate()?;
207
208 Some(Frame { bounds, exclusion })
209 } else {
210 None
211 };
212 Ok(WindowFunction::new(kind, args, ignore_nulls, partition_by, order_by, frame)?.into())
213 }
214
215 fn bind_window_frame_usize_bounds(
216 &mut self,
217 start: WindowFrameBound,
218 end: Option<WindowFrameBound>,
219 ) -> Result<(FrameBound<usize>, FrameBound<usize>)> {
220 let mut convert_offset = |offset: Box<ast::Expr>| -> Result<usize> {
221 let offset = self
222 .bind_window_frame_bound_offset(*offset, DataType::Int64)?
223 .into_int64();
224 if offset < 0 {
225 return Err(ErrorCode::InvalidInputSyntax(
226 "offset in window frame bounds must be non-negative".to_owned(),
227 )
228 .into());
229 }
230 Ok(offset as usize)
231 };
232 let mut convert_bound = |bound| -> Result<FrameBound<usize>> {
233 Ok(match bound {
234 WindowFrameBound::CurrentRow => FrameBound::CurrentRow,
235 WindowFrameBound::Preceding(None) => FrameBound::UnboundedPreceding,
236 WindowFrameBound::Preceding(Some(offset)) => {
237 FrameBound::Preceding(convert_offset(offset)?)
238 }
239 WindowFrameBound::Following(None) => FrameBound::UnboundedFollowing,
240 WindowFrameBound::Following(Some(offset)) => {
241 FrameBound::Following(convert_offset(offset)?)
242 }
243 })
244 };
245 let start = convert_bound(start)?;
246 let end = if let Some(end_bound) = end {
247 convert_bound(end_bound)?
248 } else {
249 FrameBound::CurrentRow
250 };
251 Ok((start, end))
252 }
253
254 fn bind_window_frame_scalar_impl_bounds(
255 &mut self,
256 start: WindowFrameBound,
257 end: Option<WindowFrameBound>,
258 offset_data_type: &DataType,
259 ) -> Result<(FrameBound<ScalarImpl>, FrameBound<ScalarImpl>)> {
260 let mut convert_bound = |bound| -> Result<FrameBound<_>> {
261 Ok(match bound {
262 WindowFrameBound::CurrentRow => FrameBound::CurrentRow,
263 WindowFrameBound::Preceding(None) => FrameBound::UnboundedPreceding,
264 WindowFrameBound::Preceding(Some(offset)) => FrameBound::Preceding(
265 self.bind_window_frame_bound_offset(*offset, offset_data_type.clone())?,
266 ),
267 WindowFrameBound::Following(None) => FrameBound::UnboundedFollowing,
268 WindowFrameBound::Following(Some(offset)) => FrameBound::Following(
269 self.bind_window_frame_bound_offset(*offset, offset_data_type.clone())?,
270 ),
271 })
272 };
273 let start = convert_bound(start)?;
274 let end = if let Some(end_bound) = end {
275 convert_bound(end_bound)?
276 } else {
277 FrameBound::CurrentRow
278 };
279 Ok((start, end))
280 }
281
282 fn bind_window_frame_bound_offset(
283 &mut self,
284 offset: ast::Expr,
285 cast_to: DataType,
286 ) -> Result<ScalarImpl> {
287 let mut offset = self.bind_expr(offset)?;
288 if !offset.is_const() {
289 return Err(ErrorCode::InvalidInputSyntax(
290 "offset/gap in window frame bounds must be constant".to_owned(),
291 )
292 .into());
293 }
294 if offset.cast_implicit_mut(cast_to.clone()).is_err() {
295 return Err(ErrorCode::InvalidInputSyntax(format!(
296 "offset/gap in window frame bounds must be castable to {}",
297 cast_to
298 ))
299 .into());
300 }
301 let offset = offset.fold_const()?;
302 let Some(offset) = offset else {
303 return Err(ErrorCode::InvalidInputSyntax(
304 "offset/gap in window frame bounds must not be NULL".to_owned(),
305 )
306 .into());
307 };
308 Ok(offset)
309 }
310}