risingwave_stream/executor/test_utils/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Re-export everything from submodules
16pub mod agg_executor;
17pub mod hash_join_executor;
18mod mock_source;
19pub mod top_n_executor;
20
21use async_trait::async_trait;
22use futures::{FutureExt, StreamExt, TryStreamExt};
23use futures_async_stream::try_stream;
24pub use mock_source::*;
25use risingwave_common::catalog::Schema;
26use risingwave_common::types::{DataType, ScalarImpl};
27use risingwave_common::util::epoch::{EpochExt, test_epoch};
28use tokio::sync::mpsc;
29
30use super::error::StreamExecutorError;
31use super::{
32    Barrier, BoxedMessageStream, Execute, Executor, ExecutorInfo, Message, MessageStream,
33    StreamChunk, StreamExecutorResult, Watermark,
34};
35
36// Keep the prelude module here since it imports from multiple submodules
37pub mod prelude {
38    pub use std::sync::Arc;
39    pub use std::sync::atomic::AtomicU64;
40
41    pub use risingwave_common::array::StreamChunk;
42    pub use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
43    pub use risingwave_common::test_prelude::StreamChunkTestExt;
44    pub use risingwave_common::types::DataType;
45    pub use risingwave_common::util::sort_util::OrderType;
46    pub use risingwave_storage::StateStore;
47    pub use risingwave_storage::memory::MemoryStateStore;
48
49    pub use crate::common::table::state_table::StateTable;
50    pub use crate::executor::test_utils::expr::build_from_pretty;
51    pub use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
52    pub use crate::executor::{ActorContext, BoxedMessageStream, Execute, StreamKey};
53}
54
55/// Trait for testing `StreamExecutor` more easily.
56///
57/// With `next_unwrap_ready`, we can retrieve the next message from the executor without `await`ing,
58/// so that we can immediately panic if the executor is not ready instead of getting stuck. This is
59/// useful for testing.
60#[async_trait]
61pub trait StreamExecutorTestExt: MessageStream + Unpin {
62    /// Asserts that the executor is pending (not ready) now.
63    ///
64    /// Panics if it is ready.
65    #[track_caller]
66    fn next_unwrap_pending(&mut self) {
67        if let Some(r) = self.try_next().now_or_never() {
68            panic!("expect pending stream, but got `{:?}`", r);
69        }
70    }
71
72    /// Asserts that the executor is ready now, returning the next message.
73    ///
74    /// Panics if it is pending.
75    #[track_caller]
76    fn next_unwrap_ready(&mut self) -> StreamExecutorResult<Message> {
77        match self.next().now_or_never() {
78            Some(Some(r)) => r,
79            Some(None) => panic!("expect ready stream, but got terminated"),
80            None => panic!("expect ready stream, but got pending"),
81        }
82    }
83
84    /// Asserts that the executor is ready on a [`StreamChunk`] now, returning the next chunk.
85    ///
86    /// Panics if it is pending or the next message is not a [`StreamChunk`].
87    fn next_unwrap_ready_chunk(&mut self) -> StreamExecutorResult<StreamChunk> {
88        self.next_unwrap_ready()
89            .map(|msg| msg.into_chunk().expect("expect chunk"))
90    }
91
92    /// Asserts that the executor is ready on a [`Barrier`] now, returning the next barrier.
93    ///
94    /// Panics if it is pending or the next message is not a [`Barrier`].
95    fn next_unwrap_ready_barrier(&mut self) -> StreamExecutorResult<Barrier> {
96        self.next_unwrap_ready()
97            .map(|msg| msg.into_barrier().expect("expect barrier"))
98    }
99
100    /// Asserts that the executor is ready on a [`Watermark`] now, returning the next barrier.
101    ///
102    /// Panics if it is pending or the next message is not a [`Watermark`].
103    fn next_unwrap_ready_watermark(&mut self) -> StreamExecutorResult<Watermark> {
104        self.next_unwrap_ready()
105            .map(|msg| msg.into_watermark().expect("expect watermark"))
106    }
107
108    async fn expect_barrier(&mut self) -> Barrier {
109        let msg = self.next().await.unwrap().unwrap();
110        msg.into_barrier().unwrap()
111    }
112
113    async fn expect_chunk(&mut self) -> StreamChunk {
114        let msg = self.next().await.unwrap().unwrap();
115        msg.into_chunk().unwrap()
116    }
117
118    async fn expect_watermark(&mut self) -> Watermark {
119        let msg = self.next().await.unwrap().unwrap();
120        msg.into_watermark().unwrap()
121    }
122}
123
124// FIXME: implement on any `impl MessageStream` if the analyzer works well.
125impl StreamExecutorTestExt for BoxedMessageStream {}
126
127/// `row_nonnull` builds a `OwnedRow` with concrete values.
128/// TODO: add macro row!, which requires a new trait `ToScalarValue`.
129#[macro_export]
130macro_rules! row_nonnull {
131    [$( $value:expr ),*] => {
132        {
133            risingwave_common::row::OwnedRow::new(vec![$(Some($value.into()), )*])
134        }
135    };
136}
137
138pub mod expr {
139    use risingwave_expr::expr::NonStrictExpression;
140
141    pub fn build_from_pretty(s: impl AsRef<str>) -> NonStrictExpression {
142        NonStrictExpression::for_test(risingwave_expr::expr::build_from_pretty(s))
143    }
144}