risingwave_batch/executor/aggregation/
projection.rsuse std::ops::Range;
use risingwave_common::array::StreamChunk;
use risingwave_common::types::{DataType, Datum};
use risingwave_expr::aggregate::{AggregateFunction, AggregateState, BoxedAggregateFunction};
use risingwave_expr::Result;
pub struct Projection {
inner: BoxedAggregateFunction,
indices: Vec<usize>,
}
impl Projection {
pub fn new(indices: Vec<usize>, inner: BoxedAggregateFunction) -> Self {
Self { inner, indices }
}
}
#[async_trait::async_trait]
impl AggregateFunction for Projection {
fn return_type(&self) -> DataType {
self.inner.return_type()
}
fn create_state(&self) -> Result<AggregateState> {
self.inner.create_state()
}
async fn update(&self, state: &mut AggregateState, input: &StreamChunk) -> Result<()> {
self.inner
.update(state, &input.project(&self.indices))
.await
}
async fn update_range(
&self,
state: &mut AggregateState,
input: &StreamChunk,
range: Range<usize>,
) -> Result<()> {
self.inner
.update_range(state, &input.project(&self.indices), range)
.await
}
async fn get_result(&self, state: &AggregateState) -> Result<Datum> {
self.inner.get_result(state).await
}
}