risingwave_simulation/utils/
timed_future.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::pin::{Pin, pin};
17use std::task::{Context, Poll};
18use std::time::{Duration, Instant};
19
20use pin_project::pin_project;
21
22/// Inspired by <https://stackoverflow.com/a/59935743/2990323>
23/// A wrapper around a Future which adds timing data.
24#[pin_project]
25pub struct Timed<Fut, F>
26where
27    Fut: Future,
28    F: Fn(&Fut::Output, Duration),
29{
30    #[pin]
31    inner: Fut,
32    f: F,
33    start: Option<Instant>,
34}
35
36impl<Fut, F> Future for Timed<Fut, F>
37where
38    Fut: Future,
39    F: Fn(&Fut::Output, Duration),
40{
41    type Output = Fut::Output;
42
43    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
44        let this = self.project();
45        let start = this.start.get_or_insert_with(Instant::now);
46
47        match this.inner.poll(cx) {
48            // If the inner future is still pending, this wrapper is still pending.
49            Poll::Pending => Poll::Pending,
50
51            // If the inner future is done, measure the elapsed time and finish this wrapper future.
52            Poll::Ready(v) => {
53                let elapsed = start.elapsed();
54                (this.f)(&v, elapsed);
55
56                Poll::Ready(v)
57            }
58        }
59    }
60}
61
62pub trait TimedExt: Sized + Future {
63    fn timed<F>(self, f: F) -> Timed<Self, F>
64    where
65        F: Fn(&Self::Output, Duration),
66    {
67        Timed {
68            inner: self,
69            f,
70            start: None,
71        }
72    }
73}
74
75// All futures can use the `.timed` method defined above
76impl<F: Future> TimedExt for F {}