risingwave_expr_impl/scalar/
delay.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use risingwave_common::types::{F64, Interval};
18use risingwave_expr::function;
19
20/// Makes the current session's process sleep until the given number of seconds have elapsed.
21///
22/// ```slt
23/// query I
24/// SELECT pg_sleep(1.5);
25/// ----
26/// NULL
27/// ```
28#[function("pg_sleep(float8)", volatile)]
29async fn pg_sleep(second: F64) {
30    tokio::time::sleep(Duration::from_secs_f64(second.0)).await;
31}
32
33/// Makes the current session's process sleep until the given interval has elapsed.
34///
35/// ```slt
36/// query I
37/// SELECT pg_sleep_for('1 second');
38/// ----
39/// NULL
40/// ```
41#[function("pg_sleep_for(interval)", volatile)]
42async fn pg_sleep_for(interval: Interval) {
43    // we only use the microsecond part of the interval
44    let usecs = if interval.is_positive() {
45        interval.usecs() as u64
46    } else {
47        // return if the interval is not positive
48        return;
49    };
50    let duration = Duration::from_micros(usecs);
51    tokio::time::sleep(duration).await;
52}