risingwave_batch/task/
channel.rs1use std::sync::Arc;
16
17use risingwave_common::array::DataChunk;
18use risingwave_pb::batch_plan::ExchangeInfo;
19use risingwave_pb::batch_plan::exchange_info::DistributionMode as ShuffleDistributionMode;
20
21use crate::error::{BatchError, Result, SharedResult};
22use crate::task::broadcast_channel::{BroadcastReceiver, BroadcastSender, new_broadcast_channel};
23use crate::task::consistent_hash_shuffle_channel::{
24 ConsistentHashShuffleReceiver, ConsistentHashShuffleSender, new_consistent_shuffle_channel,
25};
26use crate::task::data_chunk_in_channel::DataChunkInChannel;
27use crate::task::fifo_channel::{FifoReceiver, FifoSender, new_fifo_channel};
28use crate::task::hash_shuffle_channel::{
29 HashShuffleReceiver, HashShuffleSender, new_hash_shuffle_channel,
30};
31
32pub(super) trait ChanSender: Send {
33 async fn send(&mut self, chunk: DataChunk) -> Result<()>;
37
38 async fn close(self, error: Option<Arc<BatchError>>) -> Result<()>;
43}
44
45#[derive(Debug, Clone)]
46pub enum ChanSenderImpl {
47 HashShuffle(HashShuffleSender),
48 ConsistentHashShuffle(ConsistentHashShuffleSender),
49 Fifo(FifoSender),
50 Broadcast(BroadcastSender),
51}
52
53impl ChanSenderImpl {
54 pub(super) async fn send(&mut self, chunk: DataChunk) -> Result<()> {
55 match self {
56 Self::HashShuffle(sender) => sender.send(chunk).await,
57 Self::ConsistentHashShuffle(sender) => sender.send(chunk).await,
58 Self::Fifo(sender) => sender.send(chunk).await,
59 Self::Broadcast(sender) => sender.send(chunk).await,
60 }
61 }
62
63 pub(super) async fn close(self, error: Option<Arc<BatchError>>) -> Result<()> {
64 match self {
65 Self::HashShuffle(sender) => sender.close(error).await,
66 Self::ConsistentHashShuffle(sender) => sender.close(error).await,
67 Self::Fifo(sender) => sender.close(error).await,
68 Self::Broadcast(sender) => sender.close(error).await,
69 }
70 }
71}
72
73pub(super) trait ChanReceiver: Send {
74 async fn recv(&mut self) -> SharedResult<Option<DataChunkInChannel>>;
77}
78
79pub enum ChanReceiverImpl {
80 HashShuffle(HashShuffleReceiver),
81 ConsistentHashShuffle(ConsistentHashShuffleReceiver),
82 Fifo(FifoReceiver),
83 Broadcast(BroadcastReceiver),
84}
85
86impl ChanReceiverImpl {
87 pub(super) async fn recv(&mut self) -> SharedResult<Option<DataChunkInChannel>> {
88 match self {
89 Self::HashShuffle(receiver) => receiver.recv().await,
90 Self::ConsistentHashShuffle(receiver) => receiver.recv().await,
91 Self::Broadcast(receiver) => receiver.recv().await,
92 Self::Fifo(receiver) => receiver.recv().await,
93 }
94 }
95}
96
97pub fn create_output_channel(
102 shuffle: &ExchangeInfo,
103 output_channel_size: usize,
104) -> Result<(ChanSenderImpl, Vec<ChanReceiverImpl>)> {
105 match shuffle.get_mode()? {
106 ShuffleDistributionMode::Single => Ok(new_fifo_channel(output_channel_size)),
107 ShuffleDistributionMode::Hash => Ok(new_hash_shuffle_channel(shuffle, output_channel_size)),
108 ShuffleDistributionMode::ConsistentHash => {
109 Ok(new_consistent_shuffle_channel(shuffle, output_channel_size))
110 }
111 ShuffleDistributionMode::Broadcast => {
112 Ok(new_broadcast_channel(shuffle, output_channel_size))
113 }
114 ShuffleDistributionMode::Unspecified => unreachable!(),
115 }
116}