risingwave_expr/table_function/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use futures_util::StreamExt;
17use futures_util::stream::BoxStream;
18use risingwave_common::array::{Array, ArrayBuilder, ArrayImpl, DataChunk};
19use risingwave_common::types::{DataType, DatumRef};
20use risingwave_pb::expr::PbTableFunction;
21use risingwave_pb::expr::table_function::PbType;
22
23use super::{ExprError, Result};
24use crate::expr::{BoxedExpression, build_from_prost as expr_build_from_prost};
25
26mod empty;
27mod repeat;
28mod user_defined;
29
30pub use self::empty::*;
31pub use self::repeat::*;
32use self::user_defined::*;
33
34/// A table function takes a row as input and returns multiple rows as output.
35///
36/// It is also known as Set-Returning Function.
37#[async_trait::async_trait]
38pub trait TableFunction: std::fmt::Debug + Sync + Send {
39    /// The data type of the output.
40    fn return_type(&self) -> DataType;
41
42    /// # Contract of the output
43    ///
44    /// The returned `DataChunk` contains two or three columns:
45    /// - The first column is an I32Array containing row indices of input chunk. It should be
46    ///   monotonically increasing.
47    /// - The second column is the output values. The data type of the column is `return_type`.
48    /// - (Optional) If any error occurs, the error message is stored in the third column.
49    ///
50    /// i.e., for the `i`-th input row, the output rows are `(i, output_1)`, `(i, output_2)`, ...
51    ///
52    /// How the output is split into the `Stream` is arbitrary. It's usually done by a
53    /// `DataChunkBuilder`.
54    ///
55    /// ## Example
56    ///
57    /// ```text
58    /// select generate_series(1, x) from t(x);
59    ///
60    /// # input chunk     output chunks
61    /// 1 --------------> 0 1
62    /// 2 --------------> 1 1
63    /// 3 ----┐           ---
64    ///       │           1 2
65    ///       └---------> 2 1
66    ///                   ---
67    ///                   2 2
68    ///                   2 3
69    ///          row idx--^ ^--values
70    /// ```
71    ///
72    /// # Relationship with `ProjectSet` executor
73    ///
74    /// (You don't need to understand this section to implement a `TableFunction`)
75    ///
76    /// The output of the `TableFunction` is different from the output of the `ProjectSet` executor.
77    /// `ProjectSet` executor uses the row indices to stitch multiple table functions and produces
78    /// `projected_row_id`.
79    ///
80    /// ## Example
81    ///
82    /// ```text
83    /// select generate_series(1, x) from t(x);
84    ///
85    /// # input chunk     output chunks (TableFunction)  output chunks (ProjectSet)
86    /// 1 --------------> 0 1 -------------------------> 0 1
87    /// 2 --------------> 1 1 -------------------------> 0 1
88    /// 3 ----┐           ---                            ---
89    ///       │           1 2                            1 2
90    ///       └---------> 2 1 -------------------------> 0 1
91    ///                   ---                            ---
92    ///                   2 2                            1 2
93    ///                   2 3                            2 3
94    ///          row idx--^ ^--values  projected_row_id--^ ^--values
95    /// ```
96    async fn eval<'a>(&'a self, input: &'a DataChunk) -> BoxStream<'a, Result<DataChunk>>;
97
98    fn boxed(self) -> BoxedTableFunction
99    where
100        Self: Sized + Send + 'static,
101    {
102        Box::new(self)
103    }
104}
105
106pub type BoxedTableFunction = Box<dyn TableFunction>;
107
108pub fn build_from_prost(prost: &PbTableFunction, chunk_size: usize) -> Result<BoxedTableFunction> {
109    if prost.get_function_type().unwrap() == PbType::UserDefined {
110        return new_user_defined(prost, chunk_size);
111    }
112
113    build(
114        prost.get_function_type().unwrap(),
115        prost.get_return_type()?.into(),
116        chunk_size,
117        prost.args.iter().map(expr_build_from_prost).try_collect()?,
118    )
119}
120
121/// Build a table function.
122pub fn build(
123    func: PbType,
124    return_type: DataType,
125    chunk_size: usize,
126    children: Vec<BoxedExpression>,
127) -> Result<BoxedTableFunction> {
128    use itertools::Itertools;
129    let args = children.iter().map(|t| t.return_type()).collect_vec();
130    let desc = crate::sig::FUNCTION_REGISTRY.get(func, &args, &return_type)?;
131    desc.build_table(return_type, chunk_size, children)
132}
133
134/// A wrapper over the output of table function that allows iteration by rows.
135///
136/// If the table function returns multiple columns, the output will be struct values.
137///
138/// Note that to get datum reference for efficiency, this iterator doesn't follow the standard
139/// `Stream` API. Instead, it provides a `peek` method to get the next row without consuming it,
140/// and a `next` method to consume the next row.
141///
142/// ```
143/// # use futures_util::StreamExt;
144/// # use risingwave_common::array::{DataChunk, DataChunkTestExt};
145/// # use risingwave_expr::table_function::TableFunctionOutputIter;
146/// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
147/// let mut iter = TableFunctionOutputIter::new(
148///     futures_util::stream::iter([
149///         DataChunk::from_pretty(
150///             "i I
151///              0 0
152///              1 1",
153///         ),
154///         DataChunk::from_pretty(
155///             "i I
156///              2 2
157///              3 3",
158///         ),
159///     ])
160///     .map(Ok)
161///     .boxed(),
162/// )
163/// .await.unwrap();
164///
165/// for i in 0..4 {
166///     let (index, value) = iter.peek().unwrap();
167///     assert_eq!(index, i);
168///     assert_eq!(value.unwrap(), Some((i as i64).into()));
169///     iter.next().await.unwrap();
170/// }
171/// assert!(iter.peek().is_none());
172/// # });
173/// ```
174pub struct TableFunctionOutputIter<'a> {
175    stream: BoxStream<'a, Result<DataChunk>>,
176    chunk: Option<DataChunk>,
177    index: usize,
178}
179
180impl<'a> TableFunctionOutputIter<'a> {
181    pub async fn new(
182        stream: BoxStream<'a, Result<DataChunk>>,
183    ) -> Result<TableFunctionOutputIter<'a>> {
184        let mut iter = Self {
185            stream,
186            chunk: None,
187            index: 0,
188        };
189        iter.pop_from_stream().await?;
190        Ok(iter)
191    }
192
193    /// Gets the current row.
194    pub fn peek(&'a self) -> Option<(usize, Result<DatumRef<'a>>)> {
195        let chunk = self.chunk.as_ref()?;
196        let index = chunk.column_at(0).as_int32().value_at(self.index).unwrap() as usize;
197        let result = if let Some(msg) = chunk
198            .columns()
199            .get(2)
200            .and_then(|errors| errors.as_utf8().value_at(self.index))
201        {
202            Err(ExprError::Custom(msg.into()))
203        } else {
204            Ok(chunk.column_at(1).value_at(self.index))
205        };
206        Some((index, result))
207    }
208
209    /// Moves to the next row.
210    ///
211    /// This method is cancellation safe.
212    pub async fn next(&mut self) -> Result<()> {
213        let Some(chunk) = &self.chunk else {
214            return Ok(());
215        };
216        if self.index + 1 == chunk.capacity() {
217            // note: for cancellation safety, do not mutate self before await.
218            self.pop_from_stream().await?;
219            self.index = 0;
220        } else {
221            self.index += 1;
222        }
223        Ok(())
224    }
225
226    /// Gets the next chunk from stream.
227    async fn pop_from_stream(&mut self) -> Result<()> {
228        self.chunk = self.stream.next().await.transpose()?;
229        Ok(())
230    }
231}
232
233/// Checks if the output chunk returned by `TableFunction::eval` contains any error.
234pub fn check_error(chunk: &DataChunk) -> Result<()> {
235    if let Some(errors) = chunk.columns().get(2) {
236        if errors.null_bitmap().any() {
237            return Err(ExprError::Custom(
238                errors
239                    .as_utf8()
240                    .iter()
241                    .find_map(|s| s)
242                    .expect("no error message")
243                    .into(),
244            ));
245        }
246    }
247    Ok(())
248}