risingwave_expr_impl/scalar/
date_bin.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use chrono::DateTime;
16use risingwave_common::types::{Interval, Timestamp, Timestamptz};
17use risingwave_expr::{ExprError, Result, function};
18
19#[function("date_bin(interval, timestamp, timestamp) -> timestamp")]
20pub fn date_bin_ts(stride: Interval, source: Timestamp, origin: Timestamp) -> Result<Timestamp> {
21    let source_us = source.0.and_utc().timestamp_micros(); // source to microseconds
22    let origin_us = origin.0.and_utc().timestamp_micros(); // origin to microseconds
23
24    let binned_source_us = date_bin_inner(stride, source_us, origin_us)?;
25    Ok(Timestamp(
26        DateTime::from_timestamp_micros(binned_source_us)
27            .unwrap()
28            .naive_utc(),
29    ))
30}
31
32#[function("date_bin(interval, timestamptz, timestamptz) -> timestamptz")]
33pub fn date_bin_tstz(
34    stride: Interval,
35    source: Timestamptz,
36    origin: Timestamptz,
37) -> Result<Timestamptz> {
38    let source_us = source.timestamp_micros(); // source to microseconds
39    let origin_us = origin.timestamp_micros(); // origin to microseconds
40
41    let binned_source_us = date_bin_inner(stride, source_us, origin_us)?;
42    Ok(Timestamptz::from_micros(binned_source_us))
43}
44
45fn date_bin_inner(stride: Interval, source_us: i64, origin_us: i64) -> Result<i64> {
46    if stride.months() != 0 {
47        // PostgreSQL doesn't allow months in the interval for date_bin.
48        return Err(ExprError::InvalidParam {
49            name: "stride",
50            reason: "stride interval with months not supported in date_bin".into(),
51        });
52    }
53    let stride_us = stride.usecs() + (stride.days() as i64) * Interval::USECS_PER_DAY; // stride width in microseconds
54
55    if stride_us <= 0 {
56        return Err(ExprError::InvalidParam {
57            name: "stride",
58            reason: "stride interval must be positive".into(),
59        });
60    }
61
62    // Compute how far ts is from the origin
63    let delta = source_us - origin_us;
64
65    // Floor the delta to the nearest stride
66    let bucket = delta.div_euclid(stride_us) * stride_us;
67
68    // Add back to origin to get stridened timestamp
69    let binned_source_us = origin_us + bucket;
70    Ok(binned_source_us)
71}