risingwave_stream/executor/source/
dummy_source_executor.rs1use std::fmt::{Debug, Formatter};
16
17use anyhow::anyhow;
18use tokio::sync::mpsc::UnboundedReceiver;
19
20use crate::executor::prelude::*;
21
22pub struct DummySourceExecutor {
25 actor_ctx: ActorContextRef,
26
27 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
29}
30
31impl DummySourceExecutor {
32 pub fn new(actor_ctx: ActorContextRef, barrier_receiver: UnboundedReceiver<Barrier>) -> Self {
33 Self {
34 actor_ctx,
35 barrier_receiver: Some(barrier_receiver),
36 }
37 }
38
39 #[try_stream(ok = Message, error = StreamExecutorError)]
42 async fn execute_inner(mut self) {
43 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
44 let barrier = barrier_receiver
45 .recv()
46 .instrument_await("source_recv_first_barrier")
47 .await
48 .ok_or_else(|| {
49 anyhow!(
50 "failed to receive the first barrier, actor_id: {:?} with no stream source",
51 self.actor_ctx.id
52 )
53 })?;
54 yield Message::Barrier(barrier);
55
56 while let Some(barrier) = barrier_receiver.recv().await {
57 yield Message::Barrier(barrier);
58 }
59 }
60}
61
62impl Execute for DummySourceExecutor {
63 fn execute(self: Box<Self>) -> BoxedMessageStream {
64 self.execute_inner().boxed()
65 }
66}
67
68impl Debug for DummySourceExecutor {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("DummySourceExecutor")
71 .field("actor_id", &self.actor_ctx.id)
72 .finish()
73 }
74}