1use std::sync::Arc;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use await_tree::InstrumentAwait;
20use iceberg::arrow::{
21 RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
22};
23use iceberg::spec::{
24 DataFile, FormatVersion, PartitionSpecRef, SchemaRef as IcebergSchemaRef, SerializedDataFile,
25};
26use iceberg::table::Table;
27use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
28use iceberg::writer::base_writer::deletion_vector_writer::{
29 DeletionVectorWriter, DeletionVectorWriterBuilder,
30};
31use iceberg::writer::base_writer::equality_delete_writer::{
32 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
33};
34use iceberg::writer::base_writer::position_delete_file_writer::{
35 POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
36 PositionDeleteInput,
37};
38use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
39use iceberg::writer::file_writer::ParquetWriterBuilder;
40use iceberg::writer::file_writer::location_generator::{
41 DefaultFileNameGenerator, DefaultLocationGenerator,
42};
43use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
44use iceberg::writer::task_writer::TaskWriter;
45use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
46use itertools::Itertools;
47use parquet::file::properties::WriterProperties;
48use risingwave_common::array::arrow::IcebergArrowConvert;
49use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
50use risingwave_common::array::arrow::arrow_schema_iceberg::{
51 DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef,
52};
53use risingwave_common::array::{Op, StreamChunk};
54use risingwave_common::bitmap::Bitmap;
55use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
56use risingwave_common_estimate_size::EstimateSize;
57use risingwave_pb::connector_service::SinkMetadata;
58use uuid::Uuid;
59
60use super::prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
61use super::{
62 GLOBAL_SINK_METRICS, IcebergCommitResult, IcebergConfig, PARQUET_CREATED_BY, SinkError,
63 SinkWriterParam, create_and_validate_table_impl,
64};
65use crate::sink::writer::SinkWriter;
66use crate::sink::{Result, SinkParam};
67
68enum ProjectIdxVec {
75 None,
76 Prepare(usize),
77 Done(Vec<usize>),
78}
79
80type DataFileWriterBuilderType =
81 DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
82type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
83 ParquetWriterBuilder,
84 DefaultLocationGenerator,
85 DefaultFileNameGenerator,
86>;
87type PositionDeleteFileWriterType = PositionDeleteFileWriter<
88 ParquetWriterBuilder,
89 DefaultLocationGenerator,
90 DefaultFileNameGenerator,
91>;
92type DeletionVectorWriterBuilderType =
93 DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
94type DeletionVectorWriterType =
95 DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
96type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
97 ParquetWriterBuilder,
98 DefaultLocationGenerator,
99 DefaultFileNameGenerator,
100>;
101
102#[derive(Clone)]
103enum PositionDeleteWriterBuilderType {
104 PositionDelete(PositionDeleteFileWriterBuilderType),
105 DeletionVector(DeletionVectorWriterBuilderType),
106}
107
108enum PositionDeleteWriterType {
109 PositionDelete(PositionDeleteFileWriterType),
110 DeletionVector(DeletionVectorWriterType),
111}
112
113#[async_trait]
114impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
115 type R = PositionDeleteWriterType;
116
117 async fn build(
118 &self,
119 partition_key: Option<iceberg::spec::PartitionKey>,
120 ) -> iceberg::Result<Self::R> {
121 match self {
122 PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
123 PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
124 ),
125 PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
126 PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
127 ),
128 }
129 }
130}
131
132#[async_trait]
133impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
134 async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
135 match self {
136 PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
137 PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
138 }
139 }
140
141 async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
142 match self {
143 PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
144 PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
145 }
146 }
147}
148
149#[derive(Clone)]
150struct SharedIcebergWriterBuilder<B>(Arc<B>);
151
152#[async_trait]
153impl<B: IcebergWriterBuilder> IcebergWriterBuilder for SharedIcebergWriterBuilder<B> {
154 type R = B::R;
155
156 async fn build(
157 &self,
158 partition_key: Option<iceberg::spec::PartitionKey>,
159 ) -> iceberg::Result<Self::R> {
160 self.0.build(partition_key).await
161 }
162}
163
164#[derive(Clone)]
165struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
166 inner: Arc<B>,
167 fanout_enabled: bool,
168 schema: IcebergSchemaRef,
169 partition_spec: PartitionSpecRef,
170 compute_partition: bool,
171}
172
173impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
174 fn new(
175 inner: B,
176 fanout_enabled: bool,
177 schema: IcebergSchemaRef,
178 partition_spec: PartitionSpecRef,
179 compute_partition: bool,
180 ) -> Self {
181 Self {
182 inner: Arc::new(inner),
183 fanout_enabled,
184 schema,
185 partition_spec,
186 compute_partition,
187 }
188 }
189
190 fn build(&self) -> iceberg::Result<TaskWriter<SharedIcebergWriterBuilder<B>>> {
191 let partition_splitter = match (
192 self.partition_spec.is_unpartitioned(),
193 self.compute_partition,
194 ) {
195 (true, _) => None,
196 (false, true) => Some(RecordBatchPartitionSplitter::try_new_with_computed_values(
197 self.schema.clone(),
198 self.partition_spec.clone(),
199 )?),
200 (false, false) => Some(
201 RecordBatchPartitionSplitter::try_new_with_precomputed_values(
202 self.schema.clone(),
203 self.partition_spec.clone(),
204 )?,
205 ),
206 };
207
208 Ok(TaskWriter::new_with_partition_splitter(
209 SharedIcebergWriterBuilder(self.inner.clone()),
210 self.fanout_enabled,
211 self.schema.clone(),
212 self.partition_spec.clone(),
213 partition_splitter,
214 ))
215 }
216}
217
218pub enum IcebergSinkWriter {
219 Created(IcebergSinkWriterArgs),
220 Initialized(IcebergSinkWriterInner),
221}
222
223pub struct IcebergSinkWriterArgs {
224 config: IcebergConfig,
225 sink_param: SinkParam,
226 writer_param: SinkWriterParam,
227 unique_column_ids: Option<Vec<usize>>,
228}
229
230pub struct IcebergSinkWriterInner {
231 writer: IcebergWriterDispatch,
232 arrow_schema: SchemaRef,
233 metrics: IcebergWriterMetrics,
235 table: Table,
237 project_idx_vec: ProjectIdxVec,
240 commit_checkpoint_size_threshold_bytes: Option<u64>,
241 uncommitted_write_bytes: u64,
242}
243
244enum IcebergWriterDispatch {
245 Append {
246 writer: Option<Box<dyn IcebergWriter>>,
247 writer_builder:
248 TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
249 },
250 Upsert {
251 writer: Option<Box<dyn IcebergWriter>>,
252 writer_builder: TaskWriterBuilderWrapper<
253 MonitoredGeneralWriterBuilder<
254 DeltaWriterBuilder<
255 DataFileWriterBuilderType,
256 PositionDeleteWriterBuilderType,
257 EqualityDeleteFileWriterBuilderType,
258 >,
259 >,
260 >,
261 arrow_schema_with_op_column: SchemaRef,
262 },
263}
264
265impl IcebergWriterDispatch {
266 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
267 match self {
268 IcebergWriterDispatch::Append { writer, .. }
269 | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
270 }
271 }
272}
273
274pub struct IcebergWriterMetrics {
275 _write_qps: LabelGuardedIntCounter,
280 _write_latency: LabelGuardedHistogram,
281 write_bytes: LabelGuardedIntCounter,
282}
283
284impl IcebergSinkWriter {
285 pub fn new(
286 config: IcebergConfig,
287 sink_param: SinkParam,
288 writer_param: SinkWriterParam,
289 unique_column_ids: Option<Vec<usize>>,
290 ) -> Self {
291 Self::Created(IcebergSinkWriterArgs {
292 config,
293 sink_param,
294 writer_param,
295 unique_column_ids,
296 })
297 }
298}
299
300impl IcebergSinkWriterInner {
301 fn should_commit_on_checkpoint(&self) -> bool {
302 self.commit_checkpoint_size_threshold_bytes
303 .is_some_and(|threshold| {
304 self.uncommitted_write_bytes > 0 && self.uncommitted_write_bytes >= threshold
305 })
306 }
307
308 fn build_append_only(
309 config: &IcebergConfig,
310 table: Table,
311 writer_param: &SinkWriterParam,
312 ) -> Result<Self> {
313 let SinkWriterParam {
314 extra_partition_col_idx,
315 actor_id,
316 sink_id,
317 sink_name,
318 ..
319 } = writer_param;
320 let metrics_labels = [
321 &actor_id.to_string(),
322 &sink_id.to_string(),
323 sink_name.as_str(),
324 ];
325
326 let write_qps = GLOBAL_SINK_METRICS
328 .iceberg_write_qps
329 .with_guarded_label_values(&metrics_labels);
330 let write_latency = GLOBAL_SINK_METRICS
331 .iceberg_write_latency
332 .with_guarded_label_values(&metrics_labels);
333 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
336 .iceberg_rolling_unflushed_data_file
337 .with_guarded_label_values(&metrics_labels);
338 let write_bytes = GLOBAL_SINK_METRICS
339 .iceberg_write_bytes
340 .with_guarded_label_values(&metrics_labels);
341
342 let schema = table.metadata().current_schema();
343 let partition_spec = table.metadata().default_partition_spec();
344 let fanout_enabled = !partition_spec.fields().is_empty();
345 let unique_uuid_suffix = Uuid::now_v7();
347
348 let parquet_writer_properties = WriterProperties::builder()
349 .set_compression(config.get_parquet_compression())
350 .set_max_row_group_bytes(config.write_parquet_max_row_group_bytes())
351 .set_created_by(PARQUET_CREATED_BY.to_owned())
352 .build();
353
354 let parquet_writer_builder =
355 ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
356 let rolling_builder = RollingFileWriterBuilder::new(
357 parquet_writer_builder,
358 (config.target_file_size_mb() * 1024 * 1024) as usize,
359 table.file_io().clone(),
360 DefaultLocationGenerator::new(table.metadata().clone())
361 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
362 DefaultFileNameGenerator::new(
363 writer_param.actor_id.to_string(),
364 Some(unique_uuid_suffix.to_string()),
365 iceberg::spec::DataFileFormat::Parquet,
366 ),
367 );
368 let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
369 let monitored_builder = MonitoredGeneralWriterBuilder::new(
370 data_file_builder,
371 write_qps.clone(),
372 write_latency.clone(),
373 );
374 let writer_builder = TaskWriterBuilderWrapper::new(
375 monitored_builder,
376 fanout_enabled,
377 schema.clone(),
378 partition_spec.clone(),
379 true,
380 );
381 let inner_writer = Some(Box::new(
382 writer_builder
383 .build()
384 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
385 ) as Box<dyn IcebergWriter>);
386 Ok(Self {
387 arrow_schema: Arc::new(
388 schema_to_arrow_schema(table.metadata().current_schema())
389 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
390 ),
391 metrics: IcebergWriterMetrics {
392 _write_qps: write_qps,
393 _write_latency: write_latency,
394 write_bytes,
395 },
396 writer: IcebergWriterDispatch::Append {
397 writer: inner_writer,
398 writer_builder,
399 },
400 table,
401 project_idx_vec: {
402 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
403 ProjectIdxVec::Prepare(*extra_partition_col_idx)
404 } else {
405 ProjectIdxVec::None
406 }
407 },
408 commit_checkpoint_size_threshold_bytes: config.commit_checkpoint_size_threshold_bytes(),
409 uncommitted_write_bytes: 0,
410 })
411 }
412
413 fn build_upsert(
414 config: &IcebergConfig,
415 table: Table,
416 unique_column_ids: Vec<usize>,
417 writer_param: &SinkWriterParam,
418 ) -> Result<Self> {
419 let SinkWriterParam {
420 extra_partition_col_idx,
421 actor_id,
422 sink_id,
423 sink_name,
424 ..
425 } = writer_param;
426 let metrics_labels = [
427 &actor_id.to_string(),
428 &sink_id.to_string(),
429 sink_name.as_str(),
430 ];
431 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
432
433 let write_qps = GLOBAL_SINK_METRICS
435 .iceberg_write_qps
436 .with_guarded_label_values(&metrics_labels);
437 let write_latency = GLOBAL_SINK_METRICS
438 .iceberg_write_latency
439 .with_guarded_label_values(&metrics_labels);
440 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
443 .iceberg_rolling_unflushed_data_file
444 .with_guarded_label_values(&metrics_labels);
445 let write_bytes = GLOBAL_SINK_METRICS
446 .iceberg_write_bytes
447 .with_guarded_label_values(&metrics_labels);
448
449 let schema = table.metadata().current_schema();
451 let partition_spec = table.metadata().default_partition_spec();
452 let fanout_enabled = !partition_spec.fields().is_empty();
453 let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
454
455 let unique_uuid_suffix = Uuid::now_v7();
457
458 let parquet_writer_properties = WriterProperties::builder()
459 .set_compression(config.get_parquet_compression())
460 .set_max_row_group_bytes(config.write_parquet_max_row_group_bytes())
461 .set_created_by(PARQUET_CREATED_BY.to_owned())
462 .build();
463
464 let data_file_builder = {
465 let parquet_writer_builder =
466 ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
467 let rolling_writer_builder = RollingFileWriterBuilder::new(
468 parquet_writer_builder,
469 (config.target_file_size_mb() * 1024 * 1024) as usize,
470 table.file_io().clone(),
471 DefaultLocationGenerator::new(table.metadata().clone())
472 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
473 DefaultFileNameGenerator::new(
474 writer_param.actor_id.to_string(),
475 Some(unique_uuid_suffix.to_string()),
476 iceberg::spec::DataFileFormat::Parquet,
477 ),
478 );
479 DataFileWriterBuilder::new(rolling_writer_builder)
480 };
481 let position_delete_builder = if use_deletion_vectors {
482 let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
483 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
484 PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
485 table.file_io().clone(),
486 location_generator,
487 DefaultFileNameGenerator::new(
488 writer_param.actor_id.to_string(),
489 Some(format!("delvec-{}", unique_uuid_suffix)),
490 iceberg::spec::DataFileFormat::Puffin,
491 ),
492 ))
493 } else {
494 let parquet_writer_builder = ParquetWriterBuilder::new(
495 parquet_writer_properties.clone(),
496 POSITION_DELETE_SCHEMA.clone().into(),
497 );
498 let rolling_writer_builder = RollingFileWriterBuilder::new(
499 parquet_writer_builder,
500 (config.target_file_size_mb() * 1024 * 1024) as usize,
501 table.file_io().clone(),
502 DefaultLocationGenerator::new(table.metadata().clone())
503 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
504 DefaultFileNameGenerator::new(
505 writer_param.actor_id.to_string(),
506 Some(format!("pos-del-{}", unique_uuid_suffix)),
507 iceberg::spec::DataFileFormat::Parquet,
508 ),
509 );
510 PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
511 rolling_writer_builder,
512 ))
513 };
514 let equality_delete_builder = {
515 let eq_del_config = EqualityDeleteWriterConfig::new(
516 unique_column_ids.clone(),
517 table.metadata().current_schema().clone(),
518 )
519 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
520 let parquet_writer_builder = ParquetWriterBuilder::new(
521 parquet_writer_properties,
522 Arc::new(
523 arrow_schema_to_schema(eq_del_config.projected_arrow_schema_ref())
524 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
525 ),
526 );
527 let rolling_writer_builder = RollingFileWriterBuilder::new(
528 parquet_writer_builder,
529 (config.target_file_size_mb() * 1024 * 1024) as usize,
530 table.file_io().clone(),
531 DefaultLocationGenerator::new(table.metadata().clone())
532 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
533 DefaultFileNameGenerator::new(
534 writer_param.actor_id.to_string(),
535 Some(format!("eq-del-{}", unique_uuid_suffix)),
536 iceberg::spec::DataFileFormat::Parquet,
537 ),
538 );
539
540 EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, eq_del_config)
541 };
542 let delta_builder = DeltaWriterBuilder::new(
543 data_file_builder,
544 position_delete_builder,
545 equality_delete_builder,
546 unique_column_ids,
547 schema.clone(),
548 );
549 let original_arrow_schema = Arc::new(
550 schema_to_arrow_schema(table.metadata().current_schema())
551 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
552 );
553 let schema_with_extra_op_column = {
554 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
555 new_fields.push(Arc::new(ArrowField::new(
556 "op".to_owned(),
557 ArrowDataType::Int32,
558 false,
559 )));
560 Arc::new(ArrowSchema::new(new_fields))
561 };
562 let writer_builder = TaskWriterBuilderWrapper::new(
563 MonitoredGeneralWriterBuilder::new(
564 delta_builder,
565 write_qps.clone(),
566 write_latency.clone(),
567 ),
568 fanout_enabled,
569 schema.clone(),
570 partition_spec.clone(),
571 true,
572 );
573 let inner_writer = Some(Box::new(
574 writer_builder
575 .build()
576 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
577 ) as Box<dyn IcebergWriter>);
578 Ok(Self {
579 arrow_schema: original_arrow_schema,
580 metrics: IcebergWriterMetrics {
581 _write_qps: write_qps,
582 _write_latency: write_latency,
583 write_bytes,
584 },
585 table,
586 writer: IcebergWriterDispatch::Upsert {
587 writer: inner_writer,
588 writer_builder,
589 arrow_schema_with_op_column: schema_with_extra_op_column,
590 },
591 project_idx_vec: {
592 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
593 ProjectIdxVec::Prepare(*extra_partition_col_idx)
594 } else {
595 ProjectIdxVec::None
596 }
597 },
598 commit_checkpoint_size_threshold_bytes: config.commit_checkpoint_size_threshold_bytes(),
599 uncommitted_write_bytes: 0,
600 })
601 }
602}
603
604#[async_trait]
605impl SinkWriter for IcebergSinkWriter {
606 type CommitMetadata = Option<SinkMetadata>;
607
608 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
610 let Self::Created(args) = self else {
611 return Ok(());
612 };
613
614 let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
615 let inner = match &args.unique_column_ids {
616 Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
617 &args.config,
618 table,
619 unique_column_ids.clone(),
620 &args.writer_param,
621 )?,
622 None => {
623 IcebergSinkWriterInner::build_append_only(&args.config, table, &args.writer_param)?
624 }
625 };
626
627 *self = IcebergSinkWriter::Initialized(inner);
628 Ok(())
629 }
630
631 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
633 let Self::Initialized(inner) = self else {
634 unreachable!("IcebergSinkWriter should be initialized before barrier");
635 };
636
637 match &mut inner.writer {
639 IcebergWriterDispatch::Append {
640 writer,
641 writer_builder,
642 } => {
643 if writer.is_none() {
644 *writer = Some(Box::new(
645 writer_builder
646 .build()
647 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
648 ));
649 }
650 }
651 IcebergWriterDispatch::Upsert {
652 writer,
653 writer_builder,
654 ..
655 } => {
656 if writer.is_none() {
657 *writer = Some(Box::new(
658 writer_builder
659 .build()
660 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
661 ));
662 }
663 }
664 };
665
666 let (mut chunk, ops) = chunk.compact_vis().into_parts();
668 match &mut inner.project_idx_vec {
669 ProjectIdxVec::None => {}
670 ProjectIdxVec::Prepare(idx) => {
671 if *idx >= chunk.columns().len() {
672 return Err(SinkError::Iceberg(anyhow!(
673 "invalid extra partition column index {}",
674 idx
675 )));
676 }
677 let project_idx_vec = (0..*idx)
678 .chain(*idx + 1..chunk.columns().len())
679 .collect_vec();
680 chunk = chunk.project(&project_idx_vec);
681 inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
682 }
683 ProjectIdxVec::Done(idx_vec) => {
684 chunk = chunk.project(idx_vec);
685 }
686 }
687 if ops.is_empty() {
688 return Ok(());
689 }
690 let write_batch_size = chunk.estimated_heap_size();
691 let batch = match &inner.writer {
692 IcebergWriterDispatch::Append { .. } => {
693 let filters =
695 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
696 chunk.set_visibility(filters);
697 IcebergArrowConvert
698 .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
699 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
700 }
701 IcebergWriterDispatch::Upsert {
702 arrow_schema_with_op_column,
703 ..
704 } => {
705 let chunk = IcebergArrowConvert
706 .to_record_batch(inner.arrow_schema.clone(), &chunk)
707 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
708 let ops = Arc::new(Int32Array::from(
709 ops.iter()
710 .map(|op| match op {
711 Op::UpdateInsert | Op::Insert => INSERT_OP,
712 Op::UpdateDelete | Op::Delete => DELETE_OP,
713 })
714 .collect_vec(),
715 ));
716 let mut columns = chunk.columns().to_vec();
717 columns.push(ops);
718 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
719 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
720 }
721 };
722
723 let writer = inner.writer.get_writer().unwrap();
724 writer
725 .write(batch)
726 .instrument_await("iceberg_write")
727 .await
728 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
729 inner.metrics.write_bytes.inc_by(write_batch_size as _);
730 inner.uncommitted_write_bytes = inner
731 .uncommitted_write_bytes
732 .saturating_add(write_batch_size as u64);
733 Ok(())
734 }
735
736 fn should_commit_on_checkpoint(&self) -> bool {
737 match self {
738 Self::Initialized(inner) => inner.should_commit_on_checkpoint(),
739 Self::Created(_) => false,
740 }
741 }
742
743 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
746 let Self::Initialized(inner) = self else {
747 unreachable!("IcebergSinkWriter should be initialized before barrier");
748 };
749
750 if !is_checkpoint {
752 return Ok(None);
753 }
754
755 let close_result = match &mut inner.writer {
756 IcebergWriterDispatch::Append {
757 writer,
758 writer_builder,
759 } => {
760 let close_result = match writer.take() {
761 Some(mut writer) => {
762 Some(writer.close().instrument_await("iceberg_close").await)
763 }
764 _ => None,
765 };
766 match writer_builder.build() {
767 Ok(new_writer) => {
768 *writer = Some(Box::new(new_writer));
769 }
770 _ => {
771 tracing::warn!("Failed to build new writer after close");
774 }
775 }
776 close_result
777 }
778 IcebergWriterDispatch::Upsert {
779 writer,
780 writer_builder,
781 ..
782 } => {
783 let close_result = match writer.take() {
784 Some(mut writer) => {
785 Some(writer.close().instrument_await("iceberg_close").await)
786 }
787 _ => None,
788 };
789 match writer_builder.build() {
790 Ok(new_writer) => {
791 *writer = Some(Box::new(new_writer));
792 }
793 _ => {
794 tracing::warn!("Failed to build new writer after close");
797 }
798 }
799 close_result
800 }
801 };
802
803 match close_result {
804 Some(Ok(result)) => {
805 inner.uncommitted_write_bytes = 0;
806 let format_version = inner.table.metadata().format_version();
807 let partition_type = inner.table.metadata().default_partition_type();
808 let data_files = result
809 .into_iter()
810 .map(|f| {
811 let truncated = truncate_datafile(f);
813 SerializedDataFile::try_from(truncated, partition_type, format_version)
814 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
815 })
816 .collect::<Result<Vec<_>>>()?;
817 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
818 data_files,
819 schema_id: inner.table.metadata().current_schema_id(),
820 partition_spec_id: inner.table.metadata().default_partition_spec_id(),
821 })?))
822 }
823 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
824 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
825 }
826 }
827}
828
829const MAX_COLUMN_STAT_SIZE: usize = 10240; fn truncate_datafile(mut data_file: DataFile) -> DataFile {
853 data_file.lower_bounds.retain(|field_id, datum| {
855 let size = match datum.to_bytes() {
857 Ok(bytes) => bytes.len(),
858 Err(_) => 0,
859 };
860
861 if size > MAX_COLUMN_STAT_SIZE {
862 tracing::debug!(
863 field_id = field_id,
864 size = size,
865 "Truncating large lower_bound statistic"
866 );
867 return false;
868 }
869 true
870 });
871
872 data_file.upper_bounds.retain(|field_id, datum| {
874 let size = match datum.to_bytes() {
876 Ok(bytes) => bytes.len(),
877 Err(_) => 0,
878 };
879
880 if size > MAX_COLUMN_STAT_SIZE {
881 tracing::debug!(
882 field_id = field_id,
883 size = size,
884 "Truncating large upper_bound statistic"
885 );
886 return false;
887 }
888 true
889 });
890
891 data_file
892}