risingwave_stream/from_proto/
append_only_dedup.rs1use std::sync::Arc;
16
17use risingwave_pb::stream_plan::DedupNode;
18use risingwave_storage::StateStore;
19
20use super::ExecutorBuilder;
21use crate::common::table::state_table::StateTableBuilder;
22use crate::error::StreamResult;
23use crate::executor::{AppendOnlyDedupExecutor, Executor};
24use crate::task::ExecutorParams;
25
26pub struct AppendOnlyDedupExecutorBuilder;
27
28impl_stream_node_body!(AppendOnlyDedup(DedupNode) => AppendOnlyDedupExecutorBuilder);
29
30impl ExecutorBuilder for AppendOnlyDedupExecutorBuilder {
31 type Node = DedupNode;
32
33 async fn new_boxed_executor(
34 params: ExecutorParams,
35 node: &Self::Node,
36 store: impl StateStore,
37 ) -> StreamResult<Executor> {
38 let [input]: [_; 1] = params.input.try_into().unwrap();
39 let table = node.get_state_table()?;
40 let vnodes = params.vnode_bitmap.map(Arc::new);
41 let state_table = StateTableBuilder::new(table, store, vnodes)
42 .enable_preload_all_rows_by_config(¶ms.config)
43 .build()
44 .await;
45 let exec = AppendOnlyDedupExecutor::new(
46 params.actor_context,
47 input,
48 params.info.stream_key.clone(), state_table,
50 params.watermark_epoch,
51 params.executor_stats.clone(),
52 );
53 Ok((params.info, exec).into())
54 }
55}