Skip to main content

risingwave_batch_executors/executor/
utils.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::StreamExt;
16use futures::stream::BoxStream;
17use futures_async_stream::try_stream;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::Schema;
20use risingwave_pb::batch_plan::PbScanRange;
21use risingwave_pb::plan_common::StorageTableDesc;
22pub use risingwave_storage::table::batch_table::PkScanRange as ScanRange;
23
24use crate::error::{BatchError, Result};
25use crate::executor::{BoxedDataChunkStream, Executor};
26
27pub type BoxedDataChunkListStream = BoxStream<'static, Result<Vec<DataChunk>>>;
28
29/// Read at least `rows` rows.
30#[try_stream(boxed, ok = Vec<DataChunk>, error = BatchError)]
31pub async fn batch_read(mut stream: BoxedDataChunkStream, rows: usize) {
32    let mut cnt = 0;
33    let mut chunk_list = vec![];
34    while let Some(build_chunk) = stream.next().await {
35        let build_chunk = build_chunk?;
36        cnt += build_chunk.cardinality();
37        chunk_list.push(build_chunk);
38        if cnt < rows {
39            continue;
40        } else {
41            yield chunk_list;
42            cnt = 0;
43            chunk_list = vec![];
44        }
45    }
46    if !chunk_list.is_empty() {
47        yield chunk_list;
48    }
49}
50
51pub struct BufferChunkExecutor {
52    schema: Schema,
53    chunk_list: Vec<DataChunk>,
54}
55
56impl Executor for BufferChunkExecutor {
57    fn schema(&self) -> &Schema {
58        &self.schema
59    }
60
61    fn identity(&self) -> &str {
62        "BufferChunkExecutor"
63    }
64
65    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
66        self.do_execute()
67    }
68}
69
70impl BufferChunkExecutor {
71    pub fn new(schema: Schema, chunk_list: Vec<DataChunk>) -> Self {
72        Self { schema, chunk_list }
73    }
74
75    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
76    async fn do_execute(self) {
77        for chunk in self.chunk_list {
78            yield chunk
79        }
80    }
81}
82
83pub struct DummyExecutor {
84    pub schema: Schema,
85}
86
87impl Executor for DummyExecutor {
88    fn schema(&self) -> &Schema {
89        &self.schema
90    }
91
92    fn identity(&self) -> &str {
93        "dummy"
94    }
95
96    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
97        DummyExecutor::do_nothing()
98    }
99}
100
101impl DummyExecutor {
102    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
103    async fn do_nothing() {}
104}
105
106pub struct WrapStreamExecutor {
107    schema: Schema,
108    stream: BoxedDataChunkStream,
109}
110
111impl WrapStreamExecutor {
112    pub fn new(schema: Schema, stream: BoxedDataChunkStream) -> Self {
113        Self { schema, stream }
114    }
115}
116
117impl Executor for WrapStreamExecutor {
118    fn schema(&self) -> &Schema {
119        &self.schema
120    }
121
122    fn identity(&self) -> &str {
123        "WrapStreamExecutor"
124    }
125
126    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
127        self.stream
128    }
129}
130
131pub fn build_scan_ranges_from_pb(
132    scan_ranges: &Vec<PbScanRange>,
133    table_desc: &StorageTableDesc,
134) -> Result<Vec<ScanRange>> {
135    Ok(ScanRange::build_from_protobuf(scan_ranges, table_desc)?)
136}
137
138pub fn build_scan_range_from_pb(
139    scan_range: &PbScanRange,
140    table_desc: &StorageTableDesc,
141) -> Result<ScanRange> {
142    Ok(ScanRange::from_protobuf(scan_range, table_desc)?)
143}