1use std::error::Error;
18use std::fmt::Display;
19use std::hash::Hash;
20use std::io::{Cursor, Write};
21use std::str::FromStr;
22
23use anyhow::Context;
24use byteorder::{BigEndian, ReadBytesExt};
25use bytes::BytesMut;
26use chrono::{
27 DateTime, Datelike, Days, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Weekday,
28};
29use postgres_types::{FromSql, IsNull, ToSql, Type, accepts, to_sql_checked};
30use risingwave_common_estimate_size::ZeroHeapSize;
31use thiserror::Error;
32
33use super::to_text::ToText;
34use super::{CheckedAdd, DataType, Interval};
35use crate::array::{ArrayError, ArrayResult};
36
37const UNIX_EPOCH_DAYS: i32 = 719_163;
40const LEAP_DAYS: &[i32] = &[0, 31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
41const NORMAL_DAYS: &[i32] = &[0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
42
43macro_rules! impl_chrono_wrapper {
44 ($variant_name:ident, $chrono:ty, $pg_type:ident) => {
45 #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
46 #[repr(transparent)]
47 pub struct $variant_name(pub $chrono);
48
49 impl $variant_name {
50 pub const MIN: Self = Self(<$chrono>::MIN);
51
52 pub fn new(data: $chrono) -> Self {
53 $variant_name(data)
54 }
55 }
56
57 impl Display for $variant_name {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 ToText::write(self, f)
60 }
61 }
62
63 impl From<$chrono> for $variant_name {
64 fn from(data: $chrono) -> Self {
65 $variant_name(data)
66 }
67 }
68
69 impl ZeroHeapSize for $variant_name {}
70
71 impl ToSql for $variant_name {
72 accepts!($pg_type);
73
74 to_sql_checked!();
75
76 fn to_sql(
77 &self,
78 ty: &Type,
79 out: &mut BytesMut,
80 ) -> std::result::Result<IsNull, Box<dyn Error + Sync + Send>>
81 where
82 Self: Sized,
83 {
84 self.0.to_sql(ty, out)
85 }
86 }
87
88 impl<'a> FromSql<'a> for $variant_name {
89 fn from_sql(
90 ty: &Type,
91 raw: &'a [u8],
92 ) -> std::result::Result<Self, Box<dyn std::error::Error + Sync + Send>> {
93 let instant = <$chrono>::from_sql(ty, raw)?;
94 Ok(Self::from(instant))
95 }
96
97 fn accepts(ty: &Type) -> bool {
98 matches!(*ty, Type::$pg_type)
99 }
100 }
101 };
102}
103
104impl_chrono_wrapper!(Date, NaiveDate, DATE);
105impl_chrono_wrapper!(Timestamp, NaiveDateTime, TIMESTAMP);
106impl_chrono_wrapper!(Time, NaiveTime, TIME);
107
108impl FromStr for Date {
119 type Err = InvalidParamsError;
120
121 fn from_str(s: &str) -> Result<Self> {
122 let date = speedate::Date::parse_str_rfc3339(s).map_err(|_| ErrorKind::ParseDate)?;
123 Ok(Date::new(
124 Date::from_ymd_uncheck(date.year as i32, date.month as u32, date.day as u32).0,
125 ))
126 }
127}
128
129impl FromStr for Time {
141 type Err = InvalidParamsError;
142
143 fn from_str(s: &str) -> Result<Self> {
144 let s_without_zone = s.trim_end_matches('Z');
145 let res = speedate::Time::parse_str(s_without_zone).map_err(|_| ErrorKind::ParseTime)?;
146 Ok(Time::from_hms_micro_uncheck(
147 res.hour as u32,
148 res.minute as u32,
149 res.second as u32,
150 res.microsecond,
151 ))
152 }
153}
154
155impl FromStr for Timestamp {
168 type Err = InvalidParamsError;
169
170 fn from_str(s: &str) -> Result<Self> {
171 let dt = s
172 .parse::<jiff::civil::DateTime>()
173 .map_err(|_| ErrorKind::ParseTimestamp)?;
174 Ok(
175 Date::from_ymd_uncheck(dt.year() as i32, dt.month() as u32, dt.day() as u32)
176 .and_hms_nano_uncheck(
177 dt.hour() as u32,
178 dt.minute() as u32,
179 dt.second() as u32,
180 dt.subsec_nanosecond() as u32,
181 ),
182 )
183 }
184}
185
186impl From<Timestamp> for Date {
199 fn from(ts: Timestamp) -> Self {
200 Date::new(ts.0.date())
201 }
202}
203
204impl From<Timestamp> for Time {
217 fn from(ts: Timestamp) -> Self {
218 Time::new(ts.0.time())
219 }
220}
221
222impl From<Interval> for Time {
239 fn from(interval: Interval) -> Self {
240 let usecs = interval.usecs_of_day();
241 let secs = (usecs / 1_000_000) as u32;
242 let nano = (usecs % 1_000_000 * 1000) as u32;
243 Time::from_num_seconds_from_midnight_uncheck(secs, nano)
244 }
245}
246
247#[derive(Copy, Clone, Debug, Error)]
248enum ErrorKind {
249 #[error("Invalid date: days: {days}")]
250 Date { days: i32 },
251 #[error("Invalid time: secs: {secs}, nanoseconds: {nsecs}")]
252 Time { secs: u32, nsecs: u32 },
253 #[error("Invalid datetime: seconds: {secs}, nanoseconds: {nsecs}")]
254 DateTime { secs: i64, nsecs: u32 },
255 #[error("Can't cast string to date (expected format is YYYY-MM-DD)")]
256 ParseDate,
257 #[error(
258 "Can't cast string to time (expected format is HH:MM:SS[.D+{{up to 6 digits}}][Z] or HH:MM)"
259 )]
260 ParseTime,
261 #[error(
262 "Can't cast string to timestamp (expected format is YYYY-MM-DD HH:MM:SS[.D+{{up to 9 digits}}] or YYYY-MM-DD HH:MM or YYYY-MM-DD or ISO 8601 format)"
263 )]
264 ParseTimestamp,
265}
266
267#[derive(Debug, Error)]
268#[error(transparent)]
269pub struct InvalidParamsError(#[from] ErrorKind);
270
271impl InvalidParamsError {
272 pub fn date(days: i32) -> Self {
273 ErrorKind::Date { days }.into()
274 }
275
276 pub fn time(secs: u32, nsecs: u32) -> Self {
277 ErrorKind::Time { secs, nsecs }.into()
278 }
279
280 pub fn datetime(secs: i64, nsecs: u32) -> Self {
281 ErrorKind::DateTime { secs, nsecs }.into()
282 }
283}
284
285impl From<InvalidParamsError> for ArrayError {
286 fn from(e: InvalidParamsError) -> Self {
287 ArrayError::internal(e)
288 }
289}
290
291type Result<T> = std::result::Result<T, InvalidParamsError>;
292
293impl ToText for Date {
294 fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
306 let (ce, year) = self.0.year_ce();
307 let suffix = if ce { "" } else { " BC" };
308 write!(
309 f,
310 "{:04}-{:02}-{:02}{}",
311 year,
312 self.0.month(),
313 self.0.day(),
314 suffix
315 )
316 }
317
318 fn write_with_type<W: std::fmt::Write>(&self, ty: &DataType, f: &mut W) -> std::fmt::Result {
319 match ty {
320 super::DataType::Date => self.write(f),
321 _ => unreachable!(),
322 }
323 }
324}
325
326impl ToText for Time {
327 fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
328 write!(f, "{}", self.0)
329 }
330
331 fn write_with_type<W: std::fmt::Write>(&self, ty: &DataType, f: &mut W) -> std::fmt::Result {
332 match ty {
333 super::DataType::Time => self.write(f),
334 _ => unreachable!(),
335 }
336 }
337}
338
339impl ToText for Timestamp {
340 fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
341 let (ce, year) = self.0.year_ce();
342 let suffix = if ce { "" } else { " BC" };
343 write!(
344 f,
345 "{:04}-{:02}-{:02} {}{}",
346 year,
347 self.0.month(),
348 self.0.day(),
349 self.0.time(),
350 suffix
351 )
352 }
353
354 fn write_with_type<W: std::fmt::Write>(&self, ty: &DataType, f: &mut W) -> std::fmt::Result {
355 match ty {
356 super::DataType::Timestamp => self.write(f),
357 _ => unreachable!(),
358 }
359 }
360}
361
362impl Date {
363 pub fn with_days_since_ce(days: i32) -> Result<Self> {
364 Ok(Date::new(
365 NaiveDate::from_num_days_from_ce_opt(days)
366 .ok_or_else(|| InvalidParamsError::date(days))?,
367 ))
368 }
369
370 pub fn with_days_since_unix_epoch(days: i32) -> Result<Self> {
371 Ok(Date::new(
372 NaiveDate::from_num_days_from_ce_opt(days)
373 .ok_or_else(|| InvalidParamsError::date(days))?
374 .checked_add_days(Days::new(UNIX_EPOCH_DAYS as u64))
375 .ok_or_else(|| InvalidParamsError::date(days))?,
376 ))
377 }
378
379 pub fn get_nums_days_unix_epoch(&self) -> i32 {
380 self.0
381 .checked_sub_days(Days::new(UNIX_EPOCH_DAYS as u64))
382 .unwrap()
383 .num_days_from_ce()
384 }
385
386 pub fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<Date> {
387 let days = cur
388 .read_i32::<BigEndian>()
389 .context("failed to read i32 from Date buffer")?;
390
391 Ok(Date::with_days_since_ce(days)?)
392 }
393
394 pub fn to_protobuf<T: Write>(self, output: &mut T) -> ArrayResult<usize> {
395 output
396 .write(&(self.0.num_days_from_ce()).to_be_bytes())
397 .map_err(Into::into)
398 }
399
400 pub fn from_ymd_uncheck(year: i32, month: u32, day: u32) -> Self {
401 Self::new(NaiveDate::from_ymd_opt(year, month, day).unwrap())
402 }
403
404 pub fn from_num_days_from_ce_uncheck(days: i32) -> Self {
405 Self::with_days_since_ce(days).unwrap()
406 }
407
408 pub fn and_hms_uncheck(self, hour: u32, min: u32, sec: u32) -> Timestamp {
409 self.and_hms_micro_uncheck(hour, min, sec, 0)
410 }
411
412 pub fn and_hms_micro_uncheck(self, hour: u32, min: u32, sec: u32, micro: u32) -> Timestamp {
413 Timestamp::new(
414 self.0
415 .and_time(Time::from_hms_micro_uncheck(hour, min, sec, micro).0),
416 )
417 }
418
419 pub fn and_hms_nano_uncheck(self, hour: u32, min: u32, sec: u32, nano: u32) -> Timestamp {
420 Timestamp::new(
421 self.0
422 .and_time(Time::from_hms_nano_uncheck(hour, min, sec, nano).0),
423 )
424 }
425}
426
427impl Time {
428 pub fn with_secs_nano(secs: u32, nano: u32) -> Result<Self> {
429 Ok(Time::new(
430 NaiveTime::from_num_seconds_from_midnight_opt(secs, nano)
431 .ok_or_else(|| InvalidParamsError::time(secs, nano))?,
432 ))
433 }
434
435 pub fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<Time> {
436 let nano = cur
437 .read_u64::<BigEndian>()
438 .context("failed to read u64 from Time buffer")?;
439
440 Ok(Time::with_nano(nano)?)
441 }
442
443 pub fn to_protobuf<T: Write>(self, output: &mut T) -> ArrayResult<usize> {
444 output
445 .write(
446 &(self.0.num_seconds_from_midnight() as u64 * 1_000_000_000
447 + self.0.nanosecond() as u64)
448 .to_be_bytes(),
449 )
450 .map_err(Into::into)
451 }
452
453 pub fn with_nano(nano: u64) -> Result<Self> {
454 let secs = (nano / 1_000_000_000) as u32;
455 let nano = (nano % 1_000_000_000) as u32;
456 Self::with_secs_nano(secs, nano)
457 }
458
459 pub fn with_micro(micro: u64) -> Result<Self> {
460 let secs = (micro / 1_000_000) as u32;
461 let nano = ((micro % 1_000_000) * 1_000) as u32;
462 Self::with_secs_nano(secs, nano)
463 }
464
465 pub fn with_milli(milli: u32) -> Result<Self> {
466 let secs = milli / 1_000;
467 let nano = (milli % 1_000) * 1_000_000;
468 Self::with_secs_nano(secs, nano)
469 }
470
471 pub fn from_hms_uncheck(hour: u32, min: u32, sec: u32) -> Self {
472 Self::from_hms_nano_uncheck(hour, min, sec, 0)
473 }
474
475 pub fn from_hms_micro_uncheck(hour: u32, min: u32, sec: u32, micro: u32) -> Self {
476 Self::new(NaiveTime::from_hms_micro_opt(hour, min, sec, micro).unwrap())
477 }
478
479 pub fn from_hms_nano_uncheck(hour: u32, min: u32, sec: u32, nano: u32) -> Self {
480 Self::new(NaiveTime::from_hms_nano_opt(hour, min, sec, nano).unwrap())
481 }
482
483 pub fn from_num_seconds_from_midnight_uncheck(secs: u32, nano: u32) -> Self {
484 Self::new(NaiveTime::from_num_seconds_from_midnight_opt(secs, nano).unwrap())
485 }
486}
487
488enum FirstI64 {
495 V0 { usecs: i64 },
496 V1 { secs: i64 },
497}
498impl FirstI64 {
499 pub fn to_protobuf(&self) -> i64 {
500 match self {
501 FirstI64::V0 { usecs } => *usecs,
502 FirstI64::V1 { secs } => secs ^ (0b01 << 62),
503 }
504 }
505
506 pub fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<FirstI64> {
507 let value = cur
508 .read_i64::<BigEndian>()
509 .context("failed to read i64 from Time buffer")?;
510 if Self::is_v1_format_state(value) {
511 let secs = value ^ (0b01 << 62);
512 Ok(FirstI64::V1 { secs })
513 } else {
514 Ok(FirstI64::V0 { usecs: value })
515 }
516 }
517
518 fn is_v1_format_state(value: i64) -> bool {
519 let state = (value >> 62) & 0b11;
520 state == 0b10 || state == 0b01
521 }
522}
523
524impl Timestamp {
525 pub fn with_secs_nsecs(secs: i64, nsecs: u32) -> Result<Self> {
526 Ok(Timestamp::new({
527 DateTime::from_timestamp(secs, nsecs)
528 .map(|t| t.naive_utc())
529 .ok_or_else(|| InvalidParamsError::datetime(secs, nsecs))?
530 }))
531 }
532
533 pub fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<Timestamp> {
534 match FirstI64::from_protobuf(cur)? {
535 FirstI64::V0 { usecs } => Ok(Timestamp::with_micros(usecs)?),
536 FirstI64::V1 { secs } => {
537 let nsecs = cur
538 .read_u32::<BigEndian>()
539 .context("failed to read u32 from Time buffer")?;
540 Ok(Timestamp::with_secs_nsecs(secs, nsecs)?)
541 }
542 }
543 }
544
545 pub fn to_protobuf<T: Write>(self, output: &mut T) -> ArrayResult<usize> {
549 let timestamp_size = output
550 .write(
551 &(FirstI64::V1 {
552 secs: self.0.and_utc().timestamp(),
553 }
554 .to_protobuf())
555 .to_be_bytes(),
556 )
557 .map_err(Into::<ArrayError>::into)?;
558 let timestamp_subsec_nanos_size = output
559 .write(&(self.0.and_utc().timestamp_subsec_nanos()).to_be_bytes())
560 .map_err(Into::<ArrayError>::into)?;
561 Ok(timestamp_subsec_nanos_size + timestamp_size)
562 }
563
564 pub fn get_timestamp_nanos(&self) -> i64 {
565 self.0.and_utc().timestamp_nanos_opt().unwrap()
566 }
567
568 pub fn with_millis(timestamp_millis: i64) -> Result<Self> {
569 let secs = timestamp_millis.div_euclid(1_000);
570 let nsecs = timestamp_millis.rem_euclid(1_000) * 1_000_000;
571 Self::with_secs_nsecs(secs, nsecs as u32)
572 }
573
574 pub fn with_micros(timestamp_micros: i64) -> Result<Self> {
575 let secs = timestamp_micros.div_euclid(1_000_000);
576 let nsecs = timestamp_micros.rem_euclid(1_000_000) * 1000;
577 Self::with_secs_nsecs(secs, nsecs as u32)
578 }
579
580 pub fn from_timestamp_uncheck(secs: i64, nsecs: u32) -> Self {
581 Self::new(DateTime::from_timestamp(secs, nsecs).unwrap().naive_utc())
582 }
583
584 pub fn truncate_micros(self) -> Self {
596 Self::new(
597 self.0
598 .with_nanosecond(self.0.nanosecond() / 1000 * 1000)
599 .unwrap(),
600 )
601 }
602
603 pub fn truncate_millis(self) -> Self {
615 Self::new(
616 self.0
617 .with_nanosecond(self.0.nanosecond() / 1_000_000 * 1_000_000)
618 .unwrap(),
619 )
620 }
621
622 pub fn truncate_second(self) -> Self {
634 Self::new(self.0.with_nanosecond(0).unwrap())
635 }
636
637 pub fn truncate_minute(self) -> Self {
649 Date::new(self.0.date()).and_hms_uncheck(self.0.hour(), self.0.minute(), 0)
650 }
651
652 pub fn truncate_hour(self) -> Self {
664 Date::new(self.0.date()).and_hms_uncheck(self.0.hour(), 0, 0)
665 }
666
667 pub fn truncate_day(self) -> Self {
679 Date::new(self.0.date()).into()
680 }
681
682 pub fn truncate_week(self) -> Self {
694 Date::new(self.0.date().week(Weekday::Mon).first_day()).into()
695 }
696
697 pub fn truncate_month(self) -> Self {
709 Date::new(self.0.date().with_day(1).unwrap()).into()
710 }
711
712 pub fn truncate_quarter(self) -> Self {
724 Date::from_ymd_uncheck(self.0.year(), self.0.month0() / 3 * 3 + 1, 1).into()
725 }
726
727 pub fn truncate_year(self) -> Self {
739 Date::from_ymd_uncheck(self.0.year(), 1, 1).into()
740 }
741
742 pub fn truncate_decade(self) -> Self {
754 Date::from_ymd_uncheck(self.0.year() / 10 * 10, 1, 1).into()
755 }
756
757 pub fn truncate_century(self) -> Self {
769 Date::from_ymd_uncheck((self.0.year() - 1) / 100 * 100 + 1, 1, 1).into()
770 }
771
772 pub fn truncate_millennium(self) -> Self {
784 Date::from_ymd_uncheck((self.0.year() - 1) / 1000 * 1000 + 1, 1, 1).into()
785 }
786}
787
788impl From<Date> for Timestamp {
789 fn from(date: Date) -> Self {
790 date.and_hms_uncheck(0, 0, 0)
791 }
792}
793
794fn get_mouth_days(year: i32, month: usize) -> i32 {
796 if is_leap_year(year) {
797 LEAP_DAYS[month]
798 } else {
799 NORMAL_DAYS[month]
800 }
801}
802
803fn is_leap_year(year: i32) -> bool {
804 year % 4 == 0 && (year % 100 != 0 || year % 400 == 0)
805}
806
807impl CheckedAdd<Interval> for Timestamp {
808 type Output = Timestamp;
809
810 fn checked_add(self, rhs: Interval) -> Option<Timestamp> {
811 let mut date = self.0.date();
812 if rhs.months() != 0 {
813 let mut day = date.day() as i32;
815 let mut month = date.month() as i32;
816 let mut year = date.year();
817 let interval_months = rhs.months();
819 let year_diff = interval_months / 12;
820 year += year_diff;
821
822 let month_diff = interval_months - year_diff * 12;
826 month += month_diff;
828 if month > 12 {
830 year += 1;
831 month -= 12;
832 } else if month <= 0 {
833 year -= 1;
834 month += 12;
835 }
836
837 day = day.min(get_mouth_days(year, month as usize));
840 date = NaiveDate::from_ymd_opt(year, month as u32, day as u32)?;
841 }
842 let mut datetime = NaiveDateTime::new(date, self.0.time());
843 datetime = datetime.checked_add_signed(Duration::days(rhs.days().into()))?;
844 datetime = datetime.checked_add_signed(Duration::microseconds(rhs.usecs()))?;
845
846 Some(Timestamp::new(datetime))
847 }
848}
849
850#[cfg(test)]
851mod tests {
852 use super::*;
853
854 #[test]
855 fn parse() {
856 assert_eq!(
857 Timestamp::from_str("2022-08-03T10:34:02").unwrap(),
858 Timestamp::from_str("2022-08-03 10:34:02").unwrap()
859 );
860 let ts = Timestamp::from_str("0001-11-15 07:35:40.999999").unwrap();
861 assert_eq!(ts.0.and_utc().timestamp_micros(), -62108094259000001);
862
863 let ts = Timestamp::from_str("1969-12-31 23:59:59.999999").unwrap();
864 assert_eq!(ts.0.and_utc().timestamp_micros(), -1);
865
866 Date::from_str("1999-01-08AA").unwrap_err();
868 Time::from_str("AA04:05:06").unwrap_err();
869 Timestamp::from_str("1999-01-08 04:05:06AA").unwrap_err();
870 }
871}