risingwave_stream/executor/exchange/
permit.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Channel implementation for permit-based back-pressure.
16
17use std::sync::Arc;
18
19use risingwave_common::config::StreamingConfig;
20use risingwave_common::metrics::LabelGuardedIntGauge;
21use risingwave_common_estimate_size::EstimateSize;
22use risingwave_pb::task_service::permits;
23use tokio::sync::{AcquireError, Semaphore, SemaphorePermit, mpsc};
24
25use crate::executor::DispatcherMessageBatch as Message;
26
27/// Message with its required permits.
28///
29/// We store the `permits` in the struct instead of implying it from the `message` so that the
30/// permit number is totally determined by the sender and the downstream only needs to give the
31/// `permits` back verbatim, in case the version of the upstream and the downstream are different.
32pub struct MessageWithPermits {
33    pub message: Message,
34    pub permits: Option<permits::Value>,
35}
36
37pub struct ChannelMetrics {
38    pub sender_actor_channel_buffered_bytes: LabelGuardedIntGauge,
39    pub receiver_actor_channel_buffered_bytes: LabelGuardedIntGauge,
40}
41
42impl ChannelMetrics {
43    pub fn for_test() -> Self {
44        Self {
45            sender_actor_channel_buffered_bytes: LabelGuardedIntGauge::test_int_gauge::<1>(),
46            receiver_actor_channel_buffered_bytes: LabelGuardedIntGauge::test_int_gauge::<1>(),
47        }
48    }
49}
50
51/// Create a channel for the exchange service with metrics.
52pub fn channel_with_metrics(
53    initial_permits: usize,
54    batched_permits: usize,
55    concurrent_barriers: usize,
56    metrics: ChannelMetrics,
57) -> (Sender, Receiver) {
58    let ChannelMetrics {
59        sender_actor_channel_buffered_bytes,
60        receiver_actor_channel_buffered_bytes,
61    } = metrics;
62
63    // Use an unbounded channel since we manage the permits manually.
64    let (tx, rx) = mpsc::unbounded_channel();
65
66    let records = Semaphore::new(initial_permits);
67    let barriers = Semaphore::new(concurrent_barriers);
68    let permits = Arc::new(Permits { records, barriers });
69
70    let max_chunk_permits: usize = initial_permits - batched_permits;
71
72    (
73        Sender {
74            tx,
75            permits: permits.clone(),
76            max_chunk_permits,
77            sender_actor_channel_buffered_bytes,
78        },
79        Receiver {
80            rx,
81            permits,
82            receiver_actor_channel_buffered_bytes,
83        },
84    )
85}
86
87pub fn channel_from_config_with_metrics(
88    config: &StreamingConfig,
89    metrics: ChannelMetrics,
90) -> (Sender, Receiver) {
91    channel_with_metrics(
92        config.developer.exchange_initial_permits,
93        config.developer.exchange_batched_permits,
94        config.developer.exchange_concurrent_barriers,
95        metrics,
96    )
97}
98
99/// The configuration for tests.
100pub mod for_test {
101    pub const INITIAL_PERMITS: usize = (u32::MAX / 2) as _;
102    pub const BATCHED_PERMITS: usize = 1;
103    pub const CONCURRENT_BARRIERS: usize = (u32::MAX / 2) as _;
104}
105
106pub fn channel_for_test() -> (Sender, Receiver) {
107    use for_test::*;
108
109    channel_with_metrics(
110        INITIAL_PERMITS,
111        BATCHED_PERMITS,
112        CONCURRENT_BARRIERS,
113        ChannelMetrics::for_test(),
114    )
115}
116
117/// Semaphore-based permits to control the back-pressure.
118///
119/// The number of messages in the exchange channel is limited by these semaphores.
120pub struct Permits {
121    /// The permits for records in chunks.
122    records: Semaphore,
123    /// The permits for barriers.
124    barriers: Semaphore,
125}
126
127impl Permits {
128    /// Add permits back to the semaphores.
129    pub fn add_permits(&self, permits: permits::Value) {
130        match permits {
131            permits::Value::Record(p) => self.records.add_permits(p as usize),
132            permits::Value::Barrier(p) => self.barriers.add_permits(p as usize),
133        }
134    }
135
136    /// Acquire permits from the semaphores.
137    ///
138    /// This function is cancellation-safe except for the fairness of waking.
139    async fn acquire_permits(&self, permits: &permits::Value) -> Result<(), AcquireError> {
140        match permits {
141            permits::Value::Record(p) => self.records.acquire_many(*p as _),
142            permits::Value::Barrier(p) => self.barriers.acquire_many(*p as _),
143        }
144        .await
145        .map(SemaphorePermit::forget)
146    }
147
148    /// Close the semaphores so that all pending `acquire` will fail immediately.
149    fn close(&self) {
150        self.records.close();
151        self.barriers.close();
152    }
153}
154
155/// The sender of the exchange service with permit-based back-pressure.
156pub struct Sender {
157    tx: mpsc::UnboundedSender<MessageWithPermits>,
158    permits: Arc<Permits>,
159
160    /// The maximum permits required by a chunk. If there're too many rows in a chunk, we only
161    /// acquire these permits. `BATCHED_PERMITS` is subtracted to avoid deadlock with
162    /// batching.
163    max_chunk_permits: usize,
164    sender_actor_channel_buffered_bytes: LabelGuardedIntGauge,
165}
166
167impl Sender {
168    /// Send a message, waiting until there are enough permits.
169    ///
170    /// Returns error if the receive half of the channel is closed, including the message passed.
171    pub async fn send(&self, message: Message) -> Result<(), mpsc::error::SendError<Message>> {
172        let chunk_size = match &message {
173            Message::Chunk(chunk) => chunk.estimated_size() as i64,
174            _ => 0,
175        };
176
177        // The semaphores should never be closed.
178        let permits = match &message {
179            Message::Chunk(c) => {
180                let card = c.cardinality().clamp(1, self.max_chunk_permits);
181                if card == self.max_chunk_permits {
182                    tracing::warn!(cardinality = c.cardinality(), "large chunk in exchange")
183                }
184                Some(permits::Value::Record(card as _))
185            }
186            Message::BarrierBatch(_) => Some(permits::Value::Barrier(1)),
187            Message::Watermark(_) => None,
188        };
189
190        if let Some(permits) = &permits
191            && self.permits.acquire_permits(permits).await.is_err()
192        {
193            return Err(mpsc::error::SendError(message));
194        }
195
196        self.tx
197            .send(MessageWithPermits { message, permits })
198            .map_err(|e| mpsc::error::SendError(e.0.message))?;
199
200        self.sender_actor_channel_buffered_bytes.add(chunk_size);
201
202        Ok(())
203    }
204}
205
206/// The receiver of the exchange service with permit-based back-pressure.
207pub struct Receiver {
208    rx: mpsc::UnboundedReceiver<MessageWithPermits>,
209    permits: Arc<Permits>,
210    receiver_actor_channel_buffered_bytes: LabelGuardedIntGauge,
211}
212
213impl Receiver {
214    /// Receive the next message for this receiver, with the permits of this message added back.
215    /// Used for local exchange.
216    ///
217    /// Returns `None` if the channel has been closed.
218    pub async fn recv(&mut self) -> Option<Message> {
219        let MessageWithPermits { message, permits } = self.recv_raw().await?;
220
221        if let Some(permits) = permits {
222            self.permits.add_permits(permits);
223        }
224
225        Some(message)
226    }
227
228    /// Try to receive the next message for this receiver, with the permits of this message added
229    /// back.
230    ///
231    /// Returns error if the channel is currently empty.
232    pub fn try_recv(&mut self) -> Result<Message, mpsc::error::TryRecvError> {
233        let MessageWithPermits { message, permits } = self.rx.try_recv()?;
234
235        if let Some(permits) = permits {
236            self.permits.add_permits(permits);
237        }
238
239        Ok(message)
240    }
241
242    /// Receive the next message and its permits for this receiver, **without** adding the permits
243    /// back. Used for remote exchange where the permits should be manually added according to the
244    /// downstream actor.
245    ///
246    /// Returns `None` if the channel has been closed.
247    pub async fn recv_raw(&mut self) -> Option<MessageWithPermits> {
248        let message_with_permits = self.rx.recv().await?;
249
250        if let Message::Chunk(chunk) = &message_with_permits.message {
251            let chunk_size = chunk.estimated_size() as i64;
252            self.receiver_actor_channel_buffered_bytes.sub(chunk_size);
253        }
254
255        Some(message_with_permits)
256    }
257
258    /// Get a reference to the inner [`Permits`] to manually add permits.
259    pub fn permits(&self) -> Arc<Permits> {
260        self.permits.clone()
261    }
262}
263
264impl Drop for Receiver {
265    fn drop(&mut self) {
266        // Close the `permits` semaphores so that all pending `acquire` on the sender side will fail
267        // immediately.
268        self.permits.close();
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use std::assert_matches::assert_matches;
275    use std::pin::pin;
276
277    use futures::FutureExt;
278
279    use super::*;
280    use crate::executor::DispatcherBarrier as Barrier;
281
282    #[test]
283    fn test_channel_close() {
284        let (tx, mut rx) = channel_with_metrics(0, 0, 1, ChannelMetrics::for_test());
285
286        let send = || {
287            tx.send(Message::BarrierBatch(vec![
288                Barrier::with_prev_epoch_for_test(514, 114),
289            ]))
290        };
291
292        assert_matches!(send().now_or_never(), Some(Ok(_))); // send successfully
293        assert_matches!(
294            rx.recv().now_or_never(),
295            Some(Some(Message::BarrierBatch(_)))
296        ); // recv successfully
297
298        assert_matches!(send().now_or_never(), Some(Ok(_))); // send successfully
299        // do not recv, so that the channel is full
300
301        let mut send_fut = pin!(send());
302        assert_matches!((&mut send_fut).now_or_never(), None); // would block due to no permits
303        drop(rx);
304        assert_matches!(send_fut.now_or_never(), Some(Err(_))); // channel closed
305    }
306}