risingwave_stream/executor/test_utils/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Re-export everything from submodules
16pub mod agg_executor;
17pub mod hash_join_executor;
18mod mock_source;
19pub mod top_n_executor;
20
21use async_trait::async_trait;
22use futures::{FutureExt, StreamExt, TryStreamExt};
23use futures_async_stream::try_stream;
24pub use mock_source::*;
25use risingwave_common::catalog::Schema;
26use risingwave_common::types::{DataType, ScalarImpl};
27use risingwave_common::util::epoch::{EpochExt, test_epoch};
28use tokio::sync::mpsc;
29
30use super::error::StreamExecutorError;
31use super::{
32    Barrier, BoxedMessageStream, Execute, Executor, ExecutorInfo, Message, MessageStream,
33    StreamChunk, StreamExecutorResult, Watermark,
34};
35
36// Keep the prelude module here since it imports from multiple submodules
37pub mod prelude {
38    pub use std::sync::Arc;
39    pub use std::sync::atomic::AtomicU64;
40
41    pub use risingwave_common::array::StreamChunk;
42    pub use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
43    pub use risingwave_common::test_prelude::StreamChunkTestExt;
44    pub use risingwave_common::types::DataType;
45    pub use risingwave_common::util::sort_util::OrderType;
46    pub use risingwave_storage::StateStore;
47    pub use risingwave_storage::memory::MemoryStateStore;
48
49    pub use crate::common::table::state_table::StateTable;
50    pub use crate::executor::test_utils::expr::build_from_pretty;
51    pub use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
52    pub use crate::executor::{ActorContext, BoxedMessageStream, Execute, PkIndices};
53}
54
55/// Trait for testing `StreamExecutor` more easily.
56///
57/// With `next_unwrap_ready`, we can retrieve the next message from the executor without `await`ing,
58/// so that we can immediately panic if the executor is not ready instead of getting stuck. This is
59/// useful for testing.
60#[async_trait]
61pub trait StreamExecutorTestExt: MessageStream + Unpin {
62    /// Asserts that the executor is pending (not ready) now.
63    ///
64    /// Panics if it is ready.
65    fn next_unwrap_pending(&mut self) {
66        if let Some(r) = self.try_next().now_or_never() {
67            panic!("expect pending stream, but got `{:?}`", r);
68        }
69    }
70
71    /// Asserts that the executor is ready now, returning the next message.
72    ///
73    /// Panics if it is pending.
74    fn next_unwrap_ready(&mut self) -> StreamExecutorResult<Message> {
75        match self.next().now_or_never() {
76            Some(Some(r)) => r,
77            Some(None) => panic!("expect ready stream, but got terminated"),
78            None => panic!("expect ready stream, but got pending"),
79        }
80    }
81
82    /// Asserts that the executor is ready on a [`StreamChunk`] now, returning the next chunk.
83    ///
84    /// Panics if it is pending or the next message is not a [`StreamChunk`].
85    fn next_unwrap_ready_chunk(&mut self) -> StreamExecutorResult<StreamChunk> {
86        self.next_unwrap_ready()
87            .map(|msg| msg.into_chunk().expect("expect chunk"))
88    }
89
90    /// Asserts that the executor is ready on a [`Barrier`] now, returning the next barrier.
91    ///
92    /// Panics if it is pending or the next message is not a [`Barrier`].
93    fn next_unwrap_ready_barrier(&mut self) -> StreamExecutorResult<Barrier> {
94        self.next_unwrap_ready()
95            .map(|msg| msg.into_barrier().expect("expect barrier"))
96    }
97
98    /// Asserts that the executor is ready on a [`Watermark`] now, returning the next barrier.
99    ///
100    /// Panics if it is pending or the next message is not a [`Watermark`].
101    fn next_unwrap_ready_watermark(&mut self) -> StreamExecutorResult<Watermark> {
102        self.next_unwrap_ready()
103            .map(|msg| msg.into_watermark().expect("expect watermark"))
104    }
105
106    async fn expect_barrier(&mut self) -> Barrier {
107        let msg = self.next().await.unwrap().unwrap();
108        msg.into_barrier().unwrap()
109    }
110
111    async fn expect_chunk(&mut self) -> StreamChunk {
112        let msg = self.next().await.unwrap().unwrap();
113        msg.into_chunk().unwrap()
114    }
115
116    async fn expect_watermark(&mut self) -> Watermark {
117        let msg = self.next().await.unwrap().unwrap();
118        msg.into_watermark().unwrap()
119    }
120}
121
122// FIXME: implement on any `impl MessageStream` if the analyzer works well.
123impl StreamExecutorTestExt for BoxedMessageStream {}
124
125/// `row_nonnull` builds a `OwnedRow` with concrete values.
126/// TODO: add macro row!, which requires a new trait `ToScalarValue`.
127#[macro_export]
128macro_rules! row_nonnull {
129    [$( $value:expr ),*] => {
130        {
131            risingwave_common::row::OwnedRow::new(vec![$(Some($value.into()), )*])
132        }
133    };
134}
135
136pub mod expr {
137    use risingwave_expr::expr::NonStrictExpression;
138
139    pub fn build_from_pretty(s: impl AsRef<str>) -> NonStrictExpression {
140        NonStrictExpression::for_test(risingwave_expr::expr::build_from_pretty(s))
141    }
142}