risingwave_expr/table_function/
repeat.rs1use risingwave_common::array::I32ArrayBuilder;
16
17use super::*;
18
19pub fn repeat(expr: BoxedExpression, n: usize) -> BoxedTableFunction {
23    RepeatN { expr, n }.boxed()
24}
25
26#[derive(Debug)]
27struct RepeatN {
28    expr: BoxedExpression,
29    n: usize,
30}
31
32#[async_trait::async_trait]
33impl TableFunction for RepeatN {
34    fn return_type(&self) -> DataType {
35        self.expr.return_type()
36    }
37
38    async fn eval<'a>(&'a self, input: &'a DataChunk) -> BoxStream<'a, Result<DataChunk>> {
39        self.eval_inner(input)
40    }
41}
42
43impl RepeatN {
44    #[try_stream(boxed, ok = DataChunk, error = ExprError)]
45    async fn eval_inner<'a>(&'a self, input: &'a DataChunk) {
46        let array = self.expr.eval(input).await?;
47
48        let mut index_builder = I32ArrayBuilder::new(0x100);
49        let mut value_builder = self.return_type().create_array_builder(0x100);
50        for (i, value) in array.iter().enumerate() {
51            index_builder.append_n(self.n, Some(i as i32));
52            value_builder.append_n(self.n, value);
53        }
54        let len = index_builder.len();
55        let index_array: ArrayImpl = index_builder.finish().into();
56        let value_array = value_builder.finish();
57        yield DataChunk::new(vec![index_array.into(), value_array.into()], len);
58    }
59}