risingwave_stream/from_proto/
append_only_dedup.rs1use std::sync::Arc;
16
17use risingwave_pb::stream_plan::DedupNode;
18use risingwave_storage::StateStore;
19
20use super::ExecutorBuilder;
21use crate::common::table::state_table::StateTable;
22use crate::error::StreamResult;
23use crate::executor::{AppendOnlyDedupExecutor, Executor};
24use crate::task::ExecutorParams;
25
26pub struct AppendOnlyDedupExecutorBuilder;
27
28impl ExecutorBuilder for AppendOnlyDedupExecutorBuilder {
29 type Node = DedupNode;
30
31 async fn new_boxed_executor(
32 params: ExecutorParams,
33 node: &Self::Node,
34 store: impl StateStore,
35 ) -> StreamResult<Executor> {
36 let [input]: [_; 1] = params.input.try_into().unwrap();
37 let table = node.get_state_table()?;
38 let vnodes = params.vnode_bitmap.map(Arc::new);
39 let state_table = StateTable::from_table_catalog(table, store, vnodes).await;
40 let exec = AppendOnlyDedupExecutor::new(
41 params.actor_context,
42 input,
43 params.info.pk_indices.clone(), state_table,
45 params.watermark_epoch,
46 params.executor_stats.clone(),
47 );
48 Ok((params.info, exec).into())
49 }
50}