risingwave_expr_impl/scalar/
timestamptz.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use chrono::LocalResult;
16use chrono_tz::Tz;
17use num_traits::CheckedNeg;
18use risingwave_common::types::{
19    CheckedAdd, F64, Interval, IntoOrdered, Timestamp, Timestamptz, write_date_time_tz,
20};
21use risingwave_expr::{ExprError, Result, function};
22use thiserror_ext::AsReport;
23
24/// Just a wrapper to reuse the `map_err` logic.
25#[inline(always)]
26pub fn time_zone_err(inner_err: String) -> ExprError {
27    ExprError::InvalidParam {
28        name: "time_zone",
29        reason: inner_err.into(),
30    }
31}
32
33#[function("sec_to_timestamptz(float8) -> timestamptz")]
34pub fn f64_sec_to_timestamptz(elem: F64) -> Result<Timestamptz> {
35    // TODO(#4515): handle +/- infinity
36    let micros = (elem.0 * 1e6)
37        .into_ordered()
38        .try_into()
39        .map_err(|_| ExprError::NumericOutOfRange)?;
40    Ok(Timestamptz::from_micros(micros))
41}
42
43#[function("at_time_zone(timestamptz, varchar) -> timestamp")]
44pub fn timestamptz_at_time_zone(input: Timestamptz, time_zone: &str) -> Result<Timestamp> {
45    let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
46    Ok(timestamptz_at_time_zone_internal(input, time_zone))
47}
48
49pub fn timestamptz_at_time_zone_internal(input: Timestamptz, time_zone: Tz) -> Timestamp {
50    let instant_local = input.to_datetime_in_zone(time_zone);
51    let naive = instant_local.naive_local();
52    Timestamp(naive)
53}
54
55#[function("at_time_zone(timestamp, varchar) -> timestamptz")]
56pub fn timestamp_at_time_zone(input: Timestamp, time_zone: &str) -> Result<Timestamptz> {
57    let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
58    timestamp_at_time_zone_internal(input, time_zone)
59}
60
61pub fn timestamp_at_time_zone_internal(input: Timestamp, time_zone: Tz) -> Result<Timestamptz> {
62    // https://www.postgresql.org/docs/current/datetime-invalid-input.html
63    let instant_local = match input.0.and_local_timezone(time_zone) {
64        LocalResult::Single(t) => t,
65        // invalid time during daylight forward, use UTC offset before the transition
66        // we minus 3 hours in naive time first, do the timezone conversion, and add 3 hours back in the UTC timeline.
67        // This assumes jump forwards are less than 3 hours and there is a single change within this 3-hour window.
68        // see <https://github.com/risingwavelabs/risingwave/pull/15670#discussion_r1524211006>
69        LocalResult::None => {
70            (input.0 - chrono::Duration::hours(3))
71                .and_local_timezone(time_zone)
72                .single()
73                .ok_or_else(|| ExprError::InvalidParam {
74                    name: "local timestamp",
75                    reason: format!(
76                        "fail to interpret local timestamp \"{}\" in time zone \"{}\"",
77                        input, time_zone
78                    )
79                    .into(),
80                })?
81                + chrono::Duration::hours(3)
82        }
83        // ambiguous time during daylight backward, use UTC offset after the transition
84        LocalResult::Ambiguous(_, latest) => latest,
85    };
86    let usec = instant_local.timestamp_micros();
87    Ok(Timestamptz::from_micros(usec))
88}
89
90#[function("cast_with_time_zone(timestamptz, varchar) -> varchar")]
91pub fn timestamptz_to_string(
92    elem: Timestamptz,
93    time_zone: &str,
94    writer: &mut impl std::fmt::Write,
95) -> Result<()> {
96    let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
97    let instant_local = elem.to_datetime_in_zone(time_zone);
98    write_date_time_tz(instant_local, writer).map_err(|e| ExprError::Internal(e.into()))
99}
100
101// Tries to interpret the string with a timezone, and if failing, tries to interpret the string as a
102// timestamp and then adjusts it with the session timezone.
103#[function("cast_with_time_zone(varchar, varchar) -> timestamptz")]
104pub fn str_to_timestamptz(elem: &str, time_zone: &str) -> Result<Timestamptz> {
105    elem.parse().or_else(|_| {
106        timestamp_at_time_zone(
107            elem.parse::<Timestamp>()
108                .map_err(|err| ExprError::Parse(err.to_report_string().into()))?,
109            time_zone,
110        )
111    })
112}
113
114/// This operation is zone agnostic.
115#[function("subtract(timestamptz, timestamptz) -> interval")]
116pub fn timestamptz_timestamptz_sub(l: Timestamptz, r: Timestamptz) -> Result<Interval> {
117    let usecs = l
118        .timestamp_micros()
119        .checked_sub(r.timestamp_micros())
120        .ok_or(ExprError::NumericOverflow)?;
121    let interval = Interval::from_month_day_usec(0, 0, usecs);
122    // https://github.com/postgres/postgres/blob/REL_15_3/src/backend/utils/adt/timestamp.c#L2697
123    let interval = interval.justify_hour().ok_or(ExprError::NumericOverflow)?;
124    Ok(interval)
125}
126
127#[function("subtract_with_time_zone(timestamptz, interval, varchar) -> timestamptz")]
128pub fn timestamptz_interval_sub(
129    input: Timestamptz,
130    interval: Interval,
131    time_zone: &str,
132) -> Result<Timestamptz> {
133    timestamptz_interval_add(
134        input,
135        interval.checked_neg().ok_or(ExprError::NumericOverflow)?,
136        time_zone,
137    )
138}
139
140#[function("add_with_time_zone(timestamptz, interval, varchar) -> timestamptz")]
141pub fn timestamptz_interval_add(
142    input: Timestamptz,
143    interval: Interval,
144    time_zone: &str,
145) -> Result<Timestamptz> {
146    let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
147    timestamptz_interval_add_internal(input, interval, time_zone)
148}
149
150pub fn timestamptz_interval_add_internal(
151    input: Timestamptz,
152    interval: Interval,
153    time_zone: Tz,
154) -> Result<Timestamptz> {
155    use num_traits::Zero as _;
156
157    // A month may have 28-31 days, a day may have 23 or 25 hours during Daylight Saving switch.
158    // So their interpretation depends on the local time of a specific zone.
159    let qualitative = interval.truncate_day();
160    // Units smaller than `day` are zone agnostic.
161    let quantitative = interval - qualitative;
162
163    let mut t = input;
164    if !qualitative.is_zero() {
165        // Only convert into and from naive local when necessary because it is lossy.
166        // See `e2e_test/batch/functions/issue_12072.slt.part` for the difference.
167        let naive = timestamptz_at_time_zone_internal(t, time_zone);
168        let naive = naive
169            .checked_add(qualitative)
170            .ok_or(ExprError::NumericOverflow)?;
171        t = timestamp_at_time_zone_internal(naive, time_zone)?;
172    }
173    let t = timestamptz_interval_quantitative(t, quantitative, i64::checked_add)?;
174    Ok(t)
175}
176
177// Retained mostly for backward compatibility with old query plans. The signature is also useful for
178// binder type inference.
179#[function("add(timestamptz, interval) -> timestamptz")]
180pub fn timestamptz_interval_add_legacy(l: Timestamptz, r: Interval) -> Result<Timestamptz> {
181    timestamptz_interval_quantitative(l, r, i64::checked_add)
182}
183
184#[function("subtract(timestamptz, interval) -> timestamptz")]
185pub fn timestamptz_interval_sub_legacy(l: Timestamptz, r: Interval) -> Result<Timestamptz> {
186    timestamptz_interval_quantitative(l, r, i64::checked_sub)
187}
188
189#[function("add(interval, timestamptz) -> timestamptz")]
190pub fn interval_timestamptz_add_legacy(l: Interval, r: Timestamptz) -> Result<Timestamptz> {
191    timestamptz_interval_add_legacy(r, l)
192}
193
194#[inline(always)]
195fn timestamptz_interval_quantitative(
196    l: Timestamptz,
197    r: Interval,
198    f: fn(i64, i64) -> Option<i64>,
199) -> Result<Timestamptz> {
200    // Without session TimeZone, we cannot add month/day in local time. See #5826.
201    if r.months() != 0 || r.days() != 0 {
202        return Err(ExprError::UnsupportedFunction(
203            "timestamp with time zone +/- interval of days".into(),
204        ));
205    }
206    let delta_usecs = r.usecs();
207    let usecs = f(l.timestamp_micros(), delta_usecs).ok_or(ExprError::NumericOutOfRange)?;
208    Ok(Timestamptz::from_micros(usecs))
209}
210
211#[cfg(test)]
212mod tests {
213    use risingwave_common::util::iter_util::ZipEqFast;
214
215    use super::*;
216
217    #[test]
218    fn test_time_zone_conversion() {
219        let zones = ["US/Pacific", "ASIA/SINGAPORE", "europe/zurich"];
220        #[rustfmt::skip]
221        let test_cases = [
222            // winter
223            ["2022-01-01 00:00:00Z", "2021-12-31 16:00:00", "2022-01-01 08:00:00", "2022-01-01 01:00:00"],
224            // summer
225            ["2022-07-01 00:00:00Z", "2022-06-30 17:00:00", "2022-07-01 08:00:00", "2022-07-01 02:00:00"],
226            // before and after PST -> PDT, where [02:00, 03:00) are invalid
227            ["2022-03-13 09:59:00Z", "2022-03-13 01:59:00", "2022-03-13 17:59:00", "2022-03-13 10:59:00"],
228            ["2022-03-13 10:00:00Z", "2022-03-13 03:00:00", "2022-03-13 18:00:00", "2022-03-13 11:00:00"],
229            // before and after CET -> CEST, where [02:00. 03:00) are invalid
230            ["2022-03-27 00:59:00Z", "2022-03-26 17:59:00", "2022-03-27 08:59:00", "2022-03-27 01:59:00"],
231            ["2022-03-27 01:00:00Z", "2022-03-26 18:00:00", "2022-03-27 09:00:00", "2022-03-27 03:00:00"],
232            // before and after CEST -> CET, where [02:00, 03:00) are ambiguous
233            ["2022-10-29 23:59:00Z", "2022-10-29 16:59:00", "2022-10-30 07:59:00", "2022-10-30 01:59:00"],
234            ["2022-10-30 02:00:00Z", "2022-10-29 19:00:00", "2022-10-30 10:00:00", "2022-10-30 03:00:00"],
235            // before and after PDT -> PST, where [01:00, 02:00) are ambiguous
236            ["2022-11-06 07:59:00Z", "2022-11-06 00:59:00", "2022-11-06 15:59:00", "2022-11-06 08:59:00"],
237            ["2022-11-06 10:00:00Z", "2022-11-06 02:00:00", "2022-11-06 18:00:00", "2022-11-06 11:00:00"],
238        ];
239        for case in test_cases {
240            let usecs = str_to_timestamptz(case[0], "UTC").unwrap();
241            case.iter()
242                .skip(1)
243                .zip_eq_fast(zones)
244                .for_each(|(local, zone)| {
245                    let local = local.parse().unwrap();
246
247                    let actual = timestamptz_at_time_zone(usecs, zone).unwrap();
248                    assert_eq!(local, actual);
249
250                    let actual = timestamp_at_time_zone(local, zone).unwrap();
251                    assert_eq!(usecs, actual);
252                });
253        }
254    }
255
256    #[test]
257    #[rustfmt::skip]
258    fn test_time_zone_conversion_daylight_forward() {
259        // [02:00. 03:00) are invalid
260        test("2022-03-13 02:00:00", "US/Pacific", "2022-03-13 10:00:00+00:00");
261        test("2022-03-13 03:00:00", "US/Pacific", "2022-03-13 10:00:00+00:00");
262        // [02:00. 03:00) are invalid
263        test("2022-03-27 02:00:00", "europe/zurich", "2022-03-27 01:00:00+00:00");
264        test("2022-03-27 03:00:00", "europe/zurich", "2022-03-27 01:00:00+00:00");
265        // [02:00. 02:30) are invalid
266        test("2023-10-01 02:00:00", "Australia/Lord_Howe", "2023-09-30 15:30:00+00:00");
267        test("2023-10-01 02:30:00", "Australia/Lord_Howe", "2023-09-30 15:30:00+00:00");
268        // FIXME: the jump should be        1981-12-31 23:29:59 to 1982-01-01 00:00:00,
269        //        but the actual jump is    1981-12-31 15:59:59 to 1981-12-31 16:30:00
270        // an arbitrary one-off change in Singapore jumping from 1981-12-31 23:29:59 to 1982-01-01 00:00:00
271        // test("1981-12-31 23:30:00", "Asia/Singapore", "1981-12-31 16:00:00+00:00");
272        // test("1982-01-01 00:00:00", "Asia/Singapore", "1981-12-31 16:00:00+00:00");
273
274        #[track_caller]
275        fn test(local: &str, zone: &str, instant: &str) {
276            let actual = timestamp_at_time_zone(local.parse().unwrap(), zone).unwrap().to_string();
277            assert_eq!(actual, instant);
278        }
279    }
280
281    #[test]
282    fn test_time_zone_conversion_daylight_backward() {
283        #[rustfmt::skip]
284        let test_cases = [
285            ("2022-10-30 00:00:00Z", "2022-10-30 02:00:00", "europe/zurich", false),
286            ("2022-10-30 00:59:00Z", "2022-10-30 02:59:00", "europe/zurich", false),
287            ("2022-10-30 01:00:00Z", "2022-10-30 02:00:00", "europe/zurich", true),
288            ("2022-10-30 01:59:00Z", "2022-10-30 02:59:00", "europe/zurich", true),
289            ("2022-11-06 08:00:00Z", "2022-11-06 01:00:00", "US/Pacific", false),
290            ("2022-11-06 08:59:00Z", "2022-11-06 01:59:00", "US/Pacific", false),
291            ("2022-11-06 09:00:00Z", "2022-11-06 01:00:00", "US/Pacific", true),
292            ("2022-11-06 09:59:00Z", "2022-11-06 01:59:00", "US/Pacific", true),
293        ];
294        for (instant, local, zone, preferred) in test_cases {
295            let usecs = str_to_timestamptz(instant, "UTC").unwrap();
296            let local = local.parse().unwrap();
297
298            let actual = timestamptz_at_time_zone(usecs, zone).unwrap();
299            assert_eq!(local, actual);
300
301            if preferred {
302                let actual = timestamp_at_time_zone(local, zone).unwrap();
303                assert_eq!(usecs, actual)
304            }
305        }
306    }
307
308    #[test]
309    fn test_timestamptz_to_and_from_string() {
310        let str1 = "0001-11-15 15:35:40.999999+08:00";
311        let timestamptz1 = str_to_timestamptz(str1, "UTC").unwrap();
312        assert_eq!(timestamptz1.timestamp_micros(), -62108094259000001);
313
314        let mut writer = String::new();
315        timestamptz_to_string(timestamptz1, "UTC", &mut writer).unwrap();
316        assert_eq!(writer, "0001-11-15 07:35:40.999999+00:00");
317
318        let str2 = "1969-12-31 23:59:59.999999+00:00";
319        let timestamptz2 = str_to_timestamptz(str2, "UTC").unwrap();
320        assert_eq!(timestamptz2.timestamp_micros(), -1);
321
322        let mut writer = String::new();
323        timestamptz_to_string(timestamptz2, "UTC", &mut writer).unwrap();
324        assert_eq!(writer, str2);
325
326        // Parse a timestamptz from a str without timezone
327        let str3 = "2022-01-01 00:00:00+08:00";
328        let timestamptz3 = str_to_timestamptz(str3, "UTC").unwrap();
329
330        let timestamp_from_no_tz =
331            str_to_timestamptz("2022-01-01 00:00:00", "Asia/Singapore").unwrap();
332        assert_eq!(timestamptz3, timestamp_from_no_tz);
333    }
334}