risingwave_expr/table_function/
repeat.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::I32ArrayBuilder;
16
17use super::*;
18
19/// Repeat an expression n times.
20///
21/// Should only be used for tests now.
22pub fn repeat(expr: BoxedExpression, n: usize) -> BoxedTableFunction {
23    RepeatN { expr, n }.boxed()
24}
25
26#[derive(Debug)]
27struct RepeatN {
28    expr: BoxedExpression,
29    n: usize,
30}
31
32#[async_trait::async_trait]
33impl TableFunction for RepeatN {
34    fn return_type(&self) -> DataType {
35        self.expr.return_type()
36    }
37
38    async fn eval<'a>(&'a self, input: &'a DataChunk) -> BoxStream<'a, Result<DataChunk>> {
39        self.eval_inner(input)
40    }
41}
42
43impl RepeatN {
44    #[try_stream(boxed, ok = DataChunk, error = ExprError)]
45    async fn eval_inner<'a>(&'a self, input: &'a DataChunk) {
46        let array = self.expr.eval(input).await?;
47
48        let mut index_builder = I32ArrayBuilder::new(0x100);
49        let mut value_builder = self.return_type().create_array_builder(0x100);
50        for (i, value) in array.iter().enumerate() {
51            index_builder.append_n(self.n, Some(i as i32));
52            value_builder.append_n(self.n, value);
53        }
54        let len = index_builder.len();
55        let index_array: ArrayImpl = index_builder.finish().into();
56        let value_array = value_builder.finish();
57        yield DataChunk::new(vec![index_array.into(), value_array.into()], len);
58    }
59}