risingwave_expr_impl/scalar/
tumble.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use num_traits::Zero;
16use risingwave_common::types::{Date, Interval, Timestamp, Timestamptz};
17use risingwave_expr::{ExprError, Result, function};
18
19#[inline(always)]
20fn interval_to_micro_second(t: Interval) -> Result<i64> {
21    let checked_interval_to_micro_second = || {
22        (t.months() as i64)
23            .checked_mul(Interval::USECS_PER_MONTH)?
24            .checked_add(
25                (t.days() as i64)
26                    .checked_mul(Interval::USECS_PER_DAY)?
27                    .checked_add(t.usecs())?,
28            )
29    };
30
31    checked_interval_to_micro_second().ok_or(ExprError::NumericOutOfRange)
32}
33
34#[function("tumble_start(date, interval) -> timestamp")]
35pub fn tumble_start_date(timestamp: Date, window_size: Interval) -> Result<Timestamp> {
36    tumble_start_date_time(timestamp.into(), window_size)
37}
38
39#[function("tumble_start(timestamp, interval) -> timestamp")]
40pub fn tumble_start_date_time(timestamp: Timestamp, window_size: Interval) -> Result<Timestamp> {
41    let timestamp_micro_second = timestamp.0.and_utc().timestamp_micros();
42    let window_start_micro_second = get_window_start(timestamp_micro_second, window_size)?;
43    Ok(Timestamp::from_timestamp_uncheck(
44        window_start_micro_second / 1_000_000,
45        (window_start_micro_second % 1_000_000 * 1000) as u32,
46    ))
47}
48
49#[function("tumble_start(timestamptz, interval) -> timestamptz")]
50pub fn tumble_start_timestamptz(tz: Timestamptz, window_size: Interval) -> Result<Timestamptz> {
51    get_window_start(tz.timestamp_micros(), window_size).map(Timestamptz::from_micros)
52}
53
54/// The common part of PostgreSQL function `timestamp_bin` and `timestamptz_bin`.
55#[inline(always)]
56fn get_window_start(timestamp_micro_second: i64, window_size: Interval) -> Result<i64> {
57    get_window_start_with_offset(timestamp_micro_second, window_size, Interval::zero())
58}
59
60#[function("tumble_start(date, interval, interval) -> timestamp")]
61pub fn tumble_start_offset_date(
62    timestamp_date: Date,
63    window_size: Interval,
64    offset: Interval,
65) -> Result<Timestamp> {
66    tumble_start_offset_date_time(timestamp_date.into(), window_size, offset)
67}
68
69#[function("tumble_start(timestamp, interval, interval) -> timestamp")]
70pub fn tumble_start_offset_date_time(
71    time: Timestamp,
72    window_size: Interval,
73    offset: Interval,
74) -> Result<Timestamp> {
75    let timestamp_micro_second = time.0.and_utc().timestamp_micros();
76    let window_start_micro_second =
77        get_window_start_with_offset(timestamp_micro_second, window_size, offset)?;
78
79    Ok(Timestamp::from_timestamp_uncheck(
80        window_start_micro_second / 1_000_000,
81        (window_start_micro_second % 1_000_000 * 1000) as u32,
82    ))
83}
84
85#[inline(always)]
86fn get_window_start_with_offset(
87    timestamp_micro_second: i64,
88    window_size: Interval,
89    offset: Interval,
90) -> Result<i64> {
91    let window_size_micro_second = interval_to_micro_second(window_size)?;
92    let offset_micro_second = interval_to_micro_second(offset)?;
93
94    // Inspired by https://issues.apache.org/jira/browse/FLINK-26334
95    let remainder = timestamp_micro_second
96        .checked_sub(offset_micro_second)
97        .ok_or(ExprError::NumericOutOfRange)?
98        .checked_rem(window_size_micro_second)
99        .ok_or(ExprError::DivisionByZero)?;
100    if remainder < 0 {
101        timestamp_micro_second
102            .checked_sub(remainder + window_size_micro_second)
103            .ok_or(ExprError::NumericOutOfRange)
104    } else {
105        timestamp_micro_second
106            .checked_sub(remainder)
107            .ok_or(ExprError::NumericOutOfRange)
108    }
109}
110
111#[function("tumble_start(timestamptz, interval, interval) -> timestamptz")]
112pub fn tumble_start_offset_timestamptz(
113    tz: Timestamptz,
114    window_size: Interval,
115    offset: Interval,
116) -> Result<Timestamptz> {
117    get_window_start_with_offset(tz.timestamp_micros(), window_size, offset)
118        .map(Timestamptz::from_micros)
119}
120
121#[cfg(test)]
122mod tests {
123    use chrono::{Datelike, Timelike};
124    use risingwave_common::types::test_utils::IntervalTestExt;
125    use risingwave_common::types::{Date, Interval};
126
127    use super::tumble_start_offset_date_time;
128    use crate::scalar::tumble::{
129        get_window_start, interval_to_micro_second, tumble_start_date_time,
130    };
131
132    #[test]
133    fn test_tumble_start_date_time() {
134        let dt = Date::from_ymd_uncheck(2022, 2, 22).and_hms_uncheck(22, 22, 22);
135        let interval = Interval::from_minutes(30);
136        let w = tumble_start_date_time(dt, interval).unwrap().0;
137        assert_eq!(w.year(), 2022);
138        assert_eq!(w.month(), 2);
139        assert_eq!(w.day(), 22);
140        assert_eq!(w.hour(), 22);
141        assert_eq!(w.minute(), 0);
142        assert_eq!(w.second(), 0);
143    }
144
145    #[test]
146    fn test_tumble_start_offset_date_time() {
147        let dt = Date::from_ymd_uncheck(2022, 2, 22).and_hms_uncheck(22, 22, 22);
148        let window_size = 30;
149        for offset in 0..window_size {
150            for coefficient in 0..5 {
151                let w = tumble_start_date_time(dt, Interval::from_minutes(window_size))
152                    .unwrap()
153                    .0;
154                println!("{}", w);
155                let w = tumble_start_offset_date_time(
156                    dt,
157                    Interval::from_minutes(window_size),
158                    Interval::from_minutes(coefficient * window_size + offset),
159                )
160                .unwrap()
161                .0;
162                assert_eq!(w.year(), 2022);
163                assert_eq!(w.month(), 2);
164                assert_eq!(w.day(), 22);
165                if offset > 22 {
166                    assert_eq!(w.hour(), 21);
167                    assert_eq!(w.minute(), 30 + offset as u32);
168                } else {
169                    assert_eq!(w.hour(), 22);
170                    assert_eq!(w.minute(), offset as u32);
171                }
172
173                assert_eq!(w.second(), 0);
174            }
175        }
176    }
177
178    #[test]
179    fn test_remainder_necessary() {
180        let mut wrong_cnt = 0;
181        for i in -30..30 {
182            let timestamp_micro_second = Interval::from_minutes(i).usecs();
183            let window_size = Interval::from_minutes(5);
184            let window_start = get_window_start(timestamp_micro_second, window_size).unwrap();
185
186            let window_size_micro_second = interval_to_micro_second(window_size).unwrap();
187            let default_window_start = timestamp_micro_second
188                - (timestamp_micro_second + window_size_micro_second) % window_size_micro_second;
189
190            if timestamp_micro_second < default_window_start {
191                // which is wrong
192                wrong_cnt += 1;
193            }
194
195            assert!(timestamp_micro_second >= window_start)
196        }
197        assert_ne!(wrong_cnt, 0);
198    }
199
200    #[test]
201    fn test_window_start_overflow() {
202        get_window_start(i64::MIN, Interval::from_millis(20)).unwrap_err();
203        interval_to_micro_second(Interval::from_month_day_usec(1, 1, i64::MAX)).unwrap_err();
204    }
205}