risingwave_stream/executor/test_utils/
mock_source.rs1use super::*;
16pub struct MockSource {
17 rx: mpsc::UnboundedReceiver<Message>,
18
19 stop_on_finish: bool,
21}
22
23pub struct MessageSender(mpsc::UnboundedSender<Message>);
25
26impl MessageSender {
27 pub fn push_chunk(&mut self, chunk: StreamChunk) {
28 self.0.send(Message::Chunk(chunk)).unwrap();
29 }
30
31 pub fn push_barrier(&mut self, epoch: u64, stop: bool) {
32 let mut barrier = Barrier::new_test_barrier(epoch);
33 if stop {
34 barrier = barrier.with_stop();
35 }
36 self.0.send(Message::Barrier(barrier)).unwrap();
37 }
38
39 pub fn send_barrier(&self, barrier: Barrier) {
40 self.0.send(Message::Barrier(barrier)).unwrap();
41 }
42
43 pub fn push_barrier_with_prev_epoch_for_test(
44 &mut self,
45 cur_epoch: u64,
46 prev_epoch: u64,
47 stop: bool,
48 ) {
49 let mut barrier = Barrier::with_prev_epoch_for_test(cur_epoch, prev_epoch);
50 if stop {
51 barrier = barrier.with_stop();
52 }
53 self.0.send(Message::Barrier(barrier)).unwrap();
54 }
55
56 pub fn push_watermark(&mut self, col_idx: usize, data_type: DataType, val: ScalarImpl) {
57 self.0
58 .send(Message::Watermark(Watermark {
59 col_idx,
60 data_type,
61 val,
62 }))
63 .unwrap();
64 }
65
66 pub fn push_int64_watermark(&mut self, col_idx: usize, val: i64) {
67 self.push_watermark(col_idx, DataType::Int64, ScalarImpl::Int64(val));
68 }
69}
70
71impl std::fmt::Debug for MockSource {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 f.debug_struct("MockSource").finish()
74 }
75}
76
77impl MockSource {
78 pub fn channel() -> (MessageSender, Self) {
79 let (tx, rx) = mpsc::unbounded_channel();
80 let source = Self {
81 rx,
82 stop_on_finish: true,
83 };
84 (MessageSender(tx), source)
85 }
86
87 pub fn with_messages(msgs: Vec<Message>) -> Self {
88 let (tx, source) = Self::channel();
89 for msg in msgs {
90 tx.0.send(msg).unwrap();
91 }
92 source
93 }
94
95 pub fn with_chunks(chunks: Vec<StreamChunk>) -> Self {
96 let (tx, source) = Self::channel();
97 for chunk in chunks {
98 tx.0.send(Message::Chunk(chunk)).unwrap();
99 }
100 source
101 }
102
103 #[must_use]
104 pub fn stop_on_finish(self, stop_on_finish: bool) -> Self {
105 Self {
106 stop_on_finish,
107 ..self
108 }
109 }
110
111 pub fn into_executor(self, schema: Schema, stream_key: Vec<usize>) -> Executor {
112 Executor::new(
113 ExecutorInfo::for_test(schema, stream_key, "MockSource".to_owned(), 0),
114 self.boxed(),
115 )
116 }
117
118 #[try_stream(ok = Message, error = StreamExecutorError)]
119 async fn execute_inner(mut self: Box<Self>) {
120 let mut epoch = test_epoch(1);
121
122 while let Some(msg) = self.rx.recv().await {
123 epoch.inc_epoch();
124 yield msg;
125 }
126
127 if self.stop_on_finish {
128 yield Message::Barrier(Barrier::new_test_barrier(epoch).with_stop());
129 }
130 }
131}
132
133impl Execute for MockSource {
134 fn execute(self: Box<Self>) -> super::BoxedMessageStream {
135 self.execute_inner().boxed()
136 }
137}