risingwave_expr/window_function/
call.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16
17use FrameBound::{CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding};
18use enum_as_inner::EnumAsInner;
19use parse_display::Display;
20use risingwave_common::types::DataType;
21use risingwave_common::{bail, must_match};
22use risingwave_pb::expr::window_frame::{PbBounds, PbExclusion};
23use risingwave_pb::expr::{PbWindowFrame, PbWindowFunction};
24
25use super::{
26    RangeFrameBounds, RowsFrameBound, RowsFrameBounds, SessionFrameBounds, WindowFuncKind,
27};
28use crate::Result;
29use crate::aggregate::AggArgs;
30
31#[derive(Debug, Clone)]
32pub struct WindowFuncCall {
33    pub kind: WindowFuncKind,
34    pub return_type: DataType,
35    pub args: AggArgs,
36    pub ignore_nulls: bool,
37    pub frame: Frame,
38}
39
40impl WindowFuncCall {
41    pub fn from_protobuf(call: &PbWindowFunction) -> Result<Self> {
42        let call = WindowFuncCall {
43            kind: WindowFuncKind::from_protobuf(call.get_type()?)?,
44            return_type: DataType::from(call.get_return_type()?),
45            args: AggArgs::from_protobuf(call.get_args())?,
46            ignore_nulls: call.get_ignore_nulls(),
47            frame: Frame::from_protobuf(call.get_frame()?)?,
48        };
49        Ok(call)
50    }
51}
52
53#[derive(Debug, Clone, Eq, PartialEq, Hash)]
54pub struct Frame {
55    pub bounds: FrameBounds,
56    pub exclusion: FrameExclusion,
57}
58
59impl Display for Frame {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        write!(f, "{}", self.bounds)?;
62        if self.exclusion != FrameExclusion::default() {
63            write!(f, " {}", self.exclusion)?;
64        }
65        Ok(())
66    }
67}
68
69impl Frame {
70    pub fn rows(start: RowsFrameBound, end: RowsFrameBound) -> Self {
71        Self {
72            bounds: FrameBounds::Rows(RowsFrameBounds { start, end }),
73            exclusion: FrameExclusion::default(),
74        }
75    }
76
77    pub fn rows_with_exclusion(
78        start: RowsFrameBound,
79        end: RowsFrameBound,
80        exclusion: FrameExclusion,
81    ) -> Self {
82        Self {
83            bounds: FrameBounds::Rows(RowsFrameBounds { start, end }),
84            exclusion,
85        }
86    }
87}
88
89impl Frame {
90    pub fn from_protobuf(frame: &PbWindowFrame) -> Result<Self> {
91        use risingwave_pb::expr::window_frame::PbType;
92        let bounds = match frame.get_type()? {
93            PbType::Unspecified => bail!("unspecified type of `WindowFrame`"),
94            PbType::RowsLegacy => {
95                #[expect(deprecated)]
96                {
97                    let start = FrameBound::<usize>::from_protobuf_legacy(frame.get_start()?)?;
98                    let end = FrameBound::<usize>::from_protobuf_legacy(frame.get_end()?)?;
99                    FrameBounds::Rows(RowsFrameBounds { start, end })
100                }
101            }
102            PbType::Rows => {
103                let bounds = must_match!(frame.get_bounds()?, PbBounds::Rows(bounds) => bounds);
104                FrameBounds::Rows(RowsFrameBounds::from_protobuf(bounds)?)
105            }
106            PbType::Range => {
107                let bounds = must_match!(frame.get_bounds()?, PbBounds::Range(bounds) => bounds);
108                FrameBounds::Range(RangeFrameBounds::from_protobuf(bounds)?)
109            }
110            PbType::Session => {
111                let bounds = must_match!(frame.get_bounds()?, PbBounds::Session(bounds) => bounds);
112                FrameBounds::Session(SessionFrameBounds::from_protobuf(bounds)?)
113            }
114        };
115        let exclusion = FrameExclusion::from_protobuf(frame.get_exclusion()?)?;
116        Ok(Self { bounds, exclusion })
117    }
118
119    pub fn to_protobuf(&self) -> PbWindowFrame {
120        use risingwave_pb::expr::window_frame::PbType;
121        let exclusion = self.exclusion.to_protobuf() as _;
122        #[expect(deprecated)] // because of `start` and `end` fields
123        match &self.bounds {
124            FrameBounds::Rows(bounds) => PbWindowFrame {
125                r#type: PbType::Rows as _,
126                start: None, // deprecated
127                end: None,   // deprecated
128                exclusion,
129                bounds: Some(PbBounds::Rows(bounds.to_protobuf())),
130            },
131            FrameBounds::Range(bounds) => PbWindowFrame {
132                r#type: PbType::Range as _,
133                start: None, // deprecated
134                end: None,   // deprecated
135                exclusion,
136                bounds: Some(PbBounds::Range(bounds.to_protobuf())),
137            },
138            FrameBounds::Session(bounds) => PbWindowFrame {
139                r#type: PbType::Session as _,
140                start: None, // deprecated
141                end: None,   // deprecated
142                exclusion,
143                bounds: Some(PbBounds::Session(bounds.to_protobuf())),
144            },
145        }
146    }
147}
148
149#[derive(Display, Debug, Clone, Eq, PartialEq, Hash, EnumAsInner)]
150#[display("{0}")]
151pub enum FrameBounds {
152    Rows(RowsFrameBounds),
153    // Groups(GroupsFrameBounds),
154    Range(RangeFrameBounds),
155    Session(SessionFrameBounds),
156}
157
158impl FrameBounds {
159    pub fn validate(&self) -> Result<()> {
160        match self {
161            Self::Rows(bounds) => bounds.validate(),
162            Self::Range(bounds) => bounds.validate(),
163            Self::Session(bounds) => bounds.validate(),
164        }
165    }
166
167    pub fn start_is_unbounded(&self) -> bool {
168        match self {
169            Self::Rows(RowsFrameBounds { start, .. }) => start.is_unbounded_preceding(),
170            Self::Range(RangeFrameBounds { start, .. }) => start.is_unbounded_preceding(),
171            Self::Session(_) => false,
172        }
173    }
174
175    pub fn end_is_unbounded(&self) -> bool {
176        match self {
177            Self::Rows(RowsFrameBounds { end, .. }) => end.is_unbounded_following(),
178            Self::Range(RangeFrameBounds { end, .. }) => end.is_unbounded_following(),
179            Self::Session(_) => false,
180        }
181    }
182
183    pub fn is_unbounded(&self) -> bool {
184        self.start_is_unbounded() || self.end_is_unbounded()
185    }
186}
187
188pub trait FrameBoundsImpl {
189    fn validate(&self) -> Result<()>;
190}
191
192#[derive(Display, Debug, Clone, Eq, PartialEq, Hash, EnumAsInner)]
193#[display(style = "TITLE CASE")]
194pub enum FrameBound<T> {
195    UnboundedPreceding,
196    #[display("{0} PRECEDING")]
197    Preceding(T),
198    CurrentRow,
199    #[display("{0} FOLLOWING")]
200    Following(T),
201    UnboundedFollowing,
202}
203
204impl<T> FrameBound<T> {
205    fn offset_value(&self) -> Option<&T> {
206        match self {
207            UnboundedPreceding | UnboundedFollowing | CurrentRow => None,
208            Preceding(offset) | Following(offset) => Some(offset),
209        }
210    }
211
212    pub(super) fn validate_bounds(
213        start: &Self,
214        end: &Self,
215        offset_checker: impl Fn(&T) -> Result<()>,
216    ) -> Result<()> {
217        match (start, end) {
218            (_, UnboundedPreceding) => bail!("frame end cannot be UNBOUNDED PRECEDING"),
219            (UnboundedFollowing, _) => {
220                bail!("frame start cannot be UNBOUNDED FOLLOWING")
221            }
222            (Following(_), CurrentRow) | (Following(_), Preceding(_)) => {
223                bail!("frame starting from following row cannot have preceding rows")
224            }
225            (CurrentRow, Preceding(_)) => {
226                bail!("frame starting from current row cannot have preceding rows")
227            }
228            _ => {}
229        }
230
231        for bound in [start, end] {
232            if let Some(offset) = bound.offset_value() {
233                offset_checker(offset)?;
234            }
235        }
236
237        Ok(())
238    }
239
240    pub fn map<U>(self, f: impl Fn(T) -> U) -> FrameBound<U> {
241        match self {
242            UnboundedPreceding => UnboundedPreceding,
243            Preceding(offset) => Preceding(f(offset)),
244            CurrentRow => CurrentRow,
245            Following(offset) => Following(f(offset)),
246            UnboundedFollowing => UnboundedFollowing,
247        }
248    }
249}
250
251impl<T> FrameBound<T>
252where
253    T: Copy,
254{
255    pub(super) fn reverse(self) -> FrameBound<T> {
256        match self {
257            UnboundedPreceding => UnboundedFollowing,
258            Preceding(offset) => Following(offset),
259            CurrentRow => CurrentRow,
260            Following(offset) => Preceding(offset),
261            UnboundedFollowing => UnboundedPreceding,
262        }
263    }
264}
265
266#[derive(Display, Debug, Copy, Clone, Eq, PartialEq, Hash, Default, EnumAsInner)]
267#[display("EXCLUDE {}", style = "TITLE CASE")]
268pub enum FrameExclusion {
269    CurrentRow,
270    // Group,
271    // Ties,
272    #[default]
273    NoOthers,
274}
275
276impl FrameExclusion {
277    fn from_protobuf(exclusion: PbExclusion) -> Result<Self> {
278        let excl = match exclusion {
279            PbExclusion::Unspecified => bail!("unspecified type of `FrameExclusion`"),
280            PbExclusion::CurrentRow => Self::CurrentRow,
281            PbExclusion::NoOthers => Self::NoOthers,
282        };
283        Ok(excl)
284    }
285
286    fn to_protobuf(self) -> PbExclusion {
287        match self {
288            Self::CurrentRow => PbExclusion::CurrentRow,
289            Self::NoOthers => PbExclusion::NoOthers,
290        }
291    }
292}