risingwave_frontend/binder/expr/function/
window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::types::{DataType, ScalarImpl, data_types};
17use risingwave_common::{bail_not_implemented, must_match};
18use risingwave_expr::aggregate::{AggType, PbAggKind};
19use risingwave_expr::window_function::{
20    Frame, FrameBound, FrameBounds, FrameExclusion, RangeFrameBounds, RangeFrameOffset,
21    RowsFrameBounds, SessionFrameBounds, SessionFrameGap, WindowFuncKind,
22};
23use risingwave_sqlparser::ast::{
24    self, WindowFrameBound, WindowFrameBounds, WindowFrameExclusion, WindowFrameUnits, WindowSpec,
25};
26
27use crate::Binder;
28use crate::binder::Clause;
29use crate::error::{ErrorCode, Result};
30use crate::expr::{Expr, ExprImpl, OrderBy, WindowFunction};
31
32impl Binder {
33    fn ensure_window_function_allowed(&self) -> Result<()> {
34        if let Some(clause) = self.context.clause {
35            match clause {
36                Clause::Where
37                | Clause::Values
38                | Clause::GroupBy
39                | Clause::Having
40                | Clause::Filter
41                | Clause::GeneratedColumn
42                | Clause::From
43                | Clause::Insert
44                | Clause::JoinOn => {
45                    return Err(ErrorCode::InvalidInputSyntax(format!(
46                        "window functions are not allowed in {}",
47                        clause
48                    ))
49                    .into());
50                }
51            }
52        }
53        Ok(())
54    }
55
56    /// Bind window function calls according to PostgreSQL syntax.
57    /// See <https://www.postgresql.org/docs/current/sql-expressions.html#SYNTAX-WINDOW-FUNCTIONS> for syntax detail.
58    pub(super) fn bind_window_function(
59        &mut self,
60        kind: WindowFuncKind,
61        args: Vec<ExprImpl>,
62        ignore_nulls: bool,
63        filter: Option<Box<ast::Expr>>,
64        WindowSpec {
65            partition_by,
66            order_by,
67            window_frame,
68        }: WindowSpec,
69    ) -> Result<ExprImpl> {
70        self.ensure_window_function_allowed()?;
71
72        if ignore_nulls {
73            match &kind {
74                WindowFuncKind::Aggregate(AggType::Builtin(
75                    PbAggKind::FirstValue | PbAggKind::LastValue,
76                )) => {
77                    // pass
78                }
79                WindowFuncKind::Lag | WindowFuncKind::Lead => {
80                    bail_not_implemented!("`IGNORE NULLS` is not supported for `{}` yet", kind);
81                }
82                _ => {
83                    return Err(ErrorCode::InvalidInputSyntax(format!(
84                        "`IGNORE NULLS` is not allowed for `{}`",
85                        kind
86                    ))
87                    .into());
88                }
89            }
90        }
91
92        if filter.is_some() {
93            bail_not_implemented!("`FILTER` is not supported yet");
94        }
95
96        let partition_by = partition_by
97            .into_iter()
98            .map(|arg| self.bind_expr_inner(arg))
99            .try_collect()?;
100        let order_by = OrderBy::new(
101            order_by
102                .into_iter()
103                .map(|order_by_expr| self.bind_order_by_expr(order_by_expr))
104                .collect::<Result<_>>()?,
105        );
106        let frame = if let Some(frame) = window_frame {
107            let exclusion = if let Some(exclusion) = frame.exclusion {
108                match exclusion {
109                    WindowFrameExclusion::CurrentRow => FrameExclusion::CurrentRow,
110                    WindowFrameExclusion::Group | WindowFrameExclusion::Ties => {
111                        bail_not_implemented!(
112                            issue = 9124,
113                            "window frame exclusion `{}` is not supported yet",
114                            exclusion
115                        );
116                    }
117                    WindowFrameExclusion::NoOthers => FrameExclusion::NoOthers,
118                }
119            } else {
120                FrameExclusion::NoOthers
121            };
122            let bounds = match frame.units {
123                WindowFrameUnits::Rows => {
124                    let (start, end) = must_match!(frame.bounds, WindowFrameBounds::Bounds { start, end } => (start, end));
125                    let (start, end) = self.bind_window_frame_usize_bounds(start, end)?;
126                    FrameBounds::Rows(RowsFrameBounds { start, end })
127                }
128                unit @ (WindowFrameUnits::Range | WindowFrameUnits::Session) => {
129                    let order_by_expr = order_by
130                        .sort_exprs
131                        .iter()
132                        // for `RANGE | SESSION` frame, there should be exactly one `ORDER BY` column
133                        .exactly_one()
134                        .map_err(|_| {
135                            ErrorCode::InvalidInputSyntax(format!(
136                                "there should be exactly one ordering column for `{}` frame",
137                                unit
138                            ))
139                        })?;
140                    let order_data_type = order_by_expr.expr.return_type();
141                    let order_type = order_by_expr.order_type;
142
143                    let offset_data_type = match &order_data_type {
144                        // for numeric ordering columns, `offset`/`gap` should be the same type
145                        // NOTE: actually in PG it can be a larger type, but we don't support this here
146                        t @ data_types::range_frame_numeric!() => t.clone(),
147                        // for datetime ordering columns, `offset`/`gap` should be interval
148                        t @ data_types::range_frame_datetime!() => {
149                            if matches!(t, DataType::Date | DataType::Time) {
150                                bail_not_implemented!(
151                                    "`{}` frame with offset of type `{}` is not implemented yet, please manually cast the `ORDER BY` column to `timestamp`",
152                                    unit,
153                                    t
154                                );
155                            }
156                            DataType::Interval
157                        }
158                        // other types are not supported
159                        t => {
160                            return Err(ErrorCode::NotSupported(
161                                format!(
162                                    "`{}` frame with offset of type `{}` is not supported",
163                                    unit, t
164                                ),
165                                "Please re-consider the `ORDER BY` column".to_owned(),
166                            )
167                            .into());
168                        }
169                    };
170
171                    if unit == WindowFrameUnits::Range {
172                        let (start, end) = must_match!(frame.bounds, WindowFrameBounds::Bounds { start, end } => (start, end));
173                        let (start, end) = self.bind_window_frame_scalar_impl_bounds(
174                            start,
175                            end,
176                            &offset_data_type,
177                        )?;
178                        FrameBounds::Range(RangeFrameBounds {
179                            order_data_type,
180                            order_type,
181                            offset_data_type,
182                            start: start.map(RangeFrameOffset::new),
183                            end: end.map(RangeFrameOffset::new),
184                        })
185                    } else {
186                        let gap = must_match!(frame.bounds, WindowFrameBounds::Gap(gap) => gap);
187                        let gap_value =
188                            self.bind_window_frame_bound_offset(*gap, offset_data_type.clone())?;
189                        FrameBounds::Session(SessionFrameBounds {
190                            order_data_type,
191                            order_type,
192                            gap_data_type: offset_data_type,
193                            gap: SessionFrameGap::new(gap_value),
194                        })
195                    }
196                }
197                WindowFrameUnits::Groups => {
198                    bail_not_implemented!(
199                        issue = 9124,
200                        "window frame in `GROUPS` mode is not supported yet",
201                    );
202                }
203            };
204
205            // Validate the frame bounds, may return `ExprError` to user if the bounds given are not valid.
206            bounds.validate()?;
207
208            Some(Frame { bounds, exclusion })
209        } else {
210            None
211        };
212        Ok(WindowFunction::new(kind, args, ignore_nulls, partition_by, order_by, frame)?.into())
213    }
214
215    fn bind_window_frame_usize_bounds(
216        &mut self,
217        start: WindowFrameBound,
218        end: Option<WindowFrameBound>,
219    ) -> Result<(FrameBound<usize>, FrameBound<usize>)> {
220        let mut convert_offset = |offset: Box<ast::Expr>| -> Result<usize> {
221            let offset = self
222                .bind_window_frame_bound_offset(*offset, DataType::Int64)?
223                .into_int64();
224            if offset < 0 {
225                return Err(ErrorCode::InvalidInputSyntax(
226                    "offset in window frame bounds must be non-negative".to_owned(),
227                )
228                .into());
229            }
230            Ok(offset as usize)
231        };
232        let mut convert_bound = |bound| -> Result<FrameBound<usize>> {
233            Ok(match bound {
234                WindowFrameBound::CurrentRow => FrameBound::CurrentRow,
235                WindowFrameBound::Preceding(None) => FrameBound::UnboundedPreceding,
236                WindowFrameBound::Preceding(Some(offset)) => {
237                    FrameBound::Preceding(convert_offset(offset)?)
238                }
239                WindowFrameBound::Following(None) => FrameBound::UnboundedFollowing,
240                WindowFrameBound::Following(Some(offset)) => {
241                    FrameBound::Following(convert_offset(offset)?)
242                }
243            })
244        };
245        let start = convert_bound(start)?;
246        let end = if let Some(end_bound) = end {
247            convert_bound(end_bound)?
248        } else {
249            FrameBound::CurrentRow
250        };
251        Ok((start, end))
252    }
253
254    fn bind_window_frame_scalar_impl_bounds(
255        &mut self,
256        start: WindowFrameBound,
257        end: Option<WindowFrameBound>,
258        offset_data_type: &DataType,
259    ) -> Result<(FrameBound<ScalarImpl>, FrameBound<ScalarImpl>)> {
260        let mut convert_bound = |bound| -> Result<FrameBound<_>> {
261            Ok(match bound {
262                WindowFrameBound::CurrentRow => FrameBound::CurrentRow,
263                WindowFrameBound::Preceding(None) => FrameBound::UnboundedPreceding,
264                WindowFrameBound::Preceding(Some(offset)) => FrameBound::Preceding(
265                    self.bind_window_frame_bound_offset(*offset, offset_data_type.clone())?,
266                ),
267                WindowFrameBound::Following(None) => FrameBound::UnboundedFollowing,
268                WindowFrameBound::Following(Some(offset)) => FrameBound::Following(
269                    self.bind_window_frame_bound_offset(*offset, offset_data_type.clone())?,
270                ),
271            })
272        };
273        let start = convert_bound(start)?;
274        let end = if let Some(end_bound) = end {
275            convert_bound(end_bound)?
276        } else {
277            FrameBound::CurrentRow
278        };
279        Ok((start, end))
280    }
281
282    fn bind_window_frame_bound_offset(
283        &mut self,
284        offset: ast::Expr,
285        cast_to: DataType,
286    ) -> Result<ScalarImpl> {
287        let mut offset = self.bind_expr(offset)?;
288        if !offset.is_const() {
289            return Err(ErrorCode::InvalidInputSyntax(
290                "offset/gap in window frame bounds must be constant".to_owned(),
291            )
292            .into());
293        }
294        if offset.cast_implicit_mut(cast_to.clone()).is_err() {
295            return Err(ErrorCode::InvalidInputSyntax(format!(
296                "offset/gap in window frame bounds must be castable to {}",
297                cast_to
298            ))
299            .into());
300        }
301        let offset = offset.fold_const()?;
302        let Some(offset) = offset else {
303            return Err(ErrorCode::InvalidInputSyntax(
304                "offset/gap in window frame bounds must not be NULL".to_owned(),
305            )
306            .into());
307        };
308        Ok(offset)
309    }
310}