risingwave_batch/task/
fifo_channel.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Formatter};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use risingwave_common::array::DataChunk;
20use tokio::sync::mpsc;
21
22use crate::error::BatchError::{Internal, SenderError};
23use crate::error::{BatchError, Result as BatchResult, SharedResult};
24use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSenderImpl};
25use crate::task::data_chunk_in_channel::DataChunkInChannel;
26#[derive(Clone)]
27pub struct FifoSender {
28    sender: mpsc::Sender<SharedResult<Option<DataChunkInChannel>>>,
29}
30
31impl Debug for FifoSender {
32    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33        f.debug_struct("FifoSender").finish()
34    }
35}
36
37pub struct FifoReceiver {
38    receiver: mpsc::Receiver<SharedResult<Option<DataChunkInChannel>>>,
39}
40
41impl ChanSender for FifoSender {
42    async fn send(&mut self, chunk: DataChunk) -> BatchResult<()> {
43        let data = DataChunkInChannel::new(chunk);
44        self.sender
45            .send(Ok(Some(data)))
46            .await
47            .map_err(|_| SenderError)
48    }
49
50    async fn close(self, error: Option<Arc<BatchError>>) -> BatchResult<()> {
51        let result = error.map(Err).unwrap_or(Ok(None));
52        self.sender.send(result).await.map_err(|_| SenderError)
53    }
54}
55
56impl ChanReceiver for FifoReceiver {
57    async fn recv(&mut self) -> SharedResult<Option<DataChunkInChannel>> {
58        match self.receiver.recv().await {
59            Some(data_chunk) => data_chunk,
60            // Early close should be treated as error.
61            None => Err(Arc::new(Internal(anyhow!("broken fifo_channel")))),
62        }
63    }
64}
65
66pub fn new_fifo_channel(output_channel_size: usize) -> (ChanSenderImpl, Vec<ChanReceiverImpl>) {
67    let (s, r) = mpsc::channel(output_channel_size);
68    (
69        ChanSenderImpl::Fifo(FifoSender { sender: s }),
70        vec![ChanReceiverImpl::Fifo(FifoReceiver { receiver: r })],
71    )
72}
73
74mod tests {
75    #[tokio::test]
76    async fn test_recv_not_fail_on_closed_channel() {
77        use crate::task::fifo_channel::new_fifo_channel;
78
79        let (sender, mut receivers) = new_fifo_channel(64);
80        assert_eq!(receivers.len(), 1);
81        drop(sender);
82
83        let receiver = receivers.get_mut(0).unwrap();
84        assert!(receiver.recv().await.is_err());
85    }
86}