risingwave_stream/executor/
subtask.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::Future;
16use thiserror_ext::AsReport;
17use tokio::sync::mpsc;
18use tokio::sync::mpsc::error::SendError;
19use tokio_stream::wrappers::ReceiverStream;
20
21use super::actor::spawn_blocking_drop_stream;
22use crate::executor::prelude::*;
23
24/// Handle used to drive the subtask.
25pub type SubtaskHandle = impl Future<Output = ()> + Send + 'static;
26
27/// The thin wrapper for subtask-wrapped executor, containing a channel to receive the messages from
28/// the subtask.
29pub struct SubtaskRxExecutor {
30    rx: mpsc::Receiver<MessageStreamItem>,
31}
32
33impl Execute for SubtaskRxExecutor {
34    fn execute(self: Box<Self>) -> super::BoxedMessageStream {
35        ReceiverStream::new(self.rx).boxed()
36    }
37}
38
39/// Wrap an executor into a subtask and a thin receiver executor, connected by a channel with a
40/// buffer size of 1.
41///
42/// Used when there're multiple stateful executors in an actor. These subtasks can be concurrently
43/// executed to improve the I/O performance, while the computing resource can be still bounded to a
44/// single thread.
45pub fn wrap(input: Executor, actor_id: ActorId) -> (SubtaskHandle, SubtaskRxExecutor) {
46    let (tx, rx) = mpsc::channel(1);
47    let rx_executor = SubtaskRxExecutor { rx };
48
49    let handle = async move {
50        let mut input = input.execute();
51
52        while let Some(item) = input.next().await {
53            // Decide whether to stop the subtask. We explicitly do this instead of relying on the
54            // termination of the input stream, because we don't want to exhaust the stream, which
55            // causes the stream dropped in the scope of the current async task and blocks other
56            // actors. See `spawn_blocking_drop_stream` for more details.
57            let to_stop = match &item {
58                Ok(Message::Barrier(barrier)) => barrier.is_stop(actor_id),
59                Ok(_) => false,
60                Err(_) => true,
61            };
62
63            // It's possible that the downstream itself yields an error (e.g. from remote input) and
64            // finishes, so we may fail to send the message. In this case, we can simply ignore the
65            // send error and exit as well. If the message itself is another error, log it.
66            if let Err(SendError(item)) = tx.send(item).await {
67                match item {
68                    Ok(_) => tracing::error!("actor downstream subtask failed"),
69                    Err(e) => tracing::error!(
70                        error = %e.as_report(),
71                        "after actor downstream subtask failed, another error occurs"
72                    ),
73                }
74                break;
75            }
76
77            if to_stop {
78                break;
79            }
80        }
81
82        spawn_blocking_drop_stream(input).await;
83    }
84    .instrument_await("Subtask");
85
86    (handle, rx_executor)
87}