risingwave_batch_executors/executor/
table_function.rs1use futures_async_stream::try_stream;
16use risingwave_common::array::{ArrayImpl, DataChunk};
17use risingwave_common::catalog::{Field, Schema};
18use risingwave_common::types::DataType;
19use risingwave_expr::table_function::{BoxedTableFunction, build_from_prost, check_error};
20use risingwave_pb::batch_plan::plan_node::NodeBody;
21
22use super::{BoxedExecutor, BoxedExecutorBuilder};
23use crate::error::{BatchError, Result};
24use crate::executor::{BoxedDataChunkStream, Executor, ExecutorBuilder};
25
26pub struct TableFunctionExecutor {
27 schema: Schema,
28 identity: String,
29 table_function: BoxedTableFunction,
30 #[expect(dead_code)]
31 chunk_size: usize,
32}
33
34impl Executor for TableFunctionExecutor {
35 fn schema(&self) -> &Schema {
36 &self.schema
37 }
38
39 fn identity(&self) -> &str {
40 &self.identity
41 }
42
43 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
44 self.do_execute()
45 }
46}
47
48impl TableFunctionExecutor {
49 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
50 async fn do_execute(self: Box<Self>) {
51 let dummy_chunk = DataChunk::new_dummy(1);
52
53 #[for_await]
54 for chunk in self.table_function.eval(&dummy_chunk).await {
55 let chunk = chunk?;
56 check_error(&chunk)?;
57 yield match chunk.column_at(1).as_ref() {
59 ArrayImpl::Struct(struct_array) => struct_array.into(),
60 _ => chunk.split_column_at(1).1,
61 };
62 }
63 }
64}
65
66pub struct TableFunctionExecutorBuilder {}
67
68impl TableFunctionExecutorBuilder {}
69
70impl BoxedExecutorBuilder for TableFunctionExecutorBuilder {
71 async fn new_boxed_executor(
72 source: &ExecutorBuilder<'_>,
73 inputs: Vec<BoxedExecutor>,
74 ) -> Result<BoxedExecutor> {
75 ensure!(
76 inputs.is_empty(),
77 "GenerateSeriesExecutor should not have child!"
78 );
79 let node = try_match_expand!(
80 source.plan_node().get_node_body().unwrap(),
81 NodeBody::TableFunction
82 )?;
83
84 let identity = source.plan_node().get_identity().clone();
85
86 let chunk_size = source.context().get_config().developer.chunk_size;
87
88 let table_function = build_from_prost(node.table_function.as_ref().unwrap(), chunk_size)?;
89
90 let schema = if let DataType::Struct(fields) = table_function.return_type() {
91 (&fields).into()
92 } else {
93 Schema {
94 fields: vec![Field::unnamed(table_function.return_type())],
96 }
97 };
98
99 Ok(Box::new(TableFunctionExecutor {
100 schema,
101 identity,
102 table_function,
103 chunk_size,
104 }))
105 }
106}