risingwave_batch_executors/executor/
utils.rs1use futures::StreamExt;
16use futures::stream::BoxStream;
17use futures_async_stream::try_stream;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::Schema;
20use risingwave_pb::batch_plan::PbScanRange;
21use risingwave_pb::plan_common::StorageTableDesc;
22pub use risingwave_storage::table::batch_table::PkScanRange as ScanRange;
23
24use crate::error::{BatchError, Result};
25use crate::executor::{BoxedDataChunkStream, Executor};
26
27pub type BoxedDataChunkListStream = BoxStream<'static, Result<Vec<DataChunk>>>;
28
29#[try_stream(boxed, ok = Vec<DataChunk>, error = BatchError)]
31pub async fn batch_read(mut stream: BoxedDataChunkStream, rows: usize) {
32 let mut cnt = 0;
33 let mut chunk_list = vec![];
34 while let Some(build_chunk) = stream.next().await {
35 let build_chunk = build_chunk?;
36 cnt += build_chunk.cardinality();
37 chunk_list.push(build_chunk);
38 if cnt < rows {
39 continue;
40 } else {
41 yield chunk_list;
42 cnt = 0;
43 chunk_list = vec![];
44 }
45 }
46 if !chunk_list.is_empty() {
47 yield chunk_list;
48 }
49}
50
51pub struct BufferChunkExecutor {
52 schema: Schema,
53 chunk_list: Vec<DataChunk>,
54}
55
56impl Executor for BufferChunkExecutor {
57 fn schema(&self) -> &Schema {
58 &self.schema
59 }
60
61 fn identity(&self) -> &str {
62 "BufferChunkExecutor"
63 }
64
65 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
66 self.do_execute()
67 }
68}
69
70impl BufferChunkExecutor {
71 pub fn new(schema: Schema, chunk_list: Vec<DataChunk>) -> Self {
72 Self { schema, chunk_list }
73 }
74
75 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
76 async fn do_execute(self) {
77 for chunk in self.chunk_list {
78 yield chunk
79 }
80 }
81}
82
83pub struct DummyExecutor {
84 pub schema: Schema,
85}
86
87impl Executor for DummyExecutor {
88 fn schema(&self) -> &Schema {
89 &self.schema
90 }
91
92 fn identity(&self) -> &str {
93 "dummy"
94 }
95
96 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
97 DummyExecutor::do_nothing()
98 }
99}
100
101impl DummyExecutor {
102 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
103 async fn do_nothing() {}
104}
105
106pub struct WrapStreamExecutor {
107 schema: Schema,
108 stream: BoxedDataChunkStream,
109}
110
111impl WrapStreamExecutor {
112 pub fn new(schema: Schema, stream: BoxedDataChunkStream) -> Self {
113 Self { schema, stream }
114 }
115}
116
117impl Executor for WrapStreamExecutor {
118 fn schema(&self) -> &Schema {
119 &self.schema
120 }
121
122 fn identity(&self) -> &str {
123 "WrapStreamExecutor"
124 }
125
126 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
127 self.stream
128 }
129}
130
131pub fn build_scan_ranges_from_pb(
132 scan_ranges: &Vec<PbScanRange>,
133 table_desc: &StorageTableDesc,
134) -> Result<Vec<ScanRange>> {
135 Ok(ScanRange::build_from_protobuf(scan_ranges, table_desc)?)
136}
137
138pub fn build_scan_range_from_pb(
139 scan_range: &PbScanRange,
140 table_desc: &StorageTableDesc,
141) -> Result<ScanRange> {
142 Ok(ScanRange::from_protobuf(scan_range, table_desc)?)
143}