risingwave_batch/task/
channel.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::array::DataChunk;
18use risingwave_pb::batch_plan::ExchangeInfo;
19use risingwave_pb::batch_plan::exchange_info::DistributionMode as ShuffleDistributionMode;
20
21use crate::error::{BatchError, Result, SharedResult};
22use crate::task::broadcast_channel::{BroadcastReceiver, BroadcastSender, new_broadcast_channel};
23use crate::task::consistent_hash_shuffle_channel::{
24    ConsistentHashShuffleReceiver, ConsistentHashShuffleSender, new_consistent_shuffle_channel,
25};
26use crate::task::data_chunk_in_channel::DataChunkInChannel;
27use crate::task::fifo_channel::{FifoReceiver, FifoSender, new_fifo_channel};
28use crate::task::hash_shuffle_channel::{
29    HashShuffleReceiver, HashShuffleSender, new_hash_shuffle_channel,
30};
31
32pub(super) trait ChanSender: Send {
33    /// This function will block until there's enough resource to process the chunk.
34    /// Currently, it will only be called from single thread.
35    /// `None` is sent as a mark of the ending of channel.
36    async fn send(&mut self, chunk: DataChunk) -> Result<()>;
37
38    /// Close this data channel.
39    ///
40    /// If finished correctly, we should pass `None`, otherwise we should pass `BatchError`. In
41    /// either case we should stop sending more data.
42    async fn close(self, error: Option<Arc<BatchError>>) -> Result<()>;
43}
44
45#[derive(Debug, Clone)]
46pub enum ChanSenderImpl {
47    HashShuffle(HashShuffleSender),
48    ConsistentHashShuffle(ConsistentHashShuffleSender),
49    Fifo(FifoSender),
50    Broadcast(BroadcastSender),
51}
52
53impl ChanSenderImpl {
54    pub(super) async fn send(&mut self, chunk: DataChunk) -> Result<()> {
55        match self {
56            Self::HashShuffle(sender) => sender.send(chunk).await,
57            Self::ConsistentHashShuffle(sender) => sender.send(chunk).await,
58            Self::Fifo(sender) => sender.send(chunk).await,
59            Self::Broadcast(sender) => sender.send(chunk).await,
60        }
61    }
62
63    pub(super) async fn close(self, error: Option<Arc<BatchError>>) -> Result<()> {
64        match self {
65            Self::HashShuffle(sender) => sender.close(error).await,
66            Self::ConsistentHashShuffle(sender) => sender.close(error).await,
67            Self::Fifo(sender) => sender.close(error).await,
68            Self::Broadcast(sender) => sender.close(error).await,
69        }
70    }
71}
72
73pub(super) trait ChanReceiver: Send {
74    /// Returns `None` if there's no more data to read.
75    /// Otherwise it will wait until there's data.
76    async fn recv(&mut self) -> SharedResult<Option<DataChunkInChannel>>;
77}
78
79pub enum ChanReceiverImpl {
80    HashShuffle(HashShuffleReceiver),
81    ConsistentHashShuffle(ConsistentHashShuffleReceiver),
82    Fifo(FifoReceiver),
83    Broadcast(BroadcastReceiver),
84}
85
86impl ChanReceiverImpl {
87    pub(super) async fn recv(&mut self) -> SharedResult<Option<DataChunkInChannel>> {
88        match self {
89            Self::HashShuffle(receiver) => receiver.recv().await,
90            Self::ConsistentHashShuffle(receiver) => receiver.recv().await,
91            Self::Broadcast(receiver) => receiver.recv().await,
92            Self::Fifo(receiver) => receiver.recv().await,
93        }
94    }
95}
96
97/// Output-channel is a synchronous, bounded single-producer-multiple-consumer queue.
98/// The producer is the local task executor, the consumer is
99/// [`ExchangeService`](risingwave_pb::task_service::exchange_service_server::ExchangeService).
100/// The implementation depends on the shuffling strategy.
101pub fn create_output_channel(
102    shuffle: &ExchangeInfo,
103    output_channel_size: usize,
104) -> Result<(ChanSenderImpl, Vec<ChanReceiverImpl>)> {
105    match shuffle.get_mode()? {
106        ShuffleDistributionMode::Single => Ok(new_fifo_channel(output_channel_size)),
107        ShuffleDistributionMode::Hash => Ok(new_hash_shuffle_channel(shuffle, output_channel_size)),
108        ShuffleDistributionMode::ConsistentHash => {
109            Ok(new_consistent_shuffle_channel(shuffle, output_channel_size))
110        }
111        ShuffleDistributionMode::Broadcast => {
112            Ok(new_broadcast_channel(shuffle, output_channel_size))
113        }
114        ShuffleDistributionMode::Unspecified => unreachable!(),
115    }
116}