risingwave_expr/window_function/
range.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Display;
use std::ops::Deref;
use std::sync::Arc;

use anyhow::Context;
use educe::Educe;
use futures_util::FutureExt;
use risingwave_common::bail;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{
    DataType, Datum, IsNegative, ScalarImpl, ScalarRefImpl, Sentinelled, ToOwnedDatum, ToText,
};
use risingwave_common::util::sort_util::{Direction, OrderType};
use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
use risingwave_pb::expr::window_frame::{PbBoundType, PbRangeFrameBound, PbRangeFrameBounds};

use super::FrameBound::{
    self, CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding,
};
use super::FrameBoundsImpl;
use crate::expr::{
    build_func, BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression,
    LiteralExpression,
};
use crate::Result;

#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct RangeFrameBounds {
    pub order_data_type: DataType,
    pub order_type: OrderType,
    pub offset_data_type: DataType,
    pub start: RangeFrameBound,
    pub end: RangeFrameBound,
}

impl RangeFrameBounds {
    pub(super) fn from_protobuf(bounds: &PbRangeFrameBounds) -> Result<Self> {
        let order_data_type = DataType::from(bounds.get_order_data_type()?);
        let order_type = OrderType::from_protobuf(bounds.get_order_type()?);
        let offset_data_type = DataType::from(bounds.get_offset_data_type()?);
        let start = FrameBound::<RangeFrameOffset>::from_protobuf(
            bounds.get_start()?,
            &order_data_type,
            &offset_data_type,
        )?;
        let end = FrameBound::<RangeFrameOffset>::from_protobuf(
            bounds.get_end()?,
            &order_data_type,
            &offset_data_type,
        )?;
        Ok(Self {
            order_data_type,
            order_type,
            offset_data_type,
            start,
            end,
        })
    }

    pub(super) fn to_protobuf(&self) -> PbRangeFrameBounds {
        PbRangeFrameBounds {
            start: Some(self.start.to_protobuf()),
            end: Some(self.end.to_protobuf()),
            order_data_type: Some(self.order_data_type.to_protobuf()),
            order_type: Some(self.order_type.to_protobuf()),
            offset_data_type: Some(self.offset_data_type.to_protobuf()),
        }
    }
}

impl Display for RangeFrameBounds {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "RANGE BETWEEN {} AND {}",
            self.start.for_display(),
            self.end.for_display()
        )?;
        Ok(())
    }
}

impl FrameBoundsImpl for RangeFrameBounds {
    fn validate(&self) -> Result<()> {
        fn validate_non_negative(val: impl IsNegative + Display) -> Result<()> {
            if val.is_negative() {
                bail!(
                    "frame bound offset should be non-negative, but {} is given",
                    val
                );
            }
            Ok(())
        }

        FrameBound::validate_bounds(&self.start, &self.end, |offset| {
            match offset.as_scalar_ref_impl() {
                // TODO(rc): use decl macro?
                ScalarRefImpl::Int16(val) => validate_non_negative(val)?,
                ScalarRefImpl::Int32(val) => validate_non_negative(val)?,
                ScalarRefImpl::Int64(val) => validate_non_negative(val)?,
                ScalarRefImpl::Float32(val) => validate_non_negative(val)?,
                ScalarRefImpl::Float64(val) => validate_non_negative(val)?,
                ScalarRefImpl::Decimal(val) => validate_non_negative(val)?,
                ScalarRefImpl::Interval(val) => {
                    if !val.is_never_negative() {
                        bail!(
                            "for frame bound offset of type `interval`, each field should be non-negative, but {} is given",
                            val
                        );
                    }
                    if matches!(self.order_data_type, DataType::Timestamptz) {
                        // for `timestamptz`, we only support offset without `month` and `day` fields
                        if val.months() != 0 || val.days() != 0 {
                            bail!(
                                "for frame order column of type `timestamptz`, offset should not have non-zero `month` and `day`",
                            );
                        }
                    }
                },
                _ => unreachable!("other order column data types are not supported and should be banned in frontend"),
            }
            Ok(())
        })
    }
}

