risingwave_batch_executors/executor/
project_set.rs1use either::Either;
16use futures_async_stream::try_stream;
17use itertools::Itertools;
18use risingwave_common::array::{ArrayRef, DataChunk};
19use risingwave_common::catalog::{Field, PROJECTED_ROW_ID_COLUMN_NAME, Schema};
20use risingwave_common::types::{DataType, DatumRef};
21use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
22use risingwave_common::util::iter_util::ZipEqFast;
23use risingwave_expr::expr::{self, BoxedExpression};
24use risingwave_expr::table_function::{self, BoxedTableFunction, TableFunctionOutputIter};
25use risingwave_pb::batch_plan::plan_node::NodeBody;
26use risingwave_pb::expr::PbProjectSetSelectItem;
27use risingwave_pb::expr::project_set_select_item::PbSelectItem;
28
29use crate::error::{BatchError, Result};
30use crate::executor::{
31 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
32};
33
34pub struct ProjectSetExecutor {
35 select_list: Vec<ProjectSetSelectItem>,
36 child: BoxedExecutor,
37 schema: Schema,
38 identity: String,
39 chunk_size: usize,
40}
41
42impl Executor for ProjectSetExecutor {
43 fn schema(&self) -> &Schema {
44 &self.schema
45 }
46
47 fn identity(&self) -> &str {
48 &self.identity
49 }
50
51 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
52 self.do_execute()
53 }
54}
55
56impl ProjectSetExecutor {
57 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
58 async fn do_execute(self: Box<Self>) {
59 assert!(!self.select_list.is_empty());
60
61 let mut builder = DataChunkBuilder::new(
64 std::iter::once(DataType::Int64)
65 .chain(self.select_list.iter().map(|i| i.return_type()))
66 .collect(),
67 self.chunk_size,
68 );
69 let mut row = vec![None as DatumRef<'_>; builder.num_columns()];
71
72 #[for_await]
73 for input in self.child.execute() {
74 let input = input?;
75
76 let mut results = Vec::with_capacity(self.select_list.len());
77 for select_item in &self.select_list {
78 let result = select_item.eval(&input).await?;
79 results.push(result);
80 }
81
82 for row_idx in 0..input.capacity() {
84 for projected_row_id in 0i64.. {
86 let row: &mut [DatumRef<'_>] =
90 unsafe { std::mem::transmute(row.as_mut_slice()) };
91 row[0] = Some(projected_row_id.into());
92 let mut valid = false;
94 for (item, value) in results.iter_mut().zip_eq_fast(&mut row[1..]) {
96 *value = match item {
97 Either::Left(state) => {
98 if let Some((i, value)) = state.peek()
99 && i == row_idx
100 {
101 valid = true;
102 value?
103 } else {
104 None
105 }
106 }
107 Either::Right(array) => array.value_at(row_idx),
108 };
109 }
110 if !valid {
111 break;
113 }
114 if let Some(chunk) = builder.append_one_row(&*row) {
115 yield chunk;
116 }
117 for item in &mut results {
119 if let Either::Left(state) = item
120 && matches!(state.peek(), Some((i, _)) if i == row_idx)
121 {
122 state.next().await?;
123 }
124 }
125 }
126 }
127 if let Some(chunk) = builder.consume_all() {
128 yield chunk;
129 }
130 }
131 }
132}
133
134impl BoxedExecutorBuilder for ProjectSetExecutor {
135 async fn new_boxed_executor(
136 source: &ExecutorBuilder<'_>,
137 inputs: Vec<BoxedExecutor>,
138 ) -> Result<BoxedExecutor> {
139 let [child]: [_; 1] = inputs.try_into().unwrap();
140
141 let project_set_node = try_match_expand!(
142 source.plan_node().get_node_body().unwrap(),
143 NodeBody::ProjectSet
144 )?;
145
146 let select_list: Vec<_> = project_set_node
147 .get_select_list()
148 .iter()
149 .map(|proto| {
150 ProjectSetSelectItem::from_prost(
151 proto,
152 source.context().get_config().developer.chunk_size,
153 )
154 })
155 .try_collect()?;
156
157 let mut fields = vec![Field::with_name(
158 DataType::Int64,
159 PROJECTED_ROW_ID_COLUMN_NAME,
160 )];
161 fields.extend(
162 select_list
163 .iter()
164 .map(|expr| Field::unnamed(expr.return_type())),
165 );
166
167 Ok(Box::new(Self {
168 select_list,
169 child,
170 schema: Schema { fields },
171 identity: source.plan_node().get_identity().clone(),
172 chunk_size: source.context().get_config().developer.chunk_size,
173 }))
174 }
175}
176
177#[derive(Debug)]
181pub enum ProjectSetSelectItem {
182 Scalar(BoxedExpression),
183 Set(BoxedTableFunction),
184}
185
186impl From<BoxedTableFunction> for ProjectSetSelectItem {
187 fn from(table_function: BoxedTableFunction) -> Self {
188 ProjectSetSelectItem::Set(table_function)
189 }
190}
191
192impl From<BoxedExpression> for ProjectSetSelectItem {
193 fn from(expr: BoxedExpression) -> Self {
194 ProjectSetSelectItem::Scalar(expr)
195 }
196}
197
198impl ProjectSetSelectItem {
199 pub fn from_prost(prost: &PbProjectSetSelectItem, chunk_size: usize) -> Result<Self> {
200 Ok(match prost.select_item.as_ref().unwrap() {
201 PbSelectItem::Expr(expr) => Self::Scalar(expr::build_from_prost(expr)?),
202 PbSelectItem::TableFunction(tf) => {
203 Self::Set(table_function::build_from_prost(tf, chunk_size)?)
204 }
205 })
206 }
207
208 pub fn return_type(&self) -> DataType {
209 match self {
210 ProjectSetSelectItem::Scalar(expr) => expr.return_type(),
211 ProjectSetSelectItem::Set(tf) => tf.return_type(),
212 }
213 }
214
215 pub async fn eval<'a>(
216 &'a self,
217 input: &'a DataChunk,
218 ) -> Result<Either<TableFunctionOutputIter<'a>, ArrayRef>> {
219 match self {
220 Self::Set(tf) => Ok(Either::Left(
221 TableFunctionOutputIter::new(tf.eval(input).await).await?,
222 )),
223 Self::Scalar(expr) => Ok(Either::Right(expr.eval(input).await?)),
224 }
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use futures::stream::StreamExt;
231 use futures_async_stream::for_await;
232 use risingwave_common::test_prelude::*;
233 use risingwave_expr::expr::{ExpressionBoxExt, InputRefExpression, LiteralExpression};
234 use risingwave_expr::table_function::repeat;
235
236 use super::*;
237 use crate::executor::ValuesExecutor;
238 use crate::executor::test_utils::MockExecutor;
239 use crate::*;
240
241 const CHUNK_SIZE: usize = 1024;
242
243 #[tokio::test]
244 async fn test_project_set_executor() -> Result<()> {
245 let chunk = DataChunk::from_pretty(
246 "i i
247 1 7
248 2 8
249 33333 66666
250 4 4
251 5 3",
252 );
253
254 let expr1 = InputRefExpression::new(DataType::Int32, 0);
255 let expr2 = repeat(
256 LiteralExpression::new(DataType::Int32, Some(1_i32.into())).boxed(),
257 2,
258 );
259 let expr3 = repeat(
260 LiteralExpression::new(DataType::Int32, Some(2_i32.into())).boxed(),
261 3,
262 );
263 let select_list: Vec<ProjectSetSelectItem> =
264 vec![expr1.boxed().into(), expr2.into(), expr3.into()];
265
266 let schema = schema_unnamed! { DataType::Int32, DataType::Int32 };
267 let mut mock_executor = MockExecutor::new(schema);
268 mock_executor.add(chunk);
269
270 let fields = select_list
271 .iter()
272 .map(|expr| Field::unnamed(expr.return_type()))
273 .collect::<Vec<Field>>();
274
275 let proj_executor = Box::new(ProjectSetExecutor {
276 select_list,
277 child: Box::new(mock_executor),
278 schema: Schema { fields },
279 identity: "ProjectSetExecutor".to_owned(),
280 chunk_size: CHUNK_SIZE,
281 });
282
283 let fields = &proj_executor.schema().fields;
284 assert_eq!(fields[0].data_type, DataType::Int32);
285
286 let expected = [DataChunk::from_pretty(
287 "I i i i
288 0 1 1 2
289 1 1 1 2
290 2 1 . 2
291 0 2 1 2
292 1 2 1 2
293 2 2 . 2
294 0 33333 1 2
295 1 33333 1 2
296 2 33333 . 2
297 0 4 1 2
298 1 4 1 2
299 2 4 . 2
300 0 5 1 2
301 1 5 1 2
302 2 5 . 2",
303 )];
304
305 #[for_await]
306 for (i, result_chunk) in proj_executor.execute().enumerate() {
307 let result_chunk = result_chunk?;
308 assert_eq!(result_chunk, expected[i]);
309 }
310 Ok(())
311 }
312
313 #[tokio::test]
314 async fn test_project_set_dummy_chunk() {
315 let literal = LiteralExpression::new(DataType::Int32, Some(1_i32.into()));
316 let tf = repeat(
317 LiteralExpression::new(DataType::Int32, Some(2_i32.into())).boxed(),
318 2,
319 );
320
321 let values_executor2: Box<dyn Executor> = Box::new(ValuesExecutor::new(
322 vec![vec![]], Schema::default(),
324 "ValuesExecutor".to_owned(),
325 CHUNK_SIZE,
326 ));
327
328 let proj_executor = Box::new(ProjectSetExecutor {
329 select_list: vec![literal.boxed().into(), tf.into()],
330 child: values_executor2,
331 schema: schema_unnamed!(DataType::Int32, DataType::Int32),
332 identity: "ProjectSetExecutor2".to_owned(),
333 chunk_size: CHUNK_SIZE,
334 });
335 let mut stream = proj_executor.execute();
336 let chunk = stream.next().await.unwrap().unwrap();
337 assert_eq!(
338 chunk,
339 DataChunk::from_pretty(
340 "I i i
341 0 1 2
342 1 1 2",
343 ),
344 );
345 }
346}