Skip to main content

risingwave_connector/sink/iceberg/
writer.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use await_tree::InstrumentAwait;
20use iceberg::arrow::{
21    RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
22};
23use iceberg::spec::{
24    DataFile, FormatVersion, PartitionSpecRef, SchemaRef as IcebergSchemaRef, SerializedDataFile,
25};
26use iceberg::table::Table;
27use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
28use iceberg::writer::base_writer::deletion_vector_writer::{
29    DeletionVectorWriter, DeletionVectorWriterBuilder,
30};
31use iceberg::writer::base_writer::equality_delete_writer::{
32    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
33};
34use iceberg::writer::base_writer::position_delete_file_writer::{
35    POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
36    PositionDeleteInput,
37};
38use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
39use iceberg::writer::file_writer::ParquetWriterBuilder;
40use iceberg::writer::file_writer::location_generator::{
41    DefaultFileNameGenerator, DefaultLocationGenerator,
42};
43use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
44use iceberg::writer::task_writer::TaskWriter;
45use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
46use itertools::Itertools;
47use parquet::file::properties::WriterProperties;
48use risingwave_common::array::arrow::IcebergArrowConvert;
49use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
50use risingwave_common::array::arrow::arrow_schema_iceberg::{
51    DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef,
52};
53use risingwave_common::array::{Op, StreamChunk};
54use risingwave_common::bitmap::Bitmap;
55use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
56use risingwave_common_estimate_size::EstimateSize;
57use risingwave_pb::connector_service::SinkMetadata;
58use uuid::Uuid;
59
60use super::prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
61use super::{
62    GLOBAL_SINK_METRICS, IcebergCommitResult, IcebergConfig, PARQUET_CREATED_BY, SinkError,
63    SinkWriterParam, create_and_validate_table_impl,
64};
65use crate::sink::writer::SinkWriter;
66use crate::sink::{Result, SinkParam};
67
68/// `None` means no projection is needed.
69/// `Prepare` stores the extra partition column index.
70/// `Done` stores the finalized projection indices.
71///
72/// `ProjectIdxVec` is initialized lazily. When we first encounter `Prepare`, we
73/// derive the projection indices from the current chunk schema and cache them in
74/// `Done`.
75enum ProjectIdxVec {
76    None,
77    Prepare(usize),
78    Done(Vec<usize>),
79}
80
81type DataFileWriterBuilderType =
82    DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
83type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
84    ParquetWriterBuilder,
85    DefaultLocationGenerator,
86    DefaultFileNameGenerator,
87>;
88type PositionDeleteFileWriterType = PositionDeleteFileWriter<
89    ParquetWriterBuilder,
90    DefaultLocationGenerator,
91    DefaultFileNameGenerator,
92>;
93type DeletionVectorWriterBuilderType =
94    DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
95type DeletionVectorWriterType =
96    DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
97type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
98    ParquetWriterBuilder,
99    DefaultLocationGenerator,
100    DefaultFileNameGenerator,
101>;
102
103#[derive(Clone)]
104enum PositionDeleteWriterBuilderType {
105    PositionDelete(PositionDeleteFileWriterBuilderType),
106    DeletionVector(DeletionVectorWriterBuilderType),
107}
108
109enum PositionDeleteWriterType {
110    PositionDelete(PositionDeleteFileWriterType),
111    DeletionVector(DeletionVectorWriterType),
112}
113
114#[async_trait]
115impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
116    type R = PositionDeleteWriterType;
117
118    async fn build(
119        &self,
120        partition_key: Option<iceberg::spec::PartitionKey>,
121    ) -> iceberg::Result<Self::R> {
122        match self {
123            PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
124                PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
125            ),
126            PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
127                PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
128            ),
129        }
130    }
131}
132
133#[async_trait]
134impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
135    async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
136        match self {
137            PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
138            PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
139        }
140    }
141
142    async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
143        match self {
144            PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
145            PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
146        }
147    }
148}
149
150#[derive(Clone)]
151struct SharedIcebergWriterBuilder<B>(Arc<B>);
152
153#[async_trait]
154impl<B: IcebergWriterBuilder> IcebergWriterBuilder for SharedIcebergWriterBuilder<B> {
155    type R = B::R;
156
157    async fn build(
158        &self,
159        partition_key: Option<iceberg::spec::PartitionKey>,
160    ) -> iceberg::Result<Self::R> {
161        self.0.build(partition_key).await
162    }
163}
164
165#[derive(Clone)]
166struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
167    inner: Arc<B>,
168    fanout_enabled: bool,
169    schema: IcebergSchemaRef,
170    partition_spec: PartitionSpecRef,
171    compute_partition: bool,
172}
173
174impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
175    fn new(
176        inner: B,
177        fanout_enabled: bool,
178        schema: IcebergSchemaRef,
179        partition_spec: PartitionSpecRef,
180        compute_partition: bool,
181    ) -> Self {
182        Self {
183            inner: Arc::new(inner),
184            fanout_enabled,
185            schema,
186            partition_spec,
187            compute_partition,
188        }
189    }
190
191    fn build(&self) -> iceberg::Result<TaskWriter<SharedIcebergWriterBuilder<B>>> {
192        let partition_splitter = match (
193            self.partition_spec.is_unpartitioned(),
194            self.compute_partition,
195        ) {
196            (true, _) => None,
197            (false, true) => Some(RecordBatchPartitionSplitter::try_new_with_computed_values(
198                self.schema.clone(),
199                self.partition_spec.clone(),
200            )?),
201            (false, false) => Some(
202                RecordBatchPartitionSplitter::try_new_with_precomputed_values(
203                    self.schema.clone(),
204                    self.partition_spec.clone(),
205                )?,
206            ),
207        };
208
209        Ok(TaskWriter::new_with_partition_splitter(
210            SharedIcebergWriterBuilder(self.inner.clone()),
211            self.fanout_enabled,
212            self.schema.clone(),
213            self.partition_spec.clone(),
214            partition_splitter,
215        ))
216    }
217}
218
219pub enum IcebergSinkWriter {
220    Created(IcebergSinkWriterArgs),
221    Initialized(IcebergSinkWriterInner),
222}
223
224pub struct IcebergSinkWriterArgs {
225    config: IcebergConfig,
226    sink_param: SinkParam,
227    writer_param: SinkWriterParam,
228    upsert_primary_key_column_names: Option<Vec<String>>,
229}
230
231pub struct IcebergSinkWriterInner {
232    writer: IcebergWriterDispatch,
233    arrow_schema: SchemaRef,
234    // See comments below
235    metrics: IcebergWriterMetrics,
236    // State of iceberg table for this writer
237    table: Table,
238    // For chunk with extra partition column, we should remove this column before write.
239    // This project index vec is used to avoid create project idx each time.
240    project_idx_vec: ProjectIdxVec,
241}
242
243enum IcebergWriterDispatch {
244    Append {
245        writer: Option<Box<dyn IcebergWriter>>,
246        writer_builder:
247            TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
248    },
249    Upsert {
250        writer: Option<Box<dyn IcebergWriter>>,
251        writer_builder: TaskWriterBuilderWrapper<
252            MonitoredGeneralWriterBuilder<
253                DeltaWriterBuilder<
254                    DataFileWriterBuilderType,
255                    PositionDeleteWriterBuilderType,
256                    EqualityDeleteFileWriterBuilderType,
257                >,
258            >,
259        >,
260        arrow_schema_with_op_column: SchemaRef,
261    },
262}
263
264impl IcebergWriterDispatch {
265    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
266        match self {
267            IcebergWriterDispatch::Append { writer, .. }
268            | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
269        }
270    }
271}
272
273pub struct IcebergWriterMetrics {
274    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
275    // They are actually used in `PrometheusWriterBuilder`:
276    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
277    // We keep them here to let the guard cleans the labels from metrics registry when dropped
278    _write_qps: LabelGuardedIntCounter,
279    _write_latency: LabelGuardedHistogram,
280    write_bytes: LabelGuardedIntCounter,
281}
282
283impl IcebergSinkWriter {
284    pub fn new(
285        config: IcebergConfig,
286        sink_param: SinkParam,
287        writer_param: SinkWriterParam,
288        upsert_primary_key_column_names: Option<Vec<String>>,
289    ) -> Self {
290        Self::Created(IcebergSinkWriterArgs {
291            config,
292            sink_param,
293            writer_param,
294            upsert_primary_key_column_names,
295        })
296    }
297}
298
299pub(super) fn resolve_equality_delete_field_ids(
300    primary_key_column_names: &[String],
301    iceberg_schema: &iceberg::spec::Schema,
302) -> Result<Vec<i32>> {
303    primary_key_column_names
304        .iter()
305        .map(|column_name| {
306            iceberg_schema.field_id_by_name(column_name).ok_or_else(|| {
307                SinkError::Config(anyhow!(
308                    "Primary key column {} not found in Iceberg schema",
309                    column_name
310                ))
311            })
312        })
313        .collect()
314}
315
316impl IcebergSinkWriterInner {
317    pub fn build_append_only(
318        config: &IcebergConfig,
319        table: Table,
320        writer_param: &SinkWriterParam,
321    ) -> Result<Self> {
322        let SinkWriterParam {
323            extra_partition_col_idx,
324            actor_id,
325            sink_id,
326            sink_name,
327            ..
328        } = writer_param;
329        let metrics_labels = [
330            &actor_id.to_string(),
331            &sink_id.to_string(),
332            sink_name.as_str(),
333        ];
334
335        // Metrics
336        let write_qps = GLOBAL_SINK_METRICS
337            .iceberg_write_qps
338            .with_guarded_label_values(&metrics_labels);
339        let write_latency = GLOBAL_SINK_METRICS
340            .iceberg_write_latency
341            .with_guarded_label_values(&metrics_labels);
342        // # TODO
343        // Unused. Add this metrics later.
344        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
345            .iceberg_rolling_unflushed_data_file
346            .with_guarded_label_values(&metrics_labels);
347        let write_bytes = GLOBAL_SINK_METRICS
348            .iceberg_write_bytes
349            .with_guarded_label_values(&metrics_labels);
350
351        let schema = table.metadata().current_schema();
352        let partition_spec = table.metadata().default_partition_spec();
353        let fanout_enabled = !partition_spec.fields().is_empty();
354        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
355        let unique_uuid_suffix = Uuid::now_v7();
356
357        let parquet_writer_properties = WriterProperties::builder()
358            .set_compression(config.get_parquet_compression())
359            .set_max_row_group_bytes(config.write_parquet_max_row_group_bytes())
360            .set_created_by(PARQUET_CREATED_BY.to_owned())
361            .build();
362
363        let parquet_writer_builder =
364            ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
365        let rolling_builder = RollingFileWriterBuilder::new(
366            parquet_writer_builder,
367            (config.target_file_size_mb() * 1024 * 1024) as usize,
368            table.file_io().clone(),
369            DefaultLocationGenerator::new(table.metadata().clone())
370                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
371            DefaultFileNameGenerator::new(
372                writer_param.actor_id.to_string(),
373                Some(unique_uuid_suffix.to_string()),
374                iceberg::spec::DataFileFormat::Parquet,
375            ),
376        );
377        let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
378        let monitored_builder = MonitoredGeneralWriterBuilder::new(
379            data_file_builder,
380            write_qps.clone(),
381            write_latency.clone(),
382        );
383        let writer_builder = TaskWriterBuilderWrapper::new(
384            monitored_builder,
385            fanout_enabled,
386            schema.clone(),
387            partition_spec.clone(),
388            true,
389        );
390        let inner_writer = Some(Box::new(
391            writer_builder
392                .build()
393                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
394        ) as Box<dyn IcebergWriter>);
395        Ok(Self {
396            arrow_schema: Arc::new(
397                schema_to_arrow_schema(table.metadata().current_schema())
398                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
399            ),
400            metrics: IcebergWriterMetrics {
401                _write_qps: write_qps,
402                _write_latency: write_latency,
403                write_bytes,
404            },
405            writer: IcebergWriterDispatch::Append {
406                writer: inner_writer,
407                writer_builder,
408            },
409            table,
410            project_idx_vec: {
411                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
412                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
413                } else {
414                    ProjectIdxVec::None
415                }
416            },
417        })
418    }
419
420    pub fn build_upsert(
421        config: &IcebergConfig,
422        table: Table,
423        primary_key_column_names: Vec<String>,
424        writer_param: &SinkWriterParam,
425    ) -> Result<Self> {
426        let SinkWriterParam {
427            extra_partition_col_idx,
428            actor_id,
429            sink_id,
430            sink_name,
431            ..
432        } = writer_param;
433        let metrics_labels = [
434            &actor_id.to_string(),
435            &sink_id.to_string(),
436            sink_name.as_str(),
437        ];
438
439        // Metrics
440        let write_qps = GLOBAL_SINK_METRICS
441            .iceberg_write_qps
442            .with_guarded_label_values(&metrics_labels);
443        let write_latency = GLOBAL_SINK_METRICS
444            .iceberg_write_latency
445            .with_guarded_label_values(&metrics_labels);
446        // # TODO
447        // Unused. Add this metrics later.
448        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
449            .iceberg_rolling_unflushed_data_file
450            .with_guarded_label_values(&metrics_labels);
451        let write_bytes = GLOBAL_SINK_METRICS
452            .iceberg_write_bytes
453            .with_guarded_label_values(&metrics_labels);
454
455        // Determine the schema id and partition spec id
456        let schema = table.metadata().current_schema();
457        let unique_column_ids =
458            resolve_equality_delete_field_ids(&primary_key_column_names, schema)?;
459        let partition_spec = table.metadata().default_partition_spec();
460        let fanout_enabled = !partition_spec.fields().is_empty();
461        let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
462
463        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
464        let unique_uuid_suffix = Uuid::now_v7();
465
466        let parquet_writer_properties = WriterProperties::builder()
467            .set_compression(config.get_parquet_compression())
468            .set_max_row_group_bytes(config.write_parquet_max_row_group_bytes())
469            .set_created_by(PARQUET_CREATED_BY.to_owned())
470            .build();
471
472        let data_file_builder = {
473            let parquet_writer_builder =
474                ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
475            let rolling_writer_builder = RollingFileWriterBuilder::new(
476                parquet_writer_builder,
477                (config.target_file_size_mb() * 1024 * 1024) as usize,
478                table.file_io().clone(),
479                DefaultLocationGenerator::new(table.metadata().clone())
480                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
481                DefaultFileNameGenerator::new(
482                    writer_param.actor_id.to_string(),
483                    Some(unique_uuid_suffix.to_string()),
484                    iceberg::spec::DataFileFormat::Parquet,
485                ),
486            );
487            DataFileWriterBuilder::new(rolling_writer_builder)
488        };
489        let position_delete_builder = if use_deletion_vectors {
490            let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
491                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
492            PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
493                table.file_io().clone(),
494                location_generator,
495                DefaultFileNameGenerator::new(
496                    writer_param.actor_id.to_string(),
497                    Some(format!("delvec-{}", unique_uuid_suffix)),
498                    iceberg::spec::DataFileFormat::Puffin,
499                ),
500            ))
501        } else {
502            let parquet_writer_builder = ParquetWriterBuilder::new(
503                parquet_writer_properties.clone(),
504                POSITION_DELETE_SCHEMA.clone().into(),
505            );
506            let rolling_writer_builder = RollingFileWriterBuilder::new(
507                parquet_writer_builder,
508                (config.target_file_size_mb() * 1024 * 1024) as usize,
509                table.file_io().clone(),
510                DefaultLocationGenerator::new(table.metadata().clone())
511                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
512                DefaultFileNameGenerator::new(
513                    writer_param.actor_id.to_string(),
514                    Some(format!("pos-del-{}", unique_uuid_suffix)),
515                    iceberg::spec::DataFileFormat::Parquet,
516                ),
517            );
518            PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
519                rolling_writer_builder,
520            ))
521        };
522        let equality_delete_builder = {
523            let eq_del_config = EqualityDeleteWriterConfig::new(
524                unique_column_ids.clone(),
525                table.metadata().current_schema().clone(),
526            )
527            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
528            let parquet_writer_builder = ParquetWriterBuilder::new(
529                parquet_writer_properties,
530                Arc::new(
531                    arrow_schema_to_schema(eq_del_config.projected_arrow_schema_ref())
532                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
533                ),
534            );
535            let rolling_writer_builder = RollingFileWriterBuilder::new(
536                parquet_writer_builder,
537                (config.target_file_size_mb() * 1024 * 1024) as usize,
538                table.file_io().clone(),
539                DefaultLocationGenerator::new(table.metadata().clone())
540                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
541                DefaultFileNameGenerator::new(
542                    writer_param.actor_id.to_string(),
543                    Some(format!("eq-del-{}", unique_uuid_suffix)),
544                    iceberg::spec::DataFileFormat::Parquet,
545                ),
546            );
547
548            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, eq_del_config)
549        };
550        let delta_builder = DeltaWriterBuilder::new(
551            data_file_builder,
552            position_delete_builder,
553            equality_delete_builder,
554            unique_column_ids,
555            schema.clone(),
556        );
557        let original_arrow_schema = Arc::new(
558            schema_to_arrow_schema(table.metadata().current_schema())
559                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
560        );
561        let schema_with_extra_op_column = {
562            let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
563            new_fields.push(Arc::new(ArrowField::new(
564                "op".to_owned(),
565                ArrowDataType::Int32,
566                false,
567            )));
568            Arc::new(ArrowSchema::new(new_fields))
569        };
570        let writer_builder = TaskWriterBuilderWrapper::new(
571            MonitoredGeneralWriterBuilder::new(
572                delta_builder,
573                write_qps.clone(),
574                write_latency.clone(),
575            ),
576            fanout_enabled,
577            schema.clone(),
578            partition_spec.clone(),
579            true,
580        );
581        let inner_writer = Some(Box::new(
582            writer_builder
583                .build()
584                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
585        ) as Box<dyn IcebergWriter>);
586        Ok(Self {
587            arrow_schema: original_arrow_schema,
588            metrics: IcebergWriterMetrics {
589                _write_qps: write_qps,
590                _write_latency: write_latency,
591                write_bytes,
592            },
593            table,
594            writer: IcebergWriterDispatch::Upsert {
595                writer: inner_writer,
596                writer_builder,
597                arrow_schema_with_op_column: schema_with_extra_op_column,
598            },
599            project_idx_vec: {
600                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
601                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
602                } else {
603                    ProjectIdxVec::None
604                }
605            },
606        })
607    }
608
609    fn prepare_writer(&mut self) -> Result<()> {
610        match &mut self.writer {
611            IcebergWriterDispatch::Append {
612                writer,
613                writer_builder,
614            } => {
615                if writer.is_none() {
616                    *writer = Some(Box::new(
617                        writer_builder
618                            .build()
619                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
620                    ));
621                }
622            }
623            IcebergWriterDispatch::Upsert {
624                writer,
625                writer_builder,
626                ..
627            } => {
628                if writer.is_none() {
629                    *writer = Some(Box::new(
630                        writer_builder
631                            .build()
632                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
633                    ));
634                }
635            }
636        };
637        Ok(())
638    }
639
640    fn process_chunk(&mut self, chunk: StreamChunk) -> Result<Option<(RecordBatch, usize)>> {
641        let (mut chunk, ops) = chunk.compact_vis().into_parts();
642        match &mut self.project_idx_vec {
643            ProjectIdxVec::None => {}
644            ProjectIdxVec::Prepare(idx) => {
645                if *idx >= chunk.columns().len() {
646                    return Err(SinkError::Iceberg(anyhow!(
647                        "invalid extra partition column index {}",
648                        idx
649                    )));
650                }
651                let project_idx_vec = (0..*idx)
652                    .chain(*idx + 1..chunk.columns().len())
653                    .collect_vec();
654                chunk = chunk.project(&project_idx_vec);
655                self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
656            }
657            ProjectIdxVec::Done(idx_vec) => {
658                chunk = chunk.project(idx_vec);
659            }
660        }
661        if ops.is_empty() {
662            return Ok(None);
663        }
664        let write_batch_size = chunk.estimated_heap_size();
665        let batch = match &self.writer {
666            IcebergWriterDispatch::Append { .. } => {
667                // separate out insert chunk
668                let filters =
669                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
670                chunk.set_visibility(filters);
671                IcebergArrowConvert
672                    .to_record_batch(self.arrow_schema.clone(), &chunk.compact_vis())
673                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
674            }
675            IcebergWriterDispatch::Upsert {
676                arrow_schema_with_op_column,
677                ..
678            } => {
679                let chunk = IcebergArrowConvert
680                    .to_record_batch(self.arrow_schema.clone(), &chunk)
681                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
682                let ops = Arc::new(Int32Array::from(
683                    ops.iter()
684                        .map(|op| match op {
685                            Op::UpdateInsert | Op::Insert => INSERT_OP,
686                            Op::UpdateDelete | Op::Delete => DELETE_OP,
687                        })
688                        .collect_vec(),
689                ));
690                let mut columns = chunk.columns().to_vec();
691                columns.push(ops);
692                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
693                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
694            }
695        };
696        Ok(Some((batch, write_batch_size)))
697    }
698
699    pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
700        self.prepare_writer()?;
701        let Some((batch, write_batch_size)) = self.process_chunk(chunk)? else {
702            return Ok(());
703        };
704
705        let writer = self.writer.get_writer().unwrap();
706        writer
707            .write(batch)
708            .instrument_await("iceberg_write")
709            .await?;
710        self.metrics.write_bytes.inc_by(write_batch_size as _);
711        Ok(())
712    }
713
714    pub async fn write_batch_with_position(
715        &mut self,
716        chunk: StreamChunk,
717    ) -> Result<Vec<PositionDeleteInput>> {
718        debug_assert!(
719            matches!(self.writer, IcebergWriterDispatch::Append { .. }),
720            "write_batch_with_position should only be called for append-only writer"
721        );
722
723        self.prepare_writer()?;
724        let Some((batch, write_batch_size)) = self.process_chunk(chunk)? else {
725            return Ok(vec![]);
726        };
727
728        let writer = self.writer.get_writer().unwrap();
729        let positions = writer
730            .write_with_position(batch)
731            .instrument_await("iceberg_write")
732            .await?;
733        self.metrics.write_bytes.inc_by(write_batch_size as _);
734        Ok(positions)
735    }
736
737    pub async fn close(&mut self) -> Result<Option<Vec<DataFile>>> {
738        let res = match &mut self.writer {
739            IcebergWriterDispatch::Append {
740                writer,
741                writer_builder,
742            } => {
743                let close_result = match writer.take() {
744                    Some(mut writer) => {
745                        Some(writer.close().instrument_await("iceberg_close").await?)
746                    }
747                    _ => None,
748                };
749                match writer_builder.build() {
750                    Ok(new_writer) => {
751                        *writer = Some(Box::new(new_writer));
752                    }
753                    _ => {
754                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
755                        // here because current writer may close successfully. So we just log the error.
756                        tracing::warn!("Failed to build new writer after close");
757                    }
758                }
759                close_result
760            }
761            IcebergWriterDispatch::Upsert {
762                writer,
763                writer_builder,
764                ..
765            } => {
766                let close_result = match writer.take() {
767                    Some(mut writer) => {
768                        Some(writer.close().instrument_await("iceberg_close").await?)
769                    }
770                    _ => None,
771                };
772                match writer_builder.build() {
773                    Ok(new_writer) => {
774                        *writer = Some(Box::new(new_writer));
775                    }
776                    _ => {
777                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
778                        // here because current writer may close successfully. So we just log the error.
779                        tracing::warn!("Failed to build new writer after close");
780                    }
781                }
782                close_result
783            }
784        };
785        Ok(res)
786    }
787
788    pub fn generate_commit_metadata(&self, data_files: Vec<DataFile>) -> Result<SinkMetadata> {
789        let format_version = self.table.metadata().format_version();
790        let partition_type = self.table.metadata().default_partition_type();
791        let serialized_data_files = data_files
792            .into_iter()
793            .map(|f| {
794                // Truncate large column statistics BEFORE serialization
795                let truncated = truncate_datafile(f);
796                let res = SerializedDataFile::try_from(truncated, partition_type, format_version)?;
797                Ok(res)
798            })
799            .collect::<Result<Vec<_>>>()?;
800        SinkMetadata::try_from(&IcebergCommitResult {
801            data_files: serialized_data_files,
802            schema_id: self.table.metadata().current_schema_id(),
803            partition_spec_id: self.table.metadata().default_partition_spec_id(),
804        })
805    }
806
807    pub fn partition_type(&self) -> &iceberg::spec::StructType {
808        self.table.metadata().default_partition_type()
809    }
810}
811
812#[async_trait]
813impl SinkWriter for IcebergSinkWriter {
814    type CommitMetadata = Option<SinkMetadata>;
815
816    /// Begin a new epoch
817    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
818        let Self::Created(args) = self else {
819            return Ok(());
820        };
821
822        let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
823        let inner = match &args.upsert_primary_key_column_names {
824            Some(primary_key_column_names) => IcebergSinkWriterInner::build_upsert(
825                &args.config,
826                table,
827                primary_key_column_names.clone(),
828                &args.writer_param,
829            )?,
830            None => {
831                IcebergSinkWriterInner::build_append_only(&args.config, table, &args.writer_param)?
832            }
833        };
834
835        *self = IcebergSinkWriter::Initialized(inner);
836        Ok(())
837    }
838
839    /// Write a stream chunk to sink
840    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
841        let Self::Initialized(inner) = self else {
842            unreachable!("IcebergSinkWriter should be initialized before barrier");
843        };
844        inner.write_batch(chunk).await
845    }
846
847    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
848    /// writer should commit the current epoch.
849    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
850        let Self::Initialized(inner) = self else {
851            unreachable!("IcebergSinkWriter should be initialized before barrier");
852        };
853
854        // Skip it if not checkpoint
855        if !is_checkpoint {
856            return Ok(None);
857        }
858
859        let data_files = inner
860            .close()
861            .await?
862            .ok_or_else(|| anyhow!("No writer to close"))?;
863        let res = inner.generate_commit_metadata(data_files)?;
864        Ok(Some(res))
865    }
866}
867
868/// Maximum size for column statistics (min/max values) in bytes.
869/// Column statistics larger than this will be truncated to avoid metadata bloat.
870/// This is especially important for large fields like JSONB, TEXT, BINARY, etc.
871///
872/// Fix for large column statistics in `DataFile` metadata that can cause OOM errors.
873/// We truncate at the `DataFile` level (before serialization) by directly modifying
874/// the public `lower_bounds` and `upper_bounds` fields.
875///
876/// This prevents metadata from ballooning to gigabytes when dealing with large
877/// JSONB, TEXT, or BINARY fields, while still preserving statistics for small fields
878/// that benefit from query optimization.
879const MAX_COLUMN_STAT_SIZE: usize = 10240; // 10KB
880
881/// Truncate large column statistics from `DataFile` BEFORE serialization.
882///
883/// This function directly modifies `DataFile`'s `lower_bounds` and `upper_bounds`
884/// to remove entries that exceed `MAX_COLUMN_STAT_SIZE`.
885///
886/// # Arguments
887/// * `data_file` - A `DataFile` to process
888///
889/// # Returns
890/// The modified `DataFile` with large statistics truncated
891pub fn truncate_datafile(mut data_file: DataFile) -> DataFile {
892    // Process lower_bounds - remove entries with large values
893    data_file.lower_bounds.retain(|field_id, datum| {
894        // Use to_bytes() to get the actual binary size without JSON serialization overhead
895        let size = match datum.to_bytes() {
896            Ok(bytes) => bytes.len(),
897            Err(_) => 0,
898        };
899
900        if size > MAX_COLUMN_STAT_SIZE {
901            tracing::debug!(
902                field_id = field_id,
903                size = size,
904                "Truncating large lower_bound statistic"
905            );
906            return false;
907        }
908        true
909    });
910
911    // Process upper_bounds - remove entries with large values
912    data_file.upper_bounds.retain(|field_id, datum| {
913        // Use to_bytes() to get the actual binary size without JSON serialization overhead
914        let size = match datum.to_bytes() {
915            Ok(bytes) => bytes.len(),
916            Err(_) => 0,
917        };
918
919        if size > MAX_COLUMN_STAT_SIZE {
920            tracing::debug!(
921                field_id = field_id,
922                size = size,
923                "Truncating large upper_bound statistic"
924            );
925            return false;
926        }
927        true
928    });
929
930    data_file
931}