risingwave_stream/executor/iceberg_with_pk_index/
writer_impl.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use iceberg::table::Table;
16use iceberg::writer::PositionDeleteInput;
17use risingwave_common::array::DataChunk;
18use risingwave_connector::sink::SinkWriterParam;
19use risingwave_connector::sink::iceberg::{IcebergConfig, IcebergSinkWriterInner};
20use risingwave_pb::id::SinkId;
21
22use super::writer::{IcebergWriter, IcebergWriterFlushOutput};
23use crate::executor::{StreamExecutorError, StreamExecutorResult};
24
25pub struct IcebergWriterImpl {
26    inner: IcebergSinkWriterInner,
27    sink_id: SinkId,
28}
29
30impl IcebergWriterImpl {
31    pub fn build(
32        config: &IcebergConfig,
33        table: Table,
34        writer_param: &SinkWriterParam,
35    ) -> StreamExecutorResult<Self> {
36        let sink_id = writer_param.sink_id;
37        let inner = IcebergSinkWriterInner::build_append_only(config, table, writer_param)
38            .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
39
40        Ok(Self { inner, sink_id })
41    }
42}
43
44#[async_trait::async_trait]
45impl IcebergWriter for IcebergWriterImpl {
46    async fn write_chunk(
47        &mut self,
48        chunk: DataChunk,
49    ) -> StreamExecutorResult<Vec<PositionDeleteInput>> {
50        let positions = self
51            .inner
52            .write_batch_with_position(chunk.into())
53            .await
54            .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
55        Ok(positions)
56    }
57
58    async fn flush(&mut self) -> StreamExecutorResult<Option<IcebergWriterFlushOutput>> {
59        let Some(data_files) = self
60            .inner
61            .close()
62            .await
63            .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?
64        else {
65            return Ok(None);
66        };
67        let metadata = self
68            .inner
69            .generate_commit_metadata(data_files)
70            .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
71
72        Ok(Some(metadata))
73    }
74}