risingwave_stream/from_proto/
append_only_dedup.rs1use std::sync::Arc;
16
17use risingwave_pb::stream_plan::DedupNode;
18use risingwave_storage::StateStore;
19
20use super::ExecutorBuilder;
21use crate::common::table::state_table::StateTableBuilder;
22use crate::error::StreamResult;
23use crate::executor::{AppendOnlyDedupExecutor, Executor};
24use crate::task::ExecutorParams;
25
26pub struct AppendOnlyDedupExecutorBuilder;
27
28impl ExecutorBuilder for AppendOnlyDedupExecutorBuilder {
29 type Node = DedupNode;
30
31 async fn new_boxed_executor(
32 params: ExecutorParams,
33 node: &Self::Node,
34 store: impl StateStore,
35 ) -> StreamResult<Executor> {
36 let [input]: [_; 1] = params.input.try_into().unwrap();
37 let table = node.get_state_table()?;
38 let vnodes = params.vnode_bitmap.map(Arc::new);
39 let state_table = StateTableBuilder::new(table, store, vnodes)
40 .enable_preload_all_rows_by_config(¶ms.actor_context.streaming_config)
41 .build()
42 .await;
43 let exec = AppendOnlyDedupExecutor::new(
44 params.actor_context,
45 input,
46 params.info.pk_indices.clone(), state_table,
48 params.watermark_epoch,
49 params.executor_stats.clone(),
50 );
51 Ok((params.info, exec).into())
52 }
53}