rw_futures_util/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16
17use futures::stream::TryStream;
18use futures::{Stream, TryFuture};
19
20mod buffered_with_fence;
21mod misc;
22mod pausable;
23
24use buffered_with_fence::{Fenced, MaybeFence, TryBufferedWithFence};
25pub use misc::*;
26pub use pausable::{Pausable, Valve};
27
28/// Create a pausable stream, which can be paused or resumed by a valve.
29pub fn pausable<St>(stream: St) -> (Pausable<St>, Valve)
30where
31    St: Stream,
32{
33    Pausable::new(stream)
34}
35
36pub trait RwTryStreamExt: TryStream {
37    /// Similar to [`TryStreamExt::try_buffered`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html#method.try_buffered), but respect to fence.
38    ///
39    /// Fence is provided by [`Future`] that implements [`MaybeFence`] and returns `true`.
40    /// When the stream receive a fenced future, it'll not do a sync operation. In brief, don't poll later futures until the current
41    /// buffer is cleared.
42    fn try_buffered_with_fence(self, n: usize) -> TryBufferedWithFence<Self>
43    where
44        Self: Sized,
45        Self::Ok: TryFuture<Error = Self::Error> + MaybeFence;
46}
47
48impl<St> RwTryStreamExt for St
49where
50    St: TryStream,
51{
52    fn try_buffered_with_fence(self, n: usize) -> TryBufferedWithFence<Self>
53    where
54        Self: Sized,
55        Self::Ok: TryFuture<Error = Self::Error> + MaybeFence,
56    {
57        TryBufferedWithFence::new(self, n)
58    }
59}
60
61pub trait RwFutureExt: Future {
62    fn with_fence(self, is_fence: bool) -> Fenced<Self>
63    where
64        Self: Sized;
65}
66
67impl<Fut: Future> RwFutureExt for Fut {
68    fn with_fence(self, is_fence: bool) -> Fenced<Self> {
69        Fenced::new(self, is_fence)
70    }
71}