risingwave_frontend/binder/expr/function/
window.rs1use itertools::Itertools;
16use risingwave_common::types::{DataType, ScalarImpl, data_types};
17use risingwave_common::{bail_not_implemented, must_match};
18use risingwave_expr::aggregate::{AggType, PbAggKind};
19use risingwave_expr::window_function::{
20 Frame, FrameBound, FrameBounds, FrameExclusion, RangeFrameBounds, RangeFrameOffset,
21 RowsFrameBounds, SessionFrameBounds, SessionFrameGap, WindowFuncKind,
22};
23use risingwave_sqlparser::ast::{
24 self, Window, WindowFrameBound, WindowFrameBounds, WindowFrameExclusion, WindowFrameUnits,
25};
26
27use crate::Binder;
28use crate::binder::Clause;
29use crate::error::{ErrorCode, Result};
30use crate::expr::{Expr, ExprImpl, OrderBy, WindowFunction};
31
32impl Binder {
33 fn ensure_window_function_allowed(&self) -> Result<()> {
34 if let Some(clause) = self.context.clause {
35 match clause {
36 Clause::Where
37 | Clause::Values
38 | Clause::GroupBy
39 | Clause::Having
40 | Clause::Filter
41 | Clause::GeneratedColumn
42 | Clause::From
43 | Clause::Insert
44 | Clause::JoinOn => {
45 return Err(ErrorCode::InvalidInputSyntax(format!(
46 "window functions are not allowed in {}",
47 clause
48 ))
49 .into());
50 }
51 }
52 }
53 Ok(())
54 }
55
56 pub(super) fn bind_window_function(
59 &mut self,
60 kind: WindowFuncKind,
61 args: Vec<ExprImpl>,
62 ignore_nulls: bool,
63 filter: Option<&ast::Expr>,
64 window: &Window,
65 ) -> Result<ExprImpl> {
66 self.ensure_window_function_allowed()?;
67
68 let window = match window {
70 Window::Spec(spec) => spec.clone(),
71 Window::Name(window_name) => {
72 let window_name_str = window_name.real_value();
74
75 if let Some(named_window_spec) = self.context.named_windows.get(&window_name_str) {
76 named_window_spec.clone()
77 } else {
78 return Err(ErrorCode::InvalidInputSyntax(format!(
79 "Window '{}' is not defined. Please ensure the window is defined in the WINDOW clause.",
80 window_name_str
81 )).into());
82 }
83 }
84 };
85
86 if ignore_nulls {
87 match &kind {
88 WindowFuncKind::Aggregate(AggType::Builtin(
89 PbAggKind::FirstValue | PbAggKind::LastValue,
90 )) => {
91 }
93 WindowFuncKind::Lag | WindowFuncKind::Lead => {
94 bail_not_implemented!("`IGNORE NULLS` is not supported for `{}` yet", kind);
95 }
96 _ => {
97 return Err(ErrorCode::InvalidInputSyntax(format!(
98 "`IGNORE NULLS` is not allowed for `{}`",
99 kind
100 ))
101 .into());
102 }
103 }
104 }
105
106 if filter.is_some() {
107 bail_not_implemented!("`FILTER` is not supported yet");
108 }
109
110 let partition_by = window
111 .partition_by
112 .iter()
113 .map(|arg| self.bind_expr_inner(arg))
114 .try_collect()?;
115 let order_by = OrderBy::new(
116 window
117 .order_by
118 .iter()
119 .map(|order_by_expr| self.bind_order_by_expr(order_by_expr))
120 .collect::<Result<_>>()?,
121 );
122 let frame = if let Some(frame) = &window.window_frame {
123 let exclusion = if let Some(exclusion) = frame.exclusion {
124 match exclusion {
125 WindowFrameExclusion::CurrentRow => FrameExclusion::CurrentRow,
126 WindowFrameExclusion::Group | WindowFrameExclusion::Ties => {
127 bail_not_implemented!(
128 issue = 9124,
129 "window frame exclusion `{}` is not supported yet",
130 exclusion
131 );
132 }
133 WindowFrameExclusion::NoOthers => FrameExclusion::NoOthers,
134 }
135 } else {
136 FrameExclusion::NoOthers
137 };
138 let bounds = match &frame.units {
139 WindowFrameUnits::Rows => {
140 let (start, end) = must_match!(&frame.bounds, WindowFrameBounds::Bounds { start, end } => (start, end));
141 let (start, end) = self.bind_window_frame_usize_bounds(start, end.as_ref())?;
142 FrameBounds::Rows(RowsFrameBounds { start, end })
143 }
144 unit @ (WindowFrameUnits::Range | WindowFrameUnits::Session) => {
145 let order_by_expr = Itertools::exactly_one(order_by.sort_exprs.iter())
147 .map_err(|_| {
148 ErrorCode::InvalidInputSyntax(format!(
149 "there should be exactly one ordering column for `{}` frame",
150 unit
151 ))
152 })?;
153 let order_data_type = order_by_expr.expr.return_type();
154 let order_type = order_by_expr.order_type;
155
156 let offset_data_type = match &order_data_type {
157 t @ data_types::range_frame_numeric!() => t.clone(),
160 t @ data_types::range_frame_datetime!() => {
162 if matches!(t, DataType::Date | DataType::Time) {
163 bail_not_implemented!(
164 "`{}` frame with offset of type `{}` is not implemented yet, please manually cast the `ORDER BY` column to `timestamp`",
165 unit,
166 t
167 );
168 }
169 DataType::Interval
170 }
171 t => {
173 return Err(ErrorCode::NotSupported(
174 format!(
175 "`{}` frame with offset of type `{}` is not supported",
176 unit, t
177 ),
178 "Please re-consider the `ORDER BY` column".to_owned(),
179 )
180 .into());
181 }
182 };
183
184 if unit == &WindowFrameUnits::Range {
185 let (start, end) = must_match!(&frame.bounds, WindowFrameBounds::Bounds { start, end } => (start, end));
186 let (start, end) = self.bind_window_frame_scalar_impl_bounds(
187 start,
188 end.as_ref(),
189 &offset_data_type,
190 )?;
191 FrameBounds::Range(RangeFrameBounds {
192 order_data_type,
193 order_type,
194 offset_data_type,
195 start: start.map(RangeFrameOffset::new),
196 end: end.map(RangeFrameOffset::new),
197 })
198 } else {
199 let gap = must_match!(&frame.bounds, WindowFrameBounds::Gap(gap) => gap);
200 let gap_value =
201 self.bind_window_frame_bound_offset(gap, &offset_data_type)?;
202 FrameBounds::Session(SessionFrameBounds {
203 order_data_type,
204 order_type,
205 gap_data_type: offset_data_type,
206 gap: SessionFrameGap::new(gap_value),
207 })
208 }
209 }
210 WindowFrameUnits::Groups => {
211 bail_not_implemented!(
212 issue = 9124,
213 "window frame in `GROUPS` mode is not supported yet",
214 );
215 }
216 };
217
218 bounds.validate()?;
220
221 Some(Frame { bounds, exclusion })
222 } else {
223 None
224 };
225 Ok(WindowFunction::new(kind, args, ignore_nulls, partition_by, order_by, frame)?.into())
226 }
227
228 fn bind_window_frame_usize_bounds(
229 &mut self,
230 start: &WindowFrameBound,
231 end: Option<&WindowFrameBound>,
232 ) -> Result<(FrameBound<usize>, FrameBound<usize>)> {
233 let mut convert_offset = |offset: &ast::Expr| -> Result<usize> {
234 let offset = self
235 .bind_window_frame_bound_offset(offset, &DataType::Int64)?
236 .into_int64();
237 if offset < 0 {
238 return Err(ErrorCode::InvalidInputSyntax(
239 "offset in window frame bounds must be non-negative".to_owned(),
240 )
241 .into());
242 }
243 Ok(offset as usize)
244 };
245 let mut convert_bound = |bound: &WindowFrameBound| -> Result<FrameBound<usize>> {
246 Ok(match bound {
247 WindowFrameBound::CurrentRow => FrameBound::CurrentRow,
248 WindowFrameBound::Preceding(None) => FrameBound::UnboundedPreceding,
249 WindowFrameBound::Preceding(Some(offset)) => {
250 FrameBound::Preceding(convert_offset(offset.as_ref())?)
251 }
252 WindowFrameBound::Following(None) => FrameBound::UnboundedFollowing,
253 WindowFrameBound::Following(Some(offset)) => {
254 FrameBound::Following(convert_offset(offset)?)
255 }
256 })
257 };
258 let start = convert_bound(start)?;
259 let end = if let Some(end_bound) = end {
260 convert_bound(end_bound)?
261 } else {
262 FrameBound::CurrentRow
263 };
264 Ok((start, end))
265 }
266
267 fn bind_window_frame_scalar_impl_bounds(
268 &mut self,
269 start: &WindowFrameBound,
270 end: Option<&WindowFrameBound>,
271 offset_data_type: &DataType,
272 ) -> Result<(FrameBound<ScalarImpl>, FrameBound<ScalarImpl>)> {
273 let mut convert_bound = |bound: &WindowFrameBound| -> Result<FrameBound<_>> {
274 Ok(match bound {
275 WindowFrameBound::CurrentRow => FrameBound::CurrentRow,
276 WindowFrameBound::Preceding(None) => FrameBound::UnboundedPreceding,
277 WindowFrameBound::Preceding(Some(offset)) => FrameBound::Preceding(
278 self.bind_window_frame_bound_offset(offset, offset_data_type)?,
279 ),
280 WindowFrameBound::Following(None) => FrameBound::UnboundedFollowing,
281 WindowFrameBound::Following(Some(offset)) => FrameBound::Following(
282 self.bind_window_frame_bound_offset(offset, offset_data_type)?,
283 ),
284 })
285 };
286 let start = convert_bound(start)?;
287 let end = if let Some(end_bound) = end {
288 convert_bound(end_bound)?
289 } else {
290 FrameBound::CurrentRow
291 };
292 Ok((start, end))
293 }
294
295 fn bind_window_frame_bound_offset(
296 &mut self,
297 offset: &ast::Expr,
298 cast_to: &DataType,
299 ) -> Result<ScalarImpl> {
300 let mut offset = self.bind_expr(offset)?;
301 if !offset.is_const() {
302 return Err(ErrorCode::InvalidInputSyntax(
303 "offset/gap in window frame bounds must be constant".to_owned(),
304 )
305 .into());
306 }
307 if offset.cast_implicit_mut(cast_to).is_err() {
308 return Err(ErrorCode::InvalidInputSyntax(format!(
309 "offset/gap in window frame bounds must be castable to {}",
310 cast_to
311 ))
312 .into());
313 }
314 let offset = offset.fold_const()?;
315 let Some(offset) = offset else {
316 return Err(ErrorCode::InvalidInputSyntax(
317 "offset/gap in window frame bounds must not be NULL".to_owned(),
318 )
319 .into());
320 };
321 Ok(offset)
322 }
323}