risingwave_stream/executor/test_utils/
mod.rs1pub mod agg_executor;
17pub mod hash_join_executor;
18mod mock_source;
19pub mod top_n_executor;
20
21use async_trait::async_trait;
22use futures::{FutureExt, StreamExt, TryStreamExt};
23use futures_async_stream::try_stream;
24pub use mock_source::*;
25use risingwave_common::catalog::Schema;
26use risingwave_common::types::{DataType, ScalarImpl};
27use risingwave_common::util::epoch::{EpochExt, test_epoch};
28use tokio::sync::mpsc;
29
30use super::error::StreamExecutorError;
31use super::{
32 Barrier, BoxedMessageStream, Execute, Executor, ExecutorInfo, Message, MessageStream,
33 StreamChunk, StreamExecutorResult, Watermark,
34};
35
36pub mod prelude {
38 pub use std::sync::Arc;
39 pub use std::sync::atomic::AtomicU64;
40
41 pub use risingwave_common::array::StreamChunk;
42 pub use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
43 pub use risingwave_common::test_prelude::StreamChunkTestExt;
44 pub use risingwave_common::types::DataType;
45 pub use risingwave_common::util::sort_util::OrderType;
46 pub use risingwave_storage::StateStore;
47 pub use risingwave_storage::memory::MemoryStateStore;
48
49 pub use crate::common::table::state_table::StateTable;
50 pub use crate::executor::test_utils::expr::build_from_pretty;
51 pub use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
52 pub use crate::executor::{ActorContext, BoxedMessageStream, Execute, StreamKey};
53}
54
55#[async_trait]
61pub trait StreamExecutorTestExt: MessageStream + Unpin {
62 #[track_caller]
66 fn next_unwrap_pending(&mut self) {
67 if let Some(r) = self.try_next().now_or_never() {
68 panic!("expect pending stream, but got `{:?}`", r);
69 }
70 }
71
72 #[track_caller]
76 fn next_unwrap_ready(&mut self) -> StreamExecutorResult<Message> {
77 match self.next().now_or_never() {
78 Some(Some(r)) => r,
79 Some(None) => panic!("expect ready stream, but got terminated"),
80 None => panic!("expect ready stream, but got pending"),
81 }
82 }
83
84 fn next_unwrap_ready_chunk(&mut self) -> StreamExecutorResult<StreamChunk> {
88 self.next_unwrap_ready()
89 .map(|msg| msg.into_chunk().expect("expect chunk"))
90 }
91
92 fn next_unwrap_ready_barrier(&mut self) -> StreamExecutorResult<Barrier> {
96 self.next_unwrap_ready()
97 .map(|msg| msg.into_barrier().expect("expect barrier"))
98 }
99
100 fn next_unwrap_ready_watermark(&mut self) -> StreamExecutorResult<Watermark> {
104 self.next_unwrap_ready()
105 .map(|msg| msg.into_watermark().expect("expect watermark"))
106 }
107
108 async fn expect_barrier(&mut self) -> Barrier {
109 let msg = self.next().await.unwrap().unwrap();
110 msg.into_barrier().unwrap()
111 }
112
113 async fn expect_chunk(&mut self) -> StreamChunk {
114 let msg = self.next().await.unwrap().unwrap();
115 msg.into_chunk().unwrap()
116 }
117
118 async fn expect_watermark(&mut self) -> Watermark {
119 let msg = self.next().await.unwrap().unwrap();
120 msg.into_watermark().unwrap()
121 }
122}
123
124impl StreamExecutorTestExt for BoxedMessageStream {}
126
127#[macro_export]
130macro_rules! row_nonnull {
131 [$( $value:expr ),*] => {
132 {
133 risingwave_common::row::OwnedRow::new(vec![$(Some($value.into()), )*])
134 }
135 };
136}
137
138pub mod expr {
139 use risingwave_expr::expr::NonStrictExpression;
140
141 pub fn build_from_pretty(s: impl AsRef<str>) -> NonStrictExpression {
142 NonStrictExpression::for_test(risingwave_expr::expr::build_from_pretty(s))
143 }
144}