risingwave_expr_impl/scalar/
date_bin.rs1use chrono::DateTime;
16use risingwave_common::types::{Interval, Timestamp, Timestamptz};
17use risingwave_expr::{ExprError, Result, function};
18
19#[function("date_bin(interval, timestamp, timestamp) -> timestamp")]
20pub fn date_bin_ts(stride: Interval, source: Timestamp, origin: Timestamp) -> Result<Timestamp> {
21 let source_us = source.0.and_utc().timestamp_micros(); let origin_us = origin.0.and_utc().timestamp_micros(); let binned_source_us = date_bin_inner(stride, source_us, origin_us)?;
25 Ok(Timestamp(
26 DateTime::from_timestamp_micros(binned_source_us)
27 .unwrap()
28 .naive_utc(),
29 ))
30}
31
32#[function("date_bin(interval, timestamptz, timestamptz) -> timestamptz")]
33pub fn date_bin_tstz(
34 stride: Interval,
35 source: Timestamptz,
36 origin: Timestamptz,
37) -> Result<Timestamptz> {
38 let source_us = source.timestamp_micros(); let origin_us = origin.timestamp_micros(); let binned_source_us = date_bin_inner(stride, source_us, origin_us)?;
42 Ok(Timestamptz::from_micros(binned_source_us))
43}
44
45fn date_bin_inner(stride: Interval, source_us: i64, origin_us: i64) -> Result<i64> {
46 if stride.months() != 0 {
47 return Err(ExprError::InvalidParam {
49 name: "stride",
50 reason: "stride interval with months not supported in date_bin".into(),
51 });
52 }
53 let stride_us = stride.usecs() + (stride.days() as i64) * Interval::USECS_PER_DAY; if stride_us <= 0 {
56 return Err(ExprError::InvalidParam {
57 name: "stride",
58 reason: "stride interval must be positive".into(),
59 });
60 }
61
62 let delta = source_us - origin_us;
64
65 let bucket = delta.div_euclid(stride_us) * stride_us;
67
68 let binned_source_us = origin_us + bucket;
70 Ok(binned_source_us)
71}