risingwave_stream/from_proto/iceberg_with_pk_index/
writer.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use risingwave_common::secret::LocalSecretManager;
19use risingwave_connector::sink::iceberg::{
20    ICEBERG_SINK, IcebergConfig, create_and_validate_table_impl,
21};
22use risingwave_connector::sink::{SinkMetaClient, SinkWriterParam};
23use risingwave_pb::id::SinkId;
24use risingwave_pb::stream_plan::IcebergWithPkIndexWriterNode;
25use risingwave_storage::StateStore;
26
27use super::super::sink::build_sink_param;
28use crate::common::table::state_table::StateTableBuilder;
29use crate::error::StreamResult;
30use crate::executor::{Executor, IcebergWriterImpl, StreamExecutorError, WriterExecutor};
31use crate::from_proto::ExecutorBuilder;
32use crate::task::ExecutorParams;
33
34pub struct IcebergWithPkIndexWriterExecutorBuilder;
35
36impl ExecutorBuilder for IcebergWithPkIndexWriterExecutorBuilder {
37    type Node = IcebergWithPkIndexWriterNode;
38
39    async fn new_boxed_executor(
40        params: ExecutorParams,
41        node: &Self::Node,
42        store: impl StateStore,
43    ) -> StreamResult<Executor> {
44        let [input]: [_; 1] = params.input.try_into().unwrap();
45
46        let sink_desc = node.sink_desc.as_ref().unwrap();
47        let sink_id: SinkId = sink_desc.get_id();
48        let sink_name = sink_desc.get_name().to_owned();
49
50        let properties_with_secret = LocalSecretManager::global().fill_secrets(
51            sink_desc.get_properties().clone(),
52            sink_desc.get_secret_refs().clone(),
53        )?;
54        let config = IcebergConfig::from_btreemap(properties_with_secret.clone())
55            .map_err(|err| StreamExecutorError::from((err, sink_id)))?;
56
57        let pk_indices = sink_desc
58            .downstream_pk
59            .iter()
60            .map(|&idx| idx as usize)
61            .collect::<Vec<_>>();
62        if pk_indices.is_empty() {
63            return Err(anyhow!("missing downstream pk in iceberg sink desc").into());
64        }
65
66        let (sink_param, _columns) =
67            build_sink_param(sink_desc, properties_with_secret, ICEBERG_SINK)?;
68
69        let table = create_and_validate_table_impl(&config, &sink_param)
70            .await
71            .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
72
73        let pk_index_state_table = StateTableBuilder::new(
74            node.get_pk_index_table()?,
75            store,
76            params.vnode_bitmap.clone().map(Arc::new),
77        )
78        .enable_preload_all_rows_by_config(&params.config)
79        .build()
80        .await;
81
82        let meta_client = params
83            .env
84            .meta_client()
85            .ok_or_else(|| anyhow!("meta client is required for Iceberg writer"))?;
86        let meta_client = SinkMetaClient::MetaClient(meta_client);
87
88        let writer_param = SinkWriterParam {
89            executor_id: params.executor_id,
90            vnode_bitmap: params.vnode_bitmap.clone(),
91            meta_client: Some(meta_client),
92            extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize),
93            actor_id: params.actor_context.id,
94            sink_id,
95            sink_name,
96            connector: ICEBERG_SINK.to_owned(),
97            streaming_config: params.config.as_ref().clone(),
98        };
99        let writer = IcebergWriterImpl::build(&config, table, &writer_param)?;
100        let pk_matched = params
101            .info
102            .stream_key
103            .iter()
104            .all(|i| pk_indices.contains(i));
105
106        let exec = WriterExecutor::new(
107            params.actor_context,
108            input,
109            pk_indices,
110            pk_index_state_table,
111            writer,
112            params.config.developer.chunk_size,
113            pk_matched,
114        );
115        Ok((params.info, exec).into())
116    }
117}