risingwave_stream/executor/iceberg_with_pk_index/
writer_impl.rs1use iceberg::table::Table;
16use iceberg::writer::PositionDeleteInput;
17use risingwave_common::array::DataChunk;
18use risingwave_connector::sink::SinkWriterParam;
19use risingwave_connector::sink::iceberg::{IcebergConfig, IcebergSinkWriterInner};
20use risingwave_pb::id::SinkId;
21
22use super::writer::{IcebergWriter, IcebergWriterFlushOutput};
23use crate::executor::{StreamExecutorError, StreamExecutorResult};
24
25pub struct IcebergWriterImpl {
26 inner: IcebergSinkWriterInner,
27 sink_id: SinkId,
28}
29
30impl IcebergWriterImpl {
31 pub fn build(
32 config: &IcebergConfig,
33 table: Table,
34 writer_param: &SinkWriterParam,
35 ) -> StreamExecutorResult<Self> {
36 let sink_id = writer_param.sink_id;
37 let inner = IcebergSinkWriterInner::build_append_only(config, table, writer_param)
38 .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
39
40 Ok(Self { inner, sink_id })
41 }
42}
43
44#[async_trait::async_trait]
45impl IcebergWriter for IcebergWriterImpl {
46 async fn write_chunk(
47 &mut self,
48 chunk: DataChunk,
49 ) -> StreamExecutorResult<Vec<PositionDeleteInput>> {
50 let positions = self
51 .inner
52 .write_batch_with_position(chunk.into())
53 .await
54 .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
55 Ok(positions)
56 }
57
58 async fn flush(&mut self) -> StreamExecutorResult<Option<IcebergWriterFlushOutput>> {
59 let Some(data_files) = self
60 .inner
61 .close()
62 .await
63 .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?
64 else {
65 return Ok(None);
66 };
67 let metadata = self
68 .inner
69 .generate_commit_metadata(data_files)
70 .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
71
72 Ok(Some(metadata))
73 }
74}