risingwave_stream/executor/exchange/
permit.rs1use std::sync::Arc;
18
19use risingwave_common::config::StreamingConfig;
20use risingwave_common::metrics::LabelGuardedIntGauge;
21use risingwave_common_estimate_size::EstimateSize;
22use risingwave_pb::task_service::permits;
23use tokio::sync::{AcquireError, Semaphore, SemaphorePermit, mpsc};
24
25use crate::executor::DispatcherMessageBatch as Message;
26
27pub struct MessageWithPermits {
33 pub message: Message,
34 pub permits: Option<permits::Value>,
35}
36
37pub struct ChannelMetrics {
38 pub sender_actor_channel_buffered_bytes: LabelGuardedIntGauge,
39 pub receiver_actor_channel_buffered_bytes: LabelGuardedIntGauge,
40}
41
42impl ChannelMetrics {
43 pub fn for_test() -> Self {
44 Self {
45 sender_actor_channel_buffered_bytes: LabelGuardedIntGauge::test_int_gauge::<1>(),
46 receiver_actor_channel_buffered_bytes: LabelGuardedIntGauge::test_int_gauge::<1>(),
47 }
48 }
49}
50
51pub fn channel_with_metrics(
53 initial_permits: usize,
54 batched_permits: usize,
55 concurrent_barriers: usize,
56 metrics: ChannelMetrics,
57) -> (Sender, Receiver) {
58 let ChannelMetrics {
59 sender_actor_channel_buffered_bytes,
60 receiver_actor_channel_buffered_bytes,
61 } = metrics;
62
63 let (tx, rx) = mpsc::unbounded_channel();
65
66 let records = Semaphore::new(initial_permits);
67 let barriers = Semaphore::new(concurrent_barriers);
68 let permits = Arc::new(Permits { records, barriers });
69
70 let max_chunk_permits: usize = initial_permits - batched_permits;
71
72 (
73 Sender {
74 tx,
75 permits: permits.clone(),
76 max_chunk_permits,
77 sender_actor_channel_buffered_bytes,
78 },
79 Receiver {
80 rx,
81 permits,
82 receiver_actor_channel_buffered_bytes,
83 },
84 )
85}
86
87pub fn channel_from_config_with_metrics(
88 config: &StreamingConfig,
89 metrics: ChannelMetrics,
90) -> (Sender, Receiver) {
91 channel_with_metrics(
92 config.developer.exchange_initial_permits,
93 config.developer.exchange_batched_permits,
94 config.developer.exchange_concurrent_barriers,
95 metrics,
96 )
97}
98
99pub mod for_test {
101 pub const INITIAL_PERMITS: usize = (u32::MAX / 2) as _;
102 pub const BATCHED_PERMITS: usize = 1;
103 pub const CONCURRENT_BARRIERS: usize = (u32::MAX / 2) as _;
104}
105
106pub fn channel_for_test() -> (Sender, Receiver) {
107 use for_test::*;
108
109 channel_with_metrics(
110 INITIAL_PERMITS,
111 BATCHED_PERMITS,
112 CONCURRENT_BARRIERS,
113 ChannelMetrics::for_test(),
114 )
115}
116
117pub struct Permits {
121 records: Semaphore,
123 barriers: Semaphore,
125}
126
127impl Permits {
128 pub fn add_permits(&self, permits: permits::Value) {
130 match permits {
131 permits::Value::Record(p) => self.records.add_permits(p as usize),
132 permits::Value::Barrier(p) => self.barriers.add_permits(p as usize),
133 }
134 }
135
136 async fn acquire_permits(&self, permits: &permits::Value) -> Result<(), AcquireError> {
140 match permits {
141 permits::Value::Record(p) => self.records.acquire_many(*p as _),
142 permits::Value::Barrier(p) => self.barriers.acquire_many(*p as _),
143 }
144 .await
145 .map(SemaphorePermit::forget)
146 }
147
148 fn close(&self) {
150 self.records.close();
151 self.barriers.close();
152 }
153}
154
155pub struct Sender {
157 tx: mpsc::UnboundedSender<MessageWithPermits>,
158 permits: Arc<Permits>,
159
160 max_chunk_permits: usize,
164 sender_actor_channel_buffered_bytes: LabelGuardedIntGauge,
165}
166
167impl Sender {
168 pub async fn send(&self, message: Message) -> Result<(), mpsc::error::SendError<Message>> {
172 let chunk_size = match &message {
173 Message::Chunk(chunk) => chunk.estimated_size() as i64,
174 _ => 0,
175 };
176
177 let permits = match &message {
179 Message::Chunk(c) => {
180 let card = c.cardinality().clamp(1, self.max_chunk_permits);
181 if card == self.max_chunk_permits {
182 tracing::warn!(cardinality = c.cardinality(), "large chunk in exchange")
183 }
184 Some(permits::Value::Record(card as _))
185 }
186 Message::BarrierBatch(_) => Some(permits::Value::Barrier(1)),
187 Message::Watermark(_) => None,
188 };
189
190 if let Some(permits) = &permits
191 && self.permits.acquire_permits(permits).await.is_err()
192 {
193 return Err(mpsc::error::SendError(message));
194 }
195
196 self.tx
197 .send(MessageWithPermits { message, permits })
198 .map_err(|e| mpsc::error::SendError(e.0.message))?;
199
200 self.sender_actor_channel_buffered_bytes.add(chunk_size);
201
202 Ok(())
203 }
204}
205
206pub struct Receiver {
208 rx: mpsc::UnboundedReceiver<MessageWithPermits>,
209 permits: Arc<Permits>,
210 receiver_actor_channel_buffered_bytes: LabelGuardedIntGauge,
211}
212
213impl Receiver {
214 pub async fn recv(&mut self) -> Option<Message> {
219 let MessageWithPermits { message, permits } = self.recv_raw().await?;
220
221 if let Some(permits) = permits {
222 self.permits.add_permits(permits);
223 }
224
225 Some(message)
226 }
227
228 pub fn try_recv(&mut self) -> Result<Message, mpsc::error::TryRecvError> {
233 let MessageWithPermits { message, permits } = self.rx.try_recv()?;
234
235 if let Some(permits) = permits {
236 self.permits.add_permits(permits);
237 }
238
239 Ok(message)
240 }
241
242 pub async fn recv_raw(&mut self) -> Option<MessageWithPermits> {
248 let message_with_permits = self.rx.recv().await?;
249
250 if let Message::Chunk(chunk) = &message_with_permits.message {
251 let chunk_size = chunk.estimated_size() as i64;
252 self.receiver_actor_channel_buffered_bytes.sub(chunk_size);
253 }
254
255 Some(message_with_permits)
256 }
257
258 pub fn permits(&self) -> Arc<Permits> {
260 self.permits.clone()
261 }
262}
263
264impl Drop for Receiver {
265 fn drop(&mut self) {
266 self.permits.close();
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use std::assert_matches::assert_matches;
275 use std::pin::pin;
276
277 use futures::FutureExt;
278
279 use super::*;
280 use crate::executor::DispatcherBarrier as Barrier;
281
282 #[test]
283 fn test_channel_close() {
284 let (tx, mut rx) = channel_with_metrics(0, 0, 1, ChannelMetrics::for_test());
285
286 let send = || {
287 tx.send(Message::BarrierBatch(vec![
288 Barrier::with_prev_epoch_for_test(514, 114),
289 ]))
290 };
291
292 assert_matches!(send().now_or_never(), Some(Ok(_))); assert_matches!(
294 rx.recv().now_or_never(),
295 Some(Some(Message::BarrierBatch(_)))
296 ); assert_matches!(send().now_or_never(), Some(Ok(_))); let mut send_fut = pin!(send());
302 assert_matches!((&mut send_fut).now_or_never(), None); drop(rx);
304 assert_matches!(send_fut.now_or_never(), Some(Err(_))); }
306}