rw_futures_util/
misc.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::pin::{pin, Pin};
use std::task::{ready, Context, Poll};

use futures::future::{pending, select, Either};
use futures::stream::Peekable;
use futures::{FutureExt, Stream, StreamExt};

/// Convert a list of streams into a [`Stream`] of results from the streams.
pub fn select_all<S: Stream + Unpin>(
    streams: impl IntoIterator<Item = S>,
) -> futures::stream::SelectAll<S> {
    // We simply forward the implementation to `futures` as it performs good enough.
    #[expect(clippy::disallowed_methods)]
    futures::stream::select_all(streams)
}

pub fn pending_on_none<I>(future: impl Future<Output = Option<I>>) -> impl Future<Output = I> {
    future.then(|opt| async move {
        match opt {
            Some(item) => item,
            None => pending::<I>().await,
        }
    })
}

pub fn drop_either_future<A, B>(
    either: Either<(A, impl Future), (B, impl Future)>,
) -> Either<A, B> {
    match either {
        Either::Left((left, _)) => Either::Left(left),
        Either::Right((right, _)) => Either::Right(right),
    }
}

/// Await on a future while monitoring on a peekable stream that may return error.
/// The peekable stream is polled at a higher priority than the future.
///
/// When the peekable stream returns with a error and end of stream, the future will
/// return the error immediately. Otherwise, it will keep polling the given future.
///
/// Return:
///     - Ok(output) as the output of the given future.
///     - Err(None) to indicate that the stream has reached the end.
///     - Err(e) to indicate that the stream returns an error.
pub async fn await_future_with_monitor_error_stream<T, E, F: Future>(
    peek_stream: &mut Peekable<impl Stream<Item = Result<T, E>> + Unpin>,
    future: F,
) -> Result<F::Output, Option<E>> {
    // Poll the response stream to early see the error
    match select(pin!(Pin::new(&mut *peek_stream).peek()), pin!(future)).await {
        Either::Left((response_result, send_future)) => match response_result {
            None => Err(None),
            Some(Err(_)) => {
                let err = match peek_stream.next().now_or_never() {
                    Some(Some(Err(err))) => err,
                    _ => unreachable!("peek has output, peek output not None, have check err"),
                };
                Err(Some(err))
            }
            Some(Ok(_)) => Ok(send_future.await),
        },
        Either::Right((output, _)) => Ok(output),
    }
}

/// Attach an item of type `T` to the future `F`. When the future is polled with ready,
/// the item will be attached to the output of future as `(F::Output, item)`.
///
/// The generated future will be similar to `future.map(|output| (output, item))`. The
/// only difference is that the `Map` future does not provide method `into_inner` to
/// get the original inner future.
pub struct AttachedFuture<F, T> {
    inner: F,
    item: Option<T>,
}

impl<F, T> AttachedFuture<F, T> {
    pub fn new(inner: F, item: T) -> Self {
        Self {
            inner,
            item: Some(item),
        }
    }

    pub fn into_inner(self) -> (F, T) {
        (
            self.inner,
            self.item.expect("should not be called after polled ready"),
        )
    }

    pub fn item(&self) -> &T {
        self.item
            .as_ref()
            .expect("should not be called after polled ready")
    }
}

impl<F: Future + Unpin, T: Unpin> Future for AttachedFuture<F, T> {
    type Output = (F::Output, T);

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();
        let output = ready!(this.inner.poll_unpin(cx));
        Poll::Ready((
            output,
            this.item
                .take()
                .expect("should not be polled ready for twice"),
        ))
    }
}