rw_futures_util/
misc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::pin::{Pin, pin};
17use std::task::{Context, Poll, ready};
18
19use futures::future::{Either, pending, select};
20use futures::stream::Peekable;
21use futures::{FutureExt, Stream, StreamExt};
22
23/// Convert a list of streams into a [`Stream`] of results from the streams.
24pub fn select_all<S: Stream + Unpin>(
25    streams: impl IntoIterator<Item = S>,
26) -> futures::stream::SelectAll<S> {
27    // We simply forward the implementation to `futures` as it performs good enough.
28    #[expect(clippy::disallowed_methods)]
29    futures::stream::select_all(streams)
30}
31
32pub fn pending_on_none<I>(future: impl Future<Output = Option<I>>) -> impl Future<Output = I> {
33    future.then(|opt| async move {
34        match opt {
35            Some(item) => item,
36            None => pending::<I>().await,
37        }
38    })
39}
40
41pub fn drop_either_future<A, B>(
42    either: Either<(A, impl Future), (B, impl Future)>,
43) -> Either<A, B> {
44    match either {
45        Either::Left((left, _)) => Either::Left(left),
46        Either::Right((right, _)) => Either::Right(right),
47    }
48}
49
50/// Await on a future while monitoring on a peekable stream that may return error.
51/// The peekable stream is polled at a higher priority than the future.
52///
53/// When the peekable stream returns with a error and end of stream, the future will
54/// return the error immediately. Otherwise, it will keep polling the given future.
55///
56/// Return:
57///     - Ok(output) as the output of the given future.
58///     - Err(None) to indicate that the stream has reached the end.
59///     - Err(e) to indicate that the stream returns an error.
60pub async fn await_future_with_monitor_error_stream<T, E, F: Future>(
61    peek_stream: &mut Peekable<impl Stream<Item = Result<T, E>> + Unpin>,
62    future: F,
63) -> Result<F::Output, Option<E>> {
64    // Poll the response stream to early see the error
65    match select(pin!(Pin::new(&mut *peek_stream).peek()), pin!(future)).await {
66        Either::Left((response_result, send_future)) => match response_result {
67            None => Err(None),
68            Some(Err(_)) => {
69                let err = match peek_stream.next().now_or_never() {
70                    Some(Some(Err(err))) => err,
71                    _ => unreachable!("peek has output, peek output not None, have check err"),
72                };
73                Err(Some(err))
74            }
75            Some(Ok(_)) => Ok(send_future.await),
76        },
77        Either::Right((output, _)) => Ok(output),
78    }
79}
80
81/// Attach an item of type `T` to the future `F`. When the future is polled with ready,
82/// the item will be attached to the output of future as `(F::Output, item)`.
83///
84/// The generated future will be similar to `future.map(|output| (output, item))`. The
85/// only difference is that the `Map` future does not provide method `into_inner` to
86/// get the original inner future.
87pub struct AttachedFuture<F, T> {
88    inner: F,
89    item: Option<T>,
90}
91
92impl<F, T> AttachedFuture<F, T> {
93    pub fn new(inner: F, item: T) -> Self {
94        Self {
95            inner,
96            item: Some(item),
97        }
98    }
99
100    pub fn into_inner(self) -> (F, T) {
101        (
102            self.inner,
103            self.item.expect("should not be called after polled ready"),
104        )
105    }
106
107    pub fn item(&self) -> &T {
108        self.item
109            .as_ref()
110            .expect("should not be called after polled ready")
111    }
112}
113
114impl<F: Future + Unpin, T: Unpin> Future for AttachedFuture<F, T> {
115    type Output = (F::Output, T);
116
117    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
118        let this = self.get_mut();
119        let output = ready!(this.inner.poll_unpin(cx));
120        Poll::Ready((
121            output,
122            this.item
123                .take()
124                .expect("should not be polled ready for twice"),
125        ))
126    }
127}