risingwave_stream/executor/subtask.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::Future;
16use thiserror_ext::AsReport;
17use tokio::sync::mpsc;
18use tokio::sync::mpsc::error::SendError;
19use tokio_stream::wrappers::ReceiverStream;
20
21use super::actor::spawn_blocking_drop_stream;
22use crate::executor::prelude::*;
23
24/// Handle used to drive the subtask.
25pub type SubtaskHandle = impl Future<Output = ()> + Send + 'static;
26
27/// The thin wrapper for subtask-wrapped executor, containing a channel to receive the messages from
28/// the subtask.
29pub struct SubtaskRxExecutor {
30 rx: mpsc::Receiver<MessageStreamItem>,
31}
32
33impl Execute for SubtaskRxExecutor {
34 fn execute(self: Box<Self>) -> super::BoxedMessageStream {
35 ReceiverStream::new(self.rx).boxed()
36 }
37}
38
39/// Wrap an executor into a subtask and a thin receiver executor, connected by a channel with a
40/// buffer size of 1.
41///
42/// Used when there're multiple stateful executors in an actor. These subtasks can be concurrently
43/// executed to improve the I/O performance, while the computing resource can be still bounded to a
44/// single thread.
45pub fn wrap(input: Executor, actor_id: ActorId) -> (SubtaskHandle, SubtaskRxExecutor) {
46 let (tx, rx) = mpsc::channel(1);
47 let rx_executor = SubtaskRxExecutor { rx };
48
49 let handle = async move {
50 let mut input = input.execute();
51
52 while let Some(item) = input.next().await {
53 // Decide whether to stop the subtask. We explicitly do this instead of relying on the
54 // termination of the input stream, because we don't want to exhaust the stream, which
55 // causes the stream dropped in the scope of the current async task and blocks other
56 // actors. See `spawn_blocking_drop_stream` for more details.
57 let to_stop = match &item {
58 Ok(Message::Barrier(barrier)) => barrier.is_stop(actor_id),
59 Ok(_) => false,
60 Err(_) => true,
61 };
62
63 // It's possible that the downstream itself yields an error (e.g. from remote input) and
64 // finishes, so we may fail to send the message. In this case, we can simply ignore the
65 // send error and exit as well. If the message itself is another error, log it.
66 if let Err(SendError(item)) = tx.send(item).await {
67 match item {
68 Ok(_) => tracing::error!("actor downstream subtask failed"),
69 Err(e) => tracing::error!(
70 error = %e.as_report(),
71 "after actor downstream subtask failed, another error occurs"
72 ),
73 }
74 break;
75 }
76
77 if to_stop {
78 break;
79 }
80 }
81
82 spawn_blocking_drop_stream(input).await;
83 }
84 .instrument_await("Subtask");
85
86 (handle, rx_executor)
87}