1use chrono::LocalResult;
16use chrono_tz::Tz;
17use num_traits::CheckedNeg;
18use risingwave_common::types::{
19 CheckedAdd, F64, Interval, IntoOrdered, Timestamp, Timestamptz, write_date_time_tz,
20};
21use risingwave_expr::{ExprError, Result, function};
22use thiserror_ext::AsReport;
23
24#[inline(always)]
26pub fn time_zone_err(inner_err: String) -> ExprError {
27 ExprError::InvalidParam {
28 name: "time_zone",
29 reason: inner_err.into(),
30 }
31}
32
33#[function("sec_to_timestamptz(float8) -> timestamptz")]
34pub fn f64_sec_to_timestamptz(elem: F64) -> Result<Timestamptz> {
35 let micros = (elem.0 * 1e6)
37 .into_ordered()
38 .try_into()
39 .map_err(|_| ExprError::NumericOutOfRange)?;
40 Ok(Timestamptz::from_micros(micros))
41}
42
43#[function("at_time_zone(timestamptz, varchar) -> timestamp")]
44pub fn timestamptz_at_time_zone(input: Timestamptz, time_zone: &str) -> Result<Timestamp> {
45 let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
46 Ok(timestamptz_at_time_zone_internal(input, time_zone))
47}
48
49pub fn timestamptz_at_time_zone_internal(input: Timestamptz, time_zone: Tz) -> Timestamp {
50 let instant_local = input.to_datetime_in_zone(time_zone);
51 let naive = instant_local.naive_local();
52 Timestamp(naive)
53}
54
55#[function("at_time_zone(timestamp, varchar) -> timestamptz")]
56pub fn timestamp_at_time_zone(input: Timestamp, time_zone: &str) -> Result<Timestamptz> {
57 let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
58 timestamp_at_time_zone_internal(input, time_zone)
59}
60
61pub fn timestamp_at_time_zone_internal(input: Timestamp, time_zone: Tz) -> Result<Timestamptz> {
62 let instant_local = match input.0.and_local_timezone(time_zone) {
64 LocalResult::Single(t) => t,
65 LocalResult::None => {
70 (input.0 - chrono::Duration::hours(3))
71 .and_local_timezone(time_zone)
72 .single()
73 .ok_or_else(|| ExprError::InvalidParam {
74 name: "local timestamp",
75 reason: format!(
76 "fail to interpret local timestamp \"{}\" in time zone \"{}\"",
77 input, time_zone
78 )
79 .into(),
80 })?
81 + chrono::Duration::hours(3)
82 }
83 LocalResult::Ambiguous(_, latest) => latest,
85 };
86 let usec = instant_local.timestamp_micros();
87 Ok(Timestamptz::from_micros(usec))
88}
89
90#[function("cast_with_time_zone(timestamptz, varchar) -> varchar")]
91pub fn timestamptz_to_string(
92 elem: Timestamptz,
93 time_zone: &str,
94 writer: &mut impl std::fmt::Write,
95) -> Result<()> {
96 let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
97 let instant_local = elem.to_datetime_in_zone(time_zone);
98 write_date_time_tz(instant_local, writer).map_err(|e| ExprError::Internal(e.into()))
99}
100
101#[function("cast_with_time_zone(varchar, varchar) -> timestamptz")]
104pub fn str_to_timestamptz(elem: &str, time_zone: &str) -> Result<Timestamptz> {
105 elem.parse().or_else(|_| {
106 timestamp_at_time_zone(
107 elem.parse::<Timestamp>()
108 .map_err(|err| ExprError::Parse(err.to_report_string().into()))?,
109 time_zone,
110 )
111 })
112}
113
114#[function("subtract(timestamptz, timestamptz) -> interval")]
116pub fn timestamptz_timestamptz_sub(l: Timestamptz, r: Timestamptz) -> Result<Interval> {
117 let usecs = l
118 .timestamp_micros()
119 .checked_sub(r.timestamp_micros())
120 .ok_or(ExprError::NumericOverflow)?;
121 let interval = Interval::from_month_day_usec(0, 0, usecs);
122 let interval = interval.justify_hour().ok_or(ExprError::NumericOverflow)?;
124 Ok(interval)
125}
126
127#[function("subtract_with_time_zone(timestamptz, interval, varchar) -> timestamptz")]
128pub fn timestamptz_interval_sub(
129 input: Timestamptz,
130 interval: Interval,
131 time_zone: &str,
132) -> Result<Timestamptz> {
133 timestamptz_interval_add(
134 input,
135 interval.checked_neg().ok_or(ExprError::NumericOverflow)?,
136 time_zone,
137 )
138}
139
140#[function("add_with_time_zone(timestamptz, interval, varchar) -> timestamptz")]
141pub fn timestamptz_interval_add(
142 input: Timestamptz,
143 interval: Interval,
144 time_zone: &str,
145) -> Result<Timestamptz> {
146 let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
147 timestamptz_interval_add_internal(input, interval, time_zone)
148}
149
150pub fn timestamptz_interval_add_internal(
151 input: Timestamptz,
152 interval: Interval,
153 time_zone: Tz,
154) -> Result<Timestamptz> {
155 use num_traits::Zero as _;
156
157 let qualitative = interval.truncate_day();
160 let quantitative = interval - qualitative;
162
163 let mut t = input;
164 if !qualitative.is_zero() {
165 let naive = timestamptz_at_time_zone_internal(t, time_zone);
168 let naive = naive
169 .checked_add(qualitative)
170 .ok_or(ExprError::NumericOverflow)?;
171 t = timestamp_at_time_zone_internal(naive, time_zone)?;
172 }
173 let t = timestamptz_interval_quantitative(t, quantitative, i64::checked_add)?;
174 Ok(t)
175}
176
177#[function("add(timestamptz, interval) -> timestamptz")]
180pub fn timestamptz_interval_add_legacy(l: Timestamptz, r: Interval) -> Result<Timestamptz> {
181 timestamptz_interval_quantitative(l, r, i64::checked_add)
182}
183
184#[function("subtract(timestamptz, interval) -> timestamptz")]
185pub fn timestamptz_interval_sub_legacy(l: Timestamptz, r: Interval) -> Result<Timestamptz> {
186 timestamptz_interval_quantitative(l, r, i64::checked_sub)
187}
188
189#[function("add(interval, timestamptz) -> timestamptz")]
190pub fn interval_timestamptz_add_legacy(l: Interval, r: Timestamptz) -> Result<Timestamptz> {
191 timestamptz_interval_add_legacy(r, l)
192}
193
194#[inline(always)]
195fn timestamptz_interval_quantitative(
196 l: Timestamptz,
197 r: Interval,
198 f: fn(i64, i64) -> Option<i64>,
199) -> Result<Timestamptz> {
200 if r.months() != 0 || r.days() != 0 {
202 return Err(ExprError::UnsupportedFunction(
203 "timestamp with time zone +/- interval of days".into(),
204 ));
205 }
206 let delta_usecs = r.usecs();
207 let usecs = f(l.timestamp_micros(), delta_usecs).ok_or(ExprError::NumericOutOfRange)?;
208 Ok(Timestamptz::from_micros(usecs))
209}
210
211#[cfg(test)]
212mod tests {
213 use risingwave_common::util::iter_util::ZipEqFast;
214
215 use super::*;
216
217 #[test]
218 fn test_time_zone_conversion() {
219 let zones = ["US/Pacific", "ASIA/SINGAPORE", "europe/zurich"];
220 #[rustfmt::skip]
221 let test_cases = [
222 ["2022-01-01 00:00:00Z", "2021-12-31 16:00:00", "2022-01-01 08:00:00", "2022-01-01 01:00:00"],
224 ["2022-07-01 00:00:00Z", "2022-06-30 17:00:00", "2022-07-01 08:00:00", "2022-07-01 02:00:00"],
226 ["2022-03-13 09:59:00Z", "2022-03-13 01:59:00", "2022-03-13 17:59:00", "2022-03-13 10:59:00"],
228 ["2022-03-13 10:00:00Z", "2022-03-13 03:00:00", "2022-03-13 18:00:00", "2022-03-13 11:00:00"],
229 ["2022-03-27 00:59:00Z", "2022-03-26 17:59:00", "2022-03-27 08:59:00", "2022-03-27 01:59:00"],
231 ["2022-03-27 01:00:00Z", "2022-03-26 18:00:00", "2022-03-27 09:00:00", "2022-03-27 03:00:00"],
232 ["2022-10-29 23:59:00Z", "2022-10-29 16:59:00", "2022-10-30 07:59:00", "2022-10-30 01:59:00"],
234 ["2022-10-30 02:00:00Z", "2022-10-29 19:00:00", "2022-10-30 10:00:00", "2022-10-30 03:00:00"],
235 ["2022-11-06 07:59:00Z", "2022-11-06 00:59:00", "2022-11-06 15:59:00", "2022-11-06 08:59:00"],
237 ["2022-11-06 10:00:00Z", "2022-11-06 02:00:00", "2022-11-06 18:00:00", "2022-11-06 11:00:00"],
238 ];
239 for case in test_cases {
240 let usecs = str_to_timestamptz(case[0], "UTC").unwrap();
241 case.iter()
242 .skip(1)
243 .zip_eq_fast(zones)
244 .for_each(|(local, zone)| {
245 let local = local.parse().unwrap();
246
247 let actual = timestamptz_at_time_zone(usecs, zone).unwrap();
248 assert_eq!(local, actual);
249
250 let actual = timestamp_at_time_zone(local, zone).unwrap();
251 assert_eq!(usecs, actual);
252 });
253 }
254 }
255
256 #[test]
257 #[rustfmt::skip]
258 fn test_time_zone_conversion_daylight_forward() {
259 test("2022-03-13 02:00:00", "US/Pacific", "2022-03-13 10:00:00+00:00");
261 test("2022-03-13 03:00:00", "US/Pacific", "2022-03-13 10:00:00+00:00");
262 test("2022-03-27 02:00:00", "europe/zurich", "2022-03-27 01:00:00+00:00");
264 test("2022-03-27 03:00:00", "europe/zurich", "2022-03-27 01:00:00+00:00");
265 test("2023-10-01 02:00:00", "Australia/Lord_Howe", "2023-09-30 15:30:00+00:00");
267 test("2023-10-01 02:30:00", "Australia/Lord_Howe", "2023-09-30 15:30:00+00:00");
268 #[track_caller]
275 fn test(local: &str, zone: &str, instant: &str) {
276 let actual = timestamp_at_time_zone(local.parse().unwrap(), zone).unwrap().to_string();
277 assert_eq!(actual, instant);
278 }
279 }
280
281 #[test]
282 fn test_time_zone_conversion_daylight_backward() {
283 #[rustfmt::skip]
284 let test_cases = [
285 ("2022-10-30 00:00:00Z", "2022-10-30 02:00:00", "europe/zurich", false),
286 ("2022-10-30 00:59:00Z", "2022-10-30 02:59:00", "europe/zurich", false),
287 ("2022-10-30 01:00:00Z", "2022-10-30 02:00:00", "europe/zurich", true),
288 ("2022-10-30 01:59:00Z", "2022-10-30 02:59:00", "europe/zurich", true),
289 ("2022-11-06 08:00:00Z", "2022-11-06 01:00:00", "US/Pacific", false),
290 ("2022-11-06 08:59:00Z", "2022-11-06 01:59:00", "US/Pacific", false),
291 ("2022-11-06 09:00:00Z", "2022-11-06 01:00:00", "US/Pacific", true),
292 ("2022-11-06 09:59:00Z", "2022-11-06 01:59:00", "US/Pacific", true),
293 ];
294 for (instant, local, zone, preferred) in test_cases {
295 let usecs = str_to_timestamptz(instant, "UTC").unwrap();
296 let local = local.parse().unwrap();
297
298 let actual = timestamptz_at_time_zone(usecs, zone).unwrap();
299 assert_eq!(local, actual);
300
301 if preferred {
302 let actual = timestamp_at_time_zone(local, zone).unwrap();
303 assert_eq!(usecs, actual)
304 }
305 }
306 }
307
308 #[test]
309 fn test_timestamptz_to_and_from_string() {
310 let str1 = "0001-11-15 15:35:40.999999+08:00";
311 let timestamptz1 = str_to_timestamptz(str1, "UTC").unwrap();
312 assert_eq!(timestamptz1.timestamp_micros(), -62108094259000001);
313
314 let mut writer = String::new();
315 timestamptz_to_string(timestamptz1, "UTC", &mut writer).unwrap();
316 assert_eq!(writer, "0001-11-15 07:35:40.999999+00:00");
317
318 let str2 = "1969-12-31 23:59:59.999999+00:00";
319 let timestamptz2 = str_to_timestamptz(str2, "UTC").unwrap();
320 assert_eq!(timestamptz2.timestamp_micros(), -1);
321
322 let mut writer = String::new();
323 timestamptz_to_string(timestamptz2, "UTC", &mut writer).unwrap();
324 assert_eq!(writer, str2);
325
326 let str3 = "2022-01-01 00:00:00+08:00";
328 let timestamptz3 = str_to_timestamptz(str3, "UTC").unwrap();
329
330 let timestamp_from_no_tz =
331 str_to_timestamptz("2022-01-01 00:00:00", "Asia/Singapore").unwrap();
332 assert_eq!(timestamptz3, timestamp_from_no_tz);
333 }
334}