risingwave_stream/executor/
subtask.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::Future;
16use thiserror_ext::AsReport;
17use tokio::sync::mpsc;
18use tokio::sync::mpsc::error::SendError;
19use tokio_stream::wrappers::ReceiverStream;
20
21use super::actor::spawn_blocking_drop_stream;
22use crate::executor::prelude::*;
23
24/// Handle used to drive the subtask.
25pub type SubtaskHandle = impl Future<Output = ()> + Send + 'static;
26
27/// The thin wrapper for subtask-wrapped executor, containing a channel to receive the messages from
28/// the subtask.
29pub struct SubtaskRxExecutor {
30    rx: mpsc::Receiver<MessageStreamItem>,
31}
32
33impl Execute for SubtaskRxExecutor {
34    fn execute(self: Box<Self>) -> super::BoxedMessageStream {
35        ReceiverStream::new(self.rx).boxed()
36    }
37}
38
39/// Wrap an executor into a subtask and a thin receiver executor, connected by a channel with a
40/// buffer size of 1.
41///
42/// Used when there're multiple stateful executors in an actor. These subtasks can be concurrently
43/// executed to improve the I/O performance, while the computing resource can be still bounded to a
44/// single thread.
45#[define_opaque(SubtaskHandle)]
46pub fn wrap(input: Executor, actor_id: ActorId) -> (SubtaskHandle, SubtaskRxExecutor) {
47    let (tx, rx) = mpsc::channel(1);
48    let rx_executor = SubtaskRxExecutor { rx };
49
50    let handle = async move {
51        let mut input = input.execute();
52
53        while let Some(item) = input.next().await {
54            // Decide whether to stop the subtask. We explicitly do this instead of relying on the
55            // termination of the input stream, because we don't want to exhaust the stream, which
56            // causes the stream dropped in the scope of the current async task and blocks other
57            // actors. See `spawn_blocking_drop_stream` for more details.
58            let to_stop = match &item {
59                Ok(Message::Barrier(barrier)) => barrier.is_stop(actor_id),
60                Ok(_) => false,
61                Err(_) => true,
62            };
63
64            // It's possible that the downstream itself yields an error (e.g. from remote input) and
65            // finishes, so we may fail to send the message. In this case, we can simply ignore the
66            // send error and exit as well. If the message itself is another error, log it.
67            if let Err(SendError(item)) = tx.send(item).await {
68                match item {
69                    Ok(_) => tracing::error!("actor downstream subtask failed"),
70                    Err(e) => tracing::error!(
71                        error = %e.as_report(),
72                        "after actor downstream subtask failed, another error occurs"
73                    ),
74                }
75                break;
76            }
77
78            if to_stop {
79                break;
80            }
81        }
82
83        spawn_blocking_drop_stream(input).await;
84    }
85    .instrument_await("Subtask");
86
87    (handle, rx_executor)
88}