risingwave_stream/executor/exchange/
output.rsuse std::fmt::Debug;
use async_trait::async_trait;
use await_tree::InstrumentAwait;
use educe::Educe;
use risingwave_common::util::addr::is_local_address;
use super::error::ExchangeChannelClosed;
use super::permit::Sender;
use crate::error::StreamResult;
use crate::executor::DispatcherMessage as Message;
use crate::task::{ActorId, SharedContext};
#[async_trait]
pub trait Output: Debug + Send + Sync + 'static {
async fn send(&mut self, message: Message) -> StreamResult<()>;
fn actor_id(&self) -> ActorId;
fn boxed(self) -> BoxedOutput
where
Self: Sized + 'static,
{
Box::new(self)
}
}
pub type BoxedOutput = Box<dyn Output>;
#[derive(Educe)]
#[educe(Debug)]
pub struct LocalOutput {
actor_id: ActorId,
#[educe(Debug(ignore))]
span: await_tree::Span,
#[educe(Debug(ignore))]
ch: Sender,
}
impl LocalOutput {
pub fn new(actor_id: ActorId, ch: Sender) -> Self {
Self {
actor_id,
span: format!("LocalOutput (actor {:?})", actor_id).into(),
ch,
}
}
}
#[async_trait]
impl Output for LocalOutput {
async fn send(&mut self, message: Message) -> StreamResult<()> {
self.ch
.send(message)
.verbose_instrument_await(self.span.clone())
.await
.map_err(|_| ExchangeChannelClosed::output(self.actor_id).into())
}
fn actor_id(&self) -> ActorId {
self.actor_id
}
}
#[derive(Educe)]
#[educe(Debug)]
pub struct RemoteOutput {
actor_id: ActorId,
#[educe(Debug(ignore))]
span: await_tree::Span,
#[educe(Debug(ignore))]
ch: Sender,
}
impl RemoteOutput {
pub fn new(actor_id: ActorId, ch: Sender) -> Self {
Self {
actor_id,
span: format!("RemoteOutput (actor {:?})", actor_id).into(),
ch,
}
}
}
#[async_trait]
impl Output for RemoteOutput {
async fn send(&mut self, message: Message) -> StreamResult<()> {
let message = match message {
Message::Chunk(chk) => Message::Chunk(chk.compact()),
_ => message,
};
self.ch
.send(message)
.verbose_instrument_await(self.span.clone())
.await
.map_err(|_| ExchangeChannelClosed::output(self.actor_id).into())
}
fn actor_id(&self) -> ActorId {
self.actor_id
}
}
pub fn new_output(
context: &SharedContext,
actor_id: ActorId,
down_id: ActorId,
) -> StreamResult<BoxedOutput> {
let tx = context.take_sender(&(actor_id, down_id))?;
let is_local_address = match context.get_actor_info(&down_id) {
Ok(info) => is_local_address(&context.addr, &info.get_host()?.into()),
Err(_) => false,
};
let output = if is_local_address {
LocalOutput::new(down_id, tx).boxed()
} else {
RemoteOutput::new(down_id, tx).boxed()
};
Ok(output)
}