risingwave_expr/table_function/
repeat.rsuse risingwave_common::array::I32ArrayBuilder;
use super::*;
pub fn repeat(expr: BoxedExpression, n: usize) -> BoxedTableFunction {
RepeatN { expr, n }.boxed()
}
#[derive(Debug)]
struct RepeatN {
expr: BoxedExpression,
n: usize,
}
#[async_trait::async_trait]
impl TableFunction for RepeatN {
fn return_type(&self) -> DataType {
self.expr.return_type()
}
async fn eval<'a>(&'a self, input: &'a DataChunk) -> BoxStream<'a, Result<DataChunk>> {
self.eval_inner(input)
}
}
impl RepeatN {
#[try_stream(boxed, ok = DataChunk, error = ExprError)]
async fn eval_inner<'a>(&'a self, input: &'a DataChunk) {
let array = self.expr.eval(input).await?;
let mut index_builder = I32ArrayBuilder::new(0x100);
let mut value_builder = self.return_type().create_array_builder(0x100);
for (i, value) in array.iter().enumerate() {
index_builder.append_n(self.n, Some(i as i32));
value_builder.append_n(self.n, value);
}
let len = index_builder.len();
let index_array: ArrayImpl = index_builder.finish().into();
let value_array = value_builder.finish();
yield DataChunk::new(vec![index_array.into(), value_array.into()], len);
}
}