risingwave_batch_executors/executor/
max_one_row.rs1use futures_async_stream::try_stream;
16use risingwave_common::array::DataChunk;
17use risingwave_common::catalog::Schema;
18use risingwave_pb::batch_plan::plan_node::NodeBody;
19
20use crate::error::{BatchError, Result};
21use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
22
23pub struct MaxOneRowExecutor {
24 child: BoxedExecutor,
25
26 identity: String,
28}
29
30impl BoxedExecutorBuilder for MaxOneRowExecutor {
31 async fn new_boxed_executor(
32 source: &ExecutorBuilder<'_>,
33 inputs: Vec<BoxedExecutor>,
34 ) -> Result<BoxedExecutor> {
35 let [child]: [_; 1] = inputs.try_into().unwrap();
36
37 let _node = try_match_expand!(
38 source.plan_node().get_node_body().unwrap(),
39 NodeBody::MaxOneRow
40 )?;
41
42 Ok(Box::new(Self {
43 child,
44 identity: source.plan_node().get_identity().clone(),
45 }))
46 }
47}
48
49impl Executor for MaxOneRowExecutor {
50 fn schema(&self) -> &Schema {
51 self.child.schema()
52 }
53
54 fn identity(&self) -> &str {
55 &self.identity
56 }
57
58 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
59 async fn execute(self: Box<Self>) {
60 let data_types = self.child.schema().data_types();
61 let mut result = None;
62
63 #[for_await]
64 for chunk in self.child.execute() {
65 let chunk = chunk?;
66 for row in chunk.rows() {
67 if result.is_some() {
68 bail!("Scalar subquery produced more than one row.");
71 } else {
72 result = Some(DataChunk::from_rows(&[row], &data_types));
77 }
78 }
79 }
80
81 if let Some(result) = result {
82 yield result;
83 }
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use futures_util::StreamExt;
90 use risingwave_common::array::DataChunk;
91 use risingwave_common::catalog::schema_test_utils::ii;
92 use risingwave_common::row::Row;
93 use risingwave_common::types::Datum;
94
95 use crate::executor::test_utils::MockExecutor;
96 use crate::executor::{Executor, MaxOneRowExecutor};
97
98 #[derive(Clone, Copy, Debug)]
99 enum ExpectedOutput {
100 Empty,
101 OneRow,
102 Error,
103 }
104
105 async fn test_case(input: MockExecutor, expected: ExpectedOutput) {
106 let executor = Box::new(MaxOneRowExecutor {
107 child: Box::new(input),
108 identity: "".to_owned(),
109 });
110
111 let outputs: Vec<_> = executor.execute().collect().await;
112
113 match (&outputs[..], expected) {
114 (&[], ExpectedOutput::Empty) => {}
115 (&[Ok(ref chunk)], ExpectedOutput::OneRow) => assert_eq!(chunk.cardinality(), 1),
116 (&[Err(_)], ExpectedOutput::Error) => {}
117 _ => panic!("expected {expected:?}, got {outputs:?}"),
118 }
119 }
120
121 fn row() -> impl Row {
122 [Datum::Some(114i32.into()), Datum::Some(514i32.into())]
123 }
124
125 #[tokio::test]
126 async fn test_empty() {
127 let input = MockExecutor::new(ii());
128
129 test_case(input, ExpectedOutput::Empty).await;
130 }
131
132 #[tokio::test]
133 async fn test_one_row() {
134 let mut input = MockExecutor::new(ii());
135 input.add(DataChunk::from_rows(&[row()], &ii().data_types()));
136
137 test_case(input, ExpectedOutput::OneRow).await;
138 }
139
140 #[tokio::test]
141 async fn test_error_1() {
142 let mut input = MockExecutor::new(ii());
143 input.add(DataChunk::from_rows(&[row(), row()], &ii().data_types()));
144
145 test_case(input, ExpectedOutput::Error).await;
146 }
147
148 #[tokio::test]
149 async fn test_error_2() {
150 let mut input = MockExecutor::new(ii());
151 input.add(DataChunk::from_rows(&[row()], &ii().data_types()));
152 input.add(DataChunk::from_rows(&[row()], &ii().data_types()));
153
154 test_case(input, ExpectedOutput::Error).await;
155 }
156}