risingwave_batch/executor/
project_set.rsuse either::Either;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{ArrayRef, DataChunk};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::{DataType, DatumRef};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::expr::{self, BoxedExpression};
use risingwave_expr::table_function::{self, BoxedTableFunction, TableFunctionOutputIter};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::expr::project_set_select_item::PbSelectItem;
use risingwave_pb::expr::PbProjectSetSelectItem;
use crate::error::{BatchError, Result};
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
use crate::task::BatchTaskContext;
pub struct ProjectSetExecutor {
select_list: Vec<ProjectSetSelectItem>,
child: BoxedExecutor,
schema: Schema,
identity: String,
chunk_size: usize,
}
impl Executor for ProjectSetExecutor {
fn schema(&self) -> &Schema {
&self.schema
}
fn identity(&self) -> &str {
&self.identity
}
fn execute(self: Box<Self>) -> BoxedDataChunkStream {
self.do_execute()
}
}
impl ProjectSetExecutor {
#[try_stream(boxed, ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
assert!(!self.select_list.is_empty());
let mut builder = DataChunkBuilder::new(
std::iter::once(DataType::Int64)
.chain(self.select_list.iter().map(|i| i.return_type()))
.collect(),
self.chunk_size,
);
let mut row = vec![None as DatumRef<'_>; builder.num_columns()];
#[for_await]
for input in self.child.execute() {
let input = input?;
let mut results = Vec::with_capacity(self.select_list.len());
for select_item in &self.select_list {
let result = select_item.eval(&input).await?;
results.push(result);
}
for row_idx in 0..input.capacity() {
for projected_row_id in 0i64.. {
let row: &mut [DatumRef<'_>] =
unsafe { std::mem::transmute(row.as_mut_slice()) };
row[0] = Some(projected_row_id.into());
let mut valid = false;
for (item, value) in results.iter_mut().zip_eq_fast(&mut row[1..]) {
*value = match item {
Either::Left(state) => {
if let Some((i, value)) = state.peek()
&& i == row_idx
{
valid = true;
value?
} else {
None
}
}
Either::Right(array) => array.value_at(row_idx),
};
}
if !valid {
break;
}
if let Some(chunk) = builder.append_one_row(&*row) {
yield chunk;
}
for item in &mut results {
if let Either::Left(state) = item
&& matches!(state.peek(), Some((i, _)) if i == row_idx)
{
state.next().await?;
}
}
}
}
if let Some(chunk) = builder.consume_all() {
yield chunk;
}
}
}
}
#[async_trait::async_trait]
impl BoxedExecutorBuilder for ProjectSetExecutor {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [child]: [_; 1] = inputs.try_into().unwrap();
let project_set_node = try_match_expand!(
source.plan_node().get_node_body().unwrap(),
NodeBody::ProjectSet
)?;
let select_list: Vec<_> = project_set_node
.get_select_list()
.iter()
.map(|proto| {
ProjectSetSelectItem::from_prost(
proto,
source.context.get_config().developer.chunk_size,
)
})
.try_collect()?;
let mut fields = vec![Field::with_name(DataType::Int64, "projected_row_id")];
fields.extend(
select_list
.iter()
.map(|expr| Field::unnamed(expr.return_type())),
);
Ok(Box::new(Self {
select_list,
child,
schema: Schema { fields },
identity: source.plan_node().get_identity().clone(),
chunk_size: source.context.get_config().developer.chunk_size,
}))
}
}
#[derive(Debug)]
pub enum ProjectSetSelectItem {
Scalar(BoxedExpression),
Set(BoxedTableFunction),
}
impl From<BoxedTableFunction> for ProjectSetSelectItem {
fn from(table_function: BoxedTableFunction) -> Self {
ProjectSetSelectItem::Set(table_function)
}
}
impl From<BoxedExpression> for ProjectSetSelectItem {
fn from(expr: BoxedExpression) -> Self {
ProjectSetSelectItem::Scalar(expr)
}
}
impl ProjectSetSelectItem {
pub fn from_prost(prost: &PbProjectSetSelectItem, chunk_size: usize) -> Result<Self> {
Ok(match prost.select_item.as_ref().unwrap() {
PbSelectItem::Expr(expr) => Self::Scalar(expr::build_from_prost(expr)?),
PbSelectItem::TableFunction(tf) => {
Self::Set(table_function::build_from_prost(tf, chunk_size)?)
}
})
}
pub fn return_type(&self) -> DataType {
match self {
ProjectSetSelectItem::Scalar(expr) => expr.return_type(),
ProjectSetSelectItem::Set(tf) => tf.return_type(),
}
}
pub async fn eval<'a>(
&'a self,
input: &'a DataChunk,
) -> Result<Either<TableFunctionOutputIter<'a>, ArrayRef>> {
match self {
Self::Set(tf) => Ok(Either::Left(
TableFunctionOutputIter::new(tf.eval(input).await).await?,
)),
Self::Scalar(expr) => Ok(Either::Right(expr.eval(input).await?)),
}
}
}
#[cfg(test)]
mod tests {
use futures::stream::StreamExt;
use futures_async_stream::for_await;
use risingwave_common::test_prelude::*;
use risingwave_expr::expr::{ExpressionBoxExt, InputRefExpression, LiteralExpression};
use risingwave_expr::table_function::repeat;
use super::*;
use crate::executor::test_utils::MockExecutor;
use crate::executor::ValuesExecutor;
use crate::*;
const CHUNK_SIZE: usize = 1024;
#[tokio::test]
async fn test_project_set_executor() -> Result<()> {
let chunk = DataChunk::from_pretty(
"i i
1 7
2 8
33333 66666
4 4
5 3",
);
let expr1 = InputRefExpression::new(DataType::Int32, 0);
let expr2 = repeat(
LiteralExpression::new(DataType::Int32, Some(1_i32.into())).boxed(),
2,
);
let expr3 = repeat(
LiteralExpression::new(DataType::Int32, Some(2_i32.into())).boxed(),
3,
);
let select_list: Vec<ProjectSetSelectItem> =
vec![expr1.boxed().into(), expr2.into(), expr3.into()];
let schema = schema_unnamed! { DataType::Int32, DataType::Int32 };
let mut mock_executor = MockExecutor::new(schema);
mock_executor.add(chunk);
let fields = select_list
.iter()
.map(|expr| Field::unnamed(expr.return_type()))
.collect::<Vec<Field>>();
let proj_executor = Box::new(ProjectSetExecutor {
select_list,
child: Box::new(mock_executor),
schema: Schema { fields },
identity: "ProjectSetExecutor".to_string(),
chunk_size: CHUNK_SIZE,
});
let fields = &proj_executor.schema().fields;
assert_eq!(fields[0].data_type, DataType::Int32);
let expected = [DataChunk::from_pretty(
"I i i i
0 1 1 2
1 1 1 2
2 1 . 2
0 2 1 2
1 2 1 2
2 2 . 2
0 33333 1 2
1 33333 1 2
2 33333 . 2
0 4 1 2
1 4 1 2
2 4 . 2
0 5 1 2
1 5 1 2
2 5 . 2",
)];
#[for_await]
for (i, result_chunk) in proj_executor.execute().enumerate() {
let result_chunk = result_chunk?;
assert_eq!(result_chunk, expected[i]);
}
Ok(())
}
#[tokio::test]
async fn test_project_set_dummy_chunk() {
let literal = LiteralExpression::new(DataType::Int32, Some(1_i32.into()));
let tf = repeat(
LiteralExpression::new(DataType::Int32, Some(2_i32.into())).boxed(),
2,
);
let values_executor2: Box<dyn Executor> = Box::new(ValuesExecutor::new(
vec![vec![]], Schema::default(),
"ValuesExecutor".to_string(),
CHUNK_SIZE,
));
let proj_executor = Box::new(ProjectSetExecutor {
select_list: vec![literal.boxed().into(), tf.into()],
child: values_executor2,
schema: schema_unnamed!(DataType::Int32, DataType::Int32),
identity: "ProjectSetExecutor2".to_string(),
chunk_size: CHUNK_SIZE,
});
let mut stream = proj_executor.execute();
let chunk = stream.next().await.unwrap().unwrap();
assert_eq!(
chunk,
DataChunk::from_pretty(
"I i i
0 1 2
1 1 2",
),
);
}
}