risingwave_stream/from_proto/iceberg_with_pk_index/
writer.rs1use std::sync::Arc;
16
17use anyhow::anyhow;
18use risingwave_common::secret::LocalSecretManager;
19use risingwave_connector::sink::iceberg::{
20 ICEBERG_SINK, IcebergConfig, create_and_validate_table_impl,
21};
22use risingwave_connector::sink::{SinkMetaClient, SinkWriterParam};
23use risingwave_pb::id::SinkId;
24use risingwave_pb::stream_plan::IcebergWithPkIndexWriterNode;
25use risingwave_storage::StateStore;
26
27use super::super::sink::build_sink_param;
28use crate::common::table::state_table::StateTableBuilder;
29use crate::error::StreamResult;
30use crate::executor::{Executor, IcebergWriterImpl, StreamExecutorError, WriterExecutor};
31use crate::from_proto::ExecutorBuilder;
32use crate::task::ExecutorParams;
33
34pub struct IcebergWithPkIndexWriterExecutorBuilder;
35
36impl_stream_node_body!(IcebergWithPkIndexWriter(IcebergWithPkIndexWriterNode) => IcebergWithPkIndexWriterExecutorBuilder);
37
38impl ExecutorBuilder for IcebergWithPkIndexWriterExecutorBuilder {
39 type Node = IcebergWithPkIndexWriterNode;
40
41 async fn new_boxed_executor(
42 params: ExecutorParams,
43 node: &Self::Node,
44 store: impl StateStore,
45 ) -> StreamResult<Executor> {
46 let [input]: [_; 1] = params.input.try_into().unwrap();
47
48 let sink_desc = node.sink_desc.as_ref().unwrap();
49 let sink_id: SinkId = sink_desc.get_id();
50 let sink_name = sink_desc.get_name().to_owned();
51
52 let properties_with_secret = LocalSecretManager::global().fill_secrets(
53 sink_desc.get_properties().clone(),
54 sink_desc.get_secret_refs().clone(),
55 )?;
56 let config = IcebergConfig::from_btreemap(properties_with_secret.clone())
57 .map_err(|err| StreamExecutorError::from((err, sink_id)))?;
58
59 let pk_indices = sink_desc
60 .downstream_pk
61 .iter()
62 .map(|&idx| idx as usize)
63 .collect::<Vec<_>>();
64 if pk_indices.is_empty() {
65 return Err(anyhow!("missing downstream pk in iceberg sink desc").into());
66 }
67
68 let (sink_param, _columns) =
69 build_sink_param(sink_desc, properties_with_secret, ICEBERG_SINK)?;
70
71 let table = create_and_validate_table_impl(&config, &sink_param)
72 .await
73 .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
74
75 let pk_index_state_table = StateTableBuilder::new(
76 node.get_pk_index_table()?,
77 store,
78 params.vnode_bitmap.clone().map(Arc::new),
79 )
80 .enable_preload_all_rows_by_config(¶ms.config)
81 .build()
82 .await;
83
84 let meta_client = params
85 .env
86 .meta_client()
87 .ok_or_else(|| anyhow!("meta client is required for Iceberg writer"))?;
88 let meta_client = SinkMetaClient::MetaClient(meta_client);
89
90 let writer_param = SinkWriterParam {
91 executor_id: params.executor_id,
92 vnode_bitmap: params.vnode_bitmap.clone(),
93 meta_client: Some(meta_client),
94 extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize),
95 actor_id: params.actor_context.id,
96 sink_id,
97 sink_name,
98 connector: ICEBERG_SINK.to_owned(),
99 streaming_config: params.config.as_ref().clone(),
100 time_zone: params.actor_context.time_zone,
101 };
102 let writer = IcebergWriterImpl::build(&config, table, &writer_param)?;
103 let pk_matched = params
104 .info
105 .stream_key
106 .iter()
107 .all(|i| pk_indices.contains(i));
108
109 let exec = WriterExecutor::new(
110 params.actor_context,
111 input,
112 pk_indices,
113 pk_index_state_table,
114 writer,
115 params.config.developer.chunk_size,
116 pk_matched,
117 );
118 Ok((params.info, exec).into())
119 }
120}