risingwave_expr_impl/scalar/
date_trunc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::{Interval, Timestamp, Timestamptz};
16use risingwave_expr::{ExprError, Result, function};
17
18use super::timestamptz::timestamp_at_time_zone;
19
20// TODO(xiangjinwu): parse into an enum
21const MICROSECONDS: &str = "microseconds";
22const MILLISECONDS: &str = "milliseconds";
23const SECOND: &str = "second";
24const MINUTE: &str = "minute";
25const HOUR: &str = "hour";
26const DAY: &str = "day";
27const WEEK: &str = "week";
28const MONTH: &str = "month";
29const QUARTER: &str = "quarter";
30const YEAR: &str = "year";
31const DECADE: &str = "decade";
32const CENTURY: &str = "century";
33const MILLENNIUM: &str = "millennium";
34
35#[function("date_trunc(varchar, timestamp) -> timestamp")]
36pub fn date_trunc_timestamp(field: &str, ts: Timestamp) -> Result<Timestamp> {
37    Ok(match field.to_ascii_lowercase().as_str() {
38        MICROSECONDS => ts.truncate_micros(),
39        MILLISECONDS => ts.truncate_millis(),
40        SECOND => ts.truncate_second(),
41        MINUTE => ts.truncate_minute(),
42        HOUR => ts.truncate_hour(),
43        DAY => ts.truncate_day(),
44        WEEK => ts.truncate_week(),
45        MONTH => ts.truncate_month(),
46        QUARTER => ts.truncate_quarter(),
47        YEAR => ts.truncate_year(),
48        DECADE => ts.truncate_decade(),
49        CENTURY => ts.truncate_century(),
50        MILLENNIUM => ts.truncate_millennium(),
51        _ => return Err(invalid_field_error(field)),
52    })
53}
54
55#[function("date_trunc(varchar, timestamptz) -> timestamptz", rewritten)]
56fn _date_trunc_timestamptz() {}
57
58#[function("date_trunc(varchar, timestamptz, varchar) -> timestamptz")]
59pub fn date_trunc_timestamptz_at_timezone(
60    field: &str,
61    tsz: Timestamptz,
62    timezone: &str,
63) -> Result<Timestamptz> {
64    use chrono::Offset as _;
65
66    use super::timestamptz::time_zone_err;
67
68    let tz = Timestamptz::lookup_time_zone(timezone).map_err(time_zone_err)?;
69    let instant_local = tsz.to_datetime_in_zone(tz);
70
71    let truncated_naive = date_trunc_timestamp(field, Timestamp(instant_local.naive_local()))?;
72
73    match field.to_ascii_lowercase().as_str() {
74        MICROSECONDS | MILLISECONDS | SECOND | MINUTE | HOUR => {
75            // When unit < day, follow PostgreSQL to use old timezone offset.
76            // rather than reinterpret it in the timezone.
77            // https://github.com/postgres/postgres/blob/REL_16_0/src/backend/utils/adt/timestamp.c#L4270
78            // See `e2e_test/batch/functions/issue_12072.slt.part` for the difference.
79            let fixed = instant_local.offset().fix();
80            // `unwrap` is okay because `FixedOffset` always returns single unique conversion result.
81            let truncated_local = truncated_naive.0.and_local_timezone(fixed).unwrap();
82            Ok(Timestamptz::from_micros(truncated_local.timestamp_micros()))
83        }
84        _ => timestamp_at_time_zone(truncated_naive, timezone),
85    }
86}
87
88#[function("date_trunc(varchar, interval) -> interval")]
89pub fn date_trunc_interval(field: &str, interval: Interval) -> Result<Interval> {
90    Ok(match field.to_ascii_lowercase().as_str() {
91        MICROSECONDS => interval,
92        MILLISECONDS => interval.truncate_millis(),
93        SECOND => interval.truncate_second(),
94        MINUTE => interval.truncate_minute(),
95        HOUR => interval.truncate_hour(),
96        DAY => interval.truncate_day(),
97        WEEK => return Err(ExprError::UnsupportedFunction(
98            "interval units \"week\" not supported because months usually have fractional weeks"
99                .into(),
100        )),
101        MONTH => interval.truncate_month(),
102        QUARTER => interval.truncate_quarter(),
103        YEAR => interval.truncate_year(),
104        DECADE => interval.truncate_decade(),
105        CENTURY => interval.truncate_century(),
106        MILLENNIUM => interval.truncate_millennium(),
107        _ => return Err(invalid_field_error(field)),
108    })
109}
110
111#[inline]
112fn invalid_field_error(field: &str) -> ExprError {
113    ExprError::InvalidParam {
114        name: "field",
115        reason: format!("invalid field {field:?}. must be one of: microseconds, milliseconds, second, minute, hour, day, week, month, quarter, year, decade, century, millennium").into(),
116    }
117}