risingwave_stream/from_proto/iceberg_with_pk_index/
dv_merger.rs1use risingwave_common::secret::LocalSecretManager;
16use risingwave_connector::sink::iceberg::IcebergConfig;
17use risingwave_pb::id::SinkId;
18use risingwave_pb::stream_plan::IcebergWithPkIndexDvMergerNode;
19use risingwave_storage::StateStore;
20
21use crate::error::StreamResult;
22use crate::executor::{DvHandlerImpl, DvMergerExecutor, Executor, StreamExecutorError};
23use crate::from_proto::ExecutorBuilder;
24use crate::task::ExecutorParams;
25
26pub struct IcebergWithPkIndexDvMergerExecutorBuilder;
27
28impl ExecutorBuilder for IcebergWithPkIndexDvMergerExecutorBuilder {
29 type Node = IcebergWithPkIndexDvMergerNode;
30
31 async fn new_boxed_executor(
32 params: ExecutorParams,
33 node: &Self::Node,
34 _store: impl StateStore,
35 ) -> StreamResult<Executor> {
36 let [input]: [_; 1] = params.input.try_into().unwrap();
37
38 let sink_desc = node.sink_desc.as_ref().unwrap();
39 let sink_id: SinkId = sink_desc.get_id();
40
41 let properties_with_secret = LocalSecretManager::global().fill_secrets(
42 sink_desc.get_properties().clone(),
43 sink_desc.get_secret_refs().clone(),
44 )?;
45 let config = IcebergConfig::from_btreemap(properties_with_secret.clone())
46 .map_err(|err| StreamExecutorError::from((err, sink_id)))?;
47 let handler = DvHandlerImpl::new(config, params.actor_context.id, sink_id).await?;
48
49 let exec = DvMergerExecutor::new(params.actor_context, input, handler);
50 Ok((params.info, exec).into())
51 }
52}