risingwave_expr/window_function/
call.rs1use std::fmt::Display;
16
17use FrameBound::{CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding};
18use enum_as_inner::EnumAsInner;
19use parse_display::Display;
20use risingwave_common::types::DataType;
21use risingwave_common::{bail, must_match};
22use risingwave_pb::expr::window_frame::{PbBounds, PbExclusion};
23use risingwave_pb::expr::{PbWindowFrame, PbWindowFunction};
24
25use super::{
26 RangeFrameBounds, RowsFrameBound, RowsFrameBounds, SessionFrameBounds, WindowFuncKind,
27};
28use crate::Result;
29use crate::aggregate::AggArgs;
30
31#[derive(Debug, Clone)]
32pub struct WindowFuncCall {
33 pub kind: WindowFuncKind,
34 pub return_type: DataType,
35 pub args: AggArgs,
36 pub ignore_nulls: bool,
37 pub frame: Frame,
38}
39
40impl WindowFuncCall {
41 pub fn from_protobuf(call: &PbWindowFunction) -> Result<Self> {
42 let call = WindowFuncCall {
43 kind: WindowFuncKind::from_protobuf(call.get_type()?)?,
44 return_type: DataType::from(call.get_return_type()?),
45 args: AggArgs::from_protobuf(call.get_args())?,
46 ignore_nulls: call.get_ignore_nulls(),
47 frame: Frame::from_protobuf(call.get_frame()?)?,
48 };
49 Ok(call)
50 }
51}
52
53#[derive(Debug, Clone, Eq, PartialEq, Hash)]
54pub struct Frame {
55 pub bounds: FrameBounds,
56 pub exclusion: FrameExclusion,
57}
58
59impl Display for Frame {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 write!(f, "{}", self.bounds)?;
62 if self.exclusion != FrameExclusion::default() {
63 write!(f, " {}", self.exclusion)?;
64 }
65 Ok(())
66 }
67}
68
69impl Frame {
70 pub fn rows(start: RowsFrameBound, end: RowsFrameBound) -> Self {
71 Self {
72 bounds: FrameBounds::Rows(RowsFrameBounds { start, end }),
73 exclusion: FrameExclusion::default(),
74 }
75 }
76
77 pub fn rows_with_exclusion(
78 start: RowsFrameBound,
79 end: RowsFrameBound,
80 exclusion: FrameExclusion,
81 ) -> Self {
82 Self {
83 bounds: FrameBounds::Rows(RowsFrameBounds { start, end }),
84 exclusion,
85 }
86 }
87}
88
89impl Frame {
90 pub fn from_protobuf(frame: &PbWindowFrame) -> Result<Self> {
91 use risingwave_pb::expr::window_frame::PbType;
92 let bounds = match frame.get_type()? {
93 PbType::Unspecified => bail!("unspecified type of `WindowFrame`"),
94 PbType::RowsLegacy => {
95 #[expect(deprecated)]
96 {
97 let start = FrameBound::<usize>::from_protobuf_legacy(frame.get_start()?)?;
98 let end = FrameBound::<usize>::from_protobuf_legacy(frame.get_end()?)?;
99 FrameBounds::Rows(RowsFrameBounds { start, end })
100 }
101 }
102 PbType::Rows => {
103 let bounds = must_match!(frame.get_bounds()?, PbBounds::Rows(bounds) => bounds);
104 FrameBounds::Rows(RowsFrameBounds::from_protobuf(bounds)?)
105 }
106 PbType::Range => {
107 let bounds = must_match!(frame.get_bounds()?, PbBounds::Range(bounds) => bounds);
108 FrameBounds::Range(RangeFrameBounds::from_protobuf(bounds)?)
109 }
110 PbType::Session => {
111 let bounds = must_match!(frame.get_bounds()?, PbBounds::Session(bounds) => bounds);
112 FrameBounds::Session(SessionFrameBounds::from_protobuf(bounds)?)
113 }
114 };
115 let exclusion = FrameExclusion::from_protobuf(frame.get_exclusion()?)?;
116 Ok(Self { bounds, exclusion })
117 }
118
119 pub fn to_protobuf(&self) -> PbWindowFrame {
120 use risingwave_pb::expr::window_frame::PbType;
121 let exclusion = self.exclusion.to_protobuf() as _;
122 #[expect(deprecated)] match &self.bounds {
124 FrameBounds::Rows(bounds) => PbWindowFrame {
125 r#type: PbType::Rows as _,
126 start: None, end: None, exclusion,
129 bounds: Some(PbBounds::Rows(bounds.to_protobuf())),
130 },
131 FrameBounds::Range(bounds) => PbWindowFrame {
132 r#type: PbType::Range as _,
133 start: None, end: None, exclusion,
136 bounds: Some(PbBounds::Range(bounds.to_protobuf())),
137 },
138 FrameBounds::Session(bounds) => PbWindowFrame {
139 r#type: PbType::Session as _,
140 start: None, end: None, exclusion,
143 bounds: Some(PbBounds::Session(bounds.to_protobuf())),
144 },
145 }
146 }
147}
148
149#[derive(Display, Debug, Clone, Eq, PartialEq, Hash, EnumAsInner)]
150#[display("{0}")]
151pub enum FrameBounds {
152 Rows(RowsFrameBounds),
153 Range(RangeFrameBounds),
155 Session(SessionFrameBounds),
156}
157
158impl FrameBounds {
159 pub fn validate(&self) -> Result<()> {
160 match self {
161 Self::Rows(bounds) => bounds.validate(),
162 Self::Range(bounds) => bounds.validate(),
163 Self::Session(bounds) => bounds.validate(),
164 }
165 }
166
167 pub fn start_is_unbounded(&self) -> bool {
168 match self {
169 Self::Rows(RowsFrameBounds { start, .. }) => start.is_unbounded_preceding(),
170 Self::Range(RangeFrameBounds { start, .. }) => start.is_unbounded_preceding(),
171 Self::Session(_) => false,
172 }
173 }
174
175 pub fn end_is_unbounded(&self) -> bool {
176 match self {
177 Self::Rows(RowsFrameBounds { end, .. }) => end.is_unbounded_following(),
178 Self::Range(RangeFrameBounds { end, .. }) => end.is_unbounded_following(),
179 Self::Session(_) => false,
180 }
181 }
182
183 pub fn is_unbounded(&self) -> bool {
184 self.start_is_unbounded() || self.end_is_unbounded()
185 }
186}
187
188pub trait FrameBoundsImpl {
189 fn validate(&self) -> Result<()>;
190}
191
192#[derive(Display, Debug, Clone, Eq, PartialEq, Hash, EnumAsInner)]
193#[display(style = "TITLE CASE")]
194pub enum FrameBound<T> {
195 UnboundedPreceding,
196 #[display("{0} PRECEDING")]
197 Preceding(T),
198 CurrentRow,
199 #[display("{0} FOLLOWING")]
200 Following(T),
201 UnboundedFollowing,
202}
203
204impl<T> FrameBound<T> {
205 fn offset_value(&self) -> Option<&T> {
206 match self {
207 UnboundedPreceding | UnboundedFollowing | CurrentRow => None,
208 Preceding(offset) | Following(offset) => Some(offset),
209 }
210 }
211
212 pub(super) fn validate_bounds(
213 start: &Self,
214 end: &Self,
215 offset_checker: impl Fn(&T) -> Result<()>,
216 ) -> Result<()> {
217 match (start, end) {
218 (_, UnboundedPreceding) => bail!("frame end cannot be UNBOUNDED PRECEDING"),
219 (UnboundedFollowing, _) => {
220 bail!("frame start cannot be UNBOUNDED FOLLOWING")
221 }
222 (Following(_), CurrentRow) | (Following(_), Preceding(_)) => {
223 bail!("frame starting from following row cannot have preceding rows")
224 }
225 (CurrentRow, Preceding(_)) => {
226 bail!("frame starting from current row cannot have preceding rows")
227 }
228 _ => {}
229 }
230
231 for bound in [start, end] {
232 if let Some(offset) = bound.offset_value() {
233 offset_checker(offset)?;
234 }
235 }
236
237 Ok(())
238 }
239
240 pub fn map<U>(self, f: impl Fn(T) -> U) -> FrameBound<U> {
241 match self {
242 UnboundedPreceding => UnboundedPreceding,
243 Preceding(offset) => Preceding(f(offset)),
244 CurrentRow => CurrentRow,
245 Following(offset) => Following(f(offset)),
246 UnboundedFollowing => UnboundedFollowing,
247 }
248 }
249}
250
251impl<T> FrameBound<T>
252where
253 T: Copy,
254{
255 pub(super) fn reverse(self) -> FrameBound<T> {
256 match self {
257 UnboundedPreceding => UnboundedFollowing,
258 Preceding(offset) => Following(offset),
259 CurrentRow => CurrentRow,
260 Following(offset) => Preceding(offset),
261 UnboundedFollowing => UnboundedPreceding,
262 }
263 }
264}
265
266#[derive(Display, Debug, Copy, Clone, Eq, PartialEq, Hash, Default, EnumAsInner)]
267#[display("EXCLUDE {}", style = "TITLE CASE")]
268pub enum FrameExclusion {
269 CurrentRow,
270 #[default]
273 NoOthers,
274}
275
276impl FrameExclusion {
277 fn from_protobuf(exclusion: PbExclusion) -> Result<Self> {
278 let excl = match exclusion {
279 PbExclusion::Unspecified => bail!("unspecified type of `FrameExclusion`"),
280 PbExclusion::CurrentRow => Self::CurrentRow,
281 PbExclusion::NoOthers => Self::NoOthers,
282 };
283 Ok(excl)
284 }
285
286 fn to_protobuf(self) -> PbExclusion {
287 match self {
288 Self::CurrentRow => PbExclusion::CurrentRow,
289 Self::NoOthers => PbExclusion::NoOthers,
290 }
291 }
292}