1use std::fmt::Write;
16
17use chrono::LocalResult;
18use chrono_tz::Tz;
19use num_traits::CheckedNeg;
20use risingwave_common::types::{
21 CheckedAdd, F64, Interval, IntoOrdered, Timestamp, Timestamptz, write_date_time_tz,
22};
23use risingwave_expr::{ExprError, Result, function};
24use thiserror_ext::AsReport;
25
26#[inline(always)]
28pub fn time_zone_err(inner_err: String) -> ExprError {
29 ExprError::InvalidParam {
30 name: "time_zone",
31 reason: inner_err.into(),
32 }
33}
34
35#[function("sec_to_timestamptz(float8) -> timestamptz")]
36pub fn f64_sec_to_timestamptz(elem: F64) -> Result<Timestamptz> {
37 let micros = (elem.0 * 1e6)
39 .into_ordered()
40 .try_into()
41 .map_err(|_| ExprError::NumericOutOfRange)?;
42 Ok(Timestamptz::from_micros(micros))
43}
44
45#[function("at_time_zone(timestamptz, varchar) -> timestamp")]
46pub fn timestamptz_at_time_zone(input: Timestamptz, time_zone: &str) -> Result<Timestamp> {
47 let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
48 Ok(timestamptz_at_time_zone_internal(input, time_zone))
49}
50
51pub fn timestamptz_at_time_zone_internal(input: Timestamptz, time_zone: Tz) -> Timestamp {
52 let instant_local = input.to_datetime_in_zone(time_zone);
53 let naive = instant_local.naive_local();
54 Timestamp(naive)
55}
56
57#[function("at_time_zone(timestamp, varchar) -> timestamptz")]
58pub fn timestamp_at_time_zone(input: Timestamp, time_zone: &str) -> Result<Timestamptz> {
59 let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
60 timestamp_at_time_zone_internal(input, time_zone)
61}
62
63pub fn timestamp_at_time_zone_internal(input: Timestamp, time_zone: Tz) -> Result<Timestamptz> {
64 let instant_local = match input.0.and_local_timezone(time_zone) {
66 LocalResult::Single(t) => t,
67 LocalResult::None => {
72 (input.0 - chrono::Duration::hours(3))
73 .and_local_timezone(time_zone)
74 .single()
75 .ok_or_else(|| ExprError::InvalidParam {
76 name: "local timestamp",
77 reason: format!(
78 "fail to interpret local timestamp \"{}\" in time zone \"{}\"",
79 input, time_zone
80 )
81 .into(),
82 })?
83 + chrono::Duration::hours(3)
84 }
85 LocalResult::Ambiguous(_, latest) => latest,
87 };
88 let usec = instant_local.timestamp_micros();
89 Ok(Timestamptz::from_micros(usec))
90}
91
92#[function("cast_with_time_zone(timestamptz, varchar) -> varchar")]
93pub fn timestamptz_to_string(
94 elem: Timestamptz,
95 time_zone: &str,
96 writer: &mut impl Write,
97) -> Result<()> {
98 let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
99 let instant_local = elem.to_datetime_in_zone(time_zone);
100 write_date_time_tz(instant_local, writer).map_err(|e| ExprError::Internal(e.into()))
101}
102
103#[function("cast_with_time_zone(varchar, varchar) -> timestamptz")]
106pub fn str_to_timestamptz(elem: &str, time_zone: &str) -> Result<Timestamptz> {
107 elem.parse().or_else(|_| {
108 timestamp_at_time_zone(
109 elem.parse::<Timestamp>()
110 .map_err(|err| ExprError::Parse(err.to_report_string().into()))?,
111 time_zone,
112 )
113 })
114}
115
116#[function("subtract(timestamptz, timestamptz) -> interval")]
118pub fn timestamptz_timestamptz_sub(l: Timestamptz, r: Timestamptz) -> Result<Interval> {
119 let usecs = l
120 .timestamp_micros()
121 .checked_sub(r.timestamp_micros())
122 .ok_or(ExprError::NumericOverflow)?;
123 let interval = Interval::from_month_day_usec(0, 0, usecs);
124 let interval = interval.justify_hour().ok_or(ExprError::NumericOverflow)?;
126 Ok(interval)
127}
128
129#[function("subtract_with_time_zone(timestamptz, interval, varchar) -> timestamptz")]
130pub fn timestamptz_interval_sub(
131 input: Timestamptz,
132 interval: Interval,
133 time_zone: &str,
134) -> Result<Timestamptz> {
135 timestamptz_interval_add(
136 input,
137 interval.checked_neg().ok_or(ExprError::NumericOverflow)?,
138 time_zone,
139 )
140}
141
142#[function("add_with_time_zone(timestamptz, interval, varchar) -> timestamptz")]
143pub fn timestamptz_interval_add(
144 input: Timestamptz,
145 interval: Interval,
146 time_zone: &str,
147) -> Result<Timestamptz> {
148 let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
149 timestamptz_interval_add_internal(input, interval, time_zone)
150}
151
152pub fn timestamptz_interval_add_internal(
153 input: Timestamptz,
154 interval: Interval,
155 time_zone: Tz,
156) -> Result<Timestamptz> {
157 use num_traits::Zero as _;
158
159 let qualitative = interval.truncate_day();
162 let quantitative = interval - qualitative;
164
165 let mut t = input;
166 if !qualitative.is_zero() {
167 let naive = timestamptz_at_time_zone_internal(t, time_zone);
170 let naive = naive
171 .checked_add(qualitative)
172 .ok_or(ExprError::NumericOverflow)?;
173 t = timestamp_at_time_zone_internal(naive, time_zone)?;
174 }
175 let t = timestamptz_interval_quantitative(t, quantitative, i64::checked_add)?;
176 Ok(t)
177}
178
179#[function("add(timestamptz, interval) -> timestamptz")]
182pub fn timestamptz_interval_add_legacy(l: Timestamptz, r: Interval) -> Result<Timestamptz> {
183 timestamptz_interval_quantitative(l, r, i64::checked_add)
184}
185
186#[function("subtract(timestamptz, interval) -> timestamptz")]
187pub fn timestamptz_interval_sub_legacy(l: Timestamptz, r: Interval) -> Result<Timestamptz> {
188 timestamptz_interval_quantitative(l, r, i64::checked_sub)
189}
190
191#[function("add(interval, timestamptz) -> timestamptz")]
192pub fn interval_timestamptz_add_legacy(l: Interval, r: Timestamptz) -> Result<Timestamptz> {
193 timestamptz_interval_add_legacy(r, l)
194}
195
196#[inline(always)]
197fn timestamptz_interval_quantitative(
198 l: Timestamptz,
199 r: Interval,
200 f: fn(i64, i64) -> Option<i64>,
201) -> Result<Timestamptz> {
202 if r.months() != 0 || r.days() != 0 {
204 return Err(ExprError::UnsupportedFunction(
205 "timestamp with time zone +/- interval of days".into(),
206 ));
207 }
208 let delta_usecs = r.usecs();
209 let usecs = f(l.timestamp_micros(), delta_usecs).ok_or(ExprError::NumericOutOfRange)?;
210 Ok(Timestamptz::from_micros(usecs))
211}
212
213#[cfg(test)]
214mod tests {
215 use risingwave_common::util::iter_util::ZipEqFast;
216
217 use super::*;
218
219 #[test]
220 fn test_time_zone_conversion() {
221 let zones = ["US/Pacific", "ASIA/SINGAPORE", "europe/zurich"];
222 #[rustfmt::skip]
223 let test_cases = [
224 ["2022-01-01 00:00:00Z", "2021-12-31 16:00:00", "2022-01-01 08:00:00", "2022-01-01 01:00:00"],
226 ["2022-07-01 00:00:00Z", "2022-06-30 17:00:00", "2022-07-01 08:00:00", "2022-07-01 02:00:00"],
228 ["2022-03-13 09:59:00Z", "2022-03-13 01:59:00", "2022-03-13 17:59:00", "2022-03-13 10:59:00"],
230 ["2022-03-13 10:00:00Z", "2022-03-13 03:00:00", "2022-03-13 18:00:00", "2022-03-13 11:00:00"],
231 ["2022-03-27 00:59:00Z", "2022-03-26 17:59:00", "2022-03-27 08:59:00", "2022-03-27 01:59:00"],
233 ["2022-03-27 01:00:00Z", "2022-03-26 18:00:00", "2022-03-27 09:00:00", "2022-03-27 03:00:00"],
234 ["2022-10-29 23:59:00Z", "2022-10-29 16:59:00", "2022-10-30 07:59:00", "2022-10-30 01:59:00"],
236 ["2022-10-30 02:00:00Z", "2022-10-29 19:00:00", "2022-10-30 10:00:00", "2022-10-30 03:00:00"],
237 ["2022-11-06 07:59:00Z", "2022-11-06 00:59:00", "2022-11-06 15:59:00", "2022-11-06 08:59:00"],
239 ["2022-11-06 10:00:00Z", "2022-11-06 02:00:00", "2022-11-06 18:00:00", "2022-11-06 11:00:00"],
240 ];
241 for case in test_cases {
242 let usecs = str_to_timestamptz(case[0], "UTC").unwrap();
243 case.iter()
244 .skip(1)
245 .zip_eq_fast(zones)
246 .for_each(|(local, zone)| {
247 let local = local.parse().unwrap();
248
249 let actual = timestamptz_at_time_zone(usecs, zone).unwrap();
250 assert_eq!(local, actual);
251
252 let actual = timestamp_at_time_zone(local, zone).unwrap();
253 assert_eq!(usecs, actual);
254 });
255 }
256 }
257
258 #[test]
259 #[rustfmt::skip]
260 fn test_time_zone_conversion_daylight_forward() {
261 test("2022-03-13 02:00:00", "US/Pacific", "2022-03-13 10:00:00+00:00");
263 test("2022-03-13 03:00:00", "US/Pacific", "2022-03-13 10:00:00+00:00");
264 test("2022-03-27 02:00:00", "europe/zurich", "2022-03-27 01:00:00+00:00");
266 test("2022-03-27 03:00:00", "europe/zurich", "2022-03-27 01:00:00+00:00");
267 test("2023-10-01 02:00:00", "Australia/Lord_Howe", "2023-09-30 15:30:00+00:00");
269 test("2023-10-01 02:30:00", "Australia/Lord_Howe", "2023-09-30 15:30:00+00:00");
270 #[track_caller]
277 fn test(local: &str, zone: &str, instant: &str) {
278 let actual = timestamp_at_time_zone(local.parse().unwrap(), zone).unwrap().to_string();
279 assert_eq!(actual, instant);
280 }
281 }
282
283 #[test]
284 fn test_time_zone_conversion_daylight_backward() {
285 #[rustfmt::skip]
286 let test_cases = [
287 ("2022-10-30 00:00:00Z", "2022-10-30 02:00:00", "europe/zurich", false),
288 ("2022-10-30 00:59:00Z", "2022-10-30 02:59:00", "europe/zurich", false),
289 ("2022-10-30 01:00:00Z", "2022-10-30 02:00:00", "europe/zurich", true),
290 ("2022-10-30 01:59:00Z", "2022-10-30 02:59:00", "europe/zurich", true),
291 ("2022-11-06 08:00:00Z", "2022-11-06 01:00:00", "US/Pacific", false),
292 ("2022-11-06 08:59:00Z", "2022-11-06 01:59:00", "US/Pacific", false),
293 ("2022-11-06 09:00:00Z", "2022-11-06 01:00:00", "US/Pacific", true),
294 ("2022-11-06 09:59:00Z", "2022-11-06 01:59:00", "US/Pacific", true),
295 ];
296 for (instant, local, zone, preferred) in test_cases {
297 let usecs = str_to_timestamptz(instant, "UTC").unwrap();
298 let local = local.parse().unwrap();
299
300 let actual = timestamptz_at_time_zone(usecs, zone).unwrap();
301 assert_eq!(local, actual);
302
303 if preferred {
304 let actual = timestamp_at_time_zone(local, zone).unwrap();
305 assert_eq!(usecs, actual)
306 }
307 }
308 }
309
310 #[test]
311 fn test_timestamptz_to_and_from_string() {
312 let str1 = "1600-11-15 15:35:40.999999+08:00";
313 let timestamptz1 = str_to_timestamptz(str1, "UTC").unwrap();
314 assert_eq!(timestamptz1.timestamp_micros(), -11648507059000001);
315
316 let mut writer = String::new();
317 timestamptz_to_string(timestamptz1, "UTC", &mut writer).unwrap();
318 assert_eq!(writer, "1600-11-15 07:35:40.999999+00:00");
319
320 let str2 = "1969-12-31 23:59:59.999999+00:00";
321 let timestamptz2 = str_to_timestamptz(str2, "UTC").unwrap();
322 assert_eq!(timestamptz2.timestamp_micros(), -1);
323
324 let mut writer = String::new();
325 timestamptz_to_string(timestamptz2, "UTC", &mut writer).unwrap();
326 assert_eq!(writer, str2);
327
328 let str3 = "2022-01-01 00:00:00+08:00";
330 let timestamptz3 = str_to_timestamptz(str3, "UTC").unwrap();
331
332 let timestamp_from_no_tz =
333 str_to_timestamptz("2022-01-01 00:00:00", "Asia/Singapore").unwrap();
334 assert_eq!(timestamptz3, timestamp_from_no_tz);
335 }
336}