risingwave_common/types/
timestamptz.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::error::Error;
16use std::io::{Cursor, Write};
17use std::str::FromStr;
18
19use anyhow::Context;
20use byteorder::{BigEndian, ReadBytesExt};
21use bytes::BytesMut;
22use chrono::{DateTime, Datelike, TimeZone, Utc};
23use chrono_tz::Tz;
24use postgres_types::{FromSql, IsNull, ToSql, Type, accepts, to_sql_checked};
25use risingwave_common_estimate_size::ZeroHeapSize;
26use serde::{Deserialize, Serialize};
27
28use super::DataType;
29use super::to_text::ToText;
30use crate::array::ArrayResult;
31
32/// Timestamp with timezone.
33#[derive(
34    Default, Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
35)]
36#[repr(transparent)]
37pub struct Timestamptz(i64);
38
39impl ZeroHeapSize for Timestamptz {}
40
41impl ToSql for Timestamptz {
42    accepts!(TIMESTAMPTZ);
43
44    to_sql_checked!();
45
46    fn to_sql(&self, _: &Type, out: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>>
47    where
48        Self: Sized,
49    {
50        let instant = self.to_datetime_utc();
51        instant.to_sql(&Type::ANY, out)
52    }
53}
54
55impl<'a> FromSql<'a> for Timestamptz {
56    fn from_sql(
57        ty: &Type,
58        raw: &'a [u8],
59    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
60        let instant = DateTime::<Utc>::from_sql(ty, raw)?;
61        Ok(Self::from(instant))
62    }
63
64    fn accepts(ty: &Type) -> bool {
65        matches!(*ty, Type::TIMESTAMPTZ)
66    }
67}
68
69impl ToText for Timestamptz {
70    fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
71        // Just a meaningful representation as placeholder. The real implementation depends
72        // on TimeZone from session. See #3552.
73        let instant = self.to_datetime_utc();
74        // PostgreSQL uses a space rather than `T` to separate the date and time.
75        // https://www.postgresql.org/docs/current/datatype-datetime.html#DATATYPE-DATETIME-OUTPUT
76        // same as `instant.format("%Y-%m-%d %H:%M:%S%.f%:z")` but faster
77        write!(f, "{}+00:00", instant.naive_local())
78    }
79
80    fn write_with_type<W: std::fmt::Write>(&self, ty: &DataType, f: &mut W) -> std::fmt::Result {
81        assert_eq!(ty, &DataType::Timestamptz);
82        self.write(f)
83    }
84}
85
86impl Timestamptz {
87    pub const MIN: Self = Self(i64::MIN);
88
89    /// Creates a `Timestamptz` from seconds. Returns `None` if the given timestamp is out of range.
90    pub fn from_secs(timestamp_secs: i64) -> Option<Self> {
91        timestamp_secs.checked_mul(1_000_000).map(Self)
92    }
93
94    /// Creates a `Timestamptz` from milliseconds. Returns `None` if the given timestamp is out of
95    /// range.
96    pub fn from_millis(timestamp_millis: i64) -> Option<Self> {
97        timestamp_millis.checked_mul(1000).map(Self)
98    }
99
100    /// Creates a `Timestamptz` from microseconds.
101    pub fn from_micros(timestamp_micros: i64) -> Self {
102        Self(timestamp_micros)
103    }
104
105    /// Creates a `Timestamptz` from microseconds.
106    pub fn from_nanos(timestamp_nanos: i64) -> Option<Self> {
107        timestamp_nanos.checked_div(1_000).map(Self)
108    }
109
110    /// Returns the number of non-leap-microseconds since January 1, 1970 UTC.
111    pub fn timestamp_micros(&self) -> i64 {
112        self.0
113    }
114
115    /// Returns the number of non-leap-milliseconds since January 1, 1970 UTC.
116    pub fn timestamp_millis(&self) -> i64 {
117        self.0.div_euclid(1_000)
118    }
119
120    /// Returns the number of non-leap-nanosseconds since January 1, 1970 UTC.
121    pub fn timestamp_nanos(&self) -> Option<i64> {
122        self.0.checked_mul(1_000)
123    }
124
125    /// Returns the number of non-leap seconds since January 1, 1970 0:00:00 UTC (aka "UNIX
126    /// timestamp").
127    pub fn timestamp(&self) -> i64 {
128        self.0.div_euclid(1_000_000)
129    }
130
131    /// Returns the number of nanoseconds since the last second boundary.
132    pub fn timestamp_subsec_nanos(&self) -> u32 {
133        self.0.rem_euclid(1_000_000) as u32 * 1000
134    }
135
136    pub fn to_datetime_utc(self) -> chrono::DateTime<Utc> {
137        self.into()
138    }
139
140    pub fn to_datetime_in_zone(self, tz: Tz) -> chrono::DateTime<Tz> {
141        self.to_datetime_utc().with_timezone(&tz)
142    }
143
144    pub fn lookup_time_zone(time_zone: &str) -> std::result::Result<Tz, String> {
145        Tz::from_str_insensitive(time_zone)
146            .map_err(|_| format!("'{time_zone}' is not a valid timezone"))
147    }
148
149    pub fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<Timestamptz> {
150        let micros = cur
151            .read_i64::<BigEndian>()
152            .context("failed to read i64 from Timestamptz buffer")?;
153        Ok(Self(micros))
154    }
155
156    pub fn to_protobuf(self, output: &mut impl Write) -> ArrayResult<usize> {
157        output.write(&self.0.to_be_bytes()).map_err(Into::into)
158    }
159}
160
161impl<Tz: TimeZone> From<chrono::DateTime<Tz>> for Timestamptz {
162    fn from(dt: chrono::DateTime<Tz>) -> Self {
163        Self(dt.timestamp_micros())
164    }
165}
166
167impl From<Timestamptz> for chrono::DateTime<Utc> {
168    fn from(tz: Timestamptz) -> Self {
169        Utc.timestamp_opt(tz.timestamp(), tz.timestamp_subsec_nanos())
170            .unwrap()
171    }
172}
173
174impl FromStr for Timestamptz {
175    type Err = &'static str;
176
177    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
178        pub const ERROR_MSG: &str = concat!(
179            "Can't cast string to timestamp with time zone (expected format is YYYY-MM-DD HH:MM:SS[.D+{up to 6 digits}] followed by +hh:mm or literal Z)",
180            "\nFor example: '2021-04-01 00:00:00+00:00'"
181        );
182        // Try `speedate` first
183        // * It is also used by `str_to_{date,time,timestamp}`
184        // * It can parse without seconds `2006-01-02 15:04-07:00`
185        let ret = match speedate::DateTime::parse_str_rfc3339(s) {
186            Ok(r) => r,
187            Err(_) => {
188                // Supplement with `chrono` for existing cases:
189                // * Extra space before offset `2006-01-02 15:04:05 -07:00`
190                return s
191                    .parse::<chrono::DateTime<Utc>>()
192                    .or_else(|_| {
193                        chrono::DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%#z")
194                            .map(|t| t.with_timezone(&Utc))
195                    })
196                    .map(|t| Timestamptz(t.timestamp_micros()))
197                    .map_err(|_| ERROR_MSG);
198            }
199        };
200        if ret.time.tz_offset.is_none() {
201            return Err(ERROR_MSG);
202        }
203        if ret.date.year < 1600 {
204            return Err("parsing timestamptz with year < 1600 unsupported");
205        }
206        Ok(Timestamptz(
207            ret.timestamp_tz()
208                .checked_mul(1000000)
209                .and_then(|us| us.checked_add(ret.time.microsecond.into()))
210                .ok_or(ERROR_MSG)?,
211        ))
212    }
213}
214
215impl std::fmt::Display for Timestamptz {
216    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217        self.write(f)
218    }
219}
220
221pub fn write_date_time_tz(
222    instant_local: DateTime<Tz>,
223    writer: &mut impl std::fmt::Write,
224) -> std::fmt::Result {
225    let date = instant_local.date_naive();
226    let (ce, year) = date.year_ce();
227    write!(
228        writer,
229        "{:04}-{:02}-{:02} {}",
230        year,
231        date.month(),
232        date.day(),
233        instant_local.format(if ce {
234            "%H:%M:%S%.f%:z"
235        } else {
236            "%H:%M:%S%.f%:z BC"
237        })
238    )
239}
240
241#[cfg(test)]
242mod test {
243    use super::*;
244
245    #[test]
246    fn parse() {
247        assert!("1999-01-08 04:05:06".parse::<Timestamptz>().is_err());
248        assert_eq!(
249            "2022-08-03 10:34:02Z".parse::<Timestamptz>().unwrap(),
250            "2022-08-03 02:34:02-08:00".parse::<Timestamptz>().unwrap()
251        );
252
253        let expected = Ok(Timestamptz::from_micros(1689130892000000));
254        // Most standard: ISO 8601 & RFC 3339
255        assert_eq!("2023-07-12T03:01:32Z".parse(), expected);
256        assert_eq!("2023-07-12T03:01:32+00:00".parse(), expected);
257        assert_eq!("2023-07-12T11:01:32+08:00".parse(), expected);
258        // RFC 3339
259        assert_eq!("2023-07-12 03:01:32Z".parse(), expected);
260        assert_eq!("2023-07-12 03:01:32+00:00".parse(), expected);
261        assert_eq!("2023-07-12 11:01:32+08:00".parse(), expected);
262        // PostgreSQL, but neither ISO 8601 nor RFC 3339
263        assert_eq!("2023-07-12 03:01:32+00".parse(), expected);
264        assert_eq!("2023-07-12 11:01:32+08".parse(), expected);
265    }
266}