risingwave_stream/from_proto/
dml.rs1use itertools::Itertools;
16use risingwave_common::catalog::TableId;
17use risingwave_pb::stream_plan::DmlNode;
18use risingwave_storage::StateStore;
19
20use super::ExecutorBuilder;
21use crate::error::StreamResult;
22use crate::executor::Executor;
23use crate::executor::dml::DmlExecutor;
24use crate::task::ExecutorParams;
25
26pub struct DmlExecutorBuilder;
27
28impl ExecutorBuilder for DmlExecutorBuilder {
29 type Node = DmlNode;
30
31 async fn new_boxed_executor(
32 params: ExecutorParams,
33 node: &Self::Node,
34 _store: impl StateStore,
35 ) -> StreamResult<Executor> {
36 let [upstream]: [_; 1] = params.input.try_into().unwrap();
37 let table_id = TableId::new(node.table_id);
38 let column_descs = node.column_descs.iter().map(Into::into).collect_vec();
39
40 let exec = DmlExecutor::new(
41 params.actor_context.clone(),
42 upstream,
43 params.env.dml_manager_ref(),
44 table_id,
45 node.table_version_id,
46 column_descs,
47 params.env.config().developer.chunk_size,
48 node.rate_limit.into(),
49 );
50 Ok((params.info, exec).into())
51 }
52}