risingwave_stream/from_proto/
filter.rs1use risingwave_expr::expr::build_non_strict_from_prost;
16use risingwave_pb::stream_plan::FilterNode;
17
18use super::*;
19use crate::executor::FilterExecutor;
20
21pub struct FilterExecutorBuilder;
22
23impl ExecutorBuilder for FilterExecutorBuilder {
24 type Node = FilterNode;
25
26 async fn new_boxed_executor(
27 params: ExecutorParams,
28 node: &Self::Node,
29 _store: impl StateStore,
30 ) -> StreamResult<Executor> {
31 let [input]: [_; 1] = params.input.try_into().unwrap();
32 let search_condition =
33 build_non_strict_from_prost(node.get_search_condition()?, params.eval_error_report)?;
34
35 let exec = FilterExecutor::new(params.actor_context, input, search_condition);
36 Ok((params.info, exec).into())
37 }
38}