risingwave_stream/executor/exchange/
output.rs1use std::fmt::Debug;
16
17use async_trait::async_trait;
18use await_tree::InstrumentAwait;
19use educe::Educe;
20use risingwave_common::util::addr::is_local_address;
21
22use super::error::ExchangeChannelClosed;
23use super::permit::Sender;
24use crate::error::StreamResult;
25use crate::executor::DispatcherMessageBatch as Message;
26use crate::task::{ActorId, SharedContext};
27
28#[async_trait]
30pub trait Output: Debug + Send + Sync + 'static {
31 async fn send(&mut self, message: Message) -> StreamResult<()>;
32
33 fn actor_id(&self) -> ActorId;
35
36 fn boxed(self) -> BoxedOutput
37 where
38 Self: Sized + 'static,
39 {
40 Box::new(self)
41 }
42}
43
44pub type BoxedOutput = Box<dyn Output>;
45
46#[derive(Educe)]
48#[educe(Debug)]
49pub struct LocalOutput {
50 actor_id: ActorId,
51
52 #[educe(Debug(ignore))]
53 span: await_tree::Span,
54
55 #[educe(Debug(ignore))]
56 ch: Sender,
57}
58
59impl LocalOutput {
60 pub fn new(actor_id: ActorId, ch: Sender) -> Self {
61 Self {
62 actor_id,
63 span: await_tree::span!("LocalOutput (actor {:?})", actor_id).verbose(),
64 ch,
65 }
66 }
67}
68
69#[async_trait]
70impl Output for LocalOutput {
71 async fn send(&mut self, message: Message) -> StreamResult<()> {
72 self.ch
73 .send(message)
74 .instrument_await(self.span.clone())
75 .await
76 .map_err(|_| ExchangeChannelClosed::output(self.actor_id).into())
77 }
78
79 fn actor_id(&self) -> ActorId {
80 self.actor_id
81 }
82}
83
84#[derive(Educe)]
90#[educe(Debug)]
91pub struct RemoteOutput {
92 actor_id: ActorId,
93
94 #[educe(Debug(ignore))]
95 span: await_tree::Span,
96
97 #[educe(Debug(ignore))]
98 ch: Sender,
99}
100
101impl RemoteOutput {
102 pub fn new(actor_id: ActorId, ch: Sender) -> Self {
103 Self {
104 actor_id,
105 span: await_tree::span!("RemoteOutput (actor {:?})", actor_id).verbose(),
106 ch,
107 }
108 }
109}
110
111#[async_trait]
112impl Output for RemoteOutput {
113 async fn send(&mut self, message: Message) -> StreamResult<()> {
114 let message = match message {
115 Message::Chunk(chk) => Message::Chunk(chk.compact()),
116 _ => message,
117 };
118
119 self.ch
120 .send(message)
121 .instrument_await(self.span.clone())
122 .await
123 .map_err(|_| ExchangeChannelClosed::output(self.actor_id).into())
124 }
125
126 fn actor_id(&self) -> ActorId {
127 self.actor_id
128 }
129}
130
131pub fn new_output(
134 context: &SharedContext,
135 actor_id: ActorId,
136 down_id: ActorId,
137) -> StreamResult<BoxedOutput> {
138 let tx = context.take_sender(&(actor_id, down_id))?;
139
140 let is_local_address = match context.get_actor_info(&down_id) {
141 Ok(info) => is_local_address(&context.addr, &info.get_host()?.into()),
142 Err(_) => false,
145 };
146
147 let output = if is_local_address {
148 LocalOutput::new(down_id, tx).boxed()
149 } else {
150 RemoteOutput::new(down_id, tx).boxed()
151 };
152
153 Ok(output)
154}