1use std::future::Future;
16use std::pin::{Pin, pin};
17use std::task::{Context, Poll, ready};
18
19use futures::future::{Either, pending, select};
20use futures::stream::Peekable;
21use futures::{FutureExt, Stream, StreamExt};
22
23pub fn select_all<S: Stream + Unpin>(
25 streams: impl IntoIterator<Item = S>,
26) -> futures::stream::SelectAll<S> {
27 #[expect(clippy::disallowed_methods)]
29 futures::stream::select_all(streams)
30}
31
32pub fn pending_on_none<I>(future: impl Future<Output = Option<I>>) -> impl Future<Output = I> {
33 future.then(|opt| async move {
34 match opt {
35 Some(item) => item,
36 None => pending::<I>().await,
37 }
38 })
39}
40
41pub fn drop_either_future<A, B>(
42 either: Either<(A, impl Future), (B, impl Future)>,
43) -> Either<A, B> {
44 match either {
45 Either::Left((left, _)) => Either::Left(left),
46 Either::Right((right, _)) => Either::Right(right),
47 }
48}
49
50pub async fn await_future_with_monitor_error_stream<T, E, F: Future>(
61 peek_stream: &mut Peekable<impl Stream<Item = Result<T, E>> + Unpin>,
62 future: F,
63) -> Result<F::Output, Option<E>> {
64 match select(pin!(Pin::new(&mut *peek_stream).peek()), pin!(future)).await {
66 Either::Left((response_result, send_future)) => match response_result {
67 None => Err(None),
68 Some(Err(_)) => {
69 let err = match peek_stream.next().now_or_never() {
70 Some(Some(Err(err))) => err,
71 _ => unreachable!("peek has output, peek output not None, have check err"),
72 };
73 Err(Some(err))
74 }
75 Some(Ok(_)) => Ok(send_future.await),
76 },
77 Either::Right((output, _)) => Ok(output),
78 }
79}
80
81pub struct AttachedFuture<F, T> {
88 inner: F,
89 item: Option<T>,
90}
91
92impl<F, T> AttachedFuture<F, T> {
93 pub fn new(inner: F, item: T) -> Self {
94 Self {
95 inner,
96 item: Some(item),
97 }
98 }
99
100 pub fn into_inner(self) -> (F, T) {
101 (
102 self.inner,
103 self.item.expect("should not be called after polled ready"),
104 )
105 }
106
107 pub fn item(&self) -> &T {
108 self.item
109 .as_ref()
110 .expect("should not be called after polled ready")
111 }
112}
113
114impl<F: Future + Unpin, T: Unpin> Future for AttachedFuture<F, T> {
115 type Output = (F::Output, T);
116
117 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
118 let this = self.get_mut();
119 let output = ready!(this.inner.poll_unpin(cx));
120 Poll::Ready((
121 output,
122 this.item
123 .take()
124 .expect("should not be polled ready for twice"),
125 ))
126 }
127}