risingwave_expr_impl/scalar/
timestamptz.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Write;
16
17use chrono::LocalResult;
18use chrono_tz::Tz;
19use num_traits::CheckedNeg;
20use risingwave_common::types::{
21    CheckedAdd, F64, Interval, IntoOrdered, Timestamp, Timestamptz, write_date_time_tz,
22};
23use risingwave_expr::{ExprError, Result, function};
24use thiserror_ext::AsReport;
25
26/// Just a wrapper to reuse the `map_err` logic.
27#[inline(always)]
28pub fn time_zone_err(inner_err: String) -> ExprError {
29    ExprError::InvalidParam {
30        name: "time_zone",
31        reason: inner_err.into(),
32    }
33}
34
35#[function("sec_to_timestamptz(float8) -> timestamptz")]
36pub fn f64_sec_to_timestamptz(elem: F64) -> Result<Timestamptz> {
37    // TODO(#4515): handle +/- infinity
38    let micros = (elem.0 * 1e6)
39        .into_ordered()
40        .try_into()
41        .map_err(|_| ExprError::NumericOutOfRange)?;
42    Ok(Timestamptz::from_micros(micros))
43}
44
45#[function("at_time_zone(timestamptz, varchar) -> timestamp")]
46pub fn timestamptz_at_time_zone(input: Timestamptz, time_zone: &str) -> Result<Timestamp> {
47    let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
48    Ok(timestamptz_at_time_zone_internal(input, time_zone))
49}
50
51pub fn timestamptz_at_time_zone_internal(input: Timestamptz, time_zone: Tz) -> Timestamp {
52    let instant_local = input.to_datetime_in_zone(time_zone);
53    let naive = instant_local.naive_local();
54    Timestamp(naive)
55}
56
57#[function("at_time_zone(timestamp, varchar) -> timestamptz")]
58pub fn timestamp_at_time_zone(input: Timestamp, time_zone: &str) -> Result<Timestamptz> {
59    let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
60    timestamp_at_time_zone_internal(input, time_zone)
61}
62
63pub fn timestamp_at_time_zone_internal(input: Timestamp, time_zone: Tz) -> Result<Timestamptz> {
64    // https://www.postgresql.org/docs/current/datetime-invalid-input.html
65    let instant_local = match input.0.and_local_timezone(time_zone) {
66        LocalResult::Single(t) => t,
67        // invalid time during daylight forward, use UTC offset before the transition
68        // we minus 3 hours in naive time first, do the timezone conversion, and add 3 hours back in the UTC timeline.
69        // This assumes jump forwards are less than 3 hours and there is a single change within this 3-hour window.
70        // see <https://github.com/risingwavelabs/risingwave/pull/15670#discussion_r1524211006>
71        LocalResult::None => {
72            (input.0 - chrono::Duration::hours(3))
73                .and_local_timezone(time_zone)
74                .single()
75                .ok_or_else(|| ExprError::InvalidParam {
76                    name: "local timestamp",
77                    reason: format!(
78                        "fail to interpret local timestamp \"{}\" in time zone \"{}\"",
79                        input, time_zone
80                    )
81                    .into(),
82                })?
83                + chrono::Duration::hours(3)
84        }
85        // ambiguous time during daylight backward, use UTC offset after the transition
86        LocalResult::Ambiguous(_, latest) => latest,
87    };
88    let usec = instant_local.timestamp_micros();
89    Ok(Timestamptz::from_micros(usec))
90}
91
92#[function("cast_with_time_zone(timestamptz, varchar) -> varchar")]
93pub fn timestamptz_to_string(
94    elem: Timestamptz,
95    time_zone: &str,
96    writer: &mut impl Write,
97) -> Result<()> {
98    let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
99    let instant_local = elem.to_datetime_in_zone(time_zone);
100    write_date_time_tz(instant_local, writer).map_err(|e| ExprError::Internal(e.into()))
101}
102
103// Tries to interpret the string with a timezone, and if failing, tries to interpret the string as a
104// timestamp and then adjusts it with the session timezone.
105#[function("cast_with_time_zone(varchar, varchar) -> timestamptz")]
106pub fn str_to_timestamptz(elem: &str, time_zone: &str) -> Result<Timestamptz> {
107    elem.parse().or_else(|_| {
108        timestamp_at_time_zone(
109            elem.parse::<Timestamp>()
110                .map_err(|err| ExprError::Parse(err.to_report_string().into()))?,
111            time_zone,
112        )
113    })
114}
115
116/// This operation is zone agnostic.
117#[function("subtract(timestamptz, timestamptz) -> interval")]
118pub fn timestamptz_timestamptz_sub(l: Timestamptz, r: Timestamptz) -> Result<Interval> {
119    let usecs = l
120        .timestamp_micros()
121        .checked_sub(r.timestamp_micros())
122        .ok_or(ExprError::NumericOverflow)?;
123    let interval = Interval::from_month_day_usec(0, 0, usecs);
124    // https://github.com/postgres/postgres/blob/REL_15_3/src/backend/utils/adt/timestamp.c#L2697
125    let interval = interval.justify_hour().ok_or(ExprError::NumericOverflow)?;
126    Ok(interval)
127}
128
129#[function("subtract_with_time_zone(timestamptz, interval, varchar) -> timestamptz")]
130pub fn timestamptz_interval_sub(
131    input: Timestamptz,
132    interval: Interval,
133    time_zone: &str,
134) -> Result<Timestamptz> {
135    timestamptz_interval_add(
136        input,
137        interval.checked_neg().ok_or(ExprError::NumericOverflow)?,
138        time_zone,
139    )
140}
141
142#[function("add_with_time_zone(timestamptz, interval, varchar) -> timestamptz")]
143pub fn timestamptz_interval_add(
144    input: Timestamptz,
145    interval: Interval,
146    time_zone: &str,
147) -> Result<Timestamptz> {
148    let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
149    timestamptz_interval_add_internal(input, interval, time_zone)
150}
151
152pub fn timestamptz_interval_add_internal(
153    input: Timestamptz,
154    interval: Interval,
155    time_zone: Tz,
156) -> Result<Timestamptz> {
157    use num_traits::Zero as _;
158
159    // A month may have 28-31 days, a day may have 23 or 25 hours during Daylight Saving switch.
160    // So their interpretation depends on the local time of a specific zone.
161    let qualitative = interval.truncate_day();
162    // Units smaller than `day` are zone agnostic.
163    let quantitative = interval - qualitative;
164
165    let mut t = input;
166    if !qualitative.is_zero() {
167        // Only convert into and from naive local when necessary because it is lossy.
168        // See `e2e_test/batch/functions/issue_12072.slt.part` for the difference.
169        let naive = timestamptz_at_time_zone_internal(t, time_zone);
170        let naive = naive
171            .checked_add(qualitative)
172            .ok_or(ExprError::NumericOverflow)?;
173        t = timestamp_at_time_zone_internal(naive, time_zone)?;
174    }
175    let t = timestamptz_interval_quantitative(t, quantitative, i64::checked_add)?;
176    Ok(t)
177}
178
179// Retained mostly for backward compatibility with old query plans. The signature is also useful for
180// binder type inference.
181#[function("add(timestamptz, interval) -> timestamptz")]
182pub fn timestamptz_interval_add_legacy(l: Timestamptz, r: Interval) -> Result<Timestamptz> {
183    timestamptz_interval_quantitative(l, r, i64::checked_add)
184}
185
186#[function("subtract(timestamptz, interval) -> timestamptz")]
187pub fn timestamptz_interval_sub_legacy(l: Timestamptz, r: Interval) -> Result<Timestamptz> {
188    timestamptz_interval_quantitative(l, r, i64::checked_sub)
189}
190
191#[function("add(interval, timestamptz) -> timestamptz")]
192pub fn interval_timestamptz_add_legacy(l: Interval, r: Timestamptz) -> Result<Timestamptz> {
193    timestamptz_interval_add_legacy(r, l)
194}
195
196#[inline(always)]
197fn timestamptz_interval_quantitative(
198    l: Timestamptz,
199    r: Interval,
200    f: fn(i64, i64) -> Option<i64>,
201) -> Result<Timestamptz> {
202    // Without session TimeZone, we cannot add month/day in local time. See #5826.
203    if r.months() != 0 || r.days() != 0 {
204        return Err(ExprError::UnsupportedFunction(
205            "timestamp with time zone +/- interval of days".into(),
206        ));
207    }
208    let delta_usecs = r.usecs();
209    let usecs = f(l.timestamp_micros(), delta_usecs).ok_or(ExprError::NumericOutOfRange)?;
210    Ok(Timestamptz::from_micros(usecs))
211}
212
213#[cfg(test)]
214mod tests {
215    use risingwave_common::util::iter_util::ZipEqFast;
216
217    use super::*;
218
219    #[test]
220    fn test_time_zone_conversion() {
221        let zones = ["US/Pacific", "ASIA/SINGAPORE", "europe/zurich"];
222        #[rustfmt::skip]
223        let test_cases = [
224            // winter
225            ["2022-01-01 00:00:00Z", "2021-12-31 16:00:00", "2022-01-01 08:00:00", "2022-01-01 01:00:00"],
226            // summer
227            ["2022-07-01 00:00:00Z", "2022-06-30 17:00:00", "2022-07-01 08:00:00", "2022-07-01 02:00:00"],
228            // before and after PST -> PDT, where [02:00, 03:00) are invalid
229            ["2022-03-13 09:59:00Z", "2022-03-13 01:59:00", "2022-03-13 17:59:00", "2022-03-13 10:59:00"],
230            ["2022-03-13 10:00:00Z", "2022-03-13 03:00:00", "2022-03-13 18:00:00", "2022-03-13 11:00:00"],
231            // before and after CET -> CEST, where [02:00. 03:00) are invalid
232            ["2022-03-27 00:59:00Z", "2022-03-26 17:59:00", "2022-03-27 08:59:00", "2022-03-27 01:59:00"],
233            ["2022-03-27 01:00:00Z", "2022-03-26 18:00:00", "2022-03-27 09:00:00", "2022-03-27 03:00:00"],
234            // before and after CEST -> CET, where [02:00, 03:00) are ambiguous
235            ["2022-10-29 23:59:00Z", "2022-10-29 16:59:00", "2022-10-30 07:59:00", "2022-10-30 01:59:00"],
236            ["2022-10-30 02:00:00Z", "2022-10-29 19:00:00", "2022-10-30 10:00:00", "2022-10-30 03:00:00"],
237            // before and after PDT -> PST, where [01:00, 02:00) are ambiguous
238            ["2022-11-06 07:59:00Z", "2022-11-06 00:59:00", "2022-11-06 15:59:00", "2022-11-06 08:59:00"],
239            ["2022-11-06 10:00:00Z", "2022-11-06 02:00:00", "2022-11-06 18:00:00", "2022-11-06 11:00:00"],
240        ];
241        for case in test_cases {
242            let usecs = str_to_timestamptz(case[0], "UTC").unwrap();
243            case.iter()
244                .skip(1)
245                .zip_eq_fast(zones)
246                .for_each(|(local, zone)| {
247                    let local = local.parse().unwrap();
248
249                    let actual = timestamptz_at_time_zone(usecs, zone).unwrap();
250                    assert_eq!(local, actual);
251
252                    let actual = timestamp_at_time_zone(local, zone).unwrap();
253                    assert_eq!(usecs, actual);
254                });
255        }
256    }
257
258    #[test]
259    #[rustfmt::skip]
260    fn test_time_zone_conversion_daylight_forward() {
261        // [02:00. 03:00) are invalid
262        test("2022-03-13 02:00:00", "US/Pacific", "2022-03-13 10:00:00+00:00");
263        test("2022-03-13 03:00:00", "US/Pacific", "2022-03-13 10:00:00+00:00");
264        // [02:00. 03:00) are invalid
265        test("2022-03-27 02:00:00", "europe/zurich", "2022-03-27 01:00:00+00:00");
266        test("2022-03-27 03:00:00", "europe/zurich", "2022-03-27 01:00:00+00:00");
267        // [02:00. 02:30) are invalid
268        test("2023-10-01 02:00:00", "Australia/Lord_Howe", "2023-09-30 15:30:00+00:00");
269        test("2023-10-01 02:30:00", "Australia/Lord_Howe", "2023-09-30 15:30:00+00:00");
270        // FIXME: the jump should be        1981-12-31 23:29:59 to 1982-01-01 00:00:00,
271        //        but the actual jump is    1981-12-31 15:59:59 to 1981-12-31 16:30:00
272        // an arbitrary one-off change in Singapore jumping from 1981-12-31 23:29:59 to 1982-01-01 00:00:00
273        // test("1981-12-31 23:30:00", "Asia/Singapore", "1981-12-31 16:00:00+00:00");
274        // test("1982-01-01 00:00:00", "Asia/Singapore", "1981-12-31 16:00:00+00:00");
275
276        #[track_caller]
277        fn test(local: &str, zone: &str, instant: &str) {
278            let actual = timestamp_at_time_zone(local.parse().unwrap(), zone).unwrap().to_string();
279            assert_eq!(actual, instant);
280        }
281    }
282
283    #[test]
284    fn test_time_zone_conversion_daylight_backward() {
285        #[rustfmt::skip]
286        let test_cases = [
287            ("2022-10-30 00:00:00Z", "2022-10-30 02:00:00", "europe/zurich", false),
288            ("2022-10-30 00:59:00Z", "2022-10-30 02:59:00", "europe/zurich", false),
289            ("2022-10-30 01:00:00Z", "2022-10-30 02:00:00", "europe/zurich", true),
290            ("2022-10-30 01:59:00Z", "2022-10-30 02:59:00", "europe/zurich", true),
291            ("2022-11-06 08:00:00Z", "2022-11-06 01:00:00", "US/Pacific", false),
292            ("2022-11-06 08:59:00Z", "2022-11-06 01:59:00", "US/Pacific", false),
293            ("2022-11-06 09:00:00Z", "2022-11-06 01:00:00", "US/Pacific", true),
294            ("2022-11-06 09:59:00Z", "2022-11-06 01:59:00", "US/Pacific", true),
295        ];
296        for (instant, local, zone, preferred) in test_cases {
297            let usecs = str_to_timestamptz(instant, "UTC").unwrap();
298            let local = local.parse().unwrap();
299
300            let actual = timestamptz_at_time_zone(usecs, zone).unwrap();
301            assert_eq!(local, actual);
302
303            if preferred {
304                let actual = timestamp_at_time_zone(local, zone).unwrap();
305                assert_eq!(usecs, actual)
306            }
307        }
308    }
309
310    #[test]
311    fn test_timestamptz_to_and_from_string() {
312        let str1 = "1600-11-15 15:35:40.999999+08:00";
313        let timestamptz1 = str_to_timestamptz(str1, "UTC").unwrap();
314        assert_eq!(timestamptz1.timestamp_micros(), -11648507059000001);
315
316        let mut writer = String::new();
317        timestamptz_to_string(timestamptz1, "UTC", &mut writer).unwrap();
318        assert_eq!(writer, "1600-11-15 07:35:40.999999+00:00");
319
320        let str2 = "1969-12-31 23:59:59.999999+00:00";
321        let timestamptz2 = str_to_timestamptz(str2, "UTC").unwrap();
322        assert_eq!(timestamptz2.timestamp_micros(), -1);
323
324        let mut writer = String::new();
325        timestamptz_to_string(timestamptz2, "UTC", &mut writer).unwrap();
326        assert_eq!(writer, str2);
327
328        // Parse a timestamptz from a str without timezone
329        let str3 = "2022-01-01 00:00:00+08:00";
330        let timestamptz3 = str_to_timestamptz(str3, "UTC").unwrap();
331
332        let timestamp_from_no_tz =
333            str_to_timestamptz("2022-01-01 00:00:00", "Asia/Singapore").unwrap();
334        assert_eq!(timestamptz3, timestamp_from_no_tz);
335    }
336}