risingwave_batch/task/
fifo_channel.rs1use std::fmt::{Debug, Formatter};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use risingwave_common::array::DataChunk;
20use tokio::sync::mpsc;
21
22use crate::error::BatchError::{Internal, SenderError};
23use crate::error::{BatchError, Result as BatchResult, SharedResult};
24use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSenderImpl};
25use crate::task::data_chunk_in_channel::DataChunkInChannel;
26#[derive(Clone)]
27pub struct FifoSender {
28 sender: mpsc::Sender<SharedResult<Option<DataChunkInChannel>>>,
29}
30
31impl Debug for FifoSender {
32 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33 f.debug_struct("FifoSender").finish()
34 }
35}
36
37pub struct FifoReceiver {
38 receiver: mpsc::Receiver<SharedResult<Option<DataChunkInChannel>>>,
39}
40
41impl ChanSender for FifoSender {
42 async fn send(&mut self, chunk: DataChunk) -> BatchResult<()> {
43 let data = DataChunkInChannel::new(chunk);
44 self.sender
45 .send(Ok(Some(data)))
46 .await
47 .map_err(|_| SenderError)
48 }
49
50 async fn close(self, error: Option<Arc<BatchError>>) -> BatchResult<()> {
51 let result = error.map(Err).unwrap_or(Ok(None));
52 self.sender.send(result).await.map_err(|_| SenderError)
53 }
54}
55
56impl ChanReceiver for FifoReceiver {
57 async fn recv(&mut self) -> SharedResult<Option<DataChunkInChannel>> {
58 match self.receiver.recv().await {
59 Some(data_chunk) => data_chunk,
60 None => Err(Arc::new(Internal(anyhow!("broken fifo_channel")))),
62 }
63 }
64}
65
66pub fn new_fifo_channel(output_channel_size: usize) -> (ChanSenderImpl, Vec<ChanReceiverImpl>) {
67 let (s, r) = mpsc::channel(output_channel_size);
68 (
69 ChanSenderImpl::Fifo(FifoSender { sender: s }),
70 vec![ChanReceiverImpl::Fifo(FifoReceiver { receiver: r })],
71 )
72}
73
74mod tests {
75 #[tokio::test]
76 async fn test_recv_not_fail_on_closed_channel() {
77 use crate::task::fifo_channel::new_fifo_channel;
78
79 let (sender, mut receivers) = new_fifo_channel(64);
80 assert_eq!(receivers.len(), 1);
81 drop(sender);
82
83 let receiver = receivers.get_mut(0).unwrap();
84 assert!(receiver.recv().await.is_err());
85 }
86}