risingwave_stream/from_proto/
materialized_exprs.rs1use std::sync::Arc;
16
17use risingwave_expr::expr::build_non_strict_from_prost;
18use risingwave_pb::stream_plan::MaterializedExprsNode;
19
20use super::*;
21use crate::common::table::state_table::StateTableBuilder;
22use crate::executor::project::{MaterializedExprsArgs, MaterializedExprsExecutor};
23
24pub struct MaterializedExprsExecutorBuilder;
25
26impl_stream_node_body!(MaterializedExprs(MaterializedExprsNode) => MaterializedExprsExecutorBuilder);
27
28impl ExecutorBuilder for MaterializedExprsExecutorBuilder {
29 type Node = MaterializedExprsNode;
30
31 async fn new_boxed_executor(
32 params: ExecutorParams,
33 node: &Self::Node,
34 store: impl StateStore,
35 ) -> StreamResult<Executor> {
36 let [input]: [_; 1] = params.input.try_into().unwrap();
37
38 let exprs: Vec<_> = node
39 .get_exprs()
40 .iter()
41 .map(|e| build_non_strict_from_prost(e, params.eval_error_report.clone()))
42 .try_collect()?;
43
44 let vnodes = params.vnode_bitmap.map(Arc::new);
45 let state_table = StateTableBuilder::new(node.get_state_table()?, store, vnodes)
46 .enable_preload_all_rows_by_config(¶ms.config)
47 .build()
48 .await;
49
50 let exec = MaterializedExprsExecutor::new(MaterializedExprsArgs {
51 actor_ctx: params.actor_context,
52 input,
53 exprs,
54 state_table,
55 state_clean_col_idx: node.state_clean_col_idx.map(|i| i as _),
56 watermark_epoch: params.watermark_epoch,
57 });
58 Ok((params.info, exec).into())
59 }
60}