rw_futures_util/lib.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16
17use futures::stream::TryStream;
18use futures::{Stream, TryFuture};
19
20mod buffered_with_fence;
21mod misc;
22mod pausable;
23
24use buffered_with_fence::{Fenced, MaybeFence, TryBufferedWithFence};
25pub use misc::*;
26pub use pausable::{Pausable, Valve};
27
28/// Create a pausable stream, which can be paused or resumed by a valve.
29pub fn pausable<St>(stream: St) -> (Pausable<St>, Valve)
30where
31 St: Stream,
32{
33 Pausable::new(stream)
34}
35
36pub trait RwTryStreamExt: TryStream {
37 /// Similar to [`TryStreamExt::try_buffered`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html#method.try_buffered), but respect to fence.
38 ///
39 /// Fence is provided by [`Future`] that implements [`MaybeFence`] and returns `true`.
40 /// When the stream receive a fenced future, it'll not do a sync operation. In brief, don't poll later futures until the current
41 /// buffer is cleared.
42 fn try_buffered_with_fence(self, n: usize) -> TryBufferedWithFence<Self>
43 where
44 Self: Sized,
45 Self::Ok: TryFuture<Error = Self::Error> + MaybeFence;
46}
47
48impl<St> RwTryStreamExt for St
49where
50 St: TryStream,
51{
52 fn try_buffered_with_fence(self, n: usize) -> TryBufferedWithFence<Self>
53 where
54 Self: Sized,
55 Self::Ok: TryFuture<Error = Self::Error> + MaybeFence,
56 {
57 TryBufferedWithFence::new(self, n)
58 }
59}
60
61pub trait RwFutureExt: Future {
62 fn with_fence(self, is_fence: bool) -> Fenced<Self>
63 where
64 Self: Sized;
65}
66
67impl<Fut: Future> RwFutureExt for Fut {
68 fn with_fence(self, is_fence: bool) -> Fenced<Self> {
69 Fenced::new(self, is_fence)
70 }
71}