risingwave_batch_executors/executor/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::StreamExt;
16use futures::stream::BoxStream;
17use futures_async_stream::try_stream;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::Schema;
20
21use crate::error::{BatchError, Result};
22use crate::executor::{BoxedDataChunkStream, Executor};
23
24pub type BoxedDataChunkListStream = BoxStream<'static, Result<Vec<DataChunk>>>;
25
26/// Read at least `rows` rows.
27#[try_stream(boxed, ok = Vec<DataChunk>, error = BatchError)]
28pub async fn batch_read(mut stream: BoxedDataChunkStream, rows: usize) {
29    let mut cnt = 0;
30    let mut chunk_list = vec![];
31    while let Some(build_chunk) = stream.next().await {
32        let build_chunk = build_chunk?;
33        cnt += build_chunk.cardinality();
34        chunk_list.push(build_chunk);
35        if cnt < rows {
36            continue;
37        } else {
38            yield chunk_list;
39            cnt = 0;
40            chunk_list = vec![];
41        }
42    }
43    if !chunk_list.is_empty() {
44        yield chunk_list;
45    }
46}
47
48pub struct BufferChunkExecutor {
49    schema: Schema,
50    chunk_list: Vec<DataChunk>,
51}
52
53impl Executor for BufferChunkExecutor {
54    fn schema(&self) -> &Schema {
55        &self.schema
56    }
57
58    fn identity(&self) -> &str {
59        "BufferChunkExecutor"
60    }
61
62    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
63        self.do_execute()
64    }
65}
66
67impl BufferChunkExecutor {
68    pub fn new(schema: Schema, chunk_list: Vec<DataChunk>) -> Self {
69        Self { schema, chunk_list }
70    }
71
72    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
73    async fn do_execute(self) {
74        for chunk in self.chunk_list {
75            yield chunk
76        }
77    }
78}
79
80pub struct DummyExecutor {
81    pub schema: Schema,
82}
83
84impl Executor for DummyExecutor {
85    fn schema(&self) -> &Schema {
86        &self.schema
87    }
88
89    fn identity(&self) -> &str {
90        "dummy"
91    }
92
93    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
94        DummyExecutor::do_nothing()
95    }
96}
97
98impl DummyExecutor {
99    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
100    async fn do_nothing() {}
101}
102
103pub struct WrapStreamExecutor {
104    schema: Schema,
105    stream: BoxedDataChunkStream,
106}
107
108impl WrapStreamExecutor {
109    pub fn new(schema: Schema, stream: BoxedDataChunkStream) -> Self {
110        Self { schema, stream }
111    }
112}
113
114impl Executor for WrapStreamExecutor {
115    fn schema(&self) -> &Schema {
116        &self.schema
117    }
118
119    fn identity(&self) -> &str {
120        "WrapStreamExecutor"
121    }
122
123    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
124        self.stream
125    }
126}