1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
// Copyright 2025 RisingWave Labs
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::Future;
use thiserror_ext::AsReport;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::SendError;
use tokio_stream::wrappers::ReceiverStream;
use super::actor::spawn_blocking_drop_stream;
use crate::executor::prelude::*;
/// Handle used to drive the subtask.
pub type SubtaskHandle = impl Future<Output = ()> + Send + 'static;
/// The thin wrapper for subtask-wrapped executor, containing a channel to receive the messages from
/// the subtask.
pub struct SubtaskRxExecutor {
rx: mpsc::Receiver<MessageStreamItem>,
impl Execute for SubtaskRxExecutor {
fn execute(self: Box<Self>) -> super::BoxedMessageStream {
/// Wrap an executor into a subtask and a thin receiver executor, connected by a channel with a
/// buffer size of 1.
/// Used when there're multiple stateful executors in an actor. These subtasks can be concurrently
/// executed to improve the I/O performance, while the computing resource can be still bounded to a
/// single thread.
pub fn wrap(input: Executor, actor_id: ActorId) -> (SubtaskHandle, SubtaskRxExecutor) {
let (tx, rx) = mpsc::channel(1);
let rx_executor = SubtaskRxExecutor { rx };
let handle = async move {
let mut input = input.execute();
while let Some(item) = input.next().await {
// Decide whether to stop the subtask. We explicitly do this instead of relying on the
// termination of the input stream, because we don't want to exhaust the stream, which
// causes the stream dropped in the scope of the current async task and blocks other
// actors. See `spawn_blocking_drop_stream` for more details.
let to_stop = match &item {
Ok(Message::Barrier(barrier)) => barrier.is_stop(actor_id),
Ok(_) => false,
Err(_) => true,
// It's possible that the downstream itself yields an error (e.g. from remote input) and
// finishes, so we may fail to send the message. In this case, we can simply ignore the
// send error and exit as well. If the message itself is another error, log it.
if let Err(SendError(item)) = tx.send(item).await {
match item {
Ok(_) => tracing::error!("actor downstream subtask failed"),
Err(e) => tracing::error!(
error = %e.as_report(),
"after actor downstream subtask failed, another error occurs"
if to_stop {
(handle, rx_executor)