risingwave_batch_executors/executor/
max_one_row.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use risingwave_common::array::DataChunk;
17use risingwave_common::catalog::Schema;
18use risingwave_pb::batch_plan::plan_node::NodeBody;
19
20use crate::error::{BatchError, Result};
21use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
22
23pub struct MaxOneRowExecutor {
24    child: BoxedExecutor,
25
26    /// Identity string of the executor
27    identity: String,
28}
29
30impl BoxedExecutorBuilder for MaxOneRowExecutor {
31    async fn new_boxed_executor(
32        source: &ExecutorBuilder<'_>,
33        inputs: Vec<BoxedExecutor>,
34    ) -> Result<BoxedExecutor> {
35        let [child]: [_; 1] = inputs.try_into().unwrap();
36
37        let _node = try_match_expand!(
38            source.plan_node().get_node_body().unwrap(),
39            NodeBody::MaxOneRow
40        )?;
41
42        Ok(Box::new(Self {
43            child,
44            identity: source.plan_node().get_identity().clone(),
45        }))
46    }
47}
48
49impl Executor for MaxOneRowExecutor {
50    fn schema(&self) -> &Schema {
51        self.child.schema()
52    }
53
54    fn identity(&self) -> &str {
55        &self.identity
56    }
57
58    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
59    async fn execute(self: Box<Self>) {
60        let data_types = self.child.schema().data_types();
61        let mut result = None;
62
63        #[for_await]
64        for chunk in self.child.execute() {
65            let chunk = chunk?;
66            for row in chunk.rows() {
67                if result.is_some() {
68                    // `MaxOneRow` is currently only used for the runtime check of
69                    // scalar subqueries, so we raise a precise error here.
70                    bail!("Scalar subquery produced more than one row.");
71                } else {
72                    // We do not immediately yield the chunk here. Instead, we store
73                    // it in `result` and only yield it when the child executor is
74                    // exhausted, in case the parent executor cancels the execution
75                    // after receiving the row (like `limit 1`).
76                    result = Some(DataChunk::from_rows(&[row], &data_types));
77                }
78            }
79        }
80
81        if let Some(result) = result {
82            yield result;
83        }
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use futures_util::StreamExt;
90    use risingwave_common::array::DataChunk;
91    use risingwave_common::catalog::schema_test_utils::ii;
92    use risingwave_common::row::Row;
93    use risingwave_common::types::Datum;
94
95    use crate::executor::test_utils::MockExecutor;
96    use crate::executor::{Executor, MaxOneRowExecutor};
97
98    #[derive(Clone, Copy, Debug)]
99    enum ExpectedOutput {
100        Empty,
101        OneRow,
102        Error,
103    }
104
105    async fn test_case(input: MockExecutor, expected: ExpectedOutput) {
106        let executor = Box::new(MaxOneRowExecutor {
107            child: Box::new(input),
108            identity: "".to_owned(),
109        });
110
111        let outputs: Vec<_> = executor.execute().collect().await;
112
113        match (&outputs[..], expected) {
114            (&[], ExpectedOutput::Empty) => {}
115            (&[Ok(ref chunk)], ExpectedOutput::OneRow) => assert_eq!(chunk.cardinality(), 1),
116            (&[Err(_)], ExpectedOutput::Error) => {}
117            _ => panic!("expected {expected:?}, got {outputs:?}"),
118        }
119    }
120
121    fn row() -> impl Row {
122        [Datum::Some(114i32.into()), Datum::Some(514i32.into())]
123    }
124
125    #[tokio::test]
126    async fn test_empty() {
127        let input = MockExecutor::new(ii());
128
129        test_case(input, ExpectedOutput::Empty).await;
130    }
131
132    #[tokio::test]
133    async fn test_one_row() {
134        let mut input = MockExecutor::new(ii());
135        input.add(DataChunk::from_rows(&[row()], &ii().data_types()));
136
137        test_case(input, ExpectedOutput::OneRow).await;
138    }
139
140    #[tokio::test]
141    async fn test_error_1() {
142        let mut input = MockExecutor::new(ii());
143        input.add(DataChunk::from_rows(&[row(), row()], &ii().data_types()));
144
145        test_case(input, ExpectedOutput::Error).await;
146    }
147
148    #[tokio::test]
149    async fn test_error_2() {
150        let mut input = MockExecutor::new(ii());
151        input.add(DataChunk::from_rows(&[row()], &ii().data_types()));
152        input.add(DataChunk::from_rows(&[row()], &ii().data_types()));
153
154        test_case(input, ExpectedOutput::Error).await;
155    }
156}