risingwave_connector/sink/iceberg/
writer.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use await_tree::InstrumentAwait;
20use iceberg::arrow::{
21    RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
22};
23use iceberg::spec::{
24    DataFile, FormatVersion, PartitionSpecRef, SchemaRef as IcebergSchemaRef, SerializedDataFile,
25};
26use iceberg::table::Table;
27use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
28use iceberg::writer::base_writer::deletion_vector_writer::{
29    DeletionVectorWriter, DeletionVectorWriterBuilder,
30};
31use iceberg::writer::base_writer::equality_delete_writer::{
32    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
33};
34use iceberg::writer::base_writer::position_delete_file_writer::{
35    POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
36    PositionDeleteInput,
37};
38use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
39use iceberg::writer::file_writer::ParquetWriterBuilder;
40use iceberg::writer::file_writer::location_generator::{
41    DefaultFileNameGenerator, DefaultLocationGenerator,
42};
43use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
44use iceberg::writer::task_writer::TaskWriter;
45use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
46use itertools::Itertools;
47use parquet::file::properties::WriterProperties;
48use risingwave_common::array::arrow::IcebergArrowConvert;
49use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
50use risingwave_common::array::arrow::arrow_schema_iceberg::{
51    DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef,
52};
53use risingwave_common::array::{Op, StreamChunk};
54use risingwave_common::bitmap::Bitmap;
55use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
56use risingwave_common_estimate_size::EstimateSize;
57use risingwave_pb::connector_service::SinkMetadata;
58use uuid::Uuid;
59
60use super::prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
61use super::{
62    GLOBAL_SINK_METRICS, IcebergCommitResult, IcebergConfig, PARQUET_CREATED_BY, SinkError,
63    SinkWriterParam, create_and_validate_table_impl,
64};
65use crate::sink::writer::SinkWriter;
66use crate::sink::{Result, SinkParam};
67
68/// None means no project.
69/// Prepare represent the extra partition column idx.
70/// Done represents the project idx vec.
71///
72/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
73/// to create the project idx vec.
74enum ProjectIdxVec {
75    None,
76    Prepare(usize),
77    Done(Vec<usize>),
78}
79
80type DataFileWriterBuilderType =
81    DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
82type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
83    ParquetWriterBuilder,
84    DefaultLocationGenerator,
85    DefaultFileNameGenerator,
86>;
87type PositionDeleteFileWriterType = PositionDeleteFileWriter<
88    ParquetWriterBuilder,
89    DefaultLocationGenerator,
90    DefaultFileNameGenerator,
91>;
92type DeletionVectorWriterBuilderType =
93    DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
94type DeletionVectorWriterType =
95    DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
96type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
97    ParquetWriterBuilder,
98    DefaultLocationGenerator,
99    DefaultFileNameGenerator,
100>;
101
102#[derive(Clone)]
103enum PositionDeleteWriterBuilderType {
104    PositionDelete(PositionDeleteFileWriterBuilderType),
105    DeletionVector(DeletionVectorWriterBuilderType),
106}
107
108enum PositionDeleteWriterType {
109    PositionDelete(PositionDeleteFileWriterType),
110    DeletionVector(DeletionVectorWriterType),
111}
112
113#[async_trait]
114impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
115    type R = PositionDeleteWriterType;
116
117    async fn build(
118        &self,
119        partition_key: Option<iceberg::spec::PartitionKey>,
120    ) -> iceberg::Result<Self::R> {
121        match self {
122            PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
123                PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
124            ),
125            PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
126                PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
127            ),
128        }
129    }
130}
131
132#[async_trait]
133impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
134    async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
135        match self {
136            PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
137            PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
138        }
139    }
140
141    async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
142        match self {
143            PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
144            PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
145        }
146    }
147}
148
149#[derive(Clone)]
150struct SharedIcebergWriterBuilder<B>(Arc<B>);
151
152#[async_trait]
153impl<B: IcebergWriterBuilder> IcebergWriterBuilder for SharedIcebergWriterBuilder<B> {
154    type R = B::R;
155
156    async fn build(
157        &self,
158        partition_key: Option<iceberg::spec::PartitionKey>,
159    ) -> iceberg::Result<Self::R> {
160        self.0.build(partition_key).await
161    }
162}
163
164#[derive(Clone)]
165struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
166    inner: Arc<B>,
167    fanout_enabled: bool,
168    schema: IcebergSchemaRef,
169    partition_spec: PartitionSpecRef,
170    compute_partition: bool,
171}
172
173impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
174    fn new(
175        inner: B,
176        fanout_enabled: bool,
177        schema: IcebergSchemaRef,
178        partition_spec: PartitionSpecRef,
179        compute_partition: bool,
180    ) -> Self {
181        Self {
182            inner: Arc::new(inner),
183            fanout_enabled,
184            schema,
185            partition_spec,
186            compute_partition,
187        }
188    }
189
190    fn build(&self) -> iceberg::Result<TaskWriter<SharedIcebergWriterBuilder<B>>> {
191        let partition_splitter = match (
192            self.partition_spec.is_unpartitioned(),
193            self.compute_partition,
194        ) {
195            (true, _) => None,
196            (false, true) => Some(RecordBatchPartitionSplitter::try_new_with_computed_values(
197                self.schema.clone(),
198                self.partition_spec.clone(),
199            )?),
200            (false, false) => Some(
201                RecordBatchPartitionSplitter::try_new_with_precomputed_values(
202                    self.schema.clone(),
203                    self.partition_spec.clone(),
204                )?,
205            ),
206        };
207
208        Ok(TaskWriter::new_with_partition_splitter(
209            SharedIcebergWriterBuilder(self.inner.clone()),
210            self.fanout_enabled,
211            self.schema.clone(),
212            self.partition_spec.clone(),
213            partition_splitter,
214        ))
215    }
216}
217
218pub enum IcebergSinkWriter {
219    Created(IcebergSinkWriterArgs),
220    Initialized(IcebergSinkWriterInner),
221}
222
223pub struct IcebergSinkWriterArgs {
224    config: IcebergConfig,
225    sink_param: SinkParam,
226    writer_param: SinkWriterParam,
227    unique_column_ids: Option<Vec<usize>>,
228}
229
230pub struct IcebergSinkWriterInner {
231    writer: IcebergWriterDispatch,
232    arrow_schema: SchemaRef,
233    // See comments below
234    metrics: IcebergWriterMetrics,
235    // State of iceberg table for this writer
236    table: Table,
237    // For chunk with extra partition column, we should remove this column before write.
238    // This project index vec is used to avoid create project idx each time.
239    project_idx_vec: ProjectIdxVec,
240    commit_checkpoint_size_threshold_bytes: Option<u64>,
241    uncommitted_write_bytes: u64,
242}
243
244enum IcebergWriterDispatch {
245    Append {
246        writer: Option<Box<dyn IcebergWriter>>,
247        writer_builder:
248            TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
249    },
250    Upsert {
251        writer: Option<Box<dyn IcebergWriter>>,
252        writer_builder: TaskWriterBuilderWrapper<
253            MonitoredGeneralWriterBuilder<
254                DeltaWriterBuilder<
255                    DataFileWriterBuilderType,
256                    PositionDeleteWriterBuilderType,
257                    EqualityDeleteFileWriterBuilderType,
258                >,
259            >,
260        >,
261        arrow_schema_with_op_column: SchemaRef,
262    },
263}
264
265impl IcebergWriterDispatch {
266    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
267        match self {
268            IcebergWriterDispatch::Append { writer, .. }
269            | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
270        }
271    }
272}
273
274pub struct IcebergWriterMetrics {
275    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
276    // They are actually used in `PrometheusWriterBuilder`:
277    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
278    // We keep them here to let the guard cleans the labels from metrics registry when dropped
279    _write_qps: LabelGuardedIntCounter,
280    _write_latency: LabelGuardedHistogram,
281    write_bytes: LabelGuardedIntCounter,
282}
283
284impl IcebergSinkWriter {
285    pub fn new(
286        config: IcebergConfig,
287        sink_param: SinkParam,
288        writer_param: SinkWriterParam,
289        unique_column_ids: Option<Vec<usize>>,
290    ) -> Self {
291        Self::Created(IcebergSinkWriterArgs {
292            config,
293            sink_param,
294            writer_param,
295            unique_column_ids,
296        })
297    }
298}
299
300impl IcebergSinkWriterInner {
301    fn should_commit_on_checkpoint(&self) -> bool {
302        self.commit_checkpoint_size_threshold_bytes
303            .is_some_and(|threshold| {
304                self.uncommitted_write_bytes > 0 && self.uncommitted_write_bytes >= threshold
305            })
306    }
307
308    fn build_append_only(
309        config: &IcebergConfig,
310        table: Table,
311        writer_param: &SinkWriterParam,
312    ) -> Result<Self> {
313        let SinkWriterParam {
314            extra_partition_col_idx,
315            actor_id,
316            sink_id,
317            sink_name,
318            ..
319        } = writer_param;
320        let metrics_labels = [
321            &actor_id.to_string(),
322            &sink_id.to_string(),
323            sink_name.as_str(),
324        ];
325
326        // Metrics
327        let write_qps = GLOBAL_SINK_METRICS
328            .iceberg_write_qps
329            .with_guarded_label_values(&metrics_labels);
330        let write_latency = GLOBAL_SINK_METRICS
331            .iceberg_write_latency
332            .with_guarded_label_values(&metrics_labels);
333        // # TODO
334        // Unused. Add this metrics later.
335        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
336            .iceberg_rolling_unflushed_data_file
337            .with_guarded_label_values(&metrics_labels);
338        let write_bytes = GLOBAL_SINK_METRICS
339            .iceberg_write_bytes
340            .with_guarded_label_values(&metrics_labels);
341
342        let schema = table.metadata().current_schema();
343        let partition_spec = table.metadata().default_partition_spec();
344        let fanout_enabled = !partition_spec.fields().is_empty();
345        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
346        let unique_uuid_suffix = Uuid::now_v7();
347
348        let parquet_writer_properties = WriterProperties::builder()
349            .set_compression(config.get_parquet_compression())
350            .set_max_row_group_bytes(config.write_parquet_max_row_group_bytes())
351            .set_created_by(PARQUET_CREATED_BY.to_owned())
352            .build();
353
354        let parquet_writer_builder =
355            ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
356        let rolling_builder = RollingFileWriterBuilder::new(
357            parquet_writer_builder,
358            (config.target_file_size_mb() * 1024 * 1024) as usize,
359            table.file_io().clone(),
360            DefaultLocationGenerator::new(table.metadata().clone())
361                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
362            DefaultFileNameGenerator::new(
363                writer_param.actor_id.to_string(),
364                Some(unique_uuid_suffix.to_string()),
365                iceberg::spec::DataFileFormat::Parquet,
366            ),
367        );
368        let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
369        let monitored_builder = MonitoredGeneralWriterBuilder::new(
370            data_file_builder,
371            write_qps.clone(),
372            write_latency.clone(),
373        );
374        let writer_builder = TaskWriterBuilderWrapper::new(
375            monitored_builder,
376            fanout_enabled,
377            schema.clone(),
378            partition_spec.clone(),
379            true,
380        );
381        let inner_writer = Some(Box::new(
382            writer_builder
383                .build()
384                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
385        ) as Box<dyn IcebergWriter>);
386        Ok(Self {
387            arrow_schema: Arc::new(
388                schema_to_arrow_schema(table.metadata().current_schema())
389                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
390            ),
391            metrics: IcebergWriterMetrics {
392                _write_qps: write_qps,
393                _write_latency: write_latency,
394                write_bytes,
395            },
396            writer: IcebergWriterDispatch::Append {
397                writer: inner_writer,
398                writer_builder,
399            },
400            table,
401            project_idx_vec: {
402                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
403                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
404                } else {
405                    ProjectIdxVec::None
406                }
407            },
408            commit_checkpoint_size_threshold_bytes: config.commit_checkpoint_size_threshold_bytes(),
409            uncommitted_write_bytes: 0,
410        })
411    }
412
413    fn build_upsert(
414        config: &IcebergConfig,
415        table: Table,
416        unique_column_ids: Vec<usize>,
417        writer_param: &SinkWriterParam,
418    ) -> Result<Self> {
419        let SinkWriterParam {
420            extra_partition_col_idx,
421            actor_id,
422            sink_id,
423            sink_name,
424            ..
425        } = writer_param;
426        let metrics_labels = [
427            &actor_id.to_string(),
428            &sink_id.to_string(),
429            sink_name.as_str(),
430        ];
431        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
432
433        // Metrics
434        let write_qps = GLOBAL_SINK_METRICS
435            .iceberg_write_qps
436            .with_guarded_label_values(&metrics_labels);
437        let write_latency = GLOBAL_SINK_METRICS
438            .iceberg_write_latency
439            .with_guarded_label_values(&metrics_labels);
440        // # TODO
441        // Unused. Add this metrics later.
442        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
443            .iceberg_rolling_unflushed_data_file
444            .with_guarded_label_values(&metrics_labels);
445        let write_bytes = GLOBAL_SINK_METRICS
446            .iceberg_write_bytes
447            .with_guarded_label_values(&metrics_labels);
448
449        // Determine the schema id and partition spec id
450        let schema = table.metadata().current_schema();
451        let partition_spec = table.metadata().default_partition_spec();
452        let fanout_enabled = !partition_spec.fields().is_empty();
453        let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
454
455        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
456        let unique_uuid_suffix = Uuid::now_v7();
457
458        let parquet_writer_properties = WriterProperties::builder()
459            .set_compression(config.get_parquet_compression())
460            .set_max_row_group_bytes(config.write_parquet_max_row_group_bytes())
461            .set_created_by(PARQUET_CREATED_BY.to_owned())
462            .build();
463
464        let data_file_builder = {
465            let parquet_writer_builder =
466                ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
467            let rolling_writer_builder = RollingFileWriterBuilder::new(
468                parquet_writer_builder,
469                (config.target_file_size_mb() * 1024 * 1024) as usize,
470                table.file_io().clone(),
471                DefaultLocationGenerator::new(table.metadata().clone())
472                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
473                DefaultFileNameGenerator::new(
474                    writer_param.actor_id.to_string(),
475                    Some(unique_uuid_suffix.to_string()),
476                    iceberg::spec::DataFileFormat::Parquet,
477                ),
478            );
479            DataFileWriterBuilder::new(rolling_writer_builder)
480        };
481        let position_delete_builder = if use_deletion_vectors {
482            let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
483                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
484            PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
485                table.file_io().clone(),
486                location_generator,
487                DefaultFileNameGenerator::new(
488                    writer_param.actor_id.to_string(),
489                    Some(format!("delvec-{}", unique_uuid_suffix)),
490                    iceberg::spec::DataFileFormat::Puffin,
491                ),
492            ))
493        } else {
494            let parquet_writer_builder = ParquetWriterBuilder::new(
495                parquet_writer_properties.clone(),
496                POSITION_DELETE_SCHEMA.clone().into(),
497            );
498            let rolling_writer_builder = RollingFileWriterBuilder::new(
499                parquet_writer_builder,
500                (config.target_file_size_mb() * 1024 * 1024) as usize,
501                table.file_io().clone(),
502                DefaultLocationGenerator::new(table.metadata().clone())
503                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
504                DefaultFileNameGenerator::new(
505                    writer_param.actor_id.to_string(),
506                    Some(format!("pos-del-{}", unique_uuid_suffix)),
507                    iceberg::spec::DataFileFormat::Parquet,
508                ),
509            );
510            PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
511                rolling_writer_builder,
512            ))
513        };
514        let equality_delete_builder = {
515            let eq_del_config = EqualityDeleteWriterConfig::new(
516                unique_column_ids.clone(),
517                table.metadata().current_schema().clone(),
518            )
519            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
520            let parquet_writer_builder = ParquetWriterBuilder::new(
521                parquet_writer_properties,
522                Arc::new(
523                    arrow_schema_to_schema(eq_del_config.projected_arrow_schema_ref())
524                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
525                ),
526            );
527            let rolling_writer_builder = RollingFileWriterBuilder::new(
528                parquet_writer_builder,
529                (config.target_file_size_mb() * 1024 * 1024) as usize,
530                table.file_io().clone(),
531                DefaultLocationGenerator::new(table.metadata().clone())
532                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
533                DefaultFileNameGenerator::new(
534                    writer_param.actor_id.to_string(),
535                    Some(format!("eq-del-{}", unique_uuid_suffix)),
536                    iceberg::spec::DataFileFormat::Parquet,
537                ),
538            );
539
540            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, eq_del_config)
541        };
542        let delta_builder = DeltaWriterBuilder::new(
543            data_file_builder,
544            position_delete_builder,
545            equality_delete_builder,
546            unique_column_ids,
547            schema.clone(),
548        );
549        let original_arrow_schema = Arc::new(
550            schema_to_arrow_schema(table.metadata().current_schema())
551                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
552        );
553        let schema_with_extra_op_column = {
554            let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
555            new_fields.push(Arc::new(ArrowField::new(
556                "op".to_owned(),
557                ArrowDataType::Int32,
558                false,
559            )));
560            Arc::new(ArrowSchema::new(new_fields))
561        };
562        let writer_builder = TaskWriterBuilderWrapper::new(
563            MonitoredGeneralWriterBuilder::new(
564                delta_builder,
565                write_qps.clone(),
566                write_latency.clone(),
567            ),
568            fanout_enabled,
569            schema.clone(),
570            partition_spec.clone(),
571            true,
572        );
573        let inner_writer = Some(Box::new(
574            writer_builder
575                .build()
576                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
577        ) as Box<dyn IcebergWriter>);
578        Ok(Self {
579            arrow_schema: original_arrow_schema,
580            metrics: IcebergWriterMetrics {
581                _write_qps: write_qps,
582                _write_latency: write_latency,
583                write_bytes,
584            },
585            table,
586            writer: IcebergWriterDispatch::Upsert {
587                writer: inner_writer,
588                writer_builder,
589                arrow_schema_with_op_column: schema_with_extra_op_column,
590            },
591            project_idx_vec: {
592                if let Some(extra_partition_col_idx) = extra_partition_col_idx {
593                    ProjectIdxVec::Prepare(*extra_partition_col_idx)
594                } else {
595                    ProjectIdxVec::None
596                }
597            },
598            commit_checkpoint_size_threshold_bytes: config.commit_checkpoint_size_threshold_bytes(),
599            uncommitted_write_bytes: 0,
600        })
601    }
602}
603
604#[async_trait]
605impl SinkWriter for IcebergSinkWriter {
606    type CommitMetadata = Option<SinkMetadata>;
607
608    /// Begin a new epoch
609    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
610        let Self::Created(args) = self else {
611            return Ok(());
612        };
613
614        let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
615        let inner = match &args.unique_column_ids {
616            Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
617                &args.config,
618                table,
619                unique_column_ids.clone(),
620                &args.writer_param,
621            )?,
622            None => {
623                IcebergSinkWriterInner::build_append_only(&args.config, table, &args.writer_param)?
624            }
625        };
626
627        *self = IcebergSinkWriter::Initialized(inner);
628        Ok(())
629    }
630
631    /// Write a stream chunk to sink
632    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
633        let Self::Initialized(inner) = self else {
634            unreachable!("IcebergSinkWriter should be initialized before barrier");
635        };
636
637        // Try to build writer if it's None.
638        match &mut inner.writer {
639            IcebergWriterDispatch::Append {
640                writer,
641                writer_builder,
642            } => {
643                if writer.is_none() {
644                    *writer = Some(Box::new(
645                        writer_builder
646                            .build()
647                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
648                    ));
649                }
650            }
651            IcebergWriterDispatch::Upsert {
652                writer,
653                writer_builder,
654                ..
655            } => {
656                if writer.is_none() {
657                    *writer = Some(Box::new(
658                        writer_builder
659                            .build()
660                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
661                    ));
662                }
663            }
664        };
665
666        // Process the chunk.
667        let (mut chunk, ops) = chunk.compact_vis().into_parts();
668        match &mut inner.project_idx_vec {
669            ProjectIdxVec::None => {}
670            ProjectIdxVec::Prepare(idx) => {
671                if *idx >= chunk.columns().len() {
672                    return Err(SinkError::Iceberg(anyhow!(
673                        "invalid extra partition column index {}",
674                        idx
675                    )));
676                }
677                let project_idx_vec = (0..*idx)
678                    .chain(*idx + 1..chunk.columns().len())
679                    .collect_vec();
680                chunk = chunk.project(&project_idx_vec);
681                inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
682            }
683            ProjectIdxVec::Done(idx_vec) => {
684                chunk = chunk.project(idx_vec);
685            }
686        }
687        if ops.is_empty() {
688            return Ok(());
689        }
690        let write_batch_size = chunk.estimated_heap_size();
691        let batch = match &inner.writer {
692            IcebergWriterDispatch::Append { .. } => {
693                // separate out insert chunk
694                let filters =
695                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
696                chunk.set_visibility(filters);
697                IcebergArrowConvert
698                    .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
699                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
700            }
701            IcebergWriterDispatch::Upsert {
702                arrow_schema_with_op_column,
703                ..
704            } => {
705                let chunk = IcebergArrowConvert
706                    .to_record_batch(inner.arrow_schema.clone(), &chunk)
707                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
708                let ops = Arc::new(Int32Array::from(
709                    ops.iter()
710                        .map(|op| match op {
711                            Op::UpdateInsert | Op::Insert => INSERT_OP,
712                            Op::UpdateDelete | Op::Delete => DELETE_OP,
713                        })
714                        .collect_vec(),
715                ));
716                let mut columns = chunk.columns().to_vec();
717                columns.push(ops);
718                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
719                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
720            }
721        };
722
723        let writer = inner.writer.get_writer().unwrap();
724        writer
725            .write(batch)
726            .instrument_await("iceberg_write")
727            .await
728            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
729        inner.metrics.write_bytes.inc_by(write_batch_size as _);
730        inner.uncommitted_write_bytes = inner
731            .uncommitted_write_bytes
732            .saturating_add(write_batch_size as u64);
733        Ok(())
734    }
735
736    fn should_commit_on_checkpoint(&self) -> bool {
737        match self {
738            Self::Initialized(inner) => inner.should_commit_on_checkpoint(),
739            Self::Created(_) => false,
740        }
741    }
742
743    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
744    /// writer should commit the current epoch.
745    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
746        let Self::Initialized(inner) = self else {
747            unreachable!("IcebergSinkWriter should be initialized before barrier");
748        };
749
750        // Skip it if not checkpoint
751        if !is_checkpoint {
752            return Ok(None);
753        }
754
755        let close_result = match &mut inner.writer {
756            IcebergWriterDispatch::Append {
757                writer,
758                writer_builder,
759            } => {
760                let close_result = match writer.take() {
761                    Some(mut writer) => {
762                        Some(writer.close().instrument_await("iceberg_close").await)
763                    }
764                    _ => None,
765                };
766                match writer_builder.build() {
767                    Ok(new_writer) => {
768                        *writer = Some(Box::new(new_writer));
769                    }
770                    _ => {
771                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
772                        // here because current writer may close successfully. So we just log the error.
773                        tracing::warn!("Failed to build new writer after close");
774                    }
775                }
776                close_result
777            }
778            IcebergWriterDispatch::Upsert {
779                writer,
780                writer_builder,
781                ..
782            } => {
783                let close_result = match writer.take() {
784                    Some(mut writer) => {
785                        Some(writer.close().instrument_await("iceberg_close").await)
786                    }
787                    _ => None,
788                };
789                match writer_builder.build() {
790                    Ok(new_writer) => {
791                        *writer = Some(Box::new(new_writer));
792                    }
793                    _ => {
794                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
795                        // here because current writer may close successfully. So we just log the error.
796                        tracing::warn!("Failed to build new writer after close");
797                    }
798                }
799                close_result
800            }
801        };
802
803        match close_result {
804            Some(Ok(result)) => {
805                inner.uncommitted_write_bytes = 0;
806                let format_version = inner.table.metadata().format_version();
807                let partition_type = inner.table.metadata().default_partition_type();
808                let data_files = result
809                    .into_iter()
810                    .map(|f| {
811                        // Truncate large column statistics BEFORE serialization
812                        let truncated = truncate_datafile(f);
813                        SerializedDataFile::try_from(truncated, partition_type, format_version)
814                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
815                    })
816                    .collect::<Result<Vec<_>>>()?;
817                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
818                    data_files,
819                    schema_id: inner.table.metadata().current_schema_id(),
820                    partition_spec_id: inner.table.metadata().default_partition_spec_id(),
821                })?))
822            }
823            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
824            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
825        }
826    }
827}
828
829/// Maximum size for column statistics (min/max values) in bytes.
830/// Column statistics larger than this will be truncated to avoid metadata bloat.
831/// This is especially important for large fields like JSONB, TEXT, BINARY, etc.
832///
833/// Fix for large column statistics in `DataFile` metadata that can cause OOM errors.
834/// We truncate at the `DataFile` level (before serialization) by directly modifying
835/// the public `lower_bounds` and `upper_bounds` fields.
836///
837/// This prevents metadata from ballooning to gigabytes when dealing with large
838/// JSONB, TEXT, or BINARY fields, while still preserving statistics for small fields
839/// that benefit from query optimization.
840const MAX_COLUMN_STAT_SIZE: usize = 10240; // 10KB
841
842/// Truncate large column statistics from `DataFile` BEFORE serialization.
843///
844/// This function directly modifies `DataFile`'s `lower_bounds` and `upper_bounds`
845/// to remove entries that exceed `MAX_COLUMN_STAT_SIZE`.
846///
847/// # Arguments
848/// * `data_file` - A `DataFile` to process
849///
850/// # Returns
851/// The modified `DataFile` with large statistics truncated
852fn truncate_datafile(mut data_file: DataFile) -> DataFile {
853    // Process lower_bounds - remove entries with large values
854    data_file.lower_bounds.retain(|field_id, datum| {
855        // Use to_bytes() to get the actual binary size without JSON serialization overhead
856        let size = match datum.to_bytes() {
857            Ok(bytes) => bytes.len(),
858            Err(_) => 0,
859        };
860
861        if size > MAX_COLUMN_STAT_SIZE {
862            tracing::debug!(
863                field_id = field_id,
864                size = size,
865                "Truncating large lower_bound statistic"
866            );
867            return false;
868        }
869        true
870    });
871
872    // Process upper_bounds - remove entries with large values
873    data_file.upper_bounds.retain(|field_id, datum| {
874        // Use to_bytes() to get the actual binary size without JSON serialization overhead
875        let size = match datum.to_bytes() {
876            Ok(bytes) => bytes.len(),
877            Err(_) => 0,
878        };
879
880        if size > MAX_COLUMN_STAT_SIZE {
881            tracing::debug!(
882                field_id = field_id,
883                size = size,
884                "Truncating large upper_bound statistic"
885            );
886            return false;
887        }
888        true
889    });
890
891    data_file
892}