risingwave_expr/window_function/
rows.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use parse_display::Display;
16use risingwave_common::bail;
17use risingwave_pb::expr::window_frame::{
18    PbBound, PbBoundType, PbRowsFrameBound, PbRowsFrameBounds,
19};
20
21use super::FrameBound::{
22    self, CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding,
23};
24use super::FrameBoundsImpl;
25use crate::Result;
26
27#[derive(Display, Debug, Clone, Eq, PartialEq, Hash)]
28#[display("ROWS BETWEEN {start} AND {end}")]
29pub struct RowsFrameBounds {
30    pub start: RowsFrameBound,
31    pub end: RowsFrameBound,
32}
33
34impl RowsFrameBounds {
35    pub(super) fn from_protobuf(bounds: &PbRowsFrameBounds) -> Result<Self> {
36        let start = FrameBound::<usize>::from_protobuf(bounds.get_start()?)?;
37        let end = FrameBound::<usize>::from_protobuf(bounds.get_end()?)?;
38        Ok(Self { start, end })
39    }
40
41    pub(super) fn to_protobuf(&self) -> PbRowsFrameBounds {
42        PbRowsFrameBounds {
43            start: Some(self.start.to_protobuf()),
44            end: Some(self.end.to_protobuf()),
45        }
46    }
47}
48
49impl RowsFrameBounds {
50    /// Check if the `ROWS` frame is canonical.
51    ///
52    /// A canonical `ROWS` frame is defined as:
53    ///
54    /// - Its bounds are valid (see [`Self::validate`]).
55    /// - It contains the current row.
56    pub fn is_canonical(&self) -> bool {
57        self.validate().is_ok() && {
58            let start = self.start.to_offset();
59            let end = self.end.to_offset();
60            start.unwrap_or(0) <= 0 && end.unwrap_or(0) >= 0
61        }
62    }
63
64    /// Get the number of preceding rows.
65    pub fn n_preceding_rows(&self) -> Option<usize> {
66        match (&self.start, &self.end) {
67            (UnboundedPreceding, _) => None,
68            (Preceding(n1), Preceding(n2)) => Some(*n1.max(n2)),
69            (Preceding(n), _) => Some(*n),
70            (CurrentRow | Following(_) | UnboundedFollowing, _) => Some(0),
71        }
72    }
73
74    /// Get the number of following rows.
75    pub fn n_following_rows(&self) -> Option<usize> {
76        match (&self.start, &self.end) {
77            (_, UnboundedFollowing) => None,
78            (Following(n1), Following(n2)) => Some(*n1.max(n2)),
79            (_, Following(n)) => Some(*n),
80            (_, CurrentRow | Preceding(_) | UnboundedPreceding) => Some(0),
81        }
82    }
83}
84
85impl FrameBoundsImpl for RowsFrameBounds {
86    fn validate(&self) -> Result<()> {
87        FrameBound::validate_bounds(&self.start, &self.end, |_| Ok(()))
88    }
89}
90
91pub type RowsFrameBound = FrameBound<usize>;
92
93impl RowsFrameBound {
94    pub(super) fn from_protobuf_legacy(bound: &PbBound) -> Result<Self> {
95        use risingwave_pb::expr::window_frame::bound::PbOffset;
96
97        let offset = bound.get_offset()?;
98        let bound = match offset {
99            PbOffset::Integer(offset) => Self::from_protobuf(&PbRowsFrameBound {
100                r#type: bound.get_type()? as _,
101                offset: Some(*offset),
102            })?,
103            PbOffset::Datum(_) => bail!("offset of `RowsFrameBound` must be `Integer`"),
104        };
105        Ok(bound)
106    }
107
108    fn from_protobuf(bound: &PbRowsFrameBound) -> Result<Self> {
109        let bound = match bound.get_type()? {
110            PbBoundType::Unspecified => bail!("unspecified type of `RowsFrameBound`"),
111            PbBoundType::UnboundedPreceding => Self::UnboundedPreceding,
112            PbBoundType::Preceding => Self::Preceding(*bound.get_offset()? as usize),
113            PbBoundType::CurrentRow => Self::CurrentRow,
114            PbBoundType::Following => Self::Following(*bound.get_offset()? as usize),
115            PbBoundType::UnboundedFollowing => Self::UnboundedFollowing,
116        };
117        Ok(bound)
118    }
119
120    fn to_protobuf(&self) -> PbRowsFrameBound {
121        let (r#type, offset) = match self {
122            Self::UnboundedPreceding => (PbBoundType::UnboundedPreceding, None),
123            Self::Preceding(offset) => (PbBoundType::Preceding, Some(*offset as _)),
124            Self::CurrentRow => (PbBoundType::CurrentRow, None),
125            Self::Following(offset) => (PbBoundType::Following, Some(*offset as _)),
126            Self::UnboundedFollowing => (PbBoundType::UnboundedFollowing, None),
127        };
128        PbRowsFrameBound {
129            r#type: r#type as _,
130            offset,
131        }
132    }
133}
134
135impl RowsFrameBound {
136    /// Convert the bound to sized offset from current row. `None` if the bound is unbounded.
137    pub fn to_offset(&self) -> Option<isize> {
138        match self {
139            UnboundedPreceding | UnboundedFollowing => None,
140            CurrentRow => Some(0),
141            Preceding(n) => Some(-(*n as isize)),
142            Following(n) => Some(*n as isize),
143        }
144    }
145}
146
147#[cfg(test)]
148mod tests {
149
150    use super::*;
151
152    #[test]
153    fn test_rows_frame_bounds() {
154        let bounds = RowsFrameBounds {
155            start: Preceding(1),
156            end: CurrentRow,
157        };
158        assert!(bounds.validate().is_ok());
159        assert!(bounds.is_canonical());
160        assert_eq!(bounds.start.to_offset(), Some(-1));
161        assert_eq!(bounds.end.to_offset(), Some(0));
162        assert_eq!(bounds.n_preceding_rows(), Some(1));
163        assert_eq!(bounds.n_following_rows(), Some(0));
164
165        let bounds = RowsFrameBounds {
166            start: CurrentRow,
167            end: Following(1),
168        };
169        assert!(bounds.validate().is_ok());
170        assert!(bounds.is_canonical());
171        assert_eq!(bounds.start.to_offset(), Some(0));
172        assert_eq!(bounds.end.to_offset(), Some(1));
173        assert_eq!(bounds.n_preceding_rows(), Some(0));
174        assert_eq!(bounds.n_following_rows(), Some(1));
175
176        let bounds = RowsFrameBounds {
177            start: UnboundedPreceding,
178            end: Following(10),
179        };
180        assert!(bounds.validate().is_ok());
181        assert!(bounds.is_canonical());
182        assert_eq!(bounds.start.to_offset(), None);
183        assert_eq!(bounds.end.to_offset(), Some(10));
184        assert_eq!(bounds.n_preceding_rows(), None);
185        assert_eq!(bounds.n_following_rows(), Some(10));
186
187        let bounds = RowsFrameBounds {
188            start: Preceding(10),
189            end: UnboundedFollowing,
190        };
191        assert!(bounds.validate().is_ok());
192        assert!(bounds.is_canonical());
193        assert_eq!(bounds.start.to_offset(), Some(-10));
194        assert_eq!(bounds.end.to_offset(), None);
195        assert_eq!(bounds.n_preceding_rows(), Some(10));
196        assert_eq!(bounds.n_following_rows(), None);
197
198        let bounds = RowsFrameBounds {
199            start: Preceding(1),
200            end: Preceding(10),
201        };
202        assert!(bounds.validate().is_ok());
203        assert!(!bounds.is_canonical());
204        assert_eq!(bounds.start.to_offset(), Some(-1));
205        assert_eq!(bounds.end.to_offset(), Some(-10));
206        assert_eq!(bounds.n_preceding_rows(), Some(10));
207        assert_eq!(bounds.n_following_rows(), Some(0));
208
209        let bounds = RowsFrameBounds {
210            start: Following(10),
211            end: Following(1),
212        };
213        assert!(bounds.validate().is_ok());
214        assert!(!bounds.is_canonical());
215        assert_eq!(bounds.start.to_offset(), Some(10));
216        assert_eq!(bounds.end.to_offset(), Some(1));
217        assert_eq!(bounds.n_preceding_rows(), Some(0));
218        assert_eq!(bounds.n_following_rows(), Some(10));
219
220        let bounds = RowsFrameBounds {
221            start: UnboundedFollowing,
222            end: Following(10),
223        };
224        assert!(bounds.validate().is_err());
225        assert!(!bounds.is_canonical());
226
227        let bounds = RowsFrameBounds {
228            start: Preceding(10),
229            end: UnboundedPreceding,
230        };
231        assert!(bounds.validate().is_err());
232        assert!(!bounds.is_canonical());
233    }
234}