risingwave_stream/executor/source/
dummy_source_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Formatter};
16
17use anyhow::anyhow;
18use tokio::sync::mpsc::UnboundedReceiver;
19
20use crate::executor::prelude::*;
21
22/// A dummy source executor that only receives barrier messages and sends them to
23/// the downstream executor. Used when there is no external stream source.
24pub struct DummySourceExecutor {
25    actor_ctx: ActorContextRef,
26
27    /// Receiver of barrier channel.
28    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
29}
30
31impl DummySourceExecutor {
32    pub fn new(actor_ctx: ActorContextRef, barrier_receiver: UnboundedReceiver<Barrier>) -> Self {
33        Self {
34            actor_ctx,
35            barrier_receiver: Some(barrier_receiver),
36        }
37    }
38
39    /// A dummy source executor only receives barrier messages and sends them to
40    /// the downstream executor.
41    #[try_stream(ok = Message, error = StreamExecutorError)]
42    async fn execute_inner(mut self) {
43        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
44        let barrier = barrier_receiver
45            .recv()
46            .instrument_await("source_recv_first_barrier")
47            .await
48            .ok_or_else(|| {
49                anyhow!(
50                    "failed to receive the first barrier, actor_id: {:?} with no stream source",
51                    self.actor_ctx.id
52                )
53            })?;
54        yield Message::Barrier(barrier);
55
56        while let Some(barrier) = barrier_receiver.recv().await {
57            yield Message::Barrier(barrier);
58        }
59    }
60}
61
62impl Execute for DummySourceExecutor {
63    fn execute(self: Box<Self>) -> BoxedMessageStream {
64        self.execute_inner().boxed()
65    }
66}
67
68impl Debug for DummySourceExecutor {
69    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70        f.debug_struct("DummySourceExecutor")
71            .field("actor_id", &self.actor_ctx.id)
72            .finish()
73    }
74}