risingwave_stream/executor/exchange/
output.rs1use await_tree::InstrumentAwait;
16use educe::Educe;
17
18use super::error::ExchangeChannelClosed;
19use super::permit::Sender;
20use crate::error::StreamResult;
21use crate::executor::DispatcherMessageBatch as Message;
22use crate::task::ActorId;
23
24#[derive(Educe)]
26#[educe(Debug)]
27pub struct Output {
28 actor_id: ActorId,
29
30 #[educe(Debug(ignore))]
31 span: await_tree::Span,
32
33 #[educe(Debug(ignore))]
34 ch: Sender,
35}
36
37impl Output {
38 pub fn new(actor_id: ActorId, ch: Sender) -> Self {
39 Self {
40 actor_id,
41 span: await_tree::span!("Output (actor {:?})", actor_id).verbose(),
42 ch,
43 }
44 }
45}
46
47impl Output {
48 pub async fn send(&mut self, message: Message) -> StreamResult<()> {
49 self.ch
50 .send(message)
51 .instrument_await(self.span.clone())
52 .await
53 .map_err(|_| ExchangeChannelClosed::output(self.actor_id).into())
54 }
55
56 pub fn actor_id(&self) -> ActorId {
57 self.actor_id
58 }
59}