risingwave_stream/from_proto/iceberg_with_pk_index/
writer.rs1use std::sync::Arc;
16
17use anyhow::anyhow;
18use risingwave_common::secret::LocalSecretManager;
19use risingwave_connector::sink::iceberg::{
20 ICEBERG_SINK, IcebergConfig, create_and_validate_table_impl,
21};
22use risingwave_connector::sink::{SinkMetaClient, SinkWriterParam};
23use risingwave_pb::id::SinkId;
24use risingwave_pb::stream_plan::IcebergWithPkIndexWriterNode;
25use risingwave_storage::StateStore;
26
27use super::super::sink::build_sink_param;
28use crate::common::table::state_table::StateTableBuilder;
29use crate::error::StreamResult;
30use crate::executor::{Executor, IcebergWriterImpl, StreamExecutorError, WriterExecutor};
31use crate::from_proto::ExecutorBuilder;
32use crate::task::ExecutorParams;
33
34pub struct IcebergWithPkIndexWriterExecutorBuilder;
35
36impl ExecutorBuilder for IcebergWithPkIndexWriterExecutorBuilder {
37 type Node = IcebergWithPkIndexWriterNode;
38
39 async fn new_boxed_executor(
40 params: ExecutorParams,
41 node: &Self::Node,
42 store: impl StateStore,
43 ) -> StreamResult<Executor> {
44 let [input]: [_; 1] = params.input.try_into().unwrap();
45
46 let sink_desc = node.sink_desc.as_ref().unwrap();
47 let sink_id: SinkId = sink_desc.get_id();
48 let sink_name = sink_desc.get_name().to_owned();
49
50 let properties_with_secret = LocalSecretManager::global().fill_secrets(
51 sink_desc.get_properties().clone(),
52 sink_desc.get_secret_refs().clone(),
53 )?;
54 let config = IcebergConfig::from_btreemap(properties_with_secret.clone())
55 .map_err(|err| StreamExecutorError::from((err, sink_id)))?;
56
57 let pk_indices = sink_desc
58 .downstream_pk
59 .iter()
60 .map(|&idx| idx as usize)
61 .collect::<Vec<_>>();
62 if pk_indices.is_empty() {
63 return Err(anyhow!("missing downstream pk in iceberg sink desc").into());
64 }
65
66 let (sink_param, _columns) =
67 build_sink_param(sink_desc, properties_with_secret, ICEBERG_SINK)?;
68
69 let table = create_and_validate_table_impl(&config, &sink_param)
70 .await
71 .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
72
73 let pk_index_state_table = StateTableBuilder::new(
74 node.get_pk_index_table()?,
75 store,
76 params.vnode_bitmap.clone().map(Arc::new),
77 )
78 .enable_preload_all_rows_by_config(¶ms.config)
79 .build()
80 .await;
81
82 let meta_client = params
83 .env
84 .meta_client()
85 .ok_or_else(|| anyhow!("meta client is required for Iceberg writer"))?;
86 let meta_client = SinkMetaClient::MetaClient(meta_client);
87
88 let writer_param = SinkWriterParam {
89 executor_id: params.executor_id,
90 vnode_bitmap: params.vnode_bitmap.clone(),
91 meta_client: Some(meta_client),
92 extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize),
93 actor_id: params.actor_context.id,
94 sink_id,
95 sink_name,
96 connector: ICEBERG_SINK.to_owned(),
97 streaming_config: params.config.as_ref().clone(),
98 };
99 let writer = IcebergWriterImpl::build(&config, table, &writer_param)?;
100 let pk_matched = params
101 .info
102 .stream_key
103 .iter()
104 .all(|i| pk_indices.contains(i));
105
106 let exec = WriterExecutor::new(
107 params.actor_context,
108 input,
109 pk_indices,
110 pk_index_state_table,
111 writer,
112 params.config.developer.chunk_size,
113 pk_matched,
114 );
115 Ok((params.info, exec).into())
116 }
117}