Skip to main content

risingwave_stream/from_proto/iceberg_with_pk_index/
writer.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use risingwave_common::secret::LocalSecretManager;
19use risingwave_connector::sink::iceberg::{
20    ICEBERG_SINK, IcebergConfig, create_and_validate_table_impl,
21};
22use risingwave_connector::sink::{SinkMetaClient, SinkWriterParam};
23use risingwave_pb::id::SinkId;
24use risingwave_pb::stream_plan::IcebergWithPkIndexWriterNode;
25use risingwave_storage::StateStore;
26
27use super::super::sink::build_sink_param;
28use crate::common::table::state_table::StateTableBuilder;
29use crate::error::StreamResult;
30use crate::executor::{Executor, IcebergWriterImpl, StreamExecutorError, WriterExecutor};
31use crate::from_proto::ExecutorBuilder;
32use crate::task::ExecutorParams;
33
34pub struct IcebergWithPkIndexWriterExecutorBuilder;
35
36impl_stream_node_body!(IcebergWithPkIndexWriter(IcebergWithPkIndexWriterNode) => IcebergWithPkIndexWriterExecutorBuilder);
37
38impl ExecutorBuilder for IcebergWithPkIndexWriterExecutorBuilder {
39    type Node = IcebergWithPkIndexWriterNode;
40
41    async fn new_boxed_executor(
42        params: ExecutorParams,
43        node: &Self::Node,
44        store: impl StateStore,
45    ) -> StreamResult<Executor> {
46        let [input]: [_; 1] = params.input.try_into().unwrap();
47
48        let sink_desc = node.sink_desc.as_ref().unwrap();
49        let sink_id: SinkId = sink_desc.get_id();
50        let sink_name = sink_desc.get_name().to_owned();
51
52        let properties_with_secret = LocalSecretManager::global().fill_secrets(
53            sink_desc.get_properties().clone(),
54            sink_desc.get_secret_refs().clone(),
55        )?;
56        let config = IcebergConfig::from_btreemap(properties_with_secret.clone())
57            .map_err(|err| StreamExecutorError::from((err, sink_id)))?;
58
59        let pk_indices = sink_desc
60            .downstream_pk
61            .iter()
62            .map(|&idx| idx as usize)
63            .collect::<Vec<_>>();
64        if pk_indices.is_empty() {
65            return Err(anyhow!("missing downstream pk in iceberg sink desc").into());
66        }
67
68        let (sink_param, _columns) =
69            build_sink_param(sink_desc, properties_with_secret, ICEBERG_SINK)?;
70
71        let table = create_and_validate_table_impl(&config, &sink_param)
72            .await
73            .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
74
75        let pk_index_state_table = StateTableBuilder::new(
76            node.get_pk_index_table()?,
77            store,
78            params.vnode_bitmap.clone().map(Arc::new),
79        )
80        .enable_preload_all_rows_by_config(&params.config)
81        .build()
82        .await;
83
84        let meta_client = params
85            .env
86            .meta_client()
87            .ok_or_else(|| anyhow!("meta client is required for Iceberg writer"))?;
88        let meta_client = SinkMetaClient::MetaClient(meta_client);
89
90        let writer_param = SinkWriterParam {
91            executor_id: params.executor_id,
92            vnode_bitmap: params.vnode_bitmap.clone(),
93            meta_client: Some(meta_client),
94            extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize),
95            actor_id: params.actor_context.id,
96            sink_id,
97            sink_name,
98            connector: ICEBERG_SINK.to_owned(),
99            streaming_config: params.config.as_ref().clone(),
100            time_zone: params.actor_context.time_zone,
101        };
102        let writer = IcebergWriterImpl::build(&config, table, &writer_param)?;
103        let pk_matched = params
104            .info
105            .stream_key
106            .iter()
107            .all(|i| pk_indices.contains(i));
108
109        let exec = WriterExecutor::new(
110            params.actor_context,
111            input,
112            pk_indices,
113            pk_index_state_table,
114            writer,
115            params.config.developer.chunk_size,
116            pk_matched,
117        );
118        Ok((params.info, exec).into())
119    }
120}