risingwave_batch/executor/
utils.rsuse futures::stream::BoxStream;
use futures::StreamExt;
use futures_async_stream::try_stream;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::Schema;
use crate::error::{BatchError, Result};
use crate::executor::{BoxedDataChunkStream, Executor};
pub type BoxedDataChunkListStream = BoxStream<'static, Result<Vec<DataChunk>>>;
#[try_stream(boxed, ok = Vec<DataChunk>, error = BatchError)]
pub async fn batch_read(mut stream: BoxedDataChunkStream, rows: usize) {
let mut cnt = 0;
let mut chunk_list = vec![];
while let Some(build_chunk) = stream.next().await {
let build_chunk = build_chunk?;
cnt += build_chunk.cardinality();
chunk_list.push(build_chunk);
if cnt < rows {
continue;
} else {
yield chunk_list;
cnt = 0;
chunk_list = vec![];
}
}
if !chunk_list.is_empty() {
yield chunk_list;
}
}
pub struct BufferChunkExecutor {
schema: Schema,
chunk_list: Vec<DataChunk>,
}
impl Executor for BufferChunkExecutor {
fn schema(&self) -> &Schema {
&self.schema
}
fn identity(&self) -> &str {
"BufferChunkExecutor"
}
fn execute(self: Box<Self>) -> BoxedDataChunkStream {
self.do_execute()
}
}
impl BufferChunkExecutor {
pub fn new(schema: Schema, chunk_list: Vec<DataChunk>) -> Self {
Self { schema, chunk_list }
}
#[try_stream(boxed, ok = DataChunk, error = BatchError)]
async fn do_execute(self) {
for chunk in self.chunk_list {
yield chunk
}
}
}
pub struct DummyExecutor {
pub schema: Schema,
}
impl Executor for DummyExecutor {
fn schema(&self) -> &Schema {
&self.schema
}
fn identity(&self) -> &str {
"dummy"
}
fn execute(self: Box<Self>) -> BoxedDataChunkStream {
DummyExecutor::do_nothing()
}
}
impl DummyExecutor {
#[try_stream(boxed, ok = DataChunk, error = BatchError)]
async fn do_nothing() {}
}
pub struct WrapStreamExecutor {
schema: Schema,
stream: BoxedDataChunkStream,
}
impl WrapStreamExecutor {
pub fn new(schema: Schema, stream: BoxedDataChunkStream) -> Self {
Self { schema, stream }
}
}
impl Executor for WrapStreamExecutor {
fn schema(&self) -> &Schema {
&self.schema
}
fn identity(&self) -> &str {
"WrapStreamExecutor"
}
fn execute(self: Box<Self>) -> BoxedDataChunkStream {
self.stream
}
}