impl RangeFrameBounds {
    /// Get the frame start for a given order column value.
    ///
    /// ## Examples
    ///
    /// For the following frames:
    ///
    /// ```sql
    /// ORDER BY x ASC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    /// ORDER BY x DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    /// ```
    ///
    /// For any CURRENT ROW with any order value, the frame start is always the first-most row, which is
    /// represented by [`Sentinelled::Smallest`].
    ///
    /// For the following frame:
    ///
    /// ```sql
    /// ORDER BY x ASC RANGE BETWEEN 10 PRECEDING AND CURRENT ROW
    /// ```
    ///
    /// For CURRENT ROW with order value `100`, the frame start is the **FIRST** row with order value `90`.
    ///
    /// For the following frame:
    ///
    /// ```sql
    /// ORDER BY x DESC RANGE BETWEEN 10 PRECEDING AND CURRENT ROW
    /// ```
    ///
    /// For CURRENT ROW with order value `100`, the frame start is the **FIRST** row with order value `110`.
    pub fn frame_start_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled<Datum> {
        self.start.for_calc().bound_of(order_value, self.order_type)
    }

    /// Get the frame end for a given order column value. It's very similar to `frame_start_of`, just with
    /// everything on the other direction.
    pub fn frame_end_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled<Datum> {
        self.end.for_calc().bound_of(order_value, self.order_type)
    }

    /// Get the order value of the CURRENT ROW of the first frame that includes the given order value.
    ///
    /// ## Examples
    ///
    /// For the following frames:
    ///
    /// ```sql
    /// ORDER BY x ASC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
    /// ORDER BY x DESC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
    /// ```
    ///
    /// For any given order value, the first CURRENT ROW is always the first-most row, which is
    /// represented by [`Sentinelled::Smallest`].
    ///
    /// For the following frame:
    ///
    /// ```sql
    /// ORDER BY x ASC RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING
    /// ```
    ///
    /// For a given order value `100`, the first CURRENT ROW should have order value `90`.
    ///
    /// For the following frame:
    ///
    /// ```sql
    /// ORDER BY x DESC RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING
    /// ```
    ///
    /// For a given order value `100`, the first CURRENT ROW should have order value `110`.
    pub fn first_curr_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled<Datum> {
        self.end
            .for_calc()
            .reverse()
            .bound_of(order_value, self.order_type)
    }

    /// Get the order value of the CURRENT ROW of the last frame that includes the given order value.
    /// It's very similar to `first_curr_of`, just with everything on the other direction.
    pub fn last_curr_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled<Datum> {
        self.start
            .for_calc()
            .reverse()
            .bound_of(order_value, self.order_type)
    }
}

pub type RangeFrameBound = FrameBound<RangeFrameOffset>;

impl RangeFrameBound {
    fn from_protobuf(
        bound: &PbRangeFrameBound,
        order_data_type: &DataType,
        offset_data_type: &DataType,
    ) -> Result<Self> {
        let bound = match bound.get_type()? {
            PbBoundType::Unspecified => bail!("unspecified type of `RangeFrameBound`"),
            PbBoundType::UnboundedPreceding => Self::UnboundedPreceding,
            PbBoundType::CurrentRow => Self::CurrentRow,
            PbBoundType::UnboundedFollowing => Self::UnboundedFollowing,
            bound_type @ (PbBoundType::Preceding | PbBoundType::Following) => {
                let offset_value = Datum::from_protobuf(bound.get_offset()?, offset_data_type)
                    .context("offset `Datum` is not decodable")?
                    .context("offset of `RangeFrameBound` must be non-NULL")?;
                let mut offset = RangeFrameOffset::new(offset_value);
                offset.prepare(order_data_type, offset_data_type)?;
                if bound_type == PbBoundType::Preceding {
                    Self::Preceding(offset)
                } else {
                    Self::Following(offset)
                }
            }
        };
        Ok(bound)
    }

    fn to_protobuf(&self) -> PbRangeFrameBound {
        let (r#type, offset) = match self {
            Self::UnboundedPreceding => (PbBoundType::UnboundedPreceding, None),
            Self::Preceding(offset) => (
                PbBoundType::Preceding,
                Some(Some(offset.as_scalar_ref_impl()).to_protobuf()),
            ),
            Self::CurrentRow => (PbBoundType::CurrentRow, None),
            Self::Following(offset) => (
                PbBoundType::Following,
                Some(Some(offset.as_scalar_ref_impl()).to_protobuf()),
            ),
            Self::UnboundedFollowing => (PbBoundType::UnboundedFollowing, None),
        };
        PbRangeFrameBound {
            r#type: r#type as _,
            offset,
        }
    }
}

impl RangeFrameBound {
    fn for_display(&self) -> FrameBound<String> {
        match self {
            UnboundedPreceding => UnboundedPreceding,
            Preceding(offset) => Preceding(offset.as_scalar_ref_impl().to_text()),
            CurrentRow => CurrentRow,
            Following(offset) => Following(offset.as_scalar_ref_impl().to_text()),
            UnboundedFollowing => UnboundedFollowing,
        }
    }

