risingwave_expr/table_function/mod.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use futures_util::StreamExt;
17use futures_util::stream::BoxStream;
18use risingwave_common::array::{Array, ArrayBuilder, ArrayImpl, DataChunk};
19use risingwave_common::types::{DataType, DatumRef};
20use risingwave_pb::expr::PbTableFunction;
21use risingwave_pb::expr::table_function::PbType;
22
23use super::{ExprError, Result};
24use crate::expr::{BoxedExpression, build_from_prost as expr_build_from_prost};
25
26mod empty;
27mod repeat;
28mod user_defined;
29
30pub use self::empty::*;
31pub use self::repeat::*;
32use self::user_defined::*;
33
34/// A table function takes a row as input and returns multiple rows as output.
35///
36/// It is also known as Set-Returning Function.
37#[async_trait::async_trait]
38pub trait TableFunction: std::fmt::Debug + Sync + Send {
39 /// The data type of the output.
40 fn return_type(&self) -> DataType;
41
42 /// # Contract of the output
43 ///
44 /// The returned `DataChunk` contains two or three columns:
45 /// - The first column is an I32Array containing row indices of input chunk. It should be
46 /// monotonically increasing.
47 /// - The second column is the output values. The data type of the column is `return_type`.
48 /// - (Optional) If any error occurs, the error message is stored in the third column.
49 ///
50 /// i.e., for the `i`-th input row, the output rows are `(i, output_1)`, `(i, output_2)`, ...
51 ///
52 /// How the output is split into the `Stream` is arbitrary. It's usually done by a
53 /// `DataChunkBuilder`.
54 ///
55 /// ## Example
56 ///
57 /// ```text
58 /// select generate_series(1, x) from t(x);
59 ///
60 /// # input chunk output chunks
61 /// 1 --------------> 0 1
62 /// 2 --------------> 1 1
63 /// 3 ----┐ ---
64 /// │ 1 2
65 /// └---------> 2 1
66 /// ---
67 /// 2 2
68 /// 2 3
69 /// row idx--^ ^--values
70 /// ```
71 ///
72 /// # Relationship with `ProjectSet` executor
73 ///
74 /// (You don't need to understand this section to implement a `TableFunction`)
75 ///
76 /// The output of the `TableFunction` is different from the output of the `ProjectSet` executor.
77 /// `ProjectSet` executor uses the row indices to stitch multiple table functions and produces
78 /// `projected_row_id`.
79 ///
80 /// ## Example
81 ///
82 /// ```text
83 /// select generate_series(1, x) from t(x);
84 ///
85 /// # input chunk output chunks (TableFunction) output chunks (ProjectSet)
86 /// 1 --------------> 0 1 -------------------------> 0 1
87 /// 2 --------------> 1 1 -------------------------> 0 1
88 /// 3 ----┐ --- ---
89 /// │ 1 2 1 2
90 /// └---------> 2 1 -------------------------> 0 1
91 /// --- ---
92 /// 2 2 1 2
93 /// 2 3 2 3
94 /// row idx--^ ^--values projected_row_id--^ ^--values
95 /// ```
96 async fn eval<'a>(&'a self, input: &'a DataChunk) -> BoxStream<'a, Result<DataChunk>>;
97
98 fn boxed(self) -> BoxedTableFunction
99 where
100 Self: Sized + Send + 'static,
101 {
102 Box::new(self)
103 }
104}
105
106pub type BoxedTableFunction = Box<dyn TableFunction>;
107
108pub fn build_from_prost(prost: &PbTableFunction, chunk_size: usize) -> Result<BoxedTableFunction> {
109 if prost.get_function_type().unwrap() == PbType::UserDefined {
110 return new_user_defined(prost, chunk_size);
111 }
112
113 build(
114 prost.get_function_type().unwrap(),
115 prost.get_return_type()?.into(),
116 chunk_size,
117 prost.args.iter().map(expr_build_from_prost).try_collect()?,
118 )
119}
120
121/// Build a table function.
122pub fn build(
123 func: PbType,
124 return_type: DataType,
125 chunk_size: usize,
126 children: Vec<BoxedExpression>,
127) -> Result<BoxedTableFunction> {
128 use itertools::Itertools;
129 let args = children.iter().map(|t| t.return_type()).collect_vec();
130 let desc = crate::sig::FUNCTION_REGISTRY.get(func, &args, &return_type)?;
131 desc.build_table(return_type, chunk_size, children)
132}
133
134/// A wrapper over the output of table function that allows iteration by rows.
135///
136/// If the table function returns multiple columns, the output will be struct values.
137///
138/// Note that to get datum reference for efficiency, this iterator doesn't follow the standard
139/// `Stream` API. Instead, it provides a `peek` method to get the next row without consuming it,
140/// and a `next` method to consume the next row.
141///
142/// ```
143/// # use futures_util::StreamExt;
144/// # use risingwave_common::array::{DataChunk, DataChunkTestExt};
145/// # use risingwave_expr::table_function::TableFunctionOutputIter;
146/// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
147/// let mut iter = TableFunctionOutputIter::new(
148/// futures_util::stream::iter([
149/// DataChunk::from_pretty(
150/// "i I
151/// 0 0
152/// 1 1",
153/// ),
154/// DataChunk::from_pretty(
155/// "i I
156/// 2 2
157/// 3 3",
158/// ),
159/// ])
160/// .map(Ok)
161/// .boxed(),
162/// )
163/// .await.unwrap();
164///
165/// for i in 0..4 {
166/// let (index, value) = iter.peek().unwrap();
167/// assert_eq!(index, i);
168/// assert_eq!(value.unwrap(), Some((i as i64).into()));
169/// iter.next().await.unwrap();
170/// }
171/// assert!(iter.peek().is_none());
172/// # });
173/// ```
174pub struct TableFunctionOutputIter<'a> {
175 stream: BoxStream<'a, Result<DataChunk>>,
176 chunk: Option<DataChunk>,
177 index: usize,
178}
179
180impl<'a> TableFunctionOutputIter<'a> {
181 pub async fn new(
182 stream: BoxStream<'a, Result<DataChunk>>,
183 ) -> Result<TableFunctionOutputIter<'a>> {
184 let mut iter = Self {
185 stream,
186 chunk: None,
187 index: 0,
188 };
189 iter.pop_from_stream().await?;
190 Ok(iter)
191 }
192
193 /// Gets the current row.
194 pub fn peek(&'a self) -> Option<(usize, Result<DatumRef<'a>>)> {
195 let chunk = self.chunk.as_ref()?;
196 let index = chunk.column_at(0).as_int32().value_at(self.index).unwrap() as usize;
197 let result = if let Some(msg) = chunk
198 .columns()
199 .get(2)
200 .and_then(|errors| errors.as_utf8().value_at(self.index))
201 {
202 Err(ExprError::Custom(msg.into()))
203 } else {
204 Ok(chunk.column_at(1).value_at(self.index))
205 };
206 Some((index, result))
207 }
208
209 /// Moves to the next row.
210 ///
211 /// This method is cancellation safe.
212 pub async fn next(&mut self) -> Result<()> {
213 let Some(chunk) = &self.chunk else {
214 return Ok(());
215 };
216 if self.index + 1 == chunk.capacity() {
217 // note: for cancellation safety, do not mutate self before await.
218 self.pop_from_stream().await?;
219 self.index = 0;
220 } else {
221 self.index += 1;
222 }
223 Ok(())
224 }
225
226 /// Gets the next chunk from stream.
227 async fn pop_from_stream(&mut self) -> Result<()> {
228 self.chunk = self.stream.next().await.transpose()?;
229 Ok(())
230 }
231}
232
233/// Checks if the output chunk returned by `TableFunction::eval` contains any error.
234pub fn check_error(chunk: &DataChunk) -> Result<()> {
235 if let Some(errors) = chunk.columns().get(2) {
236 if errors.null_bitmap().any() {
237 return Err(ExprError::Custom(
238 errors
239 .as_utf8()
240 .iter()
241 .find_map(|s| s)
242 .expect("no error message")
243 .into(),
244 ));
245 }
246 }
247 Ok(())
248}