risingwave_stream/executor/exchange/
output.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use await_tree::InstrumentAwait;
16use educe::Educe;
17
18use super::error::ExchangeChannelClosed;
19use super::permit::Sender;
20use crate::error::StreamResult;
21use crate::executor::DispatcherMessageBatch as Message;
22use crate::task::ActorId;
23
24/// `LocalOutput` sends data to a local channel.
25#[derive(Educe)]
26#[educe(Debug)]
27pub struct Output {
28    actor_id: ActorId,
29
30    #[educe(Debug(ignore))]
31    span: await_tree::Span,
32
33    #[educe(Debug(ignore))]
34    ch: Sender,
35}
36
37impl Output {
38    pub fn new(actor_id: ActorId, ch: Sender) -> Self {
39        Self {
40            actor_id,
41            span: await_tree::span!("Output (actor {:?})", actor_id).verbose(),
42            ch,
43        }
44    }
45}
46
47impl Output {
48    pub async fn send(&mut self, message: Message) -> StreamResult<()> {
49        self.ch
50            .send(message)
51            .instrument_await(self.span.clone())
52            .await
53            .map_err(|_| ExchangeChannelClosed::output(self.actor_id).into())
54    }
55
56    pub fn actor_id(&self) -> ActorId {
57        self.actor_id
58    }
59}