risingwave_batch_executors/executor/
utils.rs1use futures::StreamExt;
16use futures::stream::BoxStream;
17use futures_async_stream::try_stream;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::Schema;
20
21use crate::error::{BatchError, Result};
22use crate::executor::{BoxedDataChunkStream, Executor};
23
24pub type BoxedDataChunkListStream = BoxStream<'static, Result<Vec<DataChunk>>>;
25
26#[try_stream(boxed, ok = Vec<DataChunk>, error = BatchError)]
28pub async fn batch_read(mut stream: BoxedDataChunkStream, rows: usize) {
29 let mut cnt = 0;
30 let mut chunk_list = vec![];
31 while let Some(build_chunk) = stream.next().await {
32 let build_chunk = build_chunk?;
33 cnt += build_chunk.cardinality();
34 chunk_list.push(build_chunk);
35 if cnt < rows {
36 continue;
37 } else {
38 yield chunk_list;
39 cnt = 0;
40 chunk_list = vec![];
41 }
42 }
43 if !chunk_list.is_empty() {
44 yield chunk_list;
45 }
46}
47
48pub struct BufferChunkExecutor {
49 schema: Schema,
50 chunk_list: Vec<DataChunk>,
51}
52
53impl Executor for BufferChunkExecutor {
54 fn schema(&self) -> &Schema {
55 &self.schema
56 }
57
58 fn identity(&self) -> &str {
59 "BufferChunkExecutor"
60 }
61
62 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
63 self.do_execute()
64 }
65}
66
67impl BufferChunkExecutor {
68 pub fn new(schema: Schema, chunk_list: Vec<DataChunk>) -> Self {
69 Self { schema, chunk_list }
70 }
71
72 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
73 async fn do_execute(self) {
74 for chunk in self.chunk_list {
75 yield chunk
76 }
77 }
78}
79
80pub struct DummyExecutor {
81 pub schema: Schema,
82}
83
84impl Executor for DummyExecutor {
85 fn schema(&self) -> &Schema {
86 &self.schema
87 }
88
89 fn identity(&self) -> &str {
90 "dummy"
91 }
92
93 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
94 DummyExecutor::do_nothing()
95 }
96}
97
98impl DummyExecutor {
99 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
100 async fn do_nothing() {}
101}
102
103pub struct WrapStreamExecutor {
104 schema: Schema,
105 stream: BoxedDataChunkStream,
106}
107
108impl WrapStreamExecutor {
109 pub fn new(schema: Schema, stream: BoxedDataChunkStream) -> Self {
110 Self { schema, stream }
111 }
112}
113
114impl Executor for WrapStreamExecutor {
115 fn schema(&self) -> &Schema {
116 &self.schema
117 }
118
119 fn identity(&self) -> &str {
120 "WrapStreamExecutor"
121 }
122
123 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
124 self.stream
125 }
126}