risingwave_frontend/binder/expr/function/
window.rs1use itertools::Itertools;
16use risingwave_common::types::{DataType, ScalarImpl, data_types};
17use risingwave_common::{bail_not_implemented, must_match};
18use risingwave_expr::aggregate::{AggType, PbAggKind};
19use risingwave_expr::window_function::{
20 Frame, FrameBound, FrameBounds, FrameExclusion, RangeFrameBounds, RangeFrameOffset,
21 RowsFrameBounds, SessionFrameBounds, SessionFrameGap, WindowFuncKind,
22};
23use risingwave_sqlparser::ast::{
24 self, Window, WindowFrameBound, WindowFrameBounds, WindowFrameExclusion, WindowFrameUnits,
25};
26
27use crate::Binder;
28use crate::binder::Clause;
29use crate::error::{ErrorCode, Result};
30use crate::expr::{Expr, ExprImpl, OrderBy, WindowFunction};
31
32impl Binder {
33 fn ensure_window_function_allowed(&self) -> Result<()> {
34 if let Some(clause) = self.context.clause {
35 match clause {
36 Clause::Where
37 | Clause::Values
38 | Clause::GroupBy
39 | Clause::Having
40 | Clause::Filter
41 | Clause::GeneratedColumn
42 | Clause::From
43 | Clause::Insert
44 | Clause::JoinOn => {
45 return Err(ErrorCode::InvalidInputSyntax(format!(
46 "window functions are not allowed in {}",
47 clause
48 ))
49 .into());
50 }
51 }
52 }
53 Ok(())
54 }
55
56 pub(super) fn bind_window_function(
59 &mut self,
60 kind: WindowFuncKind,
61 args: Vec<ExprImpl>,
62 ignore_nulls: bool,
63 filter: Option<Box<ast::Expr>>,
64 window: Window,
65 ) -> Result<ExprImpl> {
66 self.ensure_window_function_allowed()?;
67
68 let window = match window {
70 Window::Spec(spec) => spec,
71 Window::Name(window_name) => {
72 let window_name_str = window_name.real_value();
74
75 if let Some(named_window_spec) = self.context.named_windows.get(&window_name_str) {
76 named_window_spec.clone()
77 } else {
78 return Err(ErrorCode::InvalidInputSyntax(format!(
79 "Window '{}' is not defined. Please ensure the window is defined in the WINDOW clause.",
80 window_name_str
81 )).into());
82 }
83 }
84 };
85
86 if ignore_nulls {
87 match &kind {
88 WindowFuncKind::Aggregate(AggType::Builtin(
89 PbAggKind::FirstValue | PbAggKind::LastValue,
90 )) => {
91 }
93 WindowFuncKind::Lag | WindowFuncKind::Lead => {
94 bail_not_implemented!("`IGNORE NULLS` is not supported for `{}` yet", kind);
95 }
96 _ => {
97 return Err(ErrorCode::InvalidInputSyntax(format!(
98 "`IGNORE NULLS` is not allowed for `{}`",
99 kind
100 ))
101 .into());
102 }
103 }
104 }
105
106 if filter.is_some() {
107 bail_not_implemented!("`FILTER` is not supported yet");
108 }
109
110 let partition_by = window
111 .partition_by
112 .into_iter()
113 .map(|arg| self.bind_expr_inner(arg))
114 .try_collect()?;
115 let order_by = OrderBy::new(
116 window
117 .order_by
118 .into_iter()
119 .map(|order_by_expr| self.bind_order_by_expr(order_by_expr))
120 .collect::<Result<_>>()?,
121 );
122 let frame = if let Some(frame) = window.window_frame {
123 let exclusion = if let Some(exclusion) = frame.exclusion {
124 match exclusion {
125 WindowFrameExclusion::CurrentRow => FrameExclusion::CurrentRow,
126 WindowFrameExclusion::Group | WindowFrameExclusion::Ties => {
127 bail_not_implemented!(
128 issue = 9124,
129 "window frame exclusion `{}` is not supported yet",
130 exclusion
131 );
132 }
133 WindowFrameExclusion::NoOthers => FrameExclusion::NoOthers,
134 }
135 } else {
136 FrameExclusion::NoOthers
137 };
138 let bounds = match frame.units {
139 WindowFrameUnits::Rows => {
140 let (start, end) = must_match!(frame.bounds, WindowFrameBounds::Bounds { start, end } => (start, end));
141 let (start, end) = self.bind_window_frame_usize_bounds(start, end)?;
142 FrameBounds::Rows(RowsFrameBounds { start, end })
143 }
144 unit @ (WindowFrameUnits::Range | WindowFrameUnits::Session) => {
145 let order_by_expr = order_by
146 .sort_exprs
147 .iter()
148 .exactly_one()
150 .map_err(|_| {
151 ErrorCode::InvalidInputSyntax(format!(
152 "there should be exactly one ordering column for `{}` frame",
153 unit
154 ))
155 })?;
156 let order_data_type = order_by_expr.expr.return_type();
157 let order_type = order_by_expr.order_type;
158
159 let offset_data_type = match &order_data_type {
160 t @ data_types::range_frame_numeric!() => t.clone(),
163 t @ data_types::range_frame_datetime!() => {
165 if matches!(t, DataType::Date | DataType::Time) {
166 bail_not_implemented!(
167 "`{}` frame with offset of type `{}` is not implemented yet, please manually cast the `ORDER BY` column to `timestamp`",
168 unit,
169 t
170 );
171 }
172 DataType::Interval
173 }
174 t => {
176 return Err(ErrorCode::NotSupported(
177 format!(
178 "`{}` frame with offset of type `{}` is not supported",
179 unit, t
180 ),
181 "Please re-consider the `ORDER BY` column".to_owned(),
182 )
183 .into());
184 }
185 };
186
187 if unit == WindowFrameUnits::Range {
188 let (start, end) = must_match!(frame.bounds, WindowFrameBounds::Bounds { start, end } => (start, end));
189 let (start, end) = self.bind_window_frame_scalar_impl_bounds(
190 start,
191 end,
192 &offset_data_type,
193 )?;
194 FrameBounds::Range(RangeFrameBounds {
195 order_data_type,
196 order_type,
197 offset_data_type,
198 start: start.map(RangeFrameOffset::new),
199 end: end.map(RangeFrameOffset::new),
200 })
201 } else {
202 let gap = must_match!(frame.bounds, WindowFrameBounds::Gap(gap) => gap);
203 let gap_value =
204 self.bind_window_frame_bound_offset(*gap, offset_data_type.clone())?;
205 FrameBounds::Session(SessionFrameBounds {
206 order_data_type,
207 order_type,
208 gap_data_type: offset_data_type,
209 gap: SessionFrameGap::new(gap_value),
210 })
211 }
212 }
213 WindowFrameUnits::Groups => {
214 bail_not_implemented!(
215 issue = 9124,
216 "window frame in `GROUPS` mode is not supported yet",
217 );
218 }
219 };
220
221 bounds.validate()?;
223
224 Some(Frame { bounds, exclusion })
225 } else {
226 None
227 };
228 Ok(WindowFunction::new(kind, args, ignore_nulls, partition_by, order_by, frame)?.into())
229 }
230
231 fn bind_window_frame_usize_bounds(
232 &mut self,
233 start: WindowFrameBound,
234 end: Option<WindowFrameBound>,
235 ) -> Result<(FrameBound<usize>, FrameBound<usize>)> {
236 let mut convert_offset = |offset: Box<ast::Expr>| -> Result<usize> {
237 let offset = self
238 .bind_window_frame_bound_offset(*offset, DataType::Int64)?
239 .into_int64();
240 if offset < 0 {
241 return Err(ErrorCode::InvalidInputSyntax(
242 "offset in window frame bounds must be non-negative".to_owned(),
243 )
244 .into());
245 }
246 Ok(offset as usize)
247 };
248 let mut convert_bound = |bound| -> Result<FrameBound<usize>> {
249 Ok(match bound {
250 WindowFrameBound::CurrentRow => FrameBound::CurrentRow,
251 WindowFrameBound::Preceding(None) => FrameBound::UnboundedPreceding,
252 WindowFrameBound::Preceding(Some(offset)) => {
253 FrameBound::Preceding(convert_offset(offset)?)
254 }
255 WindowFrameBound::Following(None) => FrameBound::UnboundedFollowing,
256 WindowFrameBound::Following(Some(offset)) => {
257 FrameBound::Following(convert_offset(offset)?)
258 }
259 })
260 };
261 let start = convert_bound(start)?;
262 let end = if let Some(end_bound) = end {
263 convert_bound(end_bound)?
264 } else {
265 FrameBound::CurrentRow
266 };
267 Ok((start, end))
268 }
269
270 fn bind_window_frame_scalar_impl_bounds(
271 &mut self,
272 start: WindowFrameBound,
273 end: Option<WindowFrameBound>,
274 offset_data_type: &DataType,
275 ) -> Result<(FrameBound<ScalarImpl>, FrameBound<ScalarImpl>)> {
276 let mut convert_bound = |bound| -> Result<FrameBound<_>> {
277 Ok(match bound {
278 WindowFrameBound::CurrentRow => FrameBound::CurrentRow,
279 WindowFrameBound::Preceding(None) => FrameBound::UnboundedPreceding,
280 WindowFrameBound::Preceding(Some(offset)) => FrameBound::Preceding(
281 self.bind_window_frame_bound_offset(*offset, offset_data_type.clone())?,
282 ),
283 WindowFrameBound::Following(None) => FrameBound::UnboundedFollowing,
284 WindowFrameBound::Following(Some(offset)) => FrameBound::Following(
285 self.bind_window_frame_bound_offset(*offset, offset_data_type.clone())?,
286 ),
287 })
288 };
289 let start = convert_bound(start)?;
290 let end = if let Some(end_bound) = end {
291 convert_bound(end_bound)?
292 } else {
293 FrameBound::CurrentRow
294 };
295 Ok((start, end))
296 }
297
298 fn bind_window_frame_bound_offset(
299 &mut self,
300 offset: ast::Expr,
301 cast_to: DataType,
302 ) -> Result<ScalarImpl> {
303 let mut offset = self.bind_expr(offset)?;
304 if !offset.is_const() {
305 return Err(ErrorCode::InvalidInputSyntax(
306 "offset/gap in window frame bounds must be constant".to_owned(),
307 )
308 .into());
309 }
310 if offset.cast_implicit_mut(cast_to.clone()).is_err() {
311 return Err(ErrorCode::InvalidInputSyntax(format!(
312 "offset/gap in window frame bounds must be castable to {}",
313 cast_to
314 ))
315 .into());
316 }
317 let offset = offset.fold_const()?;
318 let Some(offset) = offset else {
319 return Err(ErrorCode::InvalidInputSyntax(
320 "offset/gap in window frame bounds must not be NULL".to_owned(),
321 )
322 .into());
323 };
324 Ok(offset)
325 }
326}