risingwave_common/types/
timestamptz.rsuse std::error::Error;
use std::io::{Cursor, Write};
use std::str::FromStr;
use anyhow::Context;
use byteorder::{BigEndian, ReadBytesExt};
use bytes::BytesMut;
use chrono::{DateTime, Datelike, TimeZone, Utc};
use chrono_tz::Tz;
use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type};
use risingwave_common_estimate_size::ZeroHeapSize;
use serde::{Deserialize, Serialize};
use super::to_text::ToText;
use super::DataType;
use crate::array::ArrayResult;
#[derive(
Default, Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
)]
#[repr(transparent)]
pub struct Timestamptz(i64);
impl ZeroHeapSize for Timestamptz {}
impl ToSql for Timestamptz {
accepts!(TIMESTAMPTZ);
to_sql_checked!();
fn to_sql(&self, _: &Type, out: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>>
where
Self: Sized,
{
let instant = self.to_datetime_utc();
instant.to_sql(&Type::ANY, out)
}
}
impl<'a> FromSql<'a> for Timestamptz {
fn from_sql(
ty: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
let instant = DateTime::<Utc>::from_sql(ty, raw)?;
Ok(Self::from(instant))
}
fn accepts(ty: &Type) -> bool {
matches!(*ty, Type::TIMESTAMPTZ)
}
}
impl ToText for Timestamptz {
fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
let instant = self.to_datetime_utc();
write!(f, "{}+00:00", instant.naive_local())
}
fn write_with_type<W: std::fmt::Write>(&self, ty: &DataType, f: &mut W) -> std::fmt::Result {
assert_eq!(ty, &DataType::Timestamptz);
self.write(f)
}
}
impl Timestamptz {
pub const MIN: Self = Self(i64::MIN);
pub fn from_secs(timestamp_secs: i64) -> Option<Self> {
timestamp_secs.checked_mul(1_000_000).map(Self)
}
pub fn from_millis(timestamp_millis: i64) -> Option<Self> {
timestamp_millis.checked_mul(1000).map(Self)
}
pub fn from_micros(timestamp_micros: i64) -> Self {
Self(timestamp_micros)
}
pub fn from_nanos(timestamp_nanos: i64) -> Option<Self> {
timestamp_nanos.checked_div(1_000).map(Self)
}
pub fn timestamp_micros(&self) -> i64 {
self.0
}
pub fn timestamp_millis(&self) -> i64 {
self.0.div_euclid(1_000)
}
pub fn timestamp_nanos(&self) -> Option<i64> {
self.0.checked_mul(1_000)
}
pub fn timestamp(&self) -> i64 {
self.0.div_euclid(1_000_000)
}
pub fn timestamp_subsec_nanos(&self) -> u32 {
self.0.rem_euclid(1_000_000) as u32 * 1000
}
pub fn to_datetime_utc(self) -> chrono::DateTime<Utc> {
self.into()
}
pub fn to_datetime_in_zone(self, tz: Tz) -> chrono::DateTime<Tz> {
self.to_datetime_utc().with_timezone(&tz)
}
pub fn lookup_time_zone(time_zone: &str) -> std::result::Result<Tz, String> {
Tz::from_str_insensitive(time_zone)
.map_err(|_| format!("'{time_zone}' is not a valid timezone"))
}
pub fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<Timestamptz> {
let micros = cur
.read_i64::<BigEndian>()
.context("failed to read i64 from Timestamptz buffer")?;
Ok(Self(micros))
}
pub fn to_protobuf(self, output: &mut impl Write) -> ArrayResult<usize> {
output.write(&self.0.to_be_bytes()).map_err(Into::into)
}
}
impl<Tz: TimeZone> From<chrono::DateTime<Tz>> for Timestamptz {
fn from(dt: chrono::DateTime<Tz>) -> Self {
Self(dt.timestamp_micros())
}
}
impl From<Timestamptz> for chrono::DateTime<Utc> {
fn from(tz: Timestamptz) -> Self {
Utc.timestamp_opt(tz.timestamp(), tz.timestamp_subsec_nanos())
.unwrap()
}
}
impl FromStr for Timestamptz {
type Err = &'static str;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
pub const ERROR_MSG: &str = concat!(
"Can't cast string to timestamp with time zone (expected format is YYYY-MM-DD HH:MM:SS[.D+{up to 6 digits}] followed by +hh:mm or literal Z)"
, "\nFor example: '2021-04-01 00:00:00+00:00'"
);
let ret = match speedate::DateTime::parse_str_rfc3339(s) {
Ok(r) => r,
Err(_) => {
return s
.parse::<chrono::DateTime<Utc>>()
.or_else(|_| {
chrono::DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%#z")
.map(|t| t.with_timezone(&Utc))
})
.map(|t| Timestamptz(t.timestamp_micros()))
.map_err(|_| ERROR_MSG);
}
};
if ret.time.tz_offset.is_none() {
return Err(ERROR_MSG);
}
if ret.date.year < 1600 {
return Err("parsing timestamptz with year < 1600 unsupported");
}
Ok(Timestamptz(
ret.timestamp_tz()
.checked_mul(1000000)
.and_then(|us| us.checked_add(ret.time.microsecond.into()))
.ok_or(ERROR_MSG)?,
))
}
}
impl std::fmt::Display for Timestamptz {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.write(f)
}
}
pub fn write_date_time_tz(
instant_local: DateTime<Tz>,
writer: &mut impl std::fmt::Write,
) -> std::fmt::Result {
let date = instant_local.date_naive();
let (ce, year) = date.year_ce();
write!(
writer,
"{:04}-{:02}-{:02} {}",
year,
date.month(),
date.day(),
instant_local.format(if ce {
"%H:%M:%S%.f%:z"
} else {
"%H:%M:%S%.f%:z BC"
})
)
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn parse() {
assert!("1999-01-08 04:05:06".parse::<Timestamptz>().is_err());
assert_eq!(
"2022-08-03 10:34:02Z".parse::<Timestamptz>().unwrap(),
"2022-08-03 02:34:02-08:00".parse::<Timestamptz>().unwrap()
);
let expected = Ok(Timestamptz::from_micros(1689130892000000));
assert_eq!("2023-07-12T03:01:32Z".parse(), expected);
assert_eq!("2023-07-12T03:01:32+00:00".parse(), expected);
assert_eq!("2023-07-12T11:01:32+08:00".parse(), expected);
assert_eq!("2023-07-12 03:01:32Z".parse(), expected);
assert_eq!("2023-07-12 03:01:32+00:00".parse(), expected);
assert_eq!("2023-07-12 11:01:32+08:00".parse(), expected);
assert_eq!("2023-07-12 03:01:32+00".parse(), expected);
assert_eq!("2023-07-12 11:01:32+08".parse(), expected);
}
}