risingwave_expr/window_function/
rows.rs1use parse_display::Display;
16use risingwave_common::bail;
17use risingwave_pb::expr::window_frame::{
18 PbBound, PbBoundType, PbRowsFrameBound, PbRowsFrameBounds,
19};
20
21use super::FrameBound::{
22 self, CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding,
23};
24use super::FrameBoundsImpl;
25use crate::Result;
26
27#[derive(Display, Debug, Clone, Eq, PartialEq, Hash)]
28#[display("ROWS BETWEEN {start} AND {end}")]
29pub struct RowsFrameBounds {
30 pub start: RowsFrameBound,
31 pub end: RowsFrameBound,
32}
33
34impl RowsFrameBounds {
35 pub(super) fn from_protobuf(bounds: &PbRowsFrameBounds) -> Result<Self> {
36 let start = FrameBound::<usize>::from_protobuf(bounds.get_start()?)?;
37 let end = FrameBound::<usize>::from_protobuf(bounds.get_end()?)?;
38 Ok(Self { start, end })
39 }
40
41 pub(super) fn to_protobuf(&self) -> PbRowsFrameBounds {
42 PbRowsFrameBounds {
43 start: Some(self.start.to_protobuf()),
44 end: Some(self.end.to_protobuf()),
45 }
46 }
47}
48
49impl RowsFrameBounds {
50 pub fn is_canonical(&self) -> bool {
57 self.validate().is_ok() && {
58 let start = self.start.to_offset();
59 let end = self.end.to_offset();
60 start.unwrap_or(0) <= 0 && end.unwrap_or(0) >= 0
61 }
62 }
63
64 pub fn n_preceding_rows(&self) -> Option<usize> {
66 match (&self.start, &self.end) {
67 (UnboundedPreceding, _) => None,
68 (Preceding(n1), Preceding(n2)) => Some(*n1.max(n2)),
69 (Preceding(n), _) => Some(*n),
70 (CurrentRow | Following(_) | UnboundedFollowing, _) => Some(0),
71 }
72 }
73
74 pub fn n_following_rows(&self) -> Option<usize> {
76 match (&self.start, &self.end) {
77 (_, UnboundedFollowing) => None,
78 (Following(n1), Following(n2)) => Some(*n1.max(n2)),
79 (_, Following(n)) => Some(*n),
80 (_, CurrentRow | Preceding(_) | UnboundedPreceding) => Some(0),
81 }
82 }
83}
84
85impl FrameBoundsImpl for RowsFrameBounds {
86 fn validate(&self) -> Result<()> {
87 FrameBound::validate_bounds(&self.start, &self.end, |_| Ok(()))
88 }
89}
90
91pub type RowsFrameBound = FrameBound<usize>;
92
93impl RowsFrameBound {
94 pub(super) fn from_protobuf_legacy(bound: &PbBound) -> Result<Self> {
95 use risingwave_pb::expr::window_frame::bound::PbOffset;
96
97 let offset = bound.get_offset()?;
98 let bound = match offset {
99 PbOffset::Integer(offset) => Self::from_protobuf(&PbRowsFrameBound {
100 r#type: bound.get_type()? as _,
101 offset: Some(*offset),
102 })?,
103 PbOffset::Datum(_) => bail!("offset of `RowsFrameBound` must be `Integer`"),
104 };
105 Ok(bound)
106 }
107
108 fn from_protobuf(bound: &PbRowsFrameBound) -> Result<Self> {
109 let bound = match bound.get_type()? {
110 PbBoundType::Unspecified => bail!("unspecified type of `RowsFrameBound`"),
111 PbBoundType::UnboundedPreceding => Self::UnboundedPreceding,
112 PbBoundType::Preceding => Self::Preceding(*bound.get_offset()? as usize),
113 PbBoundType::CurrentRow => Self::CurrentRow,
114 PbBoundType::Following => Self::Following(*bound.get_offset()? as usize),
115 PbBoundType::UnboundedFollowing => Self::UnboundedFollowing,
116 };
117 Ok(bound)
118 }
119
120 fn to_protobuf(&self) -> PbRowsFrameBound {
121 let (r#type, offset) = match self {
122 Self::UnboundedPreceding => (PbBoundType::UnboundedPreceding, None),
123 Self::Preceding(offset) => (PbBoundType::Preceding, Some(*offset as _)),
124 Self::CurrentRow => (PbBoundType::CurrentRow, None),
125 Self::Following(offset) => (PbBoundType::Following, Some(*offset as _)),
126 Self::UnboundedFollowing => (PbBoundType::UnboundedFollowing, None),
127 };
128 PbRowsFrameBound {
129 r#type: r#type as _,
130 offset,
131 }
132 }
133}
134
135impl RowsFrameBound {
136 pub fn to_offset(&self) -> Option<isize> {
138 match self {
139 UnboundedPreceding | UnboundedFollowing => None,
140 CurrentRow => Some(0),
141 Preceding(n) => Some(-(*n as isize)),
142 Following(n) => Some(*n as isize),
143 }
144 }
145}
146
147#[cfg(test)]
148mod tests {
149
150 use super::*;
151
152 #[test]
153 fn test_rows_frame_bounds() {
154 let bounds = RowsFrameBounds {
155 start: Preceding(1),
156 end: CurrentRow,
157 };
158 assert!(bounds.validate().is_ok());
159 assert!(bounds.is_canonical());
160 assert_eq!(bounds.start.to_offset(), Some(-1));
161 assert_eq!(bounds.end.to_offset(), Some(0));
162 assert_eq!(bounds.n_preceding_rows(), Some(1));
163 assert_eq!(bounds.n_following_rows(), Some(0));
164
165 let bounds = RowsFrameBounds {
166 start: CurrentRow,
167 end: Following(1),
168 };
169 assert!(bounds.validate().is_ok());
170 assert!(bounds.is_canonical());
171 assert_eq!(bounds.start.to_offset(), Some(0));
172 assert_eq!(bounds.end.to_offset(), Some(1));
173 assert_eq!(bounds.n_preceding_rows(), Some(0));
174 assert_eq!(bounds.n_following_rows(), Some(1));
175
176 let bounds = RowsFrameBounds {
177 start: UnboundedPreceding,
178 end: Following(10),
179 };
180 assert!(bounds.validate().is_ok());
181 assert!(bounds.is_canonical());
182 assert_eq!(bounds.start.to_offset(), None);
183 assert_eq!(bounds.end.to_offset(), Some(10));
184 assert_eq!(bounds.n_preceding_rows(), None);
185 assert_eq!(bounds.n_following_rows(), Some(10));
186
187 let bounds = RowsFrameBounds {
188 start: Preceding(10),
189 end: UnboundedFollowing,
190 };
191 assert!(bounds.validate().is_ok());
192 assert!(bounds.is_canonical());
193 assert_eq!(bounds.start.to_offset(), Some(-10));
194 assert_eq!(bounds.end.to_offset(), None);
195 assert_eq!(bounds.n_preceding_rows(), Some(10));
196 assert_eq!(bounds.n_following_rows(), None);
197
198 let bounds = RowsFrameBounds {
199 start: Preceding(1),
200 end: Preceding(10),
201 };
202 assert!(bounds.validate().is_ok());
203 assert!(!bounds.is_canonical());
204 assert_eq!(bounds.start.to_offset(), Some(-1));
205 assert_eq!(bounds.end.to_offset(), Some(-10));
206 assert_eq!(bounds.n_preceding_rows(), Some(10));
207 assert_eq!(bounds.n_following_rows(), Some(0));
208
209 let bounds = RowsFrameBounds {
210 start: Following(10),
211 end: Following(1),
212 };
213 assert!(bounds.validate().is_ok());
214 assert!(!bounds.is_canonical());
215 assert_eq!(bounds.start.to_offset(), Some(10));
216 assert_eq!(bounds.end.to_offset(), Some(1));
217 assert_eq!(bounds.n_preceding_rows(), Some(0));
218 assert_eq!(bounds.n_following_rows(), Some(10));
219
220 let bounds = RowsFrameBounds {
221 start: UnboundedFollowing,
222 end: Following(10),
223 };
224 assert!(bounds.validate().is_err());
225 assert!(!bounds.is_canonical());
226
227 let bounds = RowsFrameBounds {
228 start: Preceding(10),
229 end: UnboundedPreceding,
230 };
231 assert!(bounds.validate().is_err());
232 assert!(!bounds.is_canonical());
233 }
234}