risingwave_stream/from_proto/
materialized_exprs.rs1use std::sync::Arc;
16
17use risingwave_expr::expr::build_non_strict_from_prost;
18use risingwave_pb::stream_plan::MaterializedExprsNode;
19
20use super::*;
21use crate::common::table::state_table::StateTable;
22use crate::executor::project::{MaterializedExprsArgs, MaterializedExprsExecutor};
23
24pub struct MaterializedExprsExecutorBuilder;
25
26impl ExecutorBuilder for MaterializedExprsExecutorBuilder {
27 type Node = MaterializedExprsNode;
28
29 async fn new_boxed_executor(
30 params: ExecutorParams,
31 node: &Self::Node,
32 store: impl StateStore,
33 ) -> StreamResult<Executor> {
34 let [input]: [_; 1] = params.input.try_into().unwrap();
35
36 let exprs: Vec<_> = node
37 .get_exprs()
38 .iter()
39 .map(|e| build_non_strict_from_prost(e, params.eval_error_report.clone()))
40 .try_collect()?;
41
42 let vnodes = params.vnode_bitmap.map(Arc::new);
43 let state_table =
44 StateTable::from_table_catalog(node.get_state_table()?, store, vnodes).await;
45
46 let exec = MaterializedExprsExecutor::new(MaterializedExprsArgs {
47 actor_ctx: params.actor_context,
48 input,
49 exprs,
50 state_table,
51 state_clean_col_idx: node.state_clean_col_idx.map(|i| i as _),
52 });
53 Ok((params.info, exec).into())
54 }
55}