risingwave_frontend/binder/expr/function/
window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::types::{DataType, ScalarImpl, data_types};
17use risingwave_common::{bail_not_implemented, must_match};
18use risingwave_expr::aggregate::{AggType, PbAggKind};
19use risingwave_expr::window_function::{
20    Frame, FrameBound, FrameBounds, FrameExclusion, RangeFrameBounds, RangeFrameOffset,
21    RowsFrameBounds, SessionFrameBounds, SessionFrameGap, WindowFuncKind,
22};
23use risingwave_sqlparser::ast::{
24    self, Window, WindowFrameBound, WindowFrameBounds, WindowFrameExclusion, WindowFrameUnits,
25};
26
27use crate::Binder;
28use crate::binder::Clause;
29use crate::error::{ErrorCode, Result};
30use crate::expr::{Expr, ExprImpl, OrderBy, WindowFunction};
31
32impl Binder {
33    fn ensure_window_function_allowed(&self) -> Result<()> {
34        if let Some(clause) = self.context.clause {
35            match clause {
36                Clause::Where
37                | Clause::Values
38                | Clause::GroupBy
39                | Clause::Having
40                | Clause::Filter
41                | Clause::GeneratedColumn
42                | Clause::From
43                | Clause::Insert
44                | Clause::JoinOn => {
45                    return Err(ErrorCode::InvalidInputSyntax(format!(
46                        "window functions are not allowed in {}",
47                        clause
48                    ))
49                    .into());
50                }
51            }
52        }
53        Ok(())
54    }
55
56    /// Bind window function calls according to PostgreSQL syntax.
57    /// See <https://www.postgresql.org/docs/current/sql-expressions.html#SYNTAX-WINDOW-FUNCTIONS> for syntax detail.
58    pub(super) fn bind_window_function(
59        &mut self,
60        kind: WindowFuncKind,
61        args: Vec<ExprImpl>,
62        ignore_nulls: bool,
63        filter: Option<Box<ast::Expr>>,
64        window: Window,
65    ) -> Result<ExprImpl> {
66        self.ensure_window_function_allowed()?;
67
68        // Resolve the window definition to a concrete window specification
69        let window = match window {
70            Window::Spec(spec) => spec,
71            Window::Name(window_name) => {
72                // Look up the named window definition in the bind context
73                let window_name_str = window_name.real_value();
74
75                if let Some(named_window_spec) = self.context.named_windows.get(&window_name_str) {
76                    named_window_spec.clone()
77                } else {
78                    return Err(ErrorCode::InvalidInputSyntax(format!(
79                        "Window '{}' is not defined. Please ensure the window is defined in the WINDOW clause.",
80                        window_name_str
81                    )).into());
82                }
83            }
84        };
85
86        if ignore_nulls {
87            match &kind {
88                WindowFuncKind::Aggregate(AggType::Builtin(
89                    PbAggKind::FirstValue | PbAggKind::LastValue,
90                )) => {
91                    // pass
92                }
93                WindowFuncKind::Lag | WindowFuncKind::Lead => {
94                    bail_not_implemented!("`IGNORE NULLS` is not supported for `{}` yet", kind);
95                }
96                _ => {
97                    return Err(ErrorCode::InvalidInputSyntax(format!(
98                        "`IGNORE NULLS` is not allowed for `{}`",
99                        kind
100                    ))
101                    .into());
102                }
103            }
104        }
105
106        if filter.is_some() {
107            bail_not_implemented!("`FILTER` is not supported yet");
108        }
109
110        let partition_by = window
111            .partition_by
112            .into_iter()
113            .map(|arg| self.bind_expr_inner(arg))
114            .try_collect()?;
115        let order_by = OrderBy::new(
116            window
117                .order_by
118                .into_iter()
119                .map(|order_by_expr| self.bind_order_by_expr(order_by_expr))
120                .collect::<Result<_>>()?,
121        );
122        let frame = if let Some(frame) = window.window_frame {
123            let exclusion = if let Some(exclusion) = frame.exclusion {
124                match exclusion {
125                    WindowFrameExclusion::CurrentRow => FrameExclusion::CurrentRow,
126                    WindowFrameExclusion::Group | WindowFrameExclusion::Ties => {
127                        bail_not_implemented!(
128                            issue = 9124,
129                            "window frame exclusion `{}` is not supported yet",
130                            exclusion
131                        );
132                    }
133                    WindowFrameExclusion::NoOthers => FrameExclusion::NoOthers,
134                }
135            } else {
136                FrameExclusion::NoOthers
137            };
138            let bounds = match frame.units {
139                WindowFrameUnits::Rows => {
140                    let (start, end) = must_match!(frame.bounds, WindowFrameBounds::Bounds { start, end } => (start, end));
141                    let (start, end) = self.bind_window_frame_usize_bounds(start, end)?;
142                    FrameBounds::Rows(RowsFrameBounds { start, end })
143                }
144                unit @ (WindowFrameUnits::Range | WindowFrameUnits::Session) => {
145                    let order_by_expr = order_by
146                        .sort_exprs
147                        .iter()
148                        // for `RANGE | SESSION` frame, there should be exactly one `ORDER BY` column
149                        .exactly_one()
150                        .map_err(|_| {
151                            ErrorCode::InvalidInputSyntax(format!(
152                                "there should be exactly one ordering column for `{}` frame",
153                                unit
154                            ))
155                        })?;
156                    let order_data_type = order_by_expr.expr.return_type();
157                    let order_type = order_by_expr.order_type;
158
159                    let offset_data_type = match &order_data_type {
160                        // for numeric ordering columns, `offset`/`gap` should be the same type
161                        // NOTE: actually in PG it can be a larger type, but we don't support this here
162                        t @ data_types::range_frame_numeric!() => t.clone(),
163                        // for datetime ordering columns, `offset`/`gap` should be interval
164                        t @ data_types::range_frame_datetime!() => {
165                            if matches!(t, DataType::Date | DataType::Time) {
166                                bail_not_implemented!(
167                                    "`{}` frame with offset of type `{}` is not implemented yet, please manually cast the `ORDER BY` column to `timestamp`",
168                                    unit,
169                                    t
170                                );
171                            }
172                            DataType::Interval
173                        }
174                        // other types are not supported
175                        t => {
176                            return Err(ErrorCode::NotSupported(
177                                format!(
178                                    "`{}` frame with offset of type `{}` is not supported",
179                                    unit, t
180                                ),
181                                "Please re-consider the `ORDER BY` column".to_owned(),
182                            )
183                            .into());
184                        }
185                    };
186
187                    if unit == WindowFrameUnits::Range {
188                        let (start, end) = must_match!(frame.bounds, WindowFrameBounds::Bounds { start, end } => (start, end));
189                        let (start, end) = self.bind_window_frame_scalar_impl_bounds(
190                            start,
191                            end,
192                            &offset_data_type,
193                        )?;
194                        FrameBounds::Range(RangeFrameBounds {
195                            order_data_type,
196                            order_type,
197                            offset_data_type,
198                            start: start.map(RangeFrameOffset::new),
199                            end: end.map(RangeFrameOffset::new),
200                        })
201                    } else {
202                        let gap = must_match!(frame.bounds, WindowFrameBounds::Gap(gap) => gap);
203                        let gap_value =
204                            self.bind_window_frame_bound_offset(*gap, offset_data_type.clone())?;
205                        FrameBounds::Session(SessionFrameBounds {
206                            order_data_type,
207                            order_type,
208                            gap_data_type: offset_data_type,
209                            gap: SessionFrameGap::new(gap_value),
210                        })
211                    }
212                }
213                WindowFrameUnits::Groups => {
214                    bail_not_implemented!(
215                        issue = 9124,
216                        "window frame in `GROUPS` mode is not supported yet",
217                    );
218                }
219            };
220
221            // Validate the frame bounds, may return `ExprError` to user if the bounds given are not valid.
222            bounds.validate()?;
223
224            Some(Frame { bounds, exclusion })
225        } else {
226            None
227        };
228        Ok(WindowFunction::new(kind, args, ignore_nulls, partition_by, order_by, frame)?.into())
229    }
230
231    fn bind_window_frame_usize_bounds(
232        &mut self,
233        start: WindowFrameBound,
234        end: Option<WindowFrameBound>,
235    ) -> Result<(FrameBound<usize>, FrameBound<usize>)> {
236        let mut convert_offset = |offset: Box<ast::Expr>| -> Result<usize> {
237            let offset = self
238                .bind_window_frame_bound_offset(*offset, DataType::Int64)?
239                .into_int64();
240            if offset < 0 {
241                return Err(ErrorCode::InvalidInputSyntax(
242                    "offset in window frame bounds must be non-negative".to_owned(),
243                )
244                .into());
245            }
246            Ok(offset as usize)
247        };
248        let mut convert_bound = |bound| -> Result<FrameBound<usize>> {
249            Ok(match bound {
250                WindowFrameBound::CurrentRow => FrameBound::CurrentRow,
251                WindowFrameBound::Preceding(None) => FrameBound::UnboundedPreceding,
252                WindowFrameBound::Preceding(Some(offset)) => {
253                    FrameBound::Preceding(convert_offset(offset)?)
254                }
255                WindowFrameBound::Following(None) => FrameBound::UnboundedFollowing,
256                WindowFrameBound::Following(Some(offset)) => {
257                    FrameBound::Following(convert_offset(offset)?)
258                }
259            })
260        };
261        let start = convert_bound(start)?;
262        let end = if let Some(end_bound) = end {
263            convert_bound(end_bound)?
264        } else {
265            FrameBound::CurrentRow
266        };
267        Ok((start, end))
268    }
269
270    fn bind_window_frame_scalar_impl_bounds(
271        &mut self,
272        start: WindowFrameBound,
273        end: Option<WindowFrameBound>,
274        offset_data_type: &DataType,
275    ) -> Result<(FrameBound<ScalarImpl>, FrameBound<ScalarImpl>)> {
276        let mut convert_bound = |bound| -> Result<FrameBound<_>> {
277            Ok(match bound {
278                WindowFrameBound::CurrentRow => FrameBound::CurrentRow,
279                WindowFrameBound::Preceding(None) => FrameBound::UnboundedPreceding,
280                WindowFrameBound::Preceding(Some(offset)) => FrameBound::Preceding(
281                    self.bind_window_frame_bound_offset(*offset, offset_data_type.clone())?,
282                ),
283                WindowFrameBound::Following(None) => FrameBound::UnboundedFollowing,
284                WindowFrameBound::Following(Some(offset)) => FrameBound::Following(
285                    self.bind_window_frame_bound_offset(*offset, offset_data_type.clone())?,
286                ),
287            })
288        };
289        let start = convert_bound(start)?;
290        let end = if let Some(end_bound) = end {
291            convert_bound(end_bound)?
292        } else {
293            FrameBound::CurrentRow
294        };
295        Ok((start, end))
296    }
297
298    fn bind_window_frame_bound_offset(
299        &mut self,
300        offset: ast::Expr,
301        cast_to: DataType,
302    ) -> Result<ScalarImpl> {
303        let mut offset = self.bind_expr(offset)?;
304        if !offset.is_const() {
305            return Err(ErrorCode::InvalidInputSyntax(
306                "offset/gap in window frame bounds must be constant".to_owned(),
307            )
308            .into());
309        }
310        if offset.cast_implicit_mut(cast_to.clone()).is_err() {
311            return Err(ErrorCode::InvalidInputSyntax(format!(
312                "offset/gap in window frame bounds must be castable to {}",
313                cast_to
314            ))
315            .into());
316        }
317        let offset = offset.fold_const()?;
318        let Some(offset) = offset else {
319            return Err(ErrorCode::InvalidInputSyntax(
320                "offset/gap in window frame bounds must not be NULL".to_owned(),
321            )
322            .into());
323        };
324        Ok(offset)
325    }
326}