risingwave_stream/executor/exchange/
output.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16
17use async_trait::async_trait;
18use await_tree::InstrumentAwait;
19use educe::Educe;
20use risingwave_common::util::addr::is_local_address;
21
22use super::error::ExchangeChannelClosed;
23use super::permit::Sender;
24use crate::error::StreamResult;
25use crate::executor::DispatcherMessageBatch as Message;
26use crate::task::{ActorId, SharedContext};
27
28/// `Output` provides an interface for `Dispatcher` to send data into downstream actors.
29#[async_trait]
30pub trait Output: Debug + Send + Sync + 'static {
31    async fn send(&mut self, message: Message) -> StreamResult<()>;
32
33    /// The downstream actor id.
34    fn actor_id(&self) -> ActorId;
35
36    fn boxed(self) -> BoxedOutput
37    where
38        Self: Sized + 'static,
39    {
40        Box::new(self)
41    }
42}
43
44pub type BoxedOutput = Box<dyn Output>;
45
46/// `LocalOutput` sends data to a local channel.
47#[derive(Educe)]
48#[educe(Debug)]
49pub struct LocalOutput {
50    actor_id: ActorId,
51
52    #[educe(Debug(ignore))]
53    span: await_tree::Span,
54
55    #[educe(Debug(ignore))]
56    ch: Sender,
57}
58
59impl LocalOutput {
60    pub fn new(actor_id: ActorId, ch: Sender) -> Self {
61        Self {
62            actor_id,
63            span: await_tree::span!("LocalOutput (actor {:?})", actor_id).verbose(),
64            ch,
65        }
66    }
67}
68
69#[async_trait]
70impl Output for LocalOutput {
71    async fn send(&mut self, message: Message) -> StreamResult<()> {
72        self.ch
73            .send(message)
74            .instrument_await(self.span.clone())
75            .await
76            .map_err(|_| ExchangeChannelClosed::output(self.actor_id).into())
77    }
78
79    fn actor_id(&self) -> ActorId {
80        self.actor_id
81    }
82}
83
84/// `RemoteOutput` compacts the data and send to a local buffer channel, which will be further sent
85/// to the remote actor by [`ExchangeService`].
86///
87/// [`ExchangeService`]: risingwave_pb::task_service::exchange_service_server::ExchangeService
88// FIXME: can we just use the same `Output` with local and compacts it in gRPC server?
89#[derive(Educe)]
90#[educe(Debug)]
91pub struct RemoteOutput {
92    actor_id: ActorId,
93
94    #[educe(Debug(ignore))]
95    span: await_tree::Span,
96
97    #[educe(Debug(ignore))]
98    ch: Sender,
99}
100
101impl RemoteOutput {
102    pub fn new(actor_id: ActorId, ch: Sender) -> Self {
103        Self {
104            actor_id,
105            span: await_tree::span!("RemoteOutput (actor {:?})", actor_id).verbose(),
106            ch,
107        }
108    }
109}
110
111#[async_trait]
112impl Output for RemoteOutput {
113    async fn send(&mut self, message: Message) -> StreamResult<()> {
114        let message = match message {
115            Message::Chunk(chk) => Message::Chunk(chk.compact()),
116            _ => message,
117        };
118
119        self.ch
120            .send(message)
121            .instrument_await(self.span.clone())
122            .await
123            .map_err(|_| ExchangeChannelClosed::output(self.actor_id).into())
124    }
125
126    fn actor_id(&self) -> ActorId {
127        self.actor_id
128    }
129}
130
131/// Create a [`LocalOutput`] or [`RemoteOutput`] instance for the current actor id and the
132/// downstream actor id. Used by dispatchers.
133pub fn new_output(
134    context: &SharedContext,
135    actor_id: ActorId,
136    down_id: ActorId,
137) -> StreamResult<BoxedOutput> {
138    let tx = context.take_sender(&(actor_id, down_id))?;
139
140    let is_local_address = match context.get_actor_info(&down_id) {
141        Ok(info) => is_local_address(&context.addr, &info.get_host()?.into()),
142        // If we can't get the actor info locally, it must be a remote actor.
143        // This may happen when we create a mv-on-mv on different workers from the upstream. #4153
144        Err(_) => false,
145    };
146
147    let output = if is_local_address {
148        LocalOutput::new(down_id, tx).boxed()
149    } else {
150        RemoteOutput::new(down_id, tx).boxed()
151    };
152
153    Ok(output)
154}