risingwave_simulation/utils/timed_future.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::pin::{Pin, pin};
17use std::task::{Context, Poll};
18use std::time::{Duration, Instant};
19
20use pin_project::pin_project;
21
22/// Inspired by <https://stackoverflow.com/a/59935743/2990323>
23/// A wrapper around a Future which adds timing data.
24#[pin_project]
25pub struct Timed<Fut, F>
26where
27 Fut: Future,
28 F: Fn(&Fut::Output, Duration),
29{
30 #[pin]
31 inner: Fut,
32 f: F,
33 start: Option<Instant>,
34}
35
36impl<Fut, F> Future for Timed<Fut, F>
37where
38 Fut: Future,
39 F: Fn(&Fut::Output, Duration),
40{
41 type Output = Fut::Output;
42
43 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
44 let this = self.project();
45 let start = this.start.get_or_insert_with(Instant::now);
46
47 match this.inner.poll(cx) {
48 // If the inner future is still pending, this wrapper is still pending.
49 Poll::Pending => Poll::Pending,
50
51 // If the inner future is done, measure the elapsed time and finish this wrapper future.
52 Poll::Ready(v) => {
53 let elapsed = start.elapsed();
54 (this.f)(&v, elapsed);
55
56 Poll::Ready(v)
57 }
58 }
59 }
60}
61
62pub trait TimedExt: Sized + Future {
63 fn timed<F>(self, f: F) -> Timed<Self, F>
64 where
65 F: Fn(&Self::Output, Duration),
66 {
67 Timed {
68 inner: self,
69 f,
70 start: None,
71 }
72 }
73}
74
75// All futures can use the `.timed` method defined above
76impl<F: Future> TimedExt for F {}