risingwave_expr_impl/scalar/
date_bin.rs1use chrono::DateTime;
16use risingwave_common::types::{Interval, Timestamp, Timestamptz};
17use risingwave_expr::{ExprError, Result, function};
18
19#[function("date_bin(interval, timestamp, timestamp) -> timestamp")]
20pub fn date_bin_ts(stride: Interval, source: Timestamp, origin: Timestamp) -> Result<Timestamp> {
21    let source_us = source.0.and_utc().timestamp_micros(); let origin_us = origin.0.and_utc().timestamp_micros(); let binned_source_us = date_bin_inner(stride, source_us, origin_us)?;
25    Ok(Timestamp(
26        DateTime::from_timestamp_micros(binned_source_us)
27            .unwrap()
28            .naive_utc(),
29    ))
30}
31
32#[function("date_bin(interval, timestamptz, timestamptz) -> timestamptz")]
33pub fn date_bin_tstz(
34    stride: Interval,
35    source: Timestamptz,
36    origin: Timestamptz,
37) -> Result<Timestamptz> {
38    let source_us = source.timestamp_micros(); let origin_us = origin.timestamp_micros(); let binned_source_us = date_bin_inner(stride, source_us, origin_us)?;
42    Ok(Timestamptz::from_micros(binned_source_us))
43}
44
45fn date_bin_inner(stride: Interval, source_us: i64, origin_us: i64) -> Result<i64> {
46    if stride.months() != 0 {
47        return Err(ExprError::InvalidParam {
49            name: "stride",
50            reason: "stride interval with months not supported in date_bin".into(),
51        });
52    }
53    let stride_us = stride.usecs() + (stride.days() as i64) * Interval::USECS_PER_DAY; if stride_us <= 0 {
56        return Err(ExprError::InvalidParam {
57            name: "stride",
58            reason: "stride interval must be positive".into(),
59        });
60    }
61
62    let delta = source_us - origin_us;
64
65    let bucket = delta.div_euclid(stride_us) * stride_us;
67
68    let binned_source_us = origin_us + bucket;
70    Ok(binned_source_us)
71}