risingwave_expr/window_function/
range.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16use std::ops::Deref;
17use std::sync::Arc;
18
19use anyhow::Context;
20use educe::Educe;
21use futures_util::FutureExt;
22use risingwave_common::bail;
23use risingwave_common::row::OwnedRow;
24use risingwave_common::types::{
25    DataType, Datum, IsNegative, ScalarImpl, ScalarRefImpl, Sentinelled, ToOwnedDatum, ToText,
26};
27use risingwave_common::util::sort_util::{Direction, OrderType};
28use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
29use risingwave_pb::expr::window_frame::{PbBoundType, PbRangeFrameBound, PbRangeFrameBounds};
30
31use super::FrameBound::{
32    self, CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding,
33};
34use super::FrameBoundsImpl;
35use crate::Result;
36use crate::expr::{
37    BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, LiteralExpression,
38    build_func,
39};
40
41#[derive(Debug, Clone, Eq, PartialEq, Hash)]
42pub struct RangeFrameBounds {
43    pub order_data_type: DataType,
44    pub order_type: OrderType,
45    pub offset_data_type: DataType,
46    pub start: RangeFrameBound,
47    pub end: RangeFrameBound,
48}
49
50impl RangeFrameBounds {
51    pub(super) fn from_protobuf(bounds: &PbRangeFrameBounds) -> Result<Self> {
52        let order_data_type = DataType::from(bounds.get_order_data_type()?);
53        let order_type = OrderType::from_protobuf(bounds.get_order_type()?);
54        let offset_data_type = DataType::from(bounds.get_offset_data_type()?);
55        let start = FrameBound::<RangeFrameOffset>::from_protobuf(
56            bounds.get_start()?,
57            &order_data_type,
58            &offset_data_type,
59        )?;
60        let end = FrameBound::<RangeFrameOffset>::from_protobuf(
61            bounds.get_end()?,
62            &order_data_type,
63            &offset_data_type,
64        )?;
65        Ok(Self {
66            order_data_type,
67            order_type,
68            offset_data_type,
69            start,
70            end,
71        })
72    }
73
74    pub(super) fn to_protobuf(&self) -> PbRangeFrameBounds {
75        PbRangeFrameBounds {
76            start: Some(self.start.to_protobuf()),
77            end: Some(self.end.to_protobuf()),
78            order_data_type: Some(self.order_data_type.to_protobuf()),
79            order_type: Some(self.order_type.to_protobuf()),
80            offset_data_type: Some(self.offset_data_type.to_protobuf()),
81        }
82    }
83}
84
85impl Display for RangeFrameBounds {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        write!(
88            f,
89            "RANGE BETWEEN {} AND {}",
90            self.start.for_display(),
91            self.end.for_display()
92        )?;
93        Ok(())
94    }
95}
96
97impl FrameBoundsImpl for RangeFrameBounds {
98    fn validate(&self) -> Result<()> {
99        fn validate_non_negative(val: impl IsNegative + Display) -> Result<()> {
100            if val.is_negative() {
101                bail!(
102                    "frame bound offset should be non-negative, but {} is given",
103                    val
104                );
105            }
106            Ok(())
107        }
108
109        FrameBound::validate_bounds(&self.start, &self.end, |offset| {
110            match offset.as_scalar_ref_impl() {
111                // TODO(rc): use decl macro?
112                ScalarRefImpl::Int16(val) => validate_non_negative(val)?,
113                ScalarRefImpl::Int32(val) => validate_non_negative(val)?,
114                ScalarRefImpl::Int64(val) => validate_non_negative(val)?,
115                ScalarRefImpl::Float32(val) => validate_non_negative(val)?,
116                ScalarRefImpl::Float64(val) => validate_non_negative(val)?,
117                ScalarRefImpl::Decimal(val) => validate_non_negative(val)?,
118                ScalarRefImpl::Interval(val) => {
119                    if !val.is_never_negative() {
120                        bail!(
121                            "for frame bound offset of type `interval`, each field should be non-negative, but {} is given",
122                            val
123                        );
124                    }
125                    if matches!(self.order_data_type, DataType::Timestamptz) {
126                        // for `timestamptz`, we only support offset without `month` and `day` fields
127                        if val.months() != 0 || val.days() != 0 {
128                            bail!(
129                                "for frame order column of type `timestamptz`, offset should not have non-zero `month` and `day`",
130                            );
131                        }
132                    }
133                }
134                _ => unreachable!(
135                    "other order column data types are not supported and should be banned in frontend"
136                ),
137            }
138            Ok(())
139        })
140    }
141}
142
143impl RangeFrameBounds {
144    /// Get the frame start for a given order column value.
145    ///
146    /// ## Examples
147    ///
148    /// For the following frames:
149    ///
150    /// ```sql
151    /// ORDER BY x ASC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
152    /// ORDER BY x DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
153    /// ```
154    ///
155    /// For any CURRENT ROW with any order value, the frame start is always the first-most row, which is
156    /// represented by [`Sentinelled::Smallest`].
157    ///
158    /// For the following frame:
159    ///
160    /// ```sql
161    /// ORDER BY x ASC RANGE BETWEEN 10 PRECEDING AND CURRENT ROW
162    /// ```
163    ///
164    /// For CURRENT ROW with order value `100`, the frame start is the **FIRST** row with order value `90`.
165    ///
166    /// For the following frame:
167    ///
168    /// ```sql
169    /// ORDER BY x DESC RANGE BETWEEN 10 PRECEDING AND CURRENT ROW
170    /// ```
171    ///
172    /// For CURRENT ROW with order value `100`, the frame start is the **FIRST** row with order value `110`.
173    pub fn frame_start_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled<Datum> {
174        self.start.for_calc().bound_of(order_value, self.order_type)
175    }
176
177    /// Get the frame end for a given order column value. It's very similar to `frame_start_of`, just with
178    /// everything on the other direction.
179    pub fn frame_end_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled<Datum> {
180        self.end.for_calc().bound_of(order_value, self.order_type)
181    }
182
183    /// Get the order value of the CURRENT ROW of the first frame that includes the given order value.
184    ///
185    /// ## Examples
186    ///
187    /// For the following frames:
188    ///
189    /// ```sql
190    /// ORDER BY x ASC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
191    /// ORDER BY x DESC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
192    /// ```
193    ///
194    /// For any given order value, the first CURRENT ROW is always the first-most row, which is
195    /// represented by [`Sentinelled::Smallest`].
196    ///
197    /// For the following frame:
198    ///
199    /// ```sql
200    /// ORDER BY x ASC RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING
201    /// ```
202    ///
203    /// For a given order value `100`, the first CURRENT ROW should have order value `90`.
204    ///
205    /// For the following frame:
206    ///
207    /// ```sql
208    /// ORDER BY x DESC RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING
209    /// ```
210    ///
211    /// For a given order value `100`, the first CURRENT ROW should have order value `110`.
212    pub fn first_curr_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled<Datum> {
213        self.end
214            .for_calc()
215            .reverse()
216            .bound_of(order_value, self.order_type)
217    }
218
219    /// Get the order value of the CURRENT ROW of the last frame that includes the given order value.
220    /// It's very similar to `first_curr_of`, just with everything on the other direction.
221    pub fn last_curr_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled<Datum> {
222        self.start
223            .for_calc()
224            .reverse()
225            .bound_of(order_value, self.order_type)
226    }
227}
228
229pub type RangeFrameBound = FrameBound<RangeFrameOffset>;
230
231impl RangeFrameBound {
232    fn from_protobuf(
233        bound: &PbRangeFrameBound,
234        order_data_type: &DataType,
235        offset_data_type: &DataType,
236    ) -> Result<Self> {
237        let bound = match bound.get_type()? {
238            PbBoundType::Unspecified => bail!("unspecified type of `RangeFrameBound`"),
239            PbBoundType::UnboundedPreceding => Self::UnboundedPreceding,
240            PbBoundType::CurrentRow => Self::CurrentRow,
241            PbBoundType::UnboundedFollowing => Self::UnboundedFollowing,
242            bound_type @ (PbBoundType::Preceding | PbBoundType::Following) => {
243                let offset_value = Datum::from_protobuf(bound.get_offset()?, offset_data_type)
244                    .context("offset `Datum` is not decodable")?
245                    .context("offset of `RangeFrameBound` must be non-NULL")?;
246                let mut offset = RangeFrameOffset::new(offset_value);
247                offset.prepare(order_data_type, offset_data_type)?;
248                if bound_type == PbBoundType::Preceding {
249                    Self::Preceding(offset)
250                } else {
251                    Self::Following(offset)
252                }
253            }
254        };
255        Ok(bound)
256    }
257
258    fn to_protobuf(&self) -> PbRangeFrameBound {
259        let (r#type, offset) = match self {
260            Self::UnboundedPreceding => (PbBoundType::UnboundedPreceding, None),
261            Self::Preceding(offset) => (
262                PbBoundType::Preceding,
263                Some(Some(offset.as_scalar_ref_impl()).to_protobuf()),
264            ),
265            Self::CurrentRow => (PbBoundType::CurrentRow, None),
266            Self::Following(offset) => (
267                PbBoundType::Following,
268                Some(Some(offset.as_scalar_ref_impl()).to_protobuf()),
269            ),
270            Self::UnboundedFollowing => (PbBoundType::UnboundedFollowing, None),
271        };
272        PbRangeFrameBound {
273            r#type: r#type as _,
274            offset,
275        }
276    }
277}
278
279impl RangeFrameBound {
280    fn for_display(&self) -> FrameBound<String> {
281        match self {
282            UnboundedPreceding => UnboundedPreceding,
283            Preceding(offset) => Preceding(offset.as_scalar_ref_impl().to_text()),
284            CurrentRow => CurrentRow,
285            Following(offset) => Following(offset.as_scalar_ref_impl().to_text()),
286            UnboundedFollowing => UnboundedFollowing,
287        }
288    }
289
290    fn for_calc(&self) -> FrameBound<RangeFrameOffsetRef<'_>> {
291        match self {
292            UnboundedPreceding => UnboundedPreceding,
293            Preceding(offset) => Preceding(RangeFrameOffsetRef {
294                add_expr: offset.add_expr.as_ref().unwrap().as_ref(),
295                sub_expr: offset.sub_expr.as_ref().unwrap().as_ref(),
296            }),
297            CurrentRow => CurrentRow,
298            Following(offset) => Following(RangeFrameOffsetRef {
299                add_expr: offset.add_expr.as_ref().unwrap().as_ref(),
300                sub_expr: offset.sub_expr.as_ref().unwrap().as_ref(),
301            }),
302            UnboundedFollowing => UnboundedFollowing,
303        }
304    }
305}
306
307/// The wrapper type for [`ScalarImpl`] range frame offset, containing
308/// two expressions to help adding and subtracting the offset.
309#[derive(Debug, Clone, Educe)]
310#[educe(PartialEq, Eq, Hash)]
311pub struct RangeFrameOffset {
312    /// The original offset value.
313    offset: ScalarImpl,
314    /// Built expression for `$0 + offset`.
315    #[educe(PartialEq(ignore), Hash(ignore))]
316    add_expr: Option<Arc<BoxedExpression>>,
317    /// Built expression for `$0 - offset`.
318    #[educe(PartialEq(ignore), Hash(ignore))]
319    sub_expr: Option<Arc<BoxedExpression>>,
320}
321
322impl RangeFrameOffset {
323    pub fn new(offset: ScalarImpl) -> Self {
324        Self {
325            offset,
326            add_expr: None,
327            sub_expr: None,
328        }
329    }
330
331    fn prepare(&mut self, order_data_type: &DataType, offset_data_type: &DataType) -> Result<()> {
332        use risingwave_pb::expr::expr_node::PbType as PbExprType;
333
334        let input_expr = InputRefExpression::new(order_data_type.clone(), 0);
335        let offset_expr =
336            LiteralExpression::new(offset_data_type.clone(), Some(self.offset.clone()));
337        self.add_expr = Some(Arc::new(build_func(
338            PbExprType::Add,
339            order_data_type.clone(),
340            vec![input_expr.clone().boxed(), offset_expr.clone().boxed()],
341        )?));
342        self.sub_expr = Some(Arc::new(build_func(
343            PbExprType::Subtract,
344            order_data_type.clone(),
345            vec![input_expr.boxed(), offset_expr.boxed()],
346        )?));
347        Ok(())
348    }
349
350    pub fn new_for_test(
351        offset: ScalarImpl,
352        order_data_type: &DataType,
353        offset_data_type: &DataType,
354    ) -> Self {
355        let mut offset = Self::new(offset);
356        offset.prepare(order_data_type, offset_data_type).unwrap();
357        offset
358    }
359}
360
361impl Deref for RangeFrameOffset {
362    type Target = ScalarImpl;
363
364    fn deref(&self) -> &Self::Target {
365        &self.offset
366    }
367}
368
369#[derive(Debug, Educe)]
370#[educe(Clone, Copy)]
371struct RangeFrameOffsetRef<'a> {
372    /// Built expression for `$0 + offset`.
373    add_expr: &'a dyn Expression,
374    /// Built expression for `$0 - offset`.
375    sub_expr: &'a dyn Expression,
376}
377
378impl FrameBound<RangeFrameOffsetRef<'_>> {
379    fn bound_of(self, order_value: impl ToOwnedDatum, order_type: OrderType) -> Sentinelled<Datum> {
380        let expr = match (self, order_type.direction()) {
381            (UnboundedPreceding, _) => return Sentinelled::Smallest,
382            (UnboundedFollowing, _) => return Sentinelled::Largest,
383            (CurrentRow, _) => return Sentinelled::Normal(order_value.to_owned_datum()),
384            (Preceding(offset), Direction::Ascending)
385            | (Following(offset), Direction::Descending) => {
386                // should SUBTRACT the offset
387                offset.sub_expr
388            }
389            (Following(offset), Direction::Ascending)
390            | (Preceding(offset), Direction::Descending) => {
391                // should ADD the offset
392                offset.add_expr
393            }
394        };
395        let row = OwnedRow::new(vec![order_value.to_owned_datum()]);
396        Sentinelled::Normal(
397            expr.eval_row(&row)
398                .now_or_never()
399                .expect("frame bound calculation should finish immediately")
400                .expect("just simple calculation, should succeed"), // TODO(rc): handle overflow
401        )
402    }
403}