risingwave_frontend/expr/
window_function.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::bail_not_implemented;
17use risingwave_common::types::DataType;
18use risingwave_expr::aggregate::AggType;
19use risingwave_expr::window_function::{Frame, WindowFuncKind};
20
21use super::{Expr, ExprImpl, OrderBy, RwResult};
22use crate::error::{ErrorCode, RwError};
23use crate::expr::infer_type;
24
25/// A window function performs a calculation across a set of table rows that are somehow related to
26/// the current row, according to the window spec `OVER (PARTITION BY .. ORDER BY ..)`.
27/// One output row is calculated for each row in the input table.
28///
29/// Window functions are permitted only in the `SELECT` list and the `ORDER BY` clause of the query.
30/// They are forbidden elsewhere, such as in `GROUP BY`, `HAVING` and `WHERE` clauses.
31#[derive(Clone, Eq, PartialEq, Hash)]
32pub struct WindowFunction {
33    pub kind: WindowFuncKind,
34    pub return_type: DataType,
35    pub args: Vec<ExprImpl>,
36    pub ignore_nulls: bool,
37    pub partition_by: Vec<ExprImpl>,
38    pub order_by: OrderBy,
39    pub frame: Option<Frame>,
40}
41
42impl WindowFunction {
43    /// Create a `WindowFunction` expr with the return type inferred from `func_type` and types of
44    /// `inputs`.
45    pub fn new(
46        kind: WindowFuncKind,
47        mut args: Vec<ExprImpl>,
48        ignore_nulls: bool,
49        partition_by: Vec<ExprImpl>,
50        order_by: OrderBy,
51        frame: Option<Frame>,
52    ) -> RwResult<Self> {
53        let return_type = Self::infer_return_type(&kind, &mut args)?;
54        Ok(Self {
55            kind,
56            return_type,
57            args,
58            ignore_nulls,
59            partition_by,
60            order_by,
61            frame,
62        })
63    }
64
65    fn infer_return_type(kind: &WindowFuncKind, args: &mut [ExprImpl]) -> RwResult<DataType> {
66        use WindowFuncKind::*;
67        match (kind, args) {
68            (RowNumber, []) => Ok(DataType::Int64),
69            (Rank, []) => Ok(DataType::Int64),
70            (DenseRank, []) => Ok(DataType::Int64),
71
72            (Lag | Lead, [value]) => Ok(value.return_type()),
73            (Lag | Lead, [value, offset]) => {
74                if !offset.return_type().is_int() {
75                    return Err(ErrorCode::InvalidInputSyntax(format!(
76                        "the `offset` of `{kind}` function should be integer"
77                    ))
78                    .into());
79                }
80                if !offset.is_const() {
81                    bail_not_implemented!(
82                        "non-const `offset` of `{kind}` function is not supported yet"
83                    );
84                }
85                Ok(value.return_type())
86            }
87            (Lag | Lead, [_value, _offset, _default]) => {
88                bail_not_implemented!(
89                    "`{kind}` window function with `default` argument is not supported yet"
90                );
91            }
92
93            (Aggregate(agg_type), args) => Ok(match agg_type {
94                AggType::Builtin(kind) => infer_type((*kind).into(), args)?,
95                AggType::UserDefined(udf) => udf.return_type.as_ref().unwrap().into(),
96                AggType::WrapScalar(expr) => expr.return_type.as_ref().unwrap().into(),
97            }),
98
99            (_, args) => {
100                let args = args
101                    .iter()
102                    .map(|e| format!("{}", e.return_type()))
103                    .join(", ");
104                Err(RwError::from(ErrorCode::InvalidInputSyntax(format!(
105                    "Invalid window function: {kind}({args})"
106                ))))
107            }
108        }
109    }
110}
111
112impl std::fmt::Debug for WindowFunction {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        if f.alternate() {
115            let mut builder = f.debug_struct("WindowFunction");
116            builder
117                .field("kind", &self.kind)
118                .field("return_type", &self.return_type)
119                .field("args", &self.args)
120                .field("ignore_nulls", &self.ignore_nulls)
121                .field("partition_by", &self.partition_by)
122                .field("order_by", &format_args!("{}", self.order_by));
123            if let Some(frame) = &self.frame {
124                builder.field("frame", &format_args!("{}", frame));
125            } else {
126                builder.field("frame", &"None".to_owned());
127            }
128            builder.finish()
129        } else {
130            write!(f, "{}() OVER(", self.kind)?;
131
132            let mut delim = "";
133            if !self.partition_by.is_empty() {
134                delim = " ";
135                write!(
136                    f,
137                    "PARTITION BY {:?}",
138                    self.partition_by.iter().format(", ")
139                )?;
140            }
141            if !self.order_by.sort_exprs.is_empty() {
142                write!(f, "{delim}{}", self.order_by)?;
143            }
144            if let Some(frame) = &self.frame {
145                write!(f, "{delim}{}", frame)?;
146            }
147            f.write_str(")")?;
148
149            Ok(())
150        }
151    }
152}
153
154impl Expr for WindowFunction {
155    fn return_type(&self) -> DataType {
156        self.return_type.clone()
157    }
158
159    fn to_expr_proto(&self) -> risingwave_pb::expr::ExprNode {
160        unreachable!("Window function should not be converted to ExprNode")
161    }
162}