risingwave_stream/executor/test_utils/
mock_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16pub struct MockSource {
17    rx: mpsc::UnboundedReceiver<Message>,
18
19    /// Whether to send a `Stop` barrier on stream finish.
20    stop_on_finish: bool,
21}
22
23/// A wrapper around `Sender<Message>`.
24pub struct MessageSender(mpsc::UnboundedSender<Message>);
25
26impl MessageSender {
27    pub fn push_chunk(&mut self, chunk: StreamChunk) {
28        self.0.send(Message::Chunk(chunk)).unwrap();
29    }
30
31    pub fn push_barrier(&mut self, epoch: u64, stop: bool) {
32        let mut barrier = Barrier::new_test_barrier(epoch);
33        if stop {
34            barrier = barrier.with_stop();
35        }
36        self.0.send(Message::Barrier(barrier)).unwrap();
37    }
38
39    pub fn send_barrier(&self, barrier: Barrier) {
40        self.0.send(Message::Barrier(barrier)).unwrap();
41    }
42
43    pub fn push_barrier_with_prev_epoch_for_test(
44        &mut self,
45        cur_epoch: u64,
46        prev_epoch: u64,
47        stop: bool,
48    ) {
49        let mut barrier = Barrier::with_prev_epoch_for_test(cur_epoch, prev_epoch);
50        if stop {
51            barrier = barrier.with_stop();
52        }
53        self.0.send(Message::Barrier(barrier)).unwrap();
54    }
55
56    pub fn push_watermark(&mut self, col_idx: usize, data_type: DataType, val: ScalarImpl) {
57        self.0
58            .send(Message::Watermark(Watermark {
59                col_idx,
60                data_type,
61                val,
62            }))
63            .unwrap();
64    }
65
66    pub fn push_int64_watermark(&mut self, col_idx: usize, val: i64) {
67        self.push_watermark(col_idx, DataType::Int64, ScalarImpl::Int64(val));
68    }
69}
70
71impl std::fmt::Debug for MockSource {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct("MockSource").finish()
74    }
75}
76
77impl MockSource {
78    pub fn channel() -> (MessageSender, Self) {
79        let (tx, rx) = mpsc::unbounded_channel();
80        let source = Self {
81            rx,
82            stop_on_finish: true,
83        };
84        (MessageSender(tx), source)
85    }
86
87    pub fn with_messages(msgs: Vec<Message>) -> Self {
88        let (tx, source) = Self::channel();
89        for msg in msgs {
90            tx.0.send(msg).unwrap();
91        }
92        source
93    }
94
95    pub fn with_chunks(chunks: Vec<StreamChunk>) -> Self {
96        let (tx, source) = Self::channel();
97        for chunk in chunks {
98            tx.0.send(Message::Chunk(chunk)).unwrap();
99        }
100        source
101    }
102
103    #[must_use]
104    pub fn stop_on_finish(self, stop_on_finish: bool) -> Self {
105        Self {
106            stop_on_finish,
107            ..self
108        }
109    }
110
111    pub fn into_executor(self, schema: Schema, stream_key: Vec<usize>) -> Executor {
112        Executor::new(
113            ExecutorInfo::for_test(schema, stream_key, "MockSource".to_owned(), 0),
114            self.boxed(),
115        )
116    }
117
118    #[try_stream(ok = Message, error = StreamExecutorError)]
119    async fn execute_inner(mut self: Box<Self>) {
120        let mut epoch = test_epoch(1);
121
122        while let Some(msg) = self.rx.recv().await {
123            epoch.inc_epoch();
124            yield msg;
125        }
126
127        if self.stop_on_finish {
128            yield Message::Barrier(Barrier::new_test_barrier(epoch).with_stop());
129        }
130    }
131}
132
133impl Execute for MockSource {
134    fn execute(self: Box<Self>) -> super::BoxedMessageStream {
135        self.execute_inner().boxed()
136    }
137}