risingwave_expr_impl/scalar/
proctime.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::Timestamptz;
16use risingwave_common::util::epoch;
17use risingwave_expr::{ExprError, Result, function};
18
19/// Get the processing time in Timestamptz scalar from the task-local epoch.
20#[function("proctime() -> timestamptz", volatile)]
21fn proctime() -> Result<Timestamptz> {
22    let epoch = epoch::task_local::curr_epoch().ok_or(ExprError::Context("EPOCH"))?;
23    Ok(epoch.as_timestamptz())
24}
25
26#[cfg(test)]
27mod tests {
28    use risingwave_common::util::epoch::{Epoch, EpochPair};
29
30    use super::*;
31
32    #[tokio::test]
33    async fn test_proctime() {
34        let curr_epoch = Epoch::now();
35        let epoch = EpochPair {
36            curr: curr_epoch.0,
37            prev: 0,
38        };
39
40        let proctime = epoch::task_local::scope(epoch, async { proctime().unwrap() }).await;
41
42        assert_eq!(
43            proctime,
44            Timestamptz::from_millis(curr_epoch.as_unix_millis() as i64).unwrap()
45        );
46    }
47}