risingwave_expr_impl/scalar/
tumble.rs1use num_traits::Zero;
16use risingwave_common::types::{Date, Interval, Timestamp, Timestamptz};
17use risingwave_expr::{ExprError, Result, function};
18
19#[inline(always)]
20fn interval_to_micro_second(t: Interval) -> Result<i64> {
21 let checked_interval_to_micro_second = || {
22 (t.months() as i64)
23 .checked_mul(Interval::USECS_PER_MONTH)?
24 .checked_add(
25 (t.days() as i64)
26 .checked_mul(Interval::USECS_PER_DAY)?
27 .checked_add(t.usecs())?,
28 )
29 };
30
31 checked_interval_to_micro_second().ok_or(ExprError::NumericOutOfRange)
32}
33
34#[function("tumble_start(date, interval) -> timestamp")]
35pub fn tumble_start_date(timestamp: Date, window_size: Interval) -> Result<Timestamp> {
36 tumble_start_date_time(timestamp.into(), window_size)
37}
38
39#[function("tumble_start(timestamp, interval) -> timestamp")]
40pub fn tumble_start_date_time(timestamp: Timestamp, window_size: Interval) -> Result<Timestamp> {
41 let timestamp_micro_second = timestamp.0.and_utc().timestamp_micros();
42 let window_start_micro_second = get_window_start(timestamp_micro_second, window_size)?;
43 Ok(Timestamp::from_timestamp_uncheck(
44 window_start_micro_second / 1_000_000,
45 (window_start_micro_second % 1_000_000 * 1000) as u32,
46 ))
47}
48
49#[function("tumble_start(timestamptz, interval) -> timestamptz")]
50pub fn tumble_start_timestamptz(tz: Timestamptz, window_size: Interval) -> Result<Timestamptz> {
51 get_window_start(tz.timestamp_micros(), window_size).map(Timestamptz::from_micros)
52}
53
54#[inline(always)]
56fn get_window_start(timestamp_micro_second: i64, window_size: Interval) -> Result<i64> {
57 get_window_start_with_offset(timestamp_micro_second, window_size, Interval::zero())
58}
59
60#[function("tumble_start(date, interval, interval) -> timestamp")]
61pub fn tumble_start_offset_date(
62 timestamp_date: Date,
63 window_size: Interval,
64 offset: Interval,
65) -> Result<Timestamp> {
66 tumble_start_offset_date_time(timestamp_date.into(), window_size, offset)
67}
68
69#[function("tumble_start(timestamp, interval, interval) -> timestamp")]
70pub fn tumble_start_offset_date_time(
71 time: Timestamp,
72 window_size: Interval,
73 offset: Interval,
74) -> Result<Timestamp> {
75 let timestamp_micro_second = time.0.and_utc().timestamp_micros();
76 let window_start_micro_second =
77 get_window_start_with_offset(timestamp_micro_second, window_size, offset)?;
78
79 Ok(Timestamp::from_timestamp_uncheck(
80 window_start_micro_second / 1_000_000,
81 (window_start_micro_second % 1_000_000 * 1000) as u32,
82 ))
83}
84
85#[inline(always)]
86fn get_window_start_with_offset(
87 timestamp_micro_second: i64,
88 window_size: Interval,
89 offset: Interval,
90) -> Result<i64> {
91 let window_size_micro_second = interval_to_micro_second(window_size)?;
92 let offset_micro_second = interval_to_micro_second(offset)?;
93
94 let remainder = timestamp_micro_second
96 .checked_sub(offset_micro_second)
97 .ok_or(ExprError::NumericOutOfRange)?
98 .checked_rem(window_size_micro_second)
99 .ok_or(ExprError::DivisionByZero)?;
100 if remainder < 0 {
101 timestamp_micro_second
102 .checked_sub(remainder + window_size_micro_second)
103 .ok_or(ExprError::NumericOutOfRange)
104 } else {
105 timestamp_micro_second
106 .checked_sub(remainder)
107 .ok_or(ExprError::NumericOutOfRange)
108 }
109}
110
111#[function("tumble_start(timestamptz, interval, interval) -> timestamptz")]
112pub fn tumble_start_offset_timestamptz(
113 tz: Timestamptz,
114 window_size: Interval,
115 offset: Interval,
116) -> Result<Timestamptz> {
117 get_window_start_with_offset(tz.timestamp_micros(), window_size, offset)
118 .map(Timestamptz::from_micros)
119}
120
121#[cfg(test)]
122mod tests {
123 use chrono::{Datelike, Timelike};
124 use risingwave_common::types::test_utils::IntervalTestExt;
125 use risingwave_common::types::{Date, Interval};
126
127 use super::tumble_start_offset_date_time;
128 use crate::scalar::tumble::{
129 get_window_start, interval_to_micro_second, tumble_start_date_time,
130 };
131
132 #[test]
133 fn test_tumble_start_date_time() {
134 let dt = Date::from_ymd_uncheck(2022, 2, 22).and_hms_uncheck(22, 22, 22);
135 let interval = Interval::from_minutes(30);
136 let w = tumble_start_date_time(dt, interval).unwrap().0;
137 assert_eq!(w.year(), 2022);
138 assert_eq!(w.month(), 2);
139 assert_eq!(w.day(), 22);
140 assert_eq!(w.hour(), 22);
141 assert_eq!(w.minute(), 0);
142 assert_eq!(w.second(), 0);
143 }
144
145 #[test]
146 fn test_tumble_start_offset_date_time() {
147 let dt = Date::from_ymd_uncheck(2022, 2, 22).and_hms_uncheck(22, 22, 22);
148 let window_size = 30;
149 for offset in 0..window_size {
150 for coefficient in 0..5 {
151 let w = tumble_start_date_time(dt, Interval::from_minutes(window_size))
152 .unwrap()
153 .0;
154 println!("{}", w);
155 let w = tumble_start_offset_date_time(
156 dt,
157 Interval::from_minutes(window_size),
158 Interval::from_minutes(coefficient * window_size + offset),
159 )
160 .unwrap()
161 .0;
162 assert_eq!(w.year(), 2022);
163 assert_eq!(w.month(), 2);
164 assert_eq!(w.day(), 22);
165 if offset > 22 {
166 assert_eq!(w.hour(), 21);
167 assert_eq!(w.minute(), 30 + offset as u32);
168 } else {
169 assert_eq!(w.hour(), 22);
170 assert_eq!(w.minute(), offset as u32);
171 }
172
173 assert_eq!(w.second(), 0);
174 }
175 }
176 }
177
178 #[test]
179 fn test_remainder_necessary() {
180 let mut wrong_cnt = 0;
181 for i in -30..30 {
182 let timestamp_micro_second = Interval::from_minutes(i).usecs();
183 let window_size = Interval::from_minutes(5);
184 let window_start = get_window_start(timestamp_micro_second, window_size).unwrap();
185
186 let window_size_micro_second = interval_to_micro_second(window_size).unwrap();
187 let default_window_start = timestamp_micro_second
188 - (timestamp_micro_second + window_size_micro_second) % window_size_micro_second;
189
190 if timestamp_micro_second < default_window_start {
191 wrong_cnt += 1;
193 }
194
195 assert!(timestamp_micro_second >= window_start)
196 }
197 assert_ne!(wrong_cnt, 0);
198 }
199
200 #[test]
201 fn test_window_start_overflow() {
202 get_window_start(i64::MIN, Interval::from_millis(20)).unwrap_err();
203 interval_to_micro_second(Interval::from_month_day_usec(1, 1, i64::MAX)).unwrap_err();
204 }
205}