risingwave_batch/task/
channel.rsuse std::sync::Arc;
use risingwave_common::array::DataChunk;
use risingwave_pb::batch_plan::exchange_info::DistributionMode as ShuffleDistributionMode;
use risingwave_pb::batch_plan::ExchangeInfo;
use crate::error::{BatchError, Result, SharedResult};
use crate::task::broadcast_channel::{new_broadcast_channel, BroadcastReceiver, BroadcastSender};
use crate::task::consistent_hash_shuffle_channel::{
new_consistent_shuffle_channel, ConsistentHashShuffleReceiver, ConsistentHashShuffleSender,
};
use crate::task::data_chunk_in_channel::DataChunkInChannel;
use crate::task::fifo_channel::{new_fifo_channel, FifoReceiver, FifoSender};
use crate::task::hash_shuffle_channel::{
new_hash_shuffle_channel, HashShuffleReceiver, HashShuffleSender,
};
pub(super) trait ChanSender: Send {
async fn send(&mut self, chunk: DataChunk) -> Result<()>;
async fn close(self, error: Option<Arc<BatchError>>) -> Result<()>;
}
#[derive(Debug, Clone)]
pub enum ChanSenderImpl {
HashShuffle(HashShuffleSender),
ConsistentHashShuffle(ConsistentHashShuffleSender),
Fifo(FifoSender),
Broadcast(BroadcastSender),
}
impl ChanSenderImpl {
pub(super) async fn send(&mut self, chunk: DataChunk) -> Result<()> {
match self {
Self::HashShuffle(sender) => sender.send(chunk).await,
Self::ConsistentHashShuffle(sender) => sender.send(chunk).await,
Self::Fifo(sender) => sender.send(chunk).await,
Self::Broadcast(sender) => sender.send(chunk).await,
}
}
pub(super) async fn close(self, error: Option<Arc<BatchError>>) -> Result<()> {
match self {
Self::HashShuffle(sender) => sender.close(error).await,
Self::ConsistentHashShuffle(sender) => sender.close(error).await,
Self::Fifo(sender) => sender.close(error).await,
Self::Broadcast(sender) => sender.close(error).await,
}
}
}
pub(super) trait ChanReceiver: Send {
async fn recv(&mut self) -> SharedResult<Option<DataChunkInChannel>>;
}
pub enum ChanReceiverImpl {
HashShuffle(HashShuffleReceiver),
ConsistentHashShuffle(ConsistentHashShuffleReceiver),
Fifo(FifoReceiver),
Broadcast(BroadcastReceiver),
}
impl ChanReceiverImpl {
pub(super) async fn recv(&mut self) -> SharedResult<Option<DataChunkInChannel>> {
match self {
Self::HashShuffle(receiver) => receiver.recv().await,
Self::ConsistentHashShuffle(receiver) => receiver.recv().await,
Self::Broadcast(receiver) => receiver.recv().await,
Self::Fifo(receiver) => receiver.recv().await,
}
}
}
pub fn create_output_channel(
shuffle: &ExchangeInfo,
output_channel_size: usize,
) -> Result<(ChanSenderImpl, Vec<ChanReceiverImpl>)> {
match shuffle.get_mode()? {
ShuffleDistributionMode::Single => Ok(new_fifo_channel(output_channel_size)),
ShuffleDistributionMode::Hash => Ok(new_hash_shuffle_channel(shuffle, output_channel_size)),
ShuffleDistributionMode::ConsistentHash => {
Ok(new_consistent_shuffle_channel(shuffle, output_channel_size))
}
ShuffleDistributionMode::Broadcast => {
Ok(new_broadcast_channel(shuffle, output_channel_size))
}
ShuffleDistributionMode::Unspecified => unreachable!(),
}
}