risingwave_batch/
exchange_source.rsuse std::fmt::Debug;
use std::future::Future;
use futures_async_stream::try_stream;
use risingwave_common::array::DataChunk;
use crate::error::{BatchError, Result};
use crate::execution::grpc_exchange::GrpcExchangeSource;
use crate::execution::local_exchange::LocalExchangeSource;
use crate::executor::test_utils::FakeExchangeSource;
use crate::task::TaskId;
pub trait ExchangeSource: Send + Debug {
fn take_data(&mut self) -> impl Future<Output = Result<Option<DataChunk>>> + '_;
fn get_task_id(&self) -> TaskId;
}
#[derive(Debug)]
pub enum ExchangeSourceImpl {
Grpc(GrpcExchangeSource),
Local(LocalExchangeSource),
Fake(FakeExchangeSource),
}
impl ExchangeSourceImpl {
pub(crate) async fn take_data(&mut self) -> Result<Option<DataChunk>> {
match self {
ExchangeSourceImpl::Grpc(grpc) => grpc.take_data().await,
ExchangeSourceImpl::Local(local) => local.take_data().await,
ExchangeSourceImpl::Fake(fake) => fake.take_data().await,
}
}
#[expect(dead_code)]
pub(crate) fn get_task_id(&self) -> TaskId {
match self {
ExchangeSourceImpl::Grpc(grpc) => grpc.get_task_id(),
ExchangeSourceImpl::Local(local) => local.get_task_id(),
ExchangeSourceImpl::Fake(fake) => fake.get_task_id(),
}
}
#[try_stream(boxed, ok = DataChunk, error = BatchError)]
pub(crate) async fn take_data_stream(self) {
let mut source = self;
loop {
match source.take_data().await {
Ok(Some(chunk)) => yield chunk,
Ok(None) => break,
Err(e) => return Err(e),
}
}
}
}