risingwave_stream/from_proto/iceberg_with_pk_index/
dv_merger.rs1use risingwave_common::secret::LocalSecretManager;
16use risingwave_connector::sink::iceberg::IcebergConfig;
17use risingwave_pb::id::SinkId;
18use risingwave_pb::stream_plan::IcebergWithPkIndexDvMergerNode;
19use risingwave_storage::StateStore;
20
21use crate::error::StreamResult;
22use crate::executor::{DvHandlerImpl, DvMergerExecutor, Executor, StreamExecutorError};
23use crate::from_proto::ExecutorBuilder;
24use crate::task::ExecutorParams;
25
26pub struct IcebergWithPkIndexDvMergerExecutorBuilder;
27
28impl_stream_node_body!(IcebergWithPkIndexDvMerger(IcebergWithPkIndexDvMergerNode) => IcebergWithPkIndexDvMergerExecutorBuilder);
29
30impl ExecutorBuilder for IcebergWithPkIndexDvMergerExecutorBuilder {
31 type Node = IcebergWithPkIndexDvMergerNode;
32
33 async fn new_boxed_executor(
34 params: ExecutorParams,
35 node: &Self::Node,
36 _store: impl StateStore,
37 ) -> StreamResult<Executor> {
38 let [input]: [_; 1] = params.input.try_into().unwrap();
39
40 let sink_desc = node.sink_desc.as_ref().unwrap();
41 let sink_id: SinkId = sink_desc.get_id();
42
43 let properties_with_secret = LocalSecretManager::global().fill_secrets(
44 sink_desc.get_properties().clone(),
45 sink_desc.get_secret_refs().clone(),
46 )?;
47 let config = IcebergConfig::from_btreemap(properties_with_secret.clone())
48 .map_err(|err| StreamExecutorError::from((err, sink_id)))?;
49 let handler = DvHandlerImpl::new(config, params.actor_context.id, sink_id).await?;
50
51 let exec = DvMergerExecutor::new(params.actor_context, input, handler);
52 Ok((params.info, exec).into())
53 }
54}