risingwave_common/types/
timestamptz.rs1use std::error::Error;
16use std::io::{Cursor, Write};
17use std::str::FromStr;
18
19use anyhow::Context;
20use byteorder::{BigEndian, ReadBytesExt};
21use bytes::BytesMut;
22use chrono::{DateTime, Datelike, TimeZone, Utc};
23use chrono_tz::Tz;
24use postgres_types::{FromSql, IsNull, ToSql, Type, accepts, to_sql_checked};
25use risingwave_common_estimate_size::ZeroHeapSize;
26use serde::{Deserialize, Serialize};
27
28use super::DataType;
29use super::to_text::ToText;
30use crate::array::ArrayResult;
31
32#[derive(
34 Default, Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
35)]
36#[repr(transparent)]
37pub struct Timestamptz(i64);
38
39impl ZeroHeapSize for Timestamptz {}
40
41impl ToSql for Timestamptz {
42 accepts!(TIMESTAMPTZ);
43
44 to_sql_checked!();
45
46 fn to_sql(&self, _: &Type, out: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>>
47 where
48 Self: Sized,
49 {
50 let instant = self.to_datetime_utc();
51 instant.to_sql(&Type::ANY, out)
52 }
53}
54
55impl<'a> FromSql<'a> for Timestamptz {
56 fn from_sql(
57 ty: &Type,
58 raw: &'a [u8],
59 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
60 let instant = DateTime::<Utc>::from_sql(ty, raw)?;
61 Ok(Self::from(instant))
62 }
63
64 fn accepts(ty: &Type) -> bool {
65 matches!(*ty, Type::TIMESTAMPTZ)
66 }
67}
68
69impl ToText for Timestamptz {
70 fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
71 let instant = self.to_datetime_utc();
74 write!(f, "{}+00:00", instant.naive_local())
78 }
79
80 fn write_with_type<W: std::fmt::Write>(&self, ty: &DataType, f: &mut W) -> std::fmt::Result {
81 assert_eq!(ty, &DataType::Timestamptz);
82 self.write(f)
83 }
84}
85
86impl Timestamptz {
87 pub const MIN: Self = Self(i64::MIN);
88
89 pub fn from_secs(timestamp_secs: i64) -> Option<Self> {
91 timestamp_secs.checked_mul(1_000_000).map(Self)
92 }
93
94 pub fn from_millis(timestamp_millis: i64) -> Option<Self> {
97 timestamp_millis.checked_mul(1000).map(Self)
98 }
99
100 pub fn from_micros(timestamp_micros: i64) -> Self {
102 Self(timestamp_micros)
103 }
104
105 pub fn from_nanos(timestamp_nanos: i64) -> Option<Self> {
107 timestamp_nanos.checked_div(1_000).map(Self)
108 }
109
110 pub fn timestamp_micros(&self) -> i64 {
112 self.0
113 }
114
115 pub fn timestamp_millis(&self) -> i64 {
117 self.0.div_euclid(1_000)
118 }
119
120 pub fn timestamp_nanos(&self) -> Option<i64> {
122 self.0.checked_mul(1_000)
123 }
124
125 pub fn timestamp(&self) -> i64 {
128 self.0.div_euclid(1_000_000)
129 }
130
131 pub fn timestamp_subsec_nanos(&self) -> u32 {
133 self.0.rem_euclid(1_000_000) as u32 * 1000
134 }
135
136 pub fn to_datetime_utc(self) -> chrono::DateTime<Utc> {
137 self.into()
138 }
139
140 pub fn to_datetime_in_zone(self, tz: Tz) -> chrono::DateTime<Tz> {
141 self.to_datetime_utc().with_timezone(&tz)
142 }
143
144 pub fn lookup_time_zone(time_zone: &str) -> std::result::Result<Tz, String> {
145 Tz::from_str_insensitive(time_zone)
146 .map_err(|_| format!("'{time_zone}' is not a valid timezone"))
147 }
148
149 pub fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<Timestamptz> {
150 let micros = cur
151 .read_i64::<BigEndian>()
152 .context("failed to read i64 from Timestamptz buffer")?;
153 Ok(Self(micros))
154 }
155
156 pub fn to_protobuf(self, output: &mut impl Write) -> ArrayResult<usize> {
157 output.write(&self.0.to_be_bytes()).map_err(Into::into)
158 }
159}
160
161impl<Tz: TimeZone> From<chrono::DateTime<Tz>> for Timestamptz {
162 fn from(dt: chrono::DateTime<Tz>) -> Self {
163 Self(dt.timestamp_micros())
164 }
165}
166
167impl From<Timestamptz> for chrono::DateTime<Utc> {
168 fn from(tz: Timestamptz) -> Self {
169 Utc.timestamp_opt(tz.timestamp(), tz.timestamp_subsec_nanos())
170 .unwrap()
171 }
172}
173
174impl FromStr for Timestamptz {
175 type Err = &'static str;
176
177 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
178 pub const ERROR_MSG: &str = concat!(
179 "Can't cast string to timestamp with time zone (expected format is YYYY-MM-DD HH:MM:SS[.D+{up to 6 digits}] followed by +hh:mm or literal Z)",
180 "\nFor example: '2021-04-01 00:00:00+00:00'"
181 );
182 let ret = match speedate::DateTime::parse_str_rfc3339(s) {
186 Ok(r) => r,
187 Err(_) => {
188 return s
191 .parse::<chrono::DateTime<Utc>>()
192 .or_else(|_| {
193 chrono::DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%#z")
194 .map(|t| t.with_timezone(&Utc))
195 })
196 .map(|t| Timestamptz(t.timestamp_micros()))
197 .map_err(|_| ERROR_MSG);
198 }
199 };
200 if ret.time.tz_offset.is_none() {
201 return Err(ERROR_MSG);
202 }
203 if ret.date.year < 1600 {
204 return Err("parsing timestamptz with year < 1600 unsupported");
205 }
206 Ok(Timestamptz(
207 ret.timestamp_tz()
208 .checked_mul(1000000)
209 .and_then(|us| us.checked_add(ret.time.microsecond.into()))
210 .ok_or(ERROR_MSG)?,
211 ))
212 }
213}
214
215impl std::fmt::Display for Timestamptz {
216 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217 self.write(f)
218 }
219}
220
221pub fn write_date_time_tz(
222 instant_local: DateTime<Tz>,
223 writer: &mut impl std::fmt::Write,
224) -> std::fmt::Result {
225 let date = instant_local.date_naive();
226 let (ce, year) = date.year_ce();
227 write!(
228 writer,
229 "{:04}-{:02}-{:02} {}",
230 year,
231 date.month(),
232 date.day(),
233 instant_local.format(if ce {
234 "%H:%M:%S%.f%:z"
235 } else {
236 "%H:%M:%S%.f%:z BC"
237 })
238 )
239}
240
241#[cfg(test)]
242mod test {
243 use super::*;
244
245 #[test]
246 fn parse() {
247 assert!("1999-01-08 04:05:06".parse::<Timestamptz>().is_err());
248 assert_eq!(
249 "2022-08-03 10:34:02Z".parse::<Timestamptz>().unwrap(),
250 "2022-08-03 02:34:02-08:00".parse::<Timestamptz>().unwrap()
251 );
252
253 let expected = Ok(Timestamptz::from_micros(1689130892000000));
254 assert_eq!("2023-07-12T03:01:32Z".parse(), expected);
256 assert_eq!("2023-07-12T03:01:32+00:00".parse(), expected);
257 assert_eq!("2023-07-12T11:01:32+08:00".parse(), expected);
258 assert_eq!("2023-07-12 03:01:32Z".parse(), expected);
260 assert_eq!("2023-07-12 03:01:32+00:00".parse(), expected);
261 assert_eq!("2023-07-12 11:01:32+08:00".parse(), expected);
262 assert_eq!("2023-07-12 03:01:32+00".parse(), expected);
264 assert_eq!("2023-07-12 11:01:32+08".parse(), expected);
265 }
266}