risingwave_stream/from_proto/
dml.rsuse itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_pb::stream_plan::DmlNode;
use risingwave_storage::StateStore;
use super::ExecutorBuilder;
use crate::error::StreamResult;
use crate::executor::dml::DmlExecutor;
use crate::executor::Executor;
use crate::task::ExecutorParams;
pub struct DmlExecutorBuilder;
impl ExecutorBuilder for DmlExecutorBuilder {
type Node = DmlNode;
async fn new_boxed_executor(
params: ExecutorParams,
node: &Self::Node,
_store: impl StateStore,
) -> StreamResult<Executor> {
let [upstream]: [_; 1] = params.input.try_into().unwrap();
let table_id = TableId::new(node.table_id);
let column_descs = node.column_descs.iter().map(Into::into).collect_vec();
let exec = DmlExecutor::new(
upstream,
params.env.dml_manager_ref(),
table_id,
node.table_version_id,
column_descs,
params.env.config().developer.chunk_size,
);
Ok((params.info, exec).into())
}
}