risingwave_expr_impl/scalar/
proctime.rs1use risingwave_common::types::Timestamptz;
16use risingwave_common::util::epoch;
17use risingwave_expr::{ExprError, Result, function};
18
19#[function("proctime() -> timestamptz", volatile)]
21fn proctime() -> Result<Timestamptz> {
22 let epoch = epoch::task_local::curr_epoch().ok_or(ExprError::Context("EPOCH"))?;
23 Ok(epoch.as_timestamptz())
24}
25
26#[cfg(test)]
27mod tests {
28 use risingwave_common::util::epoch::{Epoch, EpochPair};
29
30 use super::*;
31
32 #[tokio::test]
33 async fn test_proctime() {
34 let curr_epoch = Epoch::now();
35 let epoch = EpochPair {
36 curr: curr_epoch.0,
37 prev: 0,
38 };
39
40 let proctime = epoch::task_local::scope(epoch, async { proctime().unwrap() }).await;
41
42 assert_eq!(
43 proctime,
44 Timestamptz::from_millis(curr_epoch.as_unix_millis() as i64).unwrap()
45 );
46 }
47}