risingwave_expr/window_function/
call.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16
17use FrameBound::{CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding};
18use enum_as_inner::EnumAsInner;
19use parse_display::Display;
20use risingwave_common::types::DataType;
21use risingwave_common::{bail, must_match};
22use risingwave_pb::expr::window_frame::{PbBounds, PbExclusion};
23use risingwave_pb::expr::{PbWindowFrame, PbWindowFunction};
24
25use super::{
26    RangeFrameBounds, RowsFrameBound, RowsFrameBounds, SessionFrameBounds, WindowFuncKind,
27};
28use crate::Result;
29use crate::aggregate::AggArgs;
30
31#[derive(Debug, Clone)]
32pub struct WindowFuncCall {
33    pub kind: WindowFuncKind,
34    pub return_type: DataType,
35    pub args: AggArgs,
36    pub ignore_nulls: bool,
37    pub frame: Frame,
38}
39
40impl WindowFuncCall {
41    pub fn from_protobuf(call: &PbWindowFunction) -> Result<Self> {
42        let call = WindowFuncCall {
43            kind: WindowFuncKind::from_protobuf(call.get_type()?)?,
44            return_type: DataType::from(call.get_return_type()?),
45            args: AggArgs::from_protobuf(call.get_args())?,
46            ignore_nulls: call.get_ignore_nulls(),
47            frame: Frame::from_protobuf(call.get_frame()?)?,
48        };
49        Ok(call)
50    }
51}
52
53#[derive(Debug, Clone, Eq, PartialEq, Hash)]
54pub struct Frame {
55    pub bounds: FrameBounds,
56    pub exclusion: FrameExclusion,
57}
58
59impl Display for Frame {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        write!(f, "{}", self.bounds)?;
62        if self.exclusion != FrameExclusion::default() {
63            write!(f, " {}", self.exclusion)?;
64        }
65        Ok(())
66    }
67}
68
69impl Frame {
70    pub fn rows(start: RowsFrameBound, end: RowsFrameBound) -> Self {
71        Self {
72            bounds: FrameBounds::Rows(RowsFrameBounds { start, end }),
73            exclusion: FrameExclusion::default(),
74        }
75    }
76
77    pub fn rows_with_exclusion(
78        start: RowsFrameBound,
79        end: RowsFrameBound,
80        exclusion: FrameExclusion,
81    ) -> Self {
82        Self {
83            bounds: FrameBounds::Rows(RowsFrameBounds { start, end }),
84            exclusion,
85        }
86    }
87}
88
89impl Frame {
90    pub fn from_protobuf(frame: &PbWindowFrame) -> Result<Self> {
91        use risingwave_pb::expr::window_frame::PbType;
92        let bounds = match frame.get_type()? {
93            PbType::Unspecified => bail!("unspecified type of `WindowFrame`"),
94            #[expect(deprecated)]
95            PbType::RowsLegacy => {
96                #[expect(deprecated)]
97                {
98                    let start = FrameBound::<usize>::from_protobuf_legacy(frame.get_start()?)?;
99                    let end = FrameBound::<usize>::from_protobuf_legacy(frame.get_end()?)?;
100                    FrameBounds::Rows(RowsFrameBounds { start, end })
101                }
102            }
103            PbType::Rows => {
104                let bounds = must_match!(frame.get_bounds()?, PbBounds::Rows(bounds) => bounds);
105                FrameBounds::Rows(RowsFrameBounds::from_protobuf(bounds)?)
106            }
107            PbType::Range => {
108                let bounds = must_match!(frame.get_bounds()?, PbBounds::Range(bounds) => bounds);
109                FrameBounds::Range(RangeFrameBounds::from_protobuf(bounds)?)
110            }
111            PbType::Session => {
112                let bounds = must_match!(frame.get_bounds()?, PbBounds::Session(bounds) => bounds);
113                FrameBounds::Session(SessionFrameBounds::from_protobuf(bounds)?)
114            }
115        };
116        let exclusion = FrameExclusion::from_protobuf(frame.get_exclusion()?)?;
117        Ok(Self { bounds, exclusion })
118    }
119
120    pub fn to_protobuf(&self) -> PbWindowFrame {
121        use risingwave_pb::expr::window_frame::PbType;
122        let exclusion = self.exclusion.to_protobuf() as _;
123        #[expect(deprecated)] // because of `start` and `end` fields
124        match &self.bounds {
125            FrameBounds::Rows(bounds) => PbWindowFrame {
126                r#type: PbType::Rows as _,
127                start: None, // deprecated
128                end: None,   // deprecated
129                exclusion,
130                bounds: Some(PbBounds::Rows(bounds.to_protobuf())),
131            },
132            FrameBounds::Range(bounds) => PbWindowFrame {
133                r#type: PbType::Range as _,
134                start: None, // deprecated
135                end: None,   // deprecated
136                exclusion,
137                bounds: Some(PbBounds::Range(bounds.to_protobuf())),
138            },
139            FrameBounds::Session(bounds) => PbWindowFrame {
140                r#type: PbType::Session as _,
141                start: None, // deprecated
142                end: None,   // deprecated
143                exclusion,
144                bounds: Some(PbBounds::Session(bounds.to_protobuf())),
145            },
146        }
147    }
148}
149
150#[derive(Display, Debug, Clone, Eq, PartialEq, Hash, EnumAsInner)]
151#[display("{0}")]
152pub enum FrameBounds {
153    Rows(RowsFrameBounds),
154    // Groups(GroupsFrameBounds),
155    Range(RangeFrameBounds),
156    Session(SessionFrameBounds),
157}
158
159impl FrameBounds {
160    pub fn validate(&self) -> Result<()> {
161        match self {
162            Self::Rows(bounds) => bounds.validate(),
163            Self::Range(bounds) => bounds.validate(),
164            Self::Session(bounds) => bounds.validate(),
165        }
166    }
167
168    pub fn start_is_unbounded(&self) -> bool {
169        match self {
170            Self::Rows(RowsFrameBounds { start, .. }) => start.is_unbounded_preceding(),
171            Self::Range(RangeFrameBounds { start, .. }) => start.is_unbounded_preceding(),
172            Self::Session(_) => false,
173        }
174    }
175
176    pub fn end_is_unbounded(&self) -> bool {
177        match self {
178            Self::Rows(RowsFrameBounds { end, .. }) => end.is_unbounded_following(),
179            Self::Range(RangeFrameBounds { end, .. }) => end.is_unbounded_following(),
180            Self::Session(_) => false,
181        }
182    }
183
184    pub fn is_unbounded(&self) -> bool {
185        self.start_is_unbounded() || self.end_is_unbounded()
186    }
187}
188
189pub trait FrameBoundsImpl {
190    fn validate(&self) -> Result<()>;
191}
192
193#[derive(Display, Debug, Clone, Eq, PartialEq, Hash, EnumAsInner)]
194#[display(style = "TITLE CASE")]
195pub enum FrameBound<T> {
196    UnboundedPreceding,
197    #[display("{0} PRECEDING")]
198    Preceding(T),
199    CurrentRow,
200    #[display("{0} FOLLOWING")]
201    Following(T),
202    UnboundedFollowing,
203}
204
205impl<T> FrameBound<T> {
206    fn offset_value(&self) -> Option<&T> {
207        match self {
208            UnboundedPreceding | UnboundedFollowing | CurrentRow => None,
209            Preceding(offset) | Following(offset) => Some(offset),
210        }
211    }
212
213    pub(super) fn validate_bounds(
214        start: &Self,
215        end: &Self,
216        offset_checker: impl Fn(&T) -> Result<()>,
217    ) -> Result<()> {
218        match (start, end) {
219            (_, UnboundedPreceding) => bail!("frame end cannot be UNBOUNDED PRECEDING"),
220            (UnboundedFollowing, _) => {
221                bail!("frame start cannot be UNBOUNDED FOLLOWING")
222            }
223            (Following(_), CurrentRow) | (Following(_), Preceding(_)) => {
224                bail!("frame starting from following row cannot have preceding rows")
225            }
226            (CurrentRow, Preceding(_)) => {
227                bail!("frame starting from current row cannot have preceding rows")
228            }
229            _ => {}
230        }
231
232        for bound in [start, end] {
233            if let Some(offset) = bound.offset_value() {
234                offset_checker(offset)?;
235            }
236        }
237
238        Ok(())
239    }
240
241    pub fn map<U>(self, f: impl Fn(T) -> U) -> FrameBound<U> {
242        match self {
243            UnboundedPreceding => UnboundedPreceding,
244            Preceding(offset) => Preceding(f(offset)),
245            CurrentRow => CurrentRow,
246            Following(offset) => Following(f(offset)),
247            UnboundedFollowing => UnboundedFollowing,
248        }
249    }
250}
251
252impl<T> FrameBound<T>
253where
254    T: Copy,
255{
256    pub(super) fn reverse(self) -> FrameBound<T> {
257        match self {
258            UnboundedPreceding => UnboundedFollowing,
259            Preceding(offset) => Following(offset),
260            CurrentRow => CurrentRow,
261            Following(offset) => Preceding(offset),
262            UnboundedFollowing => UnboundedPreceding,
263        }
264    }
265}
266
267#[derive(Display, Debug, Copy, Clone, Eq, PartialEq, Hash, Default, EnumAsInner)]
268#[display("EXCLUDE {}", style = "TITLE CASE")]
269pub enum FrameExclusion {
270    CurrentRow,
271    // Group,
272    // Ties,
273    #[default]
274    NoOthers,
275}
276
277impl FrameExclusion {
278    fn from_protobuf(exclusion: PbExclusion) -> Result<Self> {
279        let excl = match exclusion {
280            PbExclusion::Unspecified => bail!("unspecified type of `FrameExclusion`"),
281            PbExclusion::CurrentRow => Self::CurrentRow,
282            PbExclusion::NoOthers => Self::NoOthers,
283        };
284        Ok(excl)
285    }
286
287    fn to_protobuf(self) -> PbExclusion {
288        match self {
289            Self::CurrentRow => PbExclusion::CurrentRow,
290            Self::NoOthers => PbExclusion::NoOthers,
291        }
292    }
293}