risingwave_expr/table_function/
repeat.rs1use risingwave_common::array::I32ArrayBuilder;
16
17use super::*;
18
19pub fn repeat(expr: BoxedExpression, n: usize) -> BoxedTableFunction {
23 RepeatN { expr, n }.boxed()
24}
25
26#[derive(Debug)]
27struct RepeatN {
28 expr: BoxedExpression,
29 n: usize,
30}
31
32#[async_trait::async_trait]
33impl TableFunction for RepeatN {
34 fn return_type(&self) -> DataType {
35 self.expr.return_type()
36 }
37
38 async fn eval<'a>(&'a self, input: &'a DataChunk) -> BoxStream<'a, Result<DataChunk>> {
39 self.eval_inner(input)
40 }
41}
42
43impl RepeatN {
44 #[try_stream(boxed, ok = DataChunk, error = ExprError)]
45 async fn eval_inner<'a>(&'a self, input: &'a DataChunk) {
46 let array = self.expr.eval(input).await?;
47
48 let mut index_builder = I32ArrayBuilder::new(0x100);
49 let mut value_builder = self.return_type().create_array_builder(0x100);
50 for (i, value) in array.iter().enumerate() {
51 index_builder.append_n(self.n, Some(i as i32));
52 value_builder.append_n(self.n, value);
53 }
54 let len = index_builder.len();
55 let index_array: ArrayImpl = index_builder.finish().into();
56 let value_array = value_builder.finish();
57 yield DataChunk::new(vec![index_array.into(), value_array.into()], len);
58 }
59}