risingwave_stream/from_proto/
values.rs1use itertools::Itertools;
16use risingwave_expr::expr::build_non_strict_from_prost;
17use risingwave_pb::stream_plan::ValuesNode;
18use risingwave_storage::StateStore;
19
20use super::ExecutorBuilder;
21use crate::error::StreamResult;
22use crate::executor::{Executor, ValuesExecutor};
23use crate::task::ExecutorParams;
24
25pub struct ValuesExecutorBuilder;
28
29impl ExecutorBuilder for ValuesExecutorBuilder {
30 type Node = ValuesNode;
31
32 async fn new_boxed_executor(
33 params: ExecutorParams,
34 node: &ValuesNode,
35 _store: impl StateStore,
36 ) -> StreamResult<Executor> {
37 let barrier_receiver = params
38 .local_barrier_manager
39 .subscribe_barrier(params.actor_context.id);
40 let progress = params
41 .local_barrier_manager
42 .register_create_mview_progress(params.actor_context.id);
43 let rows = node
44 .get_tuples()
45 .iter()
46 .map(|tuple| {
47 tuple
48 .get_cells()
49 .iter()
50 .map(|node| {
51 build_non_strict_from_prost(node, params.eval_error_report.clone()).unwrap()
52 })
53 .collect_vec()
54 })
55 .collect_vec();
56 let exec = ValuesExecutor::new(
57 params.actor_context,
58 params.info.schema.clone(),
59 progress,
60 rows,
61 barrier_receiver,
62 );
63 Ok((params.info, exec).into())
64 }
65}