risingwave_connector/sink/iceberg/
writer.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use await_tree::InstrumentAwait;
20use iceberg::arrow::{
21    RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
22};
23use iceberg::spec::{
24    DataFile, FormatVersion, PartitionSpecRef, SchemaRef as IcebergSchemaRef, SerializedDataFile,
25};
26use iceberg::table::Table;
27use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
28use iceberg::writer::base_writer::deletion_vector_writer::{
29    DeletionVectorWriter, DeletionVectorWriterBuilder,
30};
31use iceberg::writer::base_writer::equality_delete_writer::{
32    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
33};
34use iceberg::writer::base_writer::position_delete_file_writer::{
35    POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
36    PositionDeleteInput,
37};
38use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
39use iceberg::writer::file_writer::ParquetWriterBuilder;
40use iceberg::writer::file_writer::location_generator::{
41    DefaultFileNameGenerator, DefaultLocationGenerator,
42};
43use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
44use iceberg::writer::task_writer::TaskWriter;
45use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
46use itertools::Itertools;
47use parquet::file::properties::WriterProperties;
48use risingwave_common::array::arrow::IcebergArrowConvert;
49use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
50use risingwave_common::array::arrow::arrow_schema_iceberg::{
51    DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef,
52};
53use risingwave_common::array::{Op, StreamChunk};
54use risingwave_common::bitmap::Bitmap;
55use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
56use risingwave_common_estimate_size::EstimateSize;
57use risingwave_pb::connector_service::SinkMetadata;
58use uuid::Uuid;
59
60use super::prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
61use super::{
62    GLOBAL_SINK_METRICS, IcebergCommitResult, IcebergConfig, PARQUET_CREATED_BY, SinkError,
63    SinkWriterParam, create_and_validate_table_impl,
64};
65use crate::sink::writer::SinkWriter;
66use crate::sink::{Result, SinkParam};
67
68/// None means no project.
69/// Prepare represent the extra partition column idx.
70/// Done represents the project idx vec.
71///
72/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
73/// to create the project idx vec.
74enum ProjectIdxVec {
75    None,
76    Prepare(usize),
77    Done(Vec<usize>),
78}
79
80type DataFileWriterBuilderType =
81    DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
82type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
83    ParquetWriterBuilder,
84    DefaultLocationGenerator,
85    DefaultFileNameGenerator,
86>;
87type PositionDeleteFileWriterType = PositionDeleteFileWriter<
88    ParquetWriterBuilder,
89    DefaultLocationGenerator,
90    DefaultFileNameGenerator,
91>;
92type DeletionVectorWriterBuilderType =
93    DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
94type DeletionVectorWriterType =
95    DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
96type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
97    ParquetWriterBuilder,
98    DefaultLocationGenerator,
99    DefaultFileNameGenerator,
100>;
101
102#[derive(Clone)]
103enum PositionDeleteWriterBuilderType {
104    PositionDelete(PositionDeleteFileWriterBuilderType),
105    DeletionVector(DeletionVectorWriterBuilderType),
106}
107
108enum PositionDeleteWriterType {
109    PositionDelete(PositionDeleteFileWriterType),
110    DeletionVector(DeletionVectorWriterType),
111}
112
113#[async_trait]
114impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
115    type R = PositionDeleteWriterType;
116
117    async fn build(
118        &self,
119        partition_key: Option<iceberg::spec::PartitionKey>,
120    ) -> iceberg::Result<Self::R> {
121        match self {
122            PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
123                PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
124            ),
125            PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
126                PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
127            ),
128        }
129    }
130}
131
132#[async_trait]
133impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
134    async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
135        match self {
136            PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
137            PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
138        }
139    }
140
141    async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
142        match self {
143            PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
144            PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
145        }
146    }
147}
148
149#[derive(Clone)]
150struct SharedIcebergWriterBuilder<B>(Arc<B>);
151
152#[async_trait]
153impl<B: IcebergWriterBuilder> IcebergWriterBuilder for SharedIcebergWriterBuilder<B> {
154    type R = B::R;
155
156    async fn build(
157        &self,
158        partition_key: Option<iceberg::spec::PartitionKey>,
159    ) -> iceberg::Result<Self::R> {
160        self.0.build(partition_key).await
161    }
162}
163
164#[derive(Clone)]
165struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
166    inner: Arc<B>,
167    fanout_enabled: bool,
168    schema: IcebergSchemaRef,
169    partition_spec: PartitionSpecRef,
170    compute_partition: bool,
171}
172
173impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
174    fn new(
175        inner: B,
176        fanout_enabled: bool,
177        schema: IcebergSchemaRef,
178        partition_spec: PartitionSpecRef,
179        compute_partition: bool,
180    ) -> Self {
181        Self {
182            inner: Arc::new(inner),
183            fanout_enabled,
184            schema,
185            partition_spec,
186            compute_partition,
187        }
188    }
189
190    fn build(&self) -> iceberg::Result<TaskWriter<SharedIcebergWriterBuilder<B>>> {
191        let partition_splitter = match (
192            self.partition_spec.is_unpartitioned(),
193            self.compute_partition,
194        ) {
195            (true, _) => None,
196            (false, true) => Some(RecordBatchPartitionSplitter::try_new_with_computed_values(
197                self.schema.clone(),
198                self.partition_spec.clone(),
199            )?),
200            (false, false) => Some(
201                RecordBatchPartitionSplitter::try_new_with_precomputed_values(
202                    self.schema.clone(),
203                    self.partition_spec.clone(),
204                )?,
205            ),
206        };
207
208        Ok(TaskWriter::new_with_partition_splitter(
209            SharedIcebergWriterBuilder(self.inner.clone()),
210            self.fanout_enabled,
211            self.schema.clone(),
212            self.partition_spec.clone(),
213            partition_splitter,
214        ))
215    }
216}
217
218pub enum IcebergSinkWriter {
219    Created(IcebergSinkWriterArgs),
220    Initialized(IcebergSinkWriterInner),
221}
222
223pub struct IcebergSinkWriterArgs {
224    config: IcebergConfig,
225    sink_param: SinkParam,
226    writer_param: SinkWriterParam,
227    unique_column_ids: Option<Vec<usize>>,
228}
229
230pub struct IcebergSinkWriterInner {
231    writer: IcebergWriterDispatch,
232    arrow_schema: SchemaRef,
233    // See comments below
234    metrics: IcebergWriterMetrics,
235    // State of iceberg table for this writer
236    table: Table,
237    // For chunk with extra partition column, we should remove this column before write.
238    // This project index vec is used to avoid create project idx each time.
239    project_idx_vec: ProjectIdxVec,
240    commit_checkpoint_size_threshold_bytes: Option<u64>,
241    uncommitted_write_bytes: u64,
242}
243
244#[allow(clippy::type_complexity)]
245enum IcebergWriterDispatch {
246    Append {
247        writer: Option<Box<dyn IcebergWriter>>,
248        writer_builder:
249            TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
250    },
251    Upsert {
252        writer: Option<Box<dyn IcebergWriter>>,
253        writer_builder: TaskWriterBuilderWrapper<
254            MonitoredGeneralWriterBuilder<
255                DeltaWriterBuilder<
256                    DataFileWriterBuilderType,
257                    PositionDeleteWriterBuilderType,
258                    EqualityDeleteFileWriterBuilderType,
259                >,
260            >,
261        >,
262        arrow_schema_with_op_column: SchemaRef,
263    },
264}
265
266impl IcebergWriterDispatch {
267    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
268        match self {
269            IcebergWriterDispatch::Append { writer, .. }
270            | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
271        }
272    }
273}
274
275pub struct IcebergWriterMetrics {
276    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
277    // They are actually used in `PrometheusWriterBuilder`:
278    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
279    // We keep them here to let the guard cleans the labels from metrics registry when dropped
280    _write_qps: LabelGuardedIntCounter,
281    _write_latency: LabelGuardedHistogram,
282    write_bytes: LabelGuardedIntCounter,
283}
284
285impl IcebergSinkWriter {
286    pub fn new(
287        config: IcebergConfig,
288        sink_param: SinkParam,
289        writer_param: SinkWriterParam,
290        unique_column_ids: Option<Vec<usize>>,
291    ) -> Self {
292        Self::Created(IcebergSinkWriterArgs {
293            config,
294            sink_param,
295            writer_param,
296            unique_column_ids,
297        })
298    }
299}
300
301impl IcebergSinkWriterInner {
302    fn should_commit_on_checkpoint(&self) -> bool {
303        self.commit_checkpoint_size_threshold_bytes
304            .is_some_and(|threshold| {
305                self.uncommitted_write_bytes > 0 && self.uncommitted_write_bytes >= threshold
306            })
307    }
308
309    fn build_append_only(
310        config: &IcebergConfig,
311        table: Table,
312        writer_param: &SinkWriterParam,
313    ) -> Result<Self> {
314        let SinkWriterParam {
315            extra_partition_col_idx,
316            actor_id,
317            sink_id,
318            sink_name,
319            ..
320        } = writer_param;
321        let metrics_labels = [
322            &actor_id.to_string(),
323            &sink_id.to_string(),
324            sink_name.as_str(),
325        ];
326
327        // Metrics
328        let write_qps = GLOBAL_SINK_METRICS
329            .iceberg_write_qps
330            .with_guarded_label_values(&metrics_labels);
331        let write_latency = GLOBAL_SINK_METRICS
332            .iceberg_write_latency
333            .with_guarded_label_values(&metrics_labels);
334        // # TODO
335        // Unused. Add this metrics later.
336        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
337            .iceberg_rolling_unflushed_data_file
338            .with_guarded_label_values(&metrics_labels);
339        let write_bytes = GLOBAL_SINK_METRICS
340            .iceberg_write_bytes
341            .with_guarded_label_values(&metrics_labels);
342
343        let schema = table.metadata().current_schema();
344        let partition_spec = table.metadata().default_partition_spec();
345        let fanout_enabled = !partition_spec.fields().is_empty();
346        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
347        let unique_uuid_suffix = Uuid::now_v7();
348
349        let parquet_writer_properties = WriterProperties::builder()
350            .set_compression(config.get_parquet_compression())
351            .set_max_row_group_size(config.write_parquet_max_row_group_rows())
352            .set_created_by(PARQUET_CREATED_BY.to_owned())
353            .build();
354
355        let parquet_writer_builder =
356            ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
357        let rolling_builder = RollingFileWriterBuilder::new(
358            parquet_writer_builder,
359            (config.target_file_size_mb() * 1024 * 1024) as usize,
360            table.file_io().clone(),
361            DefaultLocationGenerator::new(table.metadata().clone())
362                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
363            DefaultFileNameGenerator::new(
364                writer_param.actor_id.to_string(),
365                Some(unique_uuid_suffix.to_string()),
366                iceberg::spec::DataFileFormat::Parquet,
367            ),
368        );
369        let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
370        let monitored_builder = MonitoredGeneralWriterBuilder::new(
371            data_file_builder,
372            write_qps.clone(),
373            write_latency.clone(),
374        );
375        let writer_builder = TaskWriterBuilderWrapper::new(
376            monitored_builder,
377            fanout_enabled,
378            schema.clone(),
379            partition_spec.clone(),
380            true,
381        );
382        let inner_writer = Some(Box::new(
383            writer_builder
384                .build()
385                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
386        ) as Box<dyn IcebergWriter>);
387        Ok(Self {
388            arrow_schema: Arc::new(
389                schema_to_arrow_schema(table.metadata().current_schema())
390                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
391            ),
392            metrics: IcebergWriterMetrics {
393                _write_qps: write_qps,
394                _write_latency: write_latency,
395                write_bytes,
396            },
397            writer: IcebergWriterDispatch::Append {
398                writer: inner_writer,
399                writer_builder,
400            },
401            table,
402            project_idx_vec: {
403                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
404                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
405                } else {
406                    ProjectIdxVec::None
407                }
408            },
409            commit_checkpoint_size_threshold_bytes: config.commit_checkpoint_size_threshold_bytes(),
410            uncommitted_write_bytes: 0,
411        })
412    }
413
414    fn build_upsert(
415        config: &IcebergConfig,
416        table: Table,
417        unique_column_ids: Vec<usize>,
418        writer_param: &SinkWriterParam,
419    ) -> Result<Self> {
420        let SinkWriterParam {
421            extra_partition_col_idx,
422            actor_id,
423            sink_id,
424            sink_name,
425            ..
426        } = writer_param;
427        let metrics_labels = [
428            &actor_id.to_string(),
429            &sink_id.to_string(),
430            sink_name.as_str(),
431        ];
432        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
433
434        // Metrics
435        let write_qps = GLOBAL_SINK_METRICS
436            .iceberg_write_qps
437            .with_guarded_label_values(&metrics_labels);
438        let write_latency = GLOBAL_SINK_METRICS
439            .iceberg_write_latency
440            .with_guarded_label_values(&metrics_labels);
441        // # TODO
442        // Unused. Add this metrics later.
443        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
444            .iceberg_rolling_unflushed_data_file
445            .with_guarded_label_values(&metrics_labels);
446        let write_bytes = GLOBAL_SINK_METRICS
447            .iceberg_write_bytes
448            .with_guarded_label_values(&metrics_labels);
449
450        // Determine the schema id and partition spec id
451        let schema = table.metadata().current_schema();
452        let partition_spec = table.metadata().default_partition_spec();
453        let fanout_enabled = !partition_spec.fields().is_empty();
454        let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
455
456        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
457        let unique_uuid_suffix = Uuid::now_v7();
458
459        let parquet_writer_properties = WriterProperties::builder()
460            .set_compression(config.get_parquet_compression())
461            .set_max_row_group_size(config.write_parquet_max_row_group_rows())
462            .set_created_by(PARQUET_CREATED_BY.to_owned())
463            .build();
464
465        let data_file_builder = {
466            let parquet_writer_builder =
467                ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
468            let rolling_writer_builder = RollingFileWriterBuilder::new(
469                parquet_writer_builder,
470                (config.target_file_size_mb() * 1024 * 1024) as usize,
471                table.file_io().clone(),
472                DefaultLocationGenerator::new(table.metadata().clone())
473                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
474                DefaultFileNameGenerator::new(
475                    writer_param.actor_id.to_string(),
476                    Some(unique_uuid_suffix.to_string()),
477                    iceberg::spec::DataFileFormat::Parquet,
478                ),
479            );
480            DataFileWriterBuilder::new(rolling_writer_builder)
481        };
482        let position_delete_builder = if use_deletion_vectors {
483            let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
484                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
485            PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
486                table.file_io().clone(),
487                location_generator,
488                DefaultFileNameGenerator::new(
489                    writer_param.actor_id.to_string(),
490                    Some(format!("delvec-{}", unique_uuid_suffix)),
491                    iceberg::spec::DataFileFormat::Puffin,
492                ),
493            ))
494        } else {
495            let parquet_writer_builder = ParquetWriterBuilder::new(
496                parquet_writer_properties.clone(),
497                POSITION_DELETE_SCHEMA.clone().into(),
498            );
499            let rolling_writer_builder = RollingFileWriterBuilder::new(
500                parquet_writer_builder,
501                (config.target_file_size_mb() * 1024 * 1024) as usize,
502                table.file_io().clone(),
503                DefaultLocationGenerator::new(table.metadata().clone())
504                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
505                DefaultFileNameGenerator::new(
506                    writer_param.actor_id.to_string(),
507                    Some(format!("pos-del-{}", unique_uuid_suffix)),
508                    iceberg::spec::DataFileFormat::Parquet,
509                ),
510            );
511            PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
512                rolling_writer_builder,
513            ))
514        };
515        let equality_delete_builder = {
516            let eq_del_config = EqualityDeleteWriterConfig::new(
517                unique_column_ids.clone(),
518                table.metadata().current_schema().clone(),
519            )
520            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
521            let parquet_writer_builder = ParquetWriterBuilder::new(
522                parquet_writer_properties,
523                Arc::new(
524                    arrow_schema_to_schema(eq_del_config.projected_arrow_schema_ref())
525                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
526                ),
527            );
528            let rolling_writer_builder = RollingFileWriterBuilder::new(
529                parquet_writer_builder,
530                (config.target_file_size_mb() * 1024 * 1024) as usize,
531                table.file_io().clone(),
532                DefaultLocationGenerator::new(table.metadata().clone())
533                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
534                DefaultFileNameGenerator::new(
535                    writer_param.actor_id.to_string(),
536                    Some(format!("eq-del-{}", unique_uuid_suffix)),
537                    iceberg::spec::DataFileFormat::Parquet,
538                ),
539            );
540
541            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, eq_del_config)
542        };
543        let delta_builder = DeltaWriterBuilder::new(
544            data_file_builder,
545            position_delete_builder,
546            equality_delete_builder,
547            unique_column_ids,
548            schema.clone(),
549        );
550        let original_arrow_schema = Arc::new(
551            schema_to_arrow_schema(table.metadata().current_schema())
552                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
553        );
554        let schema_with_extra_op_column = {
555            let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
556            new_fields.push(Arc::new(ArrowField::new(
557                "op".to_owned(),
558                ArrowDataType::Int32,
559                false,
560            )));
561            Arc::new(ArrowSchema::new(new_fields))
562        };
563        let writer_builder = TaskWriterBuilderWrapper::new(
564            MonitoredGeneralWriterBuilder::new(
565                delta_builder,
566                write_qps.clone(),
567                write_latency.clone(),
568            ),
569            fanout_enabled,
570            schema.clone(),
571            partition_spec.clone(),
572            true,
573        );
574        let inner_writer = Some(Box::new(
575            writer_builder
576                .build()
577                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
578        ) as Box<dyn IcebergWriter>);
579        Ok(Self {
580            arrow_schema: original_arrow_schema,
581            metrics: IcebergWriterMetrics {
582                _write_qps: write_qps,
583                _write_latency: write_latency,
584                write_bytes,
585            },
586            table,
587            writer: IcebergWriterDispatch::Upsert {
588                writer: inner_writer,
589                writer_builder,
590                arrow_schema_with_op_column: schema_with_extra_op_column,
591            },
592            project_idx_vec: {
593                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
594                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
595                } else {
596                    ProjectIdxVec::None
597                }
598            },
599            commit_checkpoint_size_threshold_bytes: config.commit_checkpoint_size_threshold_bytes(),
600            uncommitted_write_bytes: 0,
601        })
602    }
603}
604
605#[async_trait]
606impl SinkWriter for IcebergSinkWriter {
607    type CommitMetadata = Option<SinkMetadata>;
608
609    /// Begin a new epoch
610    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
611        let Self::Created(args) = self else {
612            return Ok(());
613        };
614
615        let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
616        let inner = match &args.unique_column_ids {
617            Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
618                &args.config,
619                table,
620                unique_column_ids.clone(),
621                &args.writer_param,
622            )?,
623            None => {
624                IcebergSinkWriterInner::build_append_only(&args.config, table, &args.writer_param)?
625            }
626        };
627
628        *self = IcebergSinkWriter::Initialized(inner);
629        Ok(())
630    }
631
632    /// Write a stream chunk to sink
633    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
634        let Self::Initialized(inner) = self else {
635            unreachable!("IcebergSinkWriter should be initialized before barrier");
636        };
637
638        // Try to build writer if it's None.
639        match &mut inner.writer {
640            IcebergWriterDispatch::Append {
641                writer,
642                writer_builder,
643            } => {
644                if writer.is_none() {
645                    *writer = Some(Box::new(
646                        writer_builder
647                            .build()
648                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
649                    ));
650                }
651            }
652            IcebergWriterDispatch::Upsert {
653                writer,
654                writer_builder,
655                ..
656            } => {
657                if writer.is_none() {
658                    *writer = Some(Box::new(
659                        writer_builder
660                            .build()
661                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
662                    ));
663                }
664            }
665        };
666
667        // Process the chunk.
668        let (mut chunk, ops) = chunk.compact_vis().into_parts();
669        match &mut inner.project_idx_vec {
670            ProjectIdxVec::None => {}
671            ProjectIdxVec::Prepare(idx) => {
672                if *idx >= chunk.columns().len() {
673                    return Err(SinkError::Iceberg(anyhow!(
674                        "invalid extra partition column index {}",
675                        idx
676                    )));
677                }
678                let project_idx_vec = (0..*idx)
679                    .chain(*idx + 1..chunk.columns().len())
680                    .collect_vec();
681                chunk = chunk.project(&project_idx_vec);
682                inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
683            }
684            ProjectIdxVec::Done(idx_vec) => {
685                chunk = chunk.project(idx_vec);
686            }
687        }
688        if ops.is_empty() {
689            return Ok(());
690        }
691        let write_batch_size = chunk.estimated_heap_size();
692        let batch = match &inner.writer {
693            IcebergWriterDispatch::Append { .. } => {
694                // separate out insert chunk
695                let filters =
696                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
697                chunk.set_visibility(filters);
698                IcebergArrowConvert
699                    .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
700                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
701            }
702            IcebergWriterDispatch::Upsert {
703                arrow_schema_with_op_column,
704                ..
705            } => {
706                let chunk = IcebergArrowConvert
707                    .to_record_batch(inner.arrow_schema.clone(), &chunk)
708                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
709                let ops = Arc::new(Int32Array::from(
710                    ops.iter()
711                        .map(|op| match op {
712                            Op::UpdateInsert | Op::Insert => INSERT_OP,
713                            Op::UpdateDelete | Op::Delete => DELETE_OP,
714                        })
715                        .collect_vec(),
716                ));
717                let mut columns = chunk.columns().to_vec();
718                columns.push(ops);
719                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
720                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
721            }
722        };
723
724        let writer = inner.writer.get_writer().unwrap();
725        writer
726            .write(batch)
727            .instrument_await("iceberg_write")
728            .await
729            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
730        inner.metrics.write_bytes.inc_by(write_batch_size as _);
731        inner.uncommitted_write_bytes = inner
732            .uncommitted_write_bytes
733            .saturating_add(write_batch_size as u64);
734        Ok(())
735    }
736
737    fn should_commit_on_checkpoint(&self) -> bool {
738        match self {
739            Self::Initialized(inner) => inner.should_commit_on_checkpoint(),
740            Self::Created(_) => false,
741        }
742    }
743
744    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
745    /// writer should commit the current epoch.
746    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
747        let Self::Initialized(inner) = self else {
748            unreachable!("IcebergSinkWriter should be initialized before barrier");
749        };
750
751        // Skip it if not checkpoint
752        if !is_checkpoint {
753            return Ok(None);
754        }
755
756        let close_result = match &mut inner.writer {
757            IcebergWriterDispatch::Append {
758                writer,
759                writer_builder,
760            } => {
761                let close_result = match writer.take() {
762                    Some(mut writer) => {
763                        Some(writer.close().instrument_await("iceberg_close").await)
764                    }
765                    _ => None,
766                };
767                match writer_builder.build() {
768                    Ok(new_writer) => {
769                        *writer = Some(Box::new(new_writer));
770                    }
771                    _ => {
772                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
773                        // here because current writer may close successfully. So we just log the error.
774                        tracing::warn!("Failed to build new writer after close");
775                    }
776                }
777                close_result
778            }
779            IcebergWriterDispatch::Upsert {
780                writer,
781                writer_builder,
782                ..
783            } => {
784                let close_result = match writer.take() {
785                    Some(mut writer) => {
786                        Some(writer.close().instrument_await("iceberg_close").await)
787                    }
788                    _ => None,
789                };
790                match writer_builder.build() {
791                    Ok(new_writer) => {
792                        *writer = Some(Box::new(new_writer));
793                    }
794                    _ => {
795                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
796                        // here because current writer may close successfully. So we just log the error.
797                        tracing::warn!("Failed to build new writer after close");
798                    }
799                }
800                close_result
801            }
802        };
803
804        match close_result {
805            Some(Ok(result)) => {
806                inner.uncommitted_write_bytes = 0;
807                let format_version = inner.table.metadata().format_version();
808                let partition_type = inner.table.metadata().default_partition_type();
809                let data_files = result
810                    .into_iter()
811                    .map(|f| {
812                        // Truncate large column statistics BEFORE serialization
813                        let truncated = truncate_datafile(f);
814                        SerializedDataFile::try_from(truncated, partition_type, format_version)
815                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
816                    })
817                    .collect::<Result<Vec<_>>>()?;
818                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
819                    data_files,
820                    schema_id: inner.table.metadata().current_schema_id(),
821                    partition_spec_id: inner.table.metadata().default_partition_spec_id(),
822                })?))
823            }
824            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
825            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
826        }
827    }
828}
829
830/// Maximum size for column statistics (min/max values) in bytes.
831/// Column statistics larger than this will be truncated to avoid metadata bloat.
832/// This is especially important for large fields like JSONB, TEXT, BINARY, etc.
833///
834/// Fix for large column statistics in `DataFile` metadata that can cause OOM errors.
835/// We truncate at the `DataFile` level (before serialization) by directly modifying
836/// the public `lower_bounds` and `upper_bounds` fields.
837///
838/// This prevents metadata from ballooning to gigabytes when dealing with large
839/// JSONB, TEXT, or BINARY fields, while still preserving statistics for small fields
840/// that benefit from query optimization.
841const MAX_COLUMN_STAT_SIZE: usize = 10240; // 10KB
842
843/// Truncate large column statistics from `DataFile` BEFORE serialization.
844///
845/// This function directly modifies `DataFile`'s `lower_bounds` and `upper_bounds`
846/// to remove entries that exceed `MAX_COLUMN_STAT_SIZE`.
847///
848/// # Arguments
849/// * `data_file` - A `DataFile` to process
850///
851/// # Returns
852/// The modified `DataFile` with large statistics truncated
853fn truncate_datafile(mut data_file: DataFile) -> DataFile {
854    // Process lower_bounds - remove entries with large values
855    data_file.lower_bounds.retain(|field_id, datum| {
856        // Use to_bytes() to get the actual binary size without JSON serialization overhead
857        let size = match datum.to_bytes() {
858            Ok(bytes) => bytes.len(),
859            Err(_) => 0,
860        };
861
862        if size > MAX_COLUMN_STAT_SIZE {
863            tracing::debug!(
864                field_id = field_id,
865                size = size,
866                "Truncating large lower_bound statistic"
867            );
868            return false;
869        }
870        true
871    });
872
873    // Process upper_bounds - remove entries with large values
874    data_file.upper_bounds.retain(|field_id, datum| {
875        // Use to_bytes() to get the actual binary size without JSON serialization overhead
876        let size = match datum.to_bytes() {
877            Ok(bytes) => bytes.len(),
878            Err(_) => 0,
879        };
880
881        if size > MAX_COLUMN_STAT_SIZE {
882            tracing::debug!(
883                field_id = field_id,
884                size = size,
885                "Truncating large upper_bound statistic"
886            );
887            return false;
888        }
889        true
890    });
891
892    data_file
893}