risingwave_stream/executor/test_utils/
mod.rs1pub mod agg_executor;
17pub mod hash_join_executor;
18mod mock_source;
19pub mod top_n_executor;
20
21use async_trait::async_trait;
22use futures::{FutureExt, StreamExt, TryStreamExt};
23use futures_async_stream::try_stream;
24pub use mock_source::*;
25use risingwave_common::catalog::Schema;
26use risingwave_common::types::{DataType, ScalarImpl};
27use risingwave_common::util::epoch::{EpochExt, test_epoch};
28use tokio::sync::mpsc;
29
30use super::error::StreamExecutorError;
31use super::{
32 Barrier, BoxedMessageStream, Execute, Executor, ExecutorInfo, Message, MessageStream,
33 StreamChunk, StreamExecutorResult, Watermark,
34};
35
36pub mod prelude {
38 pub use std::sync::Arc;
39 pub use std::sync::atomic::AtomicU64;
40
41 pub use risingwave_common::array::StreamChunk;
42 pub use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
43 pub use risingwave_common::test_prelude::StreamChunkTestExt;
44 pub use risingwave_common::types::DataType;
45 pub use risingwave_common::util::sort_util::OrderType;
46 pub use risingwave_storage::StateStore;
47 pub use risingwave_storage::memory::MemoryStateStore;
48
49 pub use crate::common::table::state_table::StateTable;
50 pub use crate::executor::test_utils::expr::build_from_pretty;
51 pub use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
52 pub use crate::executor::{ActorContext, BoxedMessageStream, Execute, PkIndices};
53}
54
55#[async_trait]
61pub trait StreamExecutorTestExt: MessageStream + Unpin {
62 fn next_unwrap_pending(&mut self) {
66 if let Some(r) = self.try_next().now_or_never() {
67 panic!("expect pending stream, but got `{:?}`", r);
68 }
69 }
70
71 fn next_unwrap_ready(&mut self) -> StreamExecutorResult<Message> {
75 match self.next().now_or_never() {
76 Some(Some(r)) => r,
77 Some(None) => panic!("expect ready stream, but got terminated"),
78 None => panic!("expect ready stream, but got pending"),
79 }
80 }
81
82 fn next_unwrap_ready_chunk(&mut self) -> StreamExecutorResult<StreamChunk> {
86 self.next_unwrap_ready()
87 .map(|msg| msg.into_chunk().expect("expect chunk"))
88 }
89
90 fn next_unwrap_ready_barrier(&mut self) -> StreamExecutorResult<Barrier> {
94 self.next_unwrap_ready()
95 .map(|msg| msg.into_barrier().expect("expect barrier"))
96 }
97
98 fn next_unwrap_ready_watermark(&mut self) -> StreamExecutorResult<Watermark> {
102 self.next_unwrap_ready()
103 .map(|msg| msg.into_watermark().expect("expect watermark"))
104 }
105
106 async fn expect_barrier(&mut self) -> Barrier {
107 let msg = self.next().await.unwrap().unwrap();
108 msg.into_barrier().unwrap()
109 }
110
111 async fn expect_chunk(&mut self) -> StreamChunk {
112 let msg = self.next().await.unwrap().unwrap();
113 msg.into_chunk().unwrap()
114 }
115
116 async fn expect_watermark(&mut self) -> Watermark {
117 let msg = self.next().await.unwrap().unwrap();
118 msg.into_watermark().unwrap()
119 }
120}
121
122impl StreamExecutorTestExt for BoxedMessageStream {}
124
125#[macro_export]
128macro_rules! row_nonnull {
129 [$( $value:expr ),*] => {
130 {
131 risingwave_common::row::OwnedRow::new(vec![$(Some($value.into()), )*])
132 }
133 };
134}
135
136pub mod expr {
137 use risingwave_expr::expr::NonStrictExpression;
138
139 pub fn build_from_pretty(s: impl AsRef<str>) -> NonStrictExpression {
140 NonStrictExpression::for_test(risingwave_expr::expr::build_from_pretty(s))
141 }
142}