1use std::fmt::Display;
16use std::ops::Deref;
17use std::sync::Arc;
18
19use anyhow::Context;
20use educe::Educe;
21use futures_util::FutureExt;
22use risingwave_common::bail;
23use risingwave_common::row::OwnedRow;
24use risingwave_common::types::{
25 DataType, Datum, IsNegative, ScalarImpl, ScalarRefImpl, Sentinelled, ToOwnedDatum, ToText,
26};
27use risingwave_common::util::sort_util::{Direction, OrderType};
28use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
29use risingwave_pb::expr::window_frame::{PbBoundType, PbRangeFrameBound, PbRangeFrameBounds};
30
31use super::FrameBound::{
32 self, CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding,
33};
34use super::FrameBoundsImpl;
35use crate::Result;
36use crate::expr::{
37 BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, LiteralExpression,
38 build_func,
39};
40
41#[derive(Debug, Clone, Eq, PartialEq, Hash)]
42pub struct RangeFrameBounds {
43 pub order_data_type: DataType,
44 pub order_type: OrderType,
45 pub offset_data_type: DataType,
46 pub start: RangeFrameBound,
47 pub end: RangeFrameBound,
48}
49
50impl RangeFrameBounds {
51 pub(super) fn from_protobuf(bounds: &PbRangeFrameBounds) -> Result<Self> {
52 let order_data_type = DataType::from(bounds.get_order_data_type()?);
53 let order_type = OrderType::from_protobuf(bounds.get_order_type()?);
54 let offset_data_type = DataType::from(bounds.get_offset_data_type()?);
55 let start = FrameBound::<RangeFrameOffset>::from_protobuf(
56 bounds.get_start()?,
57 &order_data_type,
58 &offset_data_type,
59 )?;
60 let end = FrameBound::<RangeFrameOffset>::from_protobuf(
61 bounds.get_end()?,
62 &order_data_type,
63 &offset_data_type,
64 )?;
65 Ok(Self {
66 order_data_type,
67 order_type,
68 offset_data_type,
69 start,
70 end,
71 })
72 }
73
74 pub(super) fn to_protobuf(&self) -> PbRangeFrameBounds {
75 PbRangeFrameBounds {
76 start: Some(self.start.to_protobuf()),
77 end: Some(self.end.to_protobuf()),
78 order_data_type: Some(self.order_data_type.to_protobuf()),
79 order_type: Some(self.order_type.to_protobuf()),
80 offset_data_type: Some(self.offset_data_type.to_protobuf()),
81 }
82 }
83}
84
85impl Display for RangeFrameBounds {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 write!(
88 f,
89 "RANGE BETWEEN {} AND {}",
90 self.start.for_display(),
91 self.end.for_display()
92 )?;
93 Ok(())
94 }
95}
96
97impl FrameBoundsImpl for RangeFrameBounds {
98 fn validate(&self) -> Result<()> {
99 fn validate_non_negative(val: impl IsNegative + Display) -> Result<()> {
100 if val.is_negative() {
101 bail!(
102 "frame bound offset should be non-negative, but {} is given",
103 val
104 );
105 }
106 Ok(())
107 }
108
109 FrameBound::validate_bounds(&self.start, &self.end, |offset| {
110 match offset.as_scalar_ref_impl() {
111 ScalarRefImpl::Int16(val) => validate_non_negative(val)?,
113 ScalarRefImpl::Int32(val) => validate_non_negative(val)?,
114 ScalarRefImpl::Int64(val) => validate_non_negative(val)?,
115 ScalarRefImpl::Float32(val) => validate_non_negative(val)?,
116 ScalarRefImpl::Float64(val) => validate_non_negative(val)?,
117 ScalarRefImpl::Decimal(val) => validate_non_negative(val)?,
118 ScalarRefImpl::Interval(val) => {
119 if !val.is_never_negative() {
120 bail!(
121 "for frame bound offset of type `interval`, each field should be non-negative, but {} is given",
122 val
123 );
124 }
125 if matches!(self.order_data_type, DataType::Timestamptz) {
126 if val.months() != 0 || val.days() != 0 {
128 bail!(
129 "for frame order column of type `timestamptz`, offset should not have non-zero `month` and `day`",
130 );
131 }
132 }
133 }
134 _ => unreachable!(
135 "other order column data types are not supported and should be banned in frontend"
136 ),
137 }
138 Ok(())
139 })
140 }
141}
142
143impl RangeFrameBounds {
144 pub fn frame_start_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled<Datum> {
174 self.start.for_calc().bound_of(order_value, self.order_type)
175 }
176
177 pub fn frame_end_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled<Datum> {
180 self.end.for_calc().bound_of(order_value, self.order_type)
181 }
182
183 pub fn first_curr_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled<Datum> {
213 self.end
214 .for_calc()
215 .reverse()
216 .bound_of(order_value, self.order_type)
217 }
218
219 pub fn last_curr_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled<Datum> {
222 self.start
223 .for_calc()
224 .reverse()
225 .bound_of(order_value, self.order_type)
226 }
227}
228
229pub type RangeFrameBound = FrameBound<RangeFrameOffset>;
230
231impl RangeFrameBound {
232 fn from_protobuf(
233 bound: &PbRangeFrameBound,
234 order_data_type: &DataType,
235 offset_data_type: &DataType,
236 ) -> Result<Self> {
237 let bound = match bound.get_type()? {
238 PbBoundType::Unspecified => bail!("unspecified type of `RangeFrameBound`"),
239 PbBoundType::UnboundedPreceding => Self::UnboundedPreceding,
240 PbBoundType::CurrentRow => Self::CurrentRow,
241 PbBoundType::UnboundedFollowing => Self::UnboundedFollowing,
242 bound_type @ (PbBoundType::Preceding | PbBoundType::Following) => {
243 let offset_value = Datum::from_protobuf(bound.get_offset()?, offset_data_type)
244 .context("offset `Datum` is not decodable")?
245 .context("offset of `RangeFrameBound` must be non-NULL")?;
246 let mut offset = RangeFrameOffset::new(offset_value);
247 offset.prepare(order_data_type, offset_data_type)?;
248 if bound_type == PbBoundType::Preceding {
249 Self::Preceding(offset)
250 } else {
251 Self::Following(offset)
252 }
253 }
254 };
255 Ok(bound)
256 }
257
258 fn to_protobuf(&self) -> PbRangeFrameBound {
259 let (r#type, offset) = match self {
260 Self::UnboundedPreceding => (PbBoundType::UnboundedPreceding, None),
261 Self::Preceding(offset) => (
262 PbBoundType::Preceding,
263 Some(Some(offset.as_scalar_ref_impl()).to_protobuf()),
264 ),
265 Self::CurrentRow => (PbBoundType::CurrentRow, None),
266 Self::Following(offset) => (
267 PbBoundType::Following,
268 Some(Some(offset.as_scalar_ref_impl()).to_protobuf()),
269 ),
270 Self::UnboundedFollowing => (PbBoundType::UnboundedFollowing, None),
271 };
272 PbRangeFrameBound {
273 r#type: r#type as _,
274 offset,
275 }
276 }
277}
278
279impl RangeFrameBound {
280 fn for_display(&self) -> FrameBound<String> {
281 match self {
282 UnboundedPreceding => UnboundedPreceding,
283 Preceding(offset) => Preceding(offset.as_scalar_ref_impl().to_text()),
284 CurrentRow => CurrentRow,
285 Following(offset) => Following(offset.as_scalar_ref_impl().to_text()),
286 UnboundedFollowing => UnboundedFollowing,
287 }
288 }
289
290 fn for_calc(&self) -> FrameBound<RangeFrameOffsetRef<'_>> {
291 match self {
292 UnboundedPreceding => UnboundedPreceding,
293 Preceding(offset) => Preceding(RangeFrameOffsetRef {
294 add_expr: offset.add_expr.as_ref().unwrap().as_ref(),
295 sub_expr: offset.sub_expr.as_ref().unwrap().as_ref(),
296 }),
297 CurrentRow => CurrentRow,
298 Following(offset) => Following(RangeFrameOffsetRef {
299 add_expr: offset.add_expr.as_ref().unwrap().as_ref(),
300 sub_expr: offset.sub_expr.as_ref().unwrap().as_ref(),
301 }),
302 UnboundedFollowing => UnboundedFollowing,
303 }
304 }
305}
306
307#[derive(Debug, Clone, Educe)]
310#[educe(PartialEq, Eq, Hash)]
311pub struct RangeFrameOffset {
312 offset: ScalarImpl,
314 #[educe(PartialEq(ignore), Hash(ignore))]
316 add_expr: Option<Arc<BoxedExpression>>,
317 #[educe(PartialEq(ignore), Hash(ignore))]
319 sub_expr: Option<Arc<BoxedExpression>>,
320}
321
322impl RangeFrameOffset {
323 pub fn new(offset: ScalarImpl) -> Self {
324 Self {
325 offset,
326 add_expr: None,
327 sub_expr: None,
328 }
329 }
330
331 fn prepare(&mut self, order_data_type: &DataType, offset_data_type: &DataType) -> Result<()> {
332 use risingwave_pb::expr::expr_node::PbType as PbExprType;
333
334 let input_expr = InputRefExpression::new(order_data_type.clone(), 0);
335 let offset_expr =
336 LiteralExpression::new(offset_data_type.clone(), Some(self.offset.clone()));
337 self.add_expr = Some(Arc::new(build_func(
338 PbExprType::Add,
339 order_data_type.clone(),
340 vec![input_expr.clone().boxed(), offset_expr.clone().boxed()],
341 )?));
342 self.sub_expr = Some(Arc::new(build_func(
343 PbExprType::Subtract,
344 order_data_type.clone(),
345 vec![input_expr.boxed(), offset_expr.boxed()],
346 )?));
347 Ok(())
348 }
349
350 pub fn new_for_test(
351 offset: ScalarImpl,
352 order_data_type: &DataType,
353 offset_data_type: &DataType,
354 ) -> Self {
355 let mut offset = Self::new(offset);
356 offset.prepare(order_data_type, offset_data_type).unwrap();
357 offset
358 }
359}
360
361impl Deref for RangeFrameOffset {
362 type Target = ScalarImpl;
363
364 fn deref(&self) -> &Self::Target {
365 &self.offset
366 }
367}
368
369#[derive(Debug, Educe)]
370#[educe(Clone, Copy)]
371struct RangeFrameOffsetRef<'a> {
372 add_expr: &'a dyn Expression,
374 sub_expr: &'a dyn Expression,
376}
377
378impl FrameBound<RangeFrameOffsetRef<'_>> {
379 fn bound_of(self, order_value: impl ToOwnedDatum, order_type: OrderType) -> Sentinelled<Datum> {
380 let expr = match (self, order_type.direction()) {
381 (UnboundedPreceding, _) => return Sentinelled::Smallest,
382 (UnboundedFollowing, _) => return Sentinelled::Largest,
383 (CurrentRow, _) => return Sentinelled::Normal(order_value.to_owned_datum()),
384 (Preceding(offset), Direction::Ascending)
385 | (Following(offset), Direction::Descending) => {
386 offset.sub_expr
388 }
389 (Following(offset), Direction::Ascending)
390 | (Preceding(offset), Direction::Descending) => {
391 offset.add_expr
393 }
394 };
395 let row = OwnedRow::new(vec![order_value.to_owned_datum()]);
396 Sentinelled::Normal(
397 expr.eval_row(&row)
398 .now_or_never()
399 .expect("frame bound calculation should finish immediately")
400 .expect("just simple calculation, should succeed"), )
402 }
403}