Skip to main content

risingwave_frontend/binder/expr/function/
window.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::types::{DataType, ScalarImpl, data_types};
17use risingwave_common::{bail_not_implemented, must_match};
18use risingwave_expr::aggregate::{AggType, PbAggKind};
19use risingwave_expr::window_function::{
20    Frame, FrameBound, FrameBounds, FrameExclusion, RangeFrameBounds, RangeFrameOffset,
21    RowsFrameBounds, SessionFrameBounds, SessionFrameGap, WindowFuncKind,
22};
23use risingwave_sqlparser::ast::{
24    self, Window, WindowFrameBound, WindowFrameBounds, WindowFrameExclusion, WindowFrameUnits,
25};
26
27use crate::Binder;
28use crate::binder::Clause;
29use crate::error::{ErrorCode, Result};
30use crate::expr::{Expr, ExprImpl, OrderBy, WindowFunction};
31
32impl Binder {
33    fn ensure_window_function_allowed(&self) -> Result<()> {
34        if let Some(clause) = self.context.clause {
35            match clause {
36                Clause::Where
37                | Clause::Values
38                | Clause::GroupBy
39                | Clause::Having
40                | Clause::Filter
41                | Clause::GeneratedColumn
42                | Clause::From
43                | Clause::Insert
44                | Clause::JoinOn => {
45                    return Err(ErrorCode::InvalidInputSyntax(format!(
46                        "window functions are not allowed in {}",
47                        clause
48                    ))
49                    .into());
50                }
51            }
52        }
53        Ok(())
54    }
55
56    /// Bind window function calls according to PostgreSQL syntax.
57    /// See <https://www.postgresql.org/docs/current/sql-expressions.html#SYNTAX-WINDOW-FUNCTIONS> for syntax detail.
58    pub(super) fn bind_window_function(
59        &mut self,
60        kind: WindowFuncKind,
61        args: Vec<ExprImpl>,
62        ignore_nulls: bool,
63        filter: Option<&ast::Expr>,
64        window: &Window,
65    ) -> Result<ExprImpl> {
66        self.ensure_window_function_allowed()?;
67
68        // Resolve the window definition to a concrete window specification
69        let window = match window {
70            Window::Spec(spec) => spec.clone(),
71            Window::Name(window_name) => {
72                // Look up the named window definition in the bind context
73                let window_name_str = window_name.real_value();
74
75                if let Some(named_window_spec) = self.context.named_windows.get(&window_name_str) {
76                    named_window_spec.clone()
77                } else {
78                    return Err(ErrorCode::InvalidInputSyntax(format!(
79                        "Window '{}' is not defined. Please ensure the window is defined in the WINDOW clause.",
80                        window_name_str
81                    )).into());
82                }
83            }
84        };
85
86        if ignore_nulls {
87            match &kind {
88                WindowFuncKind::Aggregate(AggType::Builtin(
89                    PbAggKind::FirstValue | PbAggKind::LastValue,
90                )) => {
91                    // pass
92                }
93                WindowFuncKind::Lag | WindowFuncKind::Lead => {
94                    bail_not_implemented!("`IGNORE NULLS` is not supported for `{}` yet", kind);
95                }
96                _ => {
97                    return Err(ErrorCode::InvalidInputSyntax(format!(
98                        "`IGNORE NULLS` is not allowed for `{}`",
99                        kind
100                    ))
101                    .into());
102                }
103            }
104        }
105
106        if filter.is_some() {
107            bail_not_implemented!("`FILTER` is not supported yet");
108        }
109
110        let partition_by = window
111            .partition_by
112            .iter()
113            .map(|arg| self.bind_expr_inner(arg))
114            .try_collect()?;
115        let order_by = OrderBy::new(
116            window
117                .order_by
118                .iter()
119                .map(|order_by_expr| self.bind_order_by_expr(order_by_expr))
120                .collect::<Result<_>>()?,
121        );
122        let frame = if let Some(frame) = &window.window_frame {
123            let exclusion = if let Some(exclusion) = frame.exclusion {
124                match exclusion {
125                    WindowFrameExclusion::CurrentRow => FrameExclusion::CurrentRow,
126                    WindowFrameExclusion::Group | WindowFrameExclusion::Ties => {
127                        bail_not_implemented!(
128                            issue = 9124,
129                            "window frame exclusion `{}` is not supported yet",
130                            exclusion
131                        );
132                    }
133                    WindowFrameExclusion::NoOthers => FrameExclusion::NoOthers,
134                }
135            } else {
136                FrameExclusion::NoOthers
137            };
138            let bounds = match &frame.units {
139                WindowFrameUnits::Rows => {
140                    let (start, end) = must_match!(&frame.bounds, WindowFrameBounds::Bounds { start, end } => (start, end));
141                    let (start, end) = self.bind_window_frame_usize_bounds(start, end.as_ref())?;
142                    FrameBounds::Rows(RowsFrameBounds { start, end })
143                }
144                unit @ (WindowFrameUnits::Range | WindowFrameUnits::Session) => {
145                    // for `RANGE | SESSION` frame, there should be exactly one `ORDER BY` column
146                    let order_by_expr = Itertools::exactly_one(order_by.sort_exprs.iter())
147                        .map_err(|_| {
148                            ErrorCode::InvalidInputSyntax(format!(
149                                "there should be exactly one ordering column for `{}` frame",
150                                unit
151                            ))
152                        })?;
153                    let order_data_type = order_by_expr.expr.return_type();
154                    let order_type = order_by_expr.order_type;
155
156                    let offset_data_type = match &order_data_type {
157                        // for numeric ordering columns, `offset`/`gap` should be the same type
158                        // NOTE: actually in PG it can be a larger type, but we don't support this here
159                        t @ data_types::range_frame_numeric!() => t.clone(),
160                        // for datetime ordering columns, `offset`/`gap` should be interval
161                        t @ data_types::range_frame_datetime!() => {
162                            if matches!(t, DataType::Date | DataType::Time) {
163                                bail_not_implemented!(
164                                    "`{}` frame with offset of type `{}` is not implemented yet, please manually cast the `ORDER BY` column to `timestamp`",
165                                    unit,
166                                    t
167                                );
168                            }
169                            DataType::Interval
170                        }
171                        // other types are not supported
172                        t => {
173                            return Err(ErrorCode::NotSupported(
174                                format!(
175                                    "`{}` frame with offset of type `{}` is not supported",
176                                    unit, t
177                                ),
178                                "Please re-consider the `ORDER BY` column".to_owned(),
179                            )
180                            .into());
181                        }
182                    };
183
184                    if unit == &WindowFrameUnits::Range {
185                        let (start, end) = must_match!(&frame.bounds, WindowFrameBounds::Bounds { start, end } => (start, end));
186                        let (start, end) = self.bind_window_frame_scalar_impl_bounds(
187                            start,
188                            end.as_ref(),
189                            &offset_data_type,
190                        )?;
191                        FrameBounds::Range(RangeFrameBounds {
192                            order_data_type,
193                            order_type,
194                            offset_data_type,
195                            start: start.map(RangeFrameOffset::new),
196                            end: end.map(RangeFrameOffset::new),
197                        })
198                    } else {
199                        let gap = must_match!(&frame.bounds, WindowFrameBounds::Gap(gap) => gap);
200                        let gap_value =
201                            self.bind_window_frame_bound_offset(gap, &offset_data_type)?;
202                        FrameBounds::Session(SessionFrameBounds {
203                            order_data_type,
204                            order_type,
205                            gap_data_type: offset_data_type,
206                            gap: SessionFrameGap::new(gap_value),
207                        })
208                    }
209                }
210                WindowFrameUnits::Groups => {
211                    bail_not_implemented!(
212                        issue = 9124,
213                        "window frame in `GROUPS` mode is not supported yet",
214                    );
215                }
216            };
217
218            // Validate the frame bounds, may return `ExprError` to user if the bounds given are not valid.
219            bounds.validate()?;
220
221            Some(Frame { bounds, exclusion })
222        } else {
223            None
224        };
225        Ok(WindowFunction::new(kind, args, ignore_nulls, partition_by, order_by, frame)?.into())
226    }
227
228    fn bind_window_frame_usize_bounds(
229        &mut self,
230        start: &WindowFrameBound,
231        end: Option<&WindowFrameBound>,
232    ) -> Result<(FrameBound<usize>, FrameBound<usize>)> {
233        let mut convert_offset = |offset: &ast::Expr| -> Result<usize> {
234            let offset = self
235                .bind_window_frame_bound_offset(offset, &DataType::Int64)?
236                .into_int64();
237            if offset < 0 {
238                return Err(ErrorCode::InvalidInputSyntax(
239                    "offset in window frame bounds must be non-negative".to_owned(),
240                )
241                .into());
242            }
243            Ok(offset as usize)
244        };
245        let mut convert_bound = |bound: &WindowFrameBound| -> Result<FrameBound<usize>> {
246            Ok(match bound {
247                WindowFrameBound::CurrentRow => FrameBound::CurrentRow,
248                WindowFrameBound::Preceding(None) => FrameBound::UnboundedPreceding,
249                WindowFrameBound::Preceding(Some(offset)) => {
250                    FrameBound::Preceding(convert_offset(offset.as_ref())?)
251                }
252                WindowFrameBound::Following(None) => FrameBound::UnboundedFollowing,
253                WindowFrameBound::Following(Some(offset)) => {
254                    FrameBound::Following(convert_offset(offset)?)
255                }
256            })
257        };
258        let start = convert_bound(start)?;
259        let end = if let Some(end_bound) = end {
260            convert_bound(end_bound)?
261        } else {
262            FrameBound::CurrentRow
263        };
264        Ok((start, end))
265    }
266
267    fn bind_window_frame_scalar_impl_bounds(
268        &mut self,
269        start: &WindowFrameBound,
270        end: Option<&WindowFrameBound>,
271        offset_data_type: &DataType,
272    ) -> Result<(FrameBound<ScalarImpl>, FrameBound<ScalarImpl>)> {
273        let mut convert_bound = |bound: &WindowFrameBound| -> Result<FrameBound<_>> {
274            Ok(match bound {
275                WindowFrameBound::CurrentRow => FrameBound::CurrentRow,
276                WindowFrameBound::Preceding(None) => FrameBound::UnboundedPreceding,
277                WindowFrameBound::Preceding(Some(offset)) => FrameBound::Preceding(
278                    self.bind_window_frame_bound_offset(offset, offset_data_type)?,
279                ),
280                WindowFrameBound::Following(None) => FrameBound::UnboundedFollowing,
281                WindowFrameBound::Following(Some(offset)) => FrameBound::Following(
282                    self.bind_window_frame_bound_offset(offset, offset_data_type)?,
283                ),
284            })
285        };
286        let start = convert_bound(start)?;
287        let end = if let Some(end_bound) = end {
288            convert_bound(end_bound)?
289        } else {
290            FrameBound::CurrentRow
291        };
292        Ok((start, end))
293    }
294
295    fn bind_window_frame_bound_offset(
296        &mut self,
297        offset: &ast::Expr,
298        cast_to: &DataType,
299    ) -> Result<ScalarImpl> {
300        let mut offset = self.bind_expr(offset)?;
301        if !offset.is_const() {
302            return Err(ErrorCode::InvalidInputSyntax(
303                "offset/gap in window frame bounds must be constant".to_owned(),
304            )
305            .into());
306        }
307        if offset.cast_implicit_mut(cast_to).is_err() {
308            return Err(ErrorCode::InvalidInputSyntax(format!(
309                "offset/gap in window frame bounds must be castable to {}",
310                cast_to
311            ))
312            .into());
313        }
314        let offset = offset.fold_const()?;
315        let Some(offset) = offset else {
316            return Err(ErrorCode::InvalidInputSyntax(
317                "offset/gap in window frame bounds must not be NULL".to_owned(),
318            )
319            .into());
320        };
321        Ok(offset)
322    }
323}