risingwave_connector/sink/iceberg/
writer.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use await_tree::InstrumentAwait;
20use iceberg::arrow::{
21    RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
22};
23use iceberg::spec::{
24    DataFile, FormatVersion, PartitionSpecRef, SchemaRef as IcebergSchemaRef, SerializedDataFile,
25};
26use iceberg::table::Table;
27use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
28use iceberg::writer::base_writer::deletion_vector_writer::{
29    DeletionVectorWriter, DeletionVectorWriterBuilder,
30};
31use iceberg::writer::base_writer::equality_delete_writer::{
32    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
33};
34use iceberg::writer::base_writer::position_delete_file_writer::{
35    POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
36    PositionDeleteInput,
37};
38use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
39use iceberg::writer::file_writer::ParquetWriterBuilder;
40use iceberg::writer::file_writer::location_generator::{
41    DefaultFileNameGenerator, DefaultLocationGenerator,
42};
43use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
44use iceberg::writer::task_writer::TaskWriter;
45use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
46use itertools::Itertools;
47use parquet::file::properties::WriterProperties;
48use risingwave_common::array::arrow::IcebergArrowConvert;
49use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
50use risingwave_common::array::arrow::arrow_schema_iceberg::{
51    DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef,
52};
53use risingwave_common::array::{Op, StreamChunk};
54use risingwave_common::bitmap::Bitmap;
55use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
56use risingwave_common_estimate_size::EstimateSize;
57use risingwave_pb::connector_service::SinkMetadata;
58use uuid::Uuid;
59
60use super::prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
61use super::{
62    GLOBAL_SINK_METRICS, IcebergCommitResult, IcebergConfig, PARQUET_CREATED_BY, SinkError,
63    SinkWriterParam, create_and_validate_table_impl,
64};
65use crate::sink::writer::SinkWriter;
66use crate::sink::{Result, SinkParam};
67
68/// `None` means no projection is needed.
69/// `Prepare` stores the extra partition column index.
70/// `Done` stores the finalized projection indices.
71///
72/// `ProjectIdxVec` is initialized lazily. When we first encounter `Prepare`, we
73/// derive the projection indices from the current chunk schema and cache them in
74/// `Done`.
75enum ProjectIdxVec {
76    None,
77    Prepare(usize),
78    Done(Vec<usize>),
79}
80
81type DataFileWriterBuilderType =
82    DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
83type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
84    ParquetWriterBuilder,
85    DefaultLocationGenerator,
86    DefaultFileNameGenerator,
87>;
88type PositionDeleteFileWriterType = PositionDeleteFileWriter<
89    ParquetWriterBuilder,
90    DefaultLocationGenerator,
91    DefaultFileNameGenerator,
92>;
93type DeletionVectorWriterBuilderType =
94    DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
95type DeletionVectorWriterType =
96    DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
97type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
98    ParquetWriterBuilder,
99    DefaultLocationGenerator,
100    DefaultFileNameGenerator,
101>;
102
103#[derive(Clone)]
104enum PositionDeleteWriterBuilderType {
105    PositionDelete(PositionDeleteFileWriterBuilderType),
106    DeletionVector(DeletionVectorWriterBuilderType),
107}
108
109enum PositionDeleteWriterType {
110    PositionDelete(PositionDeleteFileWriterType),
111    DeletionVector(DeletionVectorWriterType),
112}
113
114#[async_trait]
115impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
116    type R = PositionDeleteWriterType;
117
118    async fn build(
119        &self,
120        partition_key: Option<iceberg::spec::PartitionKey>,
121    ) -> iceberg::Result<Self::R> {
122        match self {
123            PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
124                PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
125            ),
126            PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
127                PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
128            ),
129        }
130    }
131}
132
133#[async_trait]
134impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
135    async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
136        match self {
137            PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
138            PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
139        }
140    }
141
142    async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
143        match self {
144            PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
145            PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
146        }
147    }
148}
149
150#[derive(Clone)]
151struct SharedIcebergWriterBuilder<B>(Arc<B>);
152
153#[async_trait]
154impl<B: IcebergWriterBuilder> IcebergWriterBuilder for SharedIcebergWriterBuilder<B> {
155    type R = B::R;
156
157    async fn build(
158        &self,
159        partition_key: Option<iceberg::spec::PartitionKey>,
160    ) -> iceberg::Result<Self::R> {
161        self.0.build(partition_key).await
162    }
163}
164
165#[derive(Clone)]
166struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
167    inner: Arc<B>,
168    fanout_enabled: bool,
169    schema: IcebergSchemaRef,
170    partition_spec: PartitionSpecRef,
171    compute_partition: bool,
172}
173
174impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
175    fn new(
176        inner: B,
177        fanout_enabled: bool,
178        schema: IcebergSchemaRef,
179        partition_spec: PartitionSpecRef,
180        compute_partition: bool,
181    ) -> Self {
182        Self {
183            inner: Arc::new(inner),
184            fanout_enabled,
185            schema,
186            partition_spec,
187            compute_partition,
188        }
189    }
190
191    fn build(&self) -> iceberg::Result<TaskWriter<SharedIcebergWriterBuilder<B>>> {
192        let partition_splitter = match (
193            self.partition_spec.is_unpartitioned(),
194            self.compute_partition,
195        ) {
196            (true, _) => None,
197            (false, true) => Some(RecordBatchPartitionSplitter::try_new_with_computed_values(
198                self.schema.clone(),
199                self.partition_spec.clone(),
200            )?),
201            (false, false) => Some(
202                RecordBatchPartitionSplitter::try_new_with_precomputed_values(
203                    self.schema.clone(),
204                    self.partition_spec.clone(),
205                )?,
206            ),
207        };
208
209        Ok(TaskWriter::new_with_partition_splitter(
210            SharedIcebergWriterBuilder(self.inner.clone()),
211            self.fanout_enabled,
212            self.schema.clone(),
213            self.partition_spec.clone(),
214            partition_splitter,
215        ))
216    }
217}
218
219pub enum IcebergSinkWriter {
220    Created(IcebergSinkWriterArgs),
221    Initialized(IcebergSinkWriterInner),
222}
223
224pub struct IcebergSinkWriterArgs {
225    config: IcebergConfig,
226    sink_param: SinkParam,
227    writer_param: SinkWriterParam,
228    unique_column_ids: Option<Vec<usize>>,
229}
230
231pub struct IcebergSinkWriterInner {
232    writer: IcebergWriterDispatch,
233    arrow_schema: SchemaRef,
234    // See comments below
235    metrics: IcebergWriterMetrics,
236    // State of iceberg table for this writer
237    table: Table,
238    // For chunk with extra partition column, we should remove this column before write.
239    // This project index vec is used to avoid create project idx each time.
240    project_idx_vec: ProjectIdxVec,
241}
242
243enum IcebergWriterDispatch {
244    Append {
245        writer: Option<Box<dyn IcebergWriter>>,
246        writer_builder:
247            TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
248    },
249    Upsert {
250        writer: Option<Box<dyn IcebergWriter>>,
251        writer_builder: TaskWriterBuilderWrapper<
252            MonitoredGeneralWriterBuilder<
253                DeltaWriterBuilder<
254                    DataFileWriterBuilderType,
255                    PositionDeleteWriterBuilderType,
256                    EqualityDeleteFileWriterBuilderType,
257                >,
258            >,
259        >,
260        arrow_schema_with_op_column: SchemaRef,
261    },
262}
263
264impl IcebergWriterDispatch {
265    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
266        match self {
267            IcebergWriterDispatch::Append { writer, .. }
268            | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
269        }
270    }
271}
272
273pub struct IcebergWriterMetrics {
274    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
275    // They are actually used in `PrometheusWriterBuilder`:
276    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
277    // We keep them here to let the guard cleans the labels from metrics registry when dropped
278    _write_qps: LabelGuardedIntCounter,
279    _write_latency: LabelGuardedHistogram,
280    write_bytes: LabelGuardedIntCounter,
281}
282
283impl IcebergSinkWriter {
284    pub fn new(
285        config: IcebergConfig,
286        sink_param: SinkParam,
287        writer_param: SinkWriterParam,
288        unique_column_ids: Option<Vec<usize>>,
289    ) -> Self {
290        Self::Created(IcebergSinkWriterArgs {
291            config,
292            sink_param,
293            writer_param,
294            unique_column_ids,
295        })
296    }
297}
298
299impl IcebergSinkWriterInner {
300    pub fn build_append_only(
301        config: &IcebergConfig,
302        table: Table,
303        writer_param: &SinkWriterParam,
304    ) -> Result<Self> {
305        let SinkWriterParam {
306            extra_partition_col_idx,
307            actor_id,
308            sink_id,
309            sink_name,
310            ..
311        } = writer_param;
312        let metrics_labels = [
313            &actor_id.to_string(),
314            &sink_id.to_string(),
315            sink_name.as_str(),
316        ];
317
318        // Metrics
319        let write_qps = GLOBAL_SINK_METRICS
320            .iceberg_write_qps
321            .with_guarded_label_values(&metrics_labels);
322        let write_latency = GLOBAL_SINK_METRICS
323            .iceberg_write_latency
324            .with_guarded_label_values(&metrics_labels);
325        // # TODO
326        // Unused. Add this metrics later.
327        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
328            .iceberg_rolling_unflushed_data_file
329            .with_guarded_label_values(&metrics_labels);
330        let write_bytes = GLOBAL_SINK_METRICS
331            .iceberg_write_bytes
332            .with_guarded_label_values(&metrics_labels);
333
334        let schema = table.metadata().current_schema();
335        let partition_spec = table.metadata().default_partition_spec();
336        let fanout_enabled = !partition_spec.fields().is_empty();
337        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
338        let unique_uuid_suffix = Uuid::now_v7();
339
340        let parquet_writer_properties = WriterProperties::builder()
341            .set_compression(config.get_parquet_compression())
342            .set_max_row_group_bytes(config.write_parquet_max_row_group_bytes())
343            .set_created_by(PARQUET_CREATED_BY.to_owned())
344            .build();
345
346        let parquet_writer_builder =
347            ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
348        let rolling_builder = RollingFileWriterBuilder::new(
349            parquet_writer_builder,
350            (config.target_file_size_mb() * 1024 * 1024) as usize,
351            table.file_io().clone(),
352            DefaultLocationGenerator::new(table.metadata().clone())
353                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
354            DefaultFileNameGenerator::new(
355                writer_param.actor_id.to_string(),
356                Some(unique_uuid_suffix.to_string()),
357                iceberg::spec::DataFileFormat::Parquet,
358            ),
359        );
360        let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
361        let monitored_builder = MonitoredGeneralWriterBuilder::new(
362            data_file_builder,
363            write_qps.clone(),
364            write_latency.clone(),
365        );
366        let writer_builder = TaskWriterBuilderWrapper::new(
367            monitored_builder,
368            fanout_enabled,
369            schema.clone(),
370            partition_spec.clone(),
371            true,
372        );
373        let inner_writer = Some(Box::new(
374            writer_builder
375                .build()
376                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
377        ) as Box<dyn IcebergWriter>);
378        Ok(Self {
379            arrow_schema: Arc::new(
380                schema_to_arrow_schema(table.metadata().current_schema())
381                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
382            ),
383            metrics: IcebergWriterMetrics {
384                _write_qps: write_qps,
385                _write_latency: write_latency,
386                write_bytes,
387            },
388            writer: IcebergWriterDispatch::Append {
389                writer: inner_writer,
390                writer_builder,
391            },
392            table,
393            project_idx_vec: {
394                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
395                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
396                } else {
397                    ProjectIdxVec::None
398                }
399            },
400        })
401    }
402
403    pub fn build_upsert(
404        config: &IcebergConfig,
405        table: Table,
406        unique_column_ids: Vec<usize>,
407        writer_param: &SinkWriterParam,
408    ) -> Result<Self> {
409        let SinkWriterParam {
410            extra_partition_col_idx,
411            actor_id,
412            sink_id,
413            sink_name,
414            ..
415        } = writer_param;
416        let metrics_labels = [
417            &actor_id.to_string(),
418            &sink_id.to_string(),
419            sink_name.as_str(),
420        ];
421        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
422
423        // Metrics
424        let write_qps = GLOBAL_SINK_METRICS
425            .iceberg_write_qps
426            .with_guarded_label_values(&metrics_labels);
427        let write_latency = GLOBAL_SINK_METRICS
428            .iceberg_write_latency
429            .with_guarded_label_values(&metrics_labels);
430        // # TODO
431        // Unused. Add this metrics later.
432        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
433            .iceberg_rolling_unflushed_data_file
434            .with_guarded_label_values(&metrics_labels);
435        let write_bytes = GLOBAL_SINK_METRICS
436            .iceberg_write_bytes
437            .with_guarded_label_values(&metrics_labels);
438
439        // Determine the schema id and partition spec id
440        let schema = table.metadata().current_schema();
441        let partition_spec = table.metadata().default_partition_spec();
442        let fanout_enabled = !partition_spec.fields().is_empty();
443        let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
444
445        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
446        let unique_uuid_suffix = Uuid::now_v7();
447
448        let parquet_writer_properties = WriterProperties::builder()
449            .set_compression(config.get_parquet_compression())
450            .set_max_row_group_bytes(config.write_parquet_max_row_group_bytes())
451            .set_created_by(PARQUET_CREATED_BY.to_owned())
452            .build();
453
454        let data_file_builder = {
455            let parquet_writer_builder =
456                ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
457            let rolling_writer_builder = RollingFileWriterBuilder::new(
458                parquet_writer_builder,
459                (config.target_file_size_mb() * 1024 * 1024) as usize,
460                table.file_io().clone(),
461                DefaultLocationGenerator::new(table.metadata().clone())
462                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
463                DefaultFileNameGenerator::new(
464                    writer_param.actor_id.to_string(),
465                    Some(unique_uuid_suffix.to_string()),
466                    iceberg::spec::DataFileFormat::Parquet,
467                ),
468            );
469            DataFileWriterBuilder::new(rolling_writer_builder)
470        };
471        let position_delete_builder = if use_deletion_vectors {
472            let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
473                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
474            PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
475                table.file_io().clone(),
476                location_generator,
477                DefaultFileNameGenerator::new(
478                    writer_param.actor_id.to_string(),
479                    Some(format!("delvec-{}", unique_uuid_suffix)),
480                    iceberg::spec::DataFileFormat::Puffin,
481                ),
482            ))
483        } else {
484            let parquet_writer_builder = ParquetWriterBuilder::new(
485                parquet_writer_properties.clone(),
486                POSITION_DELETE_SCHEMA.clone().into(),
487            );
488            let rolling_writer_builder = RollingFileWriterBuilder::new(
489                parquet_writer_builder,
490                (config.target_file_size_mb() * 1024 * 1024) as usize,
491                table.file_io().clone(),
492                DefaultLocationGenerator::new(table.metadata().clone())
493                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
494                DefaultFileNameGenerator::new(
495                    writer_param.actor_id.to_string(),
496                    Some(format!("pos-del-{}", unique_uuid_suffix)),
497                    iceberg::spec::DataFileFormat::Parquet,
498                ),
499            );
500            PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
501                rolling_writer_builder,
502            ))
503        };
504        let equality_delete_builder = {
505            let eq_del_config = EqualityDeleteWriterConfig::new(
506                unique_column_ids.clone(),
507                table.metadata().current_schema().clone(),
508            )
509            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
510            let parquet_writer_builder = ParquetWriterBuilder::new(
511                parquet_writer_properties,
512                Arc::new(
513                    arrow_schema_to_schema(eq_del_config.projected_arrow_schema_ref())
514                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
515                ),
516            );
517            let rolling_writer_builder = RollingFileWriterBuilder::new(
518                parquet_writer_builder,
519                (config.target_file_size_mb() * 1024 * 1024) as usize,
520                table.file_io().clone(),
521                DefaultLocationGenerator::new(table.metadata().clone())
522                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
523                DefaultFileNameGenerator::new(
524                    writer_param.actor_id.to_string(),
525                    Some(format!("eq-del-{}", unique_uuid_suffix)),
526                    iceberg::spec::DataFileFormat::Parquet,
527                ),
528            );
529
530            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, eq_del_config)
531        };
532        let delta_builder = DeltaWriterBuilder::new(
533            data_file_builder,
534            position_delete_builder,
535            equality_delete_builder,
536            unique_column_ids,
537            schema.clone(),
538        );
539        let original_arrow_schema = Arc::new(
540            schema_to_arrow_schema(table.metadata().current_schema())
541                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
542        );
543        let schema_with_extra_op_column = {
544            let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
545            new_fields.push(Arc::new(ArrowField::new(
546                "op".to_owned(),
547                ArrowDataType::Int32,
548                false,
549            )));
550            Arc::new(ArrowSchema::new(new_fields))
551        };
552        let writer_builder = TaskWriterBuilderWrapper::new(
553            MonitoredGeneralWriterBuilder::new(
554                delta_builder,
555                write_qps.clone(),
556                write_latency.clone(),
557            ),
558            fanout_enabled,
559            schema.clone(),
560            partition_spec.clone(),
561            true,
562        );
563        let inner_writer = Some(Box::new(
564            writer_builder
565                .build()
566                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
567        ) as Box<dyn IcebergWriter>);
568        Ok(Self {
569            arrow_schema: original_arrow_schema,
570            metrics: IcebergWriterMetrics {
571                _write_qps: write_qps,
572                _write_latency: write_latency,
573                write_bytes,
574            },
575            table,
576            writer: IcebergWriterDispatch::Upsert {
577                writer: inner_writer,
578                writer_builder,
579                arrow_schema_with_op_column: schema_with_extra_op_column,
580            },
581            project_idx_vec: {
582                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
583                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
584                } else {
585                    ProjectIdxVec::None
586                }
587            },
588        })
589    }
590
591    fn prepare_writer(&mut self) -> Result<()> {
592        match &mut self.writer {
593            IcebergWriterDispatch::Append {
594                writer,
595                writer_builder,
596            } => {
597                if writer.is_none() {
598                    *writer = Some(Box::new(
599                        writer_builder
600                            .build()
601                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
602                    ));
603                }
604            }
605            IcebergWriterDispatch::Upsert {
606                writer,
607                writer_builder,
608                ..
609            } => {
610                if writer.is_none() {
611                    *writer = Some(Box::new(
612                        writer_builder
613                            .build()
614                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
615                    ));
616                }
617            }
618        };
619        Ok(())
620    }
621
622    fn process_chunk(&mut self, chunk: StreamChunk) -> Result<Option<(RecordBatch, usize)>> {
623        let (mut chunk, ops) = chunk.compact_vis().into_parts();
624        match &mut self.project_idx_vec {
625            ProjectIdxVec::None => {}
626            ProjectIdxVec::Prepare(idx) => {
627                if *idx >= chunk.columns().len() {
628                    return Err(SinkError::Iceberg(anyhow!(
629                        "invalid extra partition column index {}",
630                        idx
631                    )));
632                }
633                let project_idx_vec = (0..*idx)
634                    .chain(*idx + 1..chunk.columns().len())
635                    .collect_vec();
636                chunk = chunk.project(&project_idx_vec);
637                self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
638            }
639            ProjectIdxVec::Done(idx_vec) => {
640                chunk = chunk.project(idx_vec);
641            }
642        }
643        if ops.is_empty() {
644            return Ok(None);
645        }
646        let write_batch_size = chunk.estimated_heap_size();
647        let batch = match &self.writer {
648            IcebergWriterDispatch::Append { .. } => {
649                // separate out insert chunk
650                let filters =
651                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
652                chunk.set_visibility(filters);
653                IcebergArrowConvert
654                    .to_record_batch(self.arrow_schema.clone(), &chunk.compact_vis())
655                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
656            }
657            IcebergWriterDispatch::Upsert {
658                arrow_schema_with_op_column,
659                ..
660            } => {
661                let chunk = IcebergArrowConvert
662                    .to_record_batch(self.arrow_schema.clone(), &chunk)
663                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
664                let ops = Arc::new(Int32Array::from(
665                    ops.iter()
666                        .map(|op| match op {
667                            Op::UpdateInsert | Op::Insert => INSERT_OP,
668                            Op::UpdateDelete | Op::Delete => DELETE_OP,
669                        })
670                        .collect_vec(),
671                ));
672                let mut columns = chunk.columns().to_vec();
673                columns.push(ops);
674                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
675                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
676            }
677        };
678        Ok(Some((batch, write_batch_size)))
679    }
680
681    pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
682        self.prepare_writer()?;
683        let Some((batch, write_batch_size)) = self.process_chunk(chunk)? else {
684            return Ok(());
685        };
686
687        let writer = self.writer.get_writer().unwrap();
688        writer
689            .write(batch)
690            .instrument_await("iceberg_write")
691            .await?;
692        self.metrics.write_bytes.inc_by(write_batch_size as _);
693        Ok(())
694    }
695
696    pub async fn write_batch_with_position(
697        &mut self,
698        chunk: StreamChunk,
699    ) -> Result<Vec<PositionDeleteInput>> {
700        debug_assert!(
701            matches!(self.writer, IcebergWriterDispatch::Append { .. }),
702            "write_batch_with_position should only be called for append-only writer"
703        );
704
705        self.prepare_writer()?;
706        let Some((batch, write_batch_size)) = self.process_chunk(chunk)? else {
707            return Ok(vec![]);
708        };
709
710        let writer = self.writer.get_writer().unwrap();
711        let positions = writer
712            .write_with_position(batch)
713            .instrument_await("iceberg_write")
714            .await?;
715        self.metrics.write_bytes.inc_by(write_batch_size as _);
716        Ok(positions)
717    }
718
719    pub async fn close(&mut self) -> Result<Option<Vec<DataFile>>> {
720        let res = match &mut self.writer {
721            IcebergWriterDispatch::Append {
722                writer,
723                writer_builder,
724            } => {
725                let close_result = match writer.take() {
726                    Some(mut writer) => {
727                        Some(writer.close().instrument_await("iceberg_close").await?)
728                    }
729                    _ => None,
730                };
731                match writer_builder.build() {
732                    Ok(new_writer) => {
733                        *writer = Some(Box::new(new_writer));
734                    }
735                    _ => {
736                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
737                        // here because current writer may close successfully. So we just log the error.
738                        tracing::warn!("Failed to build new writer after close");
739                    }
740                }
741                close_result
742            }
743            IcebergWriterDispatch::Upsert {
744                writer,
745                writer_builder,
746                ..
747            } => {
748                let close_result = match writer.take() {
749                    Some(mut writer) => {
750                        Some(writer.close().instrument_await("iceberg_close").await?)
751                    }
752                    _ => None,
753                };
754                match writer_builder.build() {
755                    Ok(new_writer) => {
756                        *writer = Some(Box::new(new_writer));
757                    }
758                    _ => {
759                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
760                        // here because current writer may close successfully. So we just log the error.
761                        tracing::warn!("Failed to build new writer after close");
762                    }
763                }
764                close_result
765            }
766        };
767        Ok(res)
768    }
769
770    pub fn generate_commit_metadata(&self, data_files: Vec<DataFile>) -> Result<SinkMetadata> {
771        let format_version = self.table.metadata().format_version();
772        let partition_type = self.table.metadata().default_partition_type();
773        let serialized_data_files = data_files
774            .into_iter()
775            .map(|f| {
776                // Truncate large column statistics BEFORE serialization
777                let truncated = truncate_datafile(f);
778                let res = SerializedDataFile::try_from(truncated, partition_type, format_version)?;
779                Ok(res)
780            })
781            .collect::<Result<Vec<_>>>()?;
782        SinkMetadata::try_from(&IcebergCommitResult {
783            data_files: serialized_data_files,
784            schema_id: self.table.metadata().current_schema_id(),
785            partition_spec_id: self.table.metadata().default_partition_spec_id(),
786        })
787    }
788
789    pub fn partition_type(&self) -> &iceberg::spec::StructType {
790        self.table.metadata().default_partition_type()
791    }
792}
793
794#[async_trait]
795impl SinkWriter for IcebergSinkWriter {
796    type CommitMetadata = Option<SinkMetadata>;
797
798    /// Begin a new epoch
799    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
800        let Self::Created(args) = self else {
801            return Ok(());
802        };
803
804        let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
805        let inner = match &args.unique_column_ids {
806            Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
807                &args.config,
808                table,
809                unique_column_ids.clone(),
810                &args.writer_param,
811            )?,
812            None => {
813                IcebergSinkWriterInner::build_append_only(&args.config, table, &args.writer_param)?
814            }
815        };
816
817        *self = IcebergSinkWriter::Initialized(inner);
818        Ok(())
819    }
820
821    /// Write a stream chunk to sink
822    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
823        let Self::Initialized(inner) = self else {
824            unreachable!("IcebergSinkWriter should be initialized before barrier");
825        };
826        inner.write_batch(chunk).await
827    }
828
829    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
830    /// writer should commit the current epoch.
831    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
832        let Self::Initialized(inner) = self else {
833            unreachable!("IcebergSinkWriter should be initialized before barrier");
834        };
835
836        // Skip it if not checkpoint
837        if !is_checkpoint {
838            return Ok(None);
839        }
840
841        let data_files = inner
842            .close()
843            .await?
844            .ok_or_else(|| anyhow!("No writer to close"))?;
845        let res = inner.generate_commit_metadata(data_files)?;
846        Ok(Some(res))
847    }
848}
849
850/// Maximum size for column statistics (min/max values) in bytes.
851/// Column statistics larger than this will be truncated to avoid metadata bloat.
852/// This is especially important for large fields like JSONB, TEXT, BINARY, etc.
853///
854/// Fix for large column statistics in `DataFile` metadata that can cause OOM errors.
855/// We truncate at the `DataFile` level (before serialization) by directly modifying
856/// the public `lower_bounds` and `upper_bounds` fields.
857///
858/// This prevents metadata from ballooning to gigabytes when dealing with large
859/// JSONB, TEXT, or BINARY fields, while still preserving statistics for small fields
860/// that benefit from query optimization.
861const MAX_COLUMN_STAT_SIZE: usize = 10240; // 10KB
862
863/// Truncate large column statistics from `DataFile` BEFORE serialization.
864///
865/// This function directly modifies `DataFile`'s `lower_bounds` and `upper_bounds`
866/// to remove entries that exceed `MAX_COLUMN_STAT_SIZE`.
867///
868/// # Arguments
869/// * `data_file` - A `DataFile` to process
870///
871/// # Returns
872/// The modified `DataFile` with large statistics truncated
873pub fn truncate_datafile(mut data_file: DataFile) -> DataFile {
874    // Process lower_bounds - remove entries with large values
875    data_file.lower_bounds.retain(|field_id, datum| {
876        // Use to_bytes() to get the actual binary size without JSON serialization overhead
877        let size = match datum.to_bytes() {
878            Ok(bytes) => bytes.len(),
879            Err(_) => 0,
880        };
881
882        if size > MAX_COLUMN_STAT_SIZE {
883            tracing::debug!(
884                field_id = field_id,
885                size = size,
886                "Truncating large lower_bound statistic"
887            );
888            return false;
889        }
890        true
891    });
892
893    // Process upper_bounds - remove entries with large values
894    data_file.upper_bounds.retain(|field_id, datum| {
895        // Use to_bytes() to get the actual binary size without JSON serialization overhead
896        let size = match datum.to_bytes() {
897            Ok(bytes) => bytes.len(),
898            Err(_) => 0,
899        };
900
901        if size > MAX_COLUMN_STAT_SIZE {
902            tracing::debug!(
903                field_id = field_id,
904                size = size,
905                "Truncating large upper_bound statistic"
906            );
907            return false;
908        }
909        true
910    });
911
912    data_file
913}