    fn for_calc(&self) -> FrameBound<RangeFrameOffsetRef<'_>> {
        match self {
            UnboundedPreceding => UnboundedPreceding,
            Preceding(offset) => Preceding(RangeFrameOffsetRef {
                add_expr: offset.add_expr.as_ref().unwrap().as_ref(),
                sub_expr: offset.sub_expr.as_ref().unwrap().as_ref(),
            }),
            CurrentRow => CurrentRow,
            Following(offset) => Following(RangeFrameOffsetRef {
                add_expr: offset.add_expr.as_ref().unwrap().as_ref(),
                sub_expr: offset.sub_expr.as_ref().unwrap().as_ref(),
            }),
            UnboundedFollowing => UnboundedFollowing,
        }
    }
}

/// The wrapper type for [`ScalarImpl`] range frame offset, containing
/// two expressions to help adding and subtracting the offset.
#[derive(Debug, Clone, Educe)]
#[educe(PartialEq, Eq, Hash)]
pub struct RangeFrameOffset {
    /// The original offset value.
    offset: ScalarImpl,
    /// Built expression for `$0 + offset`.
    #[educe(PartialEq(ignore), Hash(ignore))]
    add_expr: Option<Arc<BoxedExpression>>,
    /// Built expression for `$0 - offset`.
    #[educe(PartialEq(ignore), Hash(ignore))]
    sub_expr: Option<Arc<BoxedExpression>>,
}

impl RangeFrameOffset {
    pub fn new(offset: ScalarImpl) -> Self {
        Self {
            offset,
            add_expr: None,
            sub_expr: None,
        }
    }

    fn prepare(&mut self, order_data_type: &DataType, offset_data_type: &DataType) -> Result<()> {
        use risingwave_pb::expr::expr_node::PbType as PbExprType;

        let input_expr = InputRefExpression::new(order_data_type.clone(), 0);
        let offset_expr =
            LiteralExpression::new(offset_data_type.clone(), Some(self.offset.clone()));
        self.add_expr = Some(Arc::new(build_func(
            PbExprType::Add,
            order_data_type.clone(),
            vec![input_expr.clone().boxed(), offset_expr.clone().boxed()],
        )?));
        self.sub_expr = Some(Arc::new(build_func(
            PbExprType::Subtract,
            order_data_type.clone(),
            vec![input_expr.boxed(), offset_expr.boxed()],
        )?));
        Ok(())
    }

    pub fn new_for_test(
        offset: ScalarImpl,
        order_data_type: &DataType,
        offset_data_type: &DataType,
    ) -> Self {
        let mut offset = Self::new(offset);
        offset.prepare(order_data_type, offset_data_type).unwrap();
        offset
    }
}

impl Deref for RangeFrameOffset {
    type Target = ScalarImpl;

    fn deref(&self) -> &Self::Target {
        &self.offset
    }
}

#[derive(Debug, Educe)]
#[educe(Clone, Copy)]
struct RangeFrameOffsetRef<'a> {
    /// Built expression for `$0 + offset`.
    add_expr: &'a dyn Expression,
    /// Built expression for `$0 - offset`.
    sub_expr: &'a dyn Expression,
}

impl FrameBound<RangeFrameOffsetRef<'_>> {
    fn bound_of(self, order_value: impl ToOwnedDatum, order_type: OrderType) -> Sentinelled<Datum> {
        let expr = match (self, order_type.direction()) {
            (UnboundedPreceding, _) => return Sentinelled::Smallest,
            (UnboundedFollowing, _) => return Sentinelled::Largest,
            (CurrentRow, _) => return Sentinelled::Normal(order_value.to_owned_datum()),
            (Preceding(offset), Direction::Ascending)
            | (Following(offset), Direction::Descending) => {
                // should SUBTRACT the offset
                offset.sub_expr
            }
            (Following(offset), Direction::Ascending)
            | (Preceding(offset), Direction::Descending) => {
                // should ADD the offset
                offset.add_expr
            }
        };
        let row = OwnedRow::new(vec![order_value.to_owned_datum()]);
        Sentinelled::Normal(
            expr.eval_row(&row)
                .now_or_never()
                .expect("frame bound calculation should finish immediately")
                .expect("just simple calculation, should succeed"), // TODO(rc): handle overflow
        )
    }
}