1use std::sync::Arc;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use await_tree::InstrumentAwait;
20use iceberg::arrow::{
21 RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
22};
23use iceberg::spec::{
24 DataFile, FormatVersion, PartitionSpecRef, SchemaRef as IcebergSchemaRef, SerializedDataFile,
25};
26use iceberg::table::Table;
27use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
28use iceberg::writer::base_writer::deletion_vector_writer::{
29 DeletionVectorWriter, DeletionVectorWriterBuilder,
30};
31use iceberg::writer::base_writer::equality_delete_writer::{
32 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
33};
34use iceberg::writer::base_writer::position_delete_file_writer::{
35 POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
36 PositionDeleteInput,
37};
38use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
39use iceberg::writer::file_writer::ParquetWriterBuilder;
40use iceberg::writer::file_writer::location_generator::{
41 DefaultFileNameGenerator, DefaultLocationGenerator,
42};
43use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
44use iceberg::writer::task_writer::TaskWriter;
45use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
46use itertools::Itertools;
47use parquet::file::properties::WriterProperties;
48use risingwave_common::array::arrow::IcebergArrowConvert;
49use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
50use risingwave_common::array::arrow::arrow_schema_iceberg::{
51 DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef,
52};
53use risingwave_common::array::{Op, StreamChunk};
54use risingwave_common::bitmap::Bitmap;
55use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
56use risingwave_common_estimate_size::EstimateSize;
57use risingwave_pb::connector_service::SinkMetadata;
58use uuid::Uuid;
59
60use super::prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
61use super::{
62 GLOBAL_SINK_METRICS, IcebergCommitResult, IcebergConfig, PARQUET_CREATED_BY, SinkError,
63 SinkWriterParam, create_and_validate_table_impl,
64};
65use crate::sink::writer::SinkWriter;
66use crate::sink::{Result, SinkParam};
67
68enum ProjectIdxVec {
75 None,
76 Prepare(usize),
77 Done(Vec<usize>),
78}
79
80type DataFileWriterBuilderType =
81 DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
82type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
83 ParquetWriterBuilder,
84 DefaultLocationGenerator,
85 DefaultFileNameGenerator,
86>;
87type PositionDeleteFileWriterType = PositionDeleteFileWriter<
88 ParquetWriterBuilder,
89 DefaultLocationGenerator,
90 DefaultFileNameGenerator,
91>;
92type DeletionVectorWriterBuilderType =
93 DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
94type DeletionVectorWriterType =
95 DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
96type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
97 ParquetWriterBuilder,
98 DefaultLocationGenerator,
99 DefaultFileNameGenerator,
100>;
101
102#[derive(Clone)]
103enum PositionDeleteWriterBuilderType {
104 PositionDelete(PositionDeleteFileWriterBuilderType),
105 DeletionVector(DeletionVectorWriterBuilderType),
106}
107
108enum PositionDeleteWriterType {
109 PositionDelete(PositionDeleteFileWriterType),
110 DeletionVector(DeletionVectorWriterType),
111}
112
113#[async_trait]
114impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
115 type R = PositionDeleteWriterType;
116
117 async fn build(
118 &self,
119 partition_key: Option<iceberg::spec::PartitionKey>,
120 ) -> iceberg::Result<Self::R> {
121 match self {
122 PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
123 PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
124 ),
125 PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
126 PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
127 ),
128 }
129 }
130}
131
132#[async_trait]
133impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
134 async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
135 match self {
136 PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
137 PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
138 }
139 }
140
141 async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
142 match self {
143 PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
144 PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
145 }
146 }
147}
148
149#[derive(Clone)]
150struct SharedIcebergWriterBuilder<B>(Arc<B>);
151
152#[async_trait]
153impl<B: IcebergWriterBuilder> IcebergWriterBuilder for SharedIcebergWriterBuilder<B> {
154 type R = B::R;
155
156 async fn build(
157 &self,
158 partition_key: Option<iceberg::spec::PartitionKey>,
159 ) -> iceberg::Result<Self::R> {
160 self.0.build(partition_key).await
161 }
162}
163
164#[derive(Clone)]
165struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
166 inner: Arc<B>,
167 fanout_enabled: bool,
168 schema: IcebergSchemaRef,
169 partition_spec: PartitionSpecRef,
170 compute_partition: bool,
171}
172
173impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
174 fn new(
175 inner: B,
176 fanout_enabled: bool,
177 schema: IcebergSchemaRef,
178 partition_spec: PartitionSpecRef,
179 compute_partition: bool,
180 ) -> Self {
181 Self {
182 inner: Arc::new(inner),
183 fanout_enabled,
184 schema,
185 partition_spec,
186 compute_partition,
187 }
188 }
189
190 fn build(&self) -> iceberg::Result<TaskWriter<SharedIcebergWriterBuilder<B>>> {
191 let partition_splitter = match (
192 self.partition_spec.is_unpartitioned(),
193 self.compute_partition,
194 ) {
195 (true, _) => None,
196 (false, true) => Some(RecordBatchPartitionSplitter::try_new_with_computed_values(
197 self.schema.clone(),
198 self.partition_spec.clone(),
199 )?),
200 (false, false) => Some(
201 RecordBatchPartitionSplitter::try_new_with_precomputed_values(
202 self.schema.clone(),
203 self.partition_spec.clone(),
204 )?,
205 ),
206 };
207
208 Ok(TaskWriter::new_with_partition_splitter(
209 SharedIcebergWriterBuilder(self.inner.clone()),
210 self.fanout_enabled,
211 self.schema.clone(),
212 self.partition_spec.clone(),
213 partition_splitter,
214 ))
215 }
216}
217
218pub enum IcebergSinkWriter {
219 Created(IcebergSinkWriterArgs),
220 Initialized(IcebergSinkWriterInner),
221}
222
223pub struct IcebergSinkWriterArgs {
224 config: IcebergConfig,
225 sink_param: SinkParam,
226 writer_param: SinkWriterParam,
227 unique_column_ids: Option<Vec<usize>>,
228}
229
230pub struct IcebergSinkWriterInner {
231 writer: IcebergWriterDispatch,
232 arrow_schema: SchemaRef,
233 metrics: IcebergWriterMetrics,
235 table: Table,
237 project_idx_vec: ProjectIdxVec,
240 commit_checkpoint_size_threshold_bytes: Option<u64>,
241 uncommitted_write_bytes: u64,
242}
243
244#[allow(clippy::type_complexity)]
245enum IcebergWriterDispatch {
246 Append {
247 writer: Option<Box<dyn IcebergWriter>>,
248 writer_builder:
249 TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
250 },
251 Upsert {
252 writer: Option<Box<dyn IcebergWriter>>,
253 writer_builder: TaskWriterBuilderWrapper<
254 MonitoredGeneralWriterBuilder<
255 DeltaWriterBuilder<
256 DataFileWriterBuilderType,
257 PositionDeleteWriterBuilderType,
258 EqualityDeleteFileWriterBuilderType,
259 >,
260 >,
261 >,
262 arrow_schema_with_op_column: SchemaRef,
263 },
264}
265
266impl IcebergWriterDispatch {
267 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
268 match self {
269 IcebergWriterDispatch::Append { writer, .. }
270 | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
271 }
272 }
273}
274
275pub struct IcebergWriterMetrics {
276 _write_qps: LabelGuardedIntCounter,
281 _write_latency: LabelGuardedHistogram,
282 write_bytes: LabelGuardedIntCounter,
283}
284
285impl IcebergSinkWriter {
286 pub fn new(
287 config: IcebergConfig,
288 sink_param: SinkParam,
289 writer_param: SinkWriterParam,
290 unique_column_ids: Option<Vec<usize>>,
291 ) -> Self {
292 Self::Created(IcebergSinkWriterArgs {
293 config,
294 sink_param,
295 writer_param,
296 unique_column_ids,
297 })
298 }
299}
300
301impl IcebergSinkWriterInner {
302 fn should_commit_on_checkpoint(&self) -> bool {
303 self.commit_checkpoint_size_threshold_bytes
304 .is_some_and(|threshold| {
305 self.uncommitted_write_bytes > 0 && self.uncommitted_write_bytes >= threshold
306 })
307 }
308
309 fn build_append_only(
310 config: &IcebergConfig,
311 table: Table,
312 writer_param: &SinkWriterParam,
313 ) -> Result<Self> {
314 let SinkWriterParam {
315 extra_partition_col_idx,
316 actor_id,
317 sink_id,
318 sink_name,
319 ..
320 } = writer_param;
321 let metrics_labels = [
322 &actor_id.to_string(),
323 &sink_id.to_string(),
324 sink_name.as_str(),
325 ];
326
327 let write_qps = GLOBAL_SINK_METRICS
329 .iceberg_write_qps
330 .with_guarded_label_values(&metrics_labels);
331 let write_latency = GLOBAL_SINK_METRICS
332 .iceberg_write_latency
333 .with_guarded_label_values(&metrics_labels);
334 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
337 .iceberg_rolling_unflushed_data_file
338 .with_guarded_label_values(&metrics_labels);
339 let write_bytes = GLOBAL_SINK_METRICS
340 .iceberg_write_bytes
341 .with_guarded_label_values(&metrics_labels);
342
343 let schema = table.metadata().current_schema();
344 let partition_spec = table.metadata().default_partition_spec();
345 let fanout_enabled = !partition_spec.fields().is_empty();
346 let unique_uuid_suffix = Uuid::now_v7();
348
349 let parquet_writer_properties = WriterProperties::builder()
350 .set_compression(config.get_parquet_compression())
351 .set_max_row_group_size(config.write_parquet_max_row_group_rows())
352 .set_created_by(PARQUET_CREATED_BY.to_owned())
353 .build();
354
355 let parquet_writer_builder =
356 ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
357 let rolling_builder = RollingFileWriterBuilder::new(
358 parquet_writer_builder,
359 (config.target_file_size_mb() * 1024 * 1024) as usize,
360 table.file_io().clone(),
361 DefaultLocationGenerator::new(table.metadata().clone())
362 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
363 DefaultFileNameGenerator::new(
364 writer_param.actor_id.to_string(),
365 Some(unique_uuid_suffix.to_string()),
366 iceberg::spec::DataFileFormat::Parquet,
367 ),
368 );
369 let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
370 let monitored_builder = MonitoredGeneralWriterBuilder::new(
371 data_file_builder,
372 write_qps.clone(),
373 write_latency.clone(),
374 );
375 let writer_builder = TaskWriterBuilderWrapper::new(
376 monitored_builder,
377 fanout_enabled,
378 schema.clone(),
379 partition_spec.clone(),
380 true,
381 );
382 let inner_writer = Some(Box::new(
383 writer_builder
384 .build()
385 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
386 ) as Box<dyn IcebergWriter>);
387 Ok(Self {
388 arrow_schema: Arc::new(
389 schema_to_arrow_schema(table.metadata().current_schema())
390 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
391 ),
392 metrics: IcebergWriterMetrics {
393 _write_qps: write_qps,
394 _write_latency: write_latency,
395 write_bytes,
396 },
397 writer: IcebergWriterDispatch::Append {
398 writer: inner_writer,
399 writer_builder,
400 },
401 table,
402 project_idx_vec: {
403 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
404 ProjectIdxVec::Prepare(*extra_partition_col_idx)
405 } else {
406 ProjectIdxVec::None
407 }
408 },
409 commit_checkpoint_size_threshold_bytes: config.commit_checkpoint_size_threshold_bytes(),
410 uncommitted_write_bytes: 0,
411 })
412 }
413
414 fn build_upsert(
415 config: &IcebergConfig,
416 table: Table,
417 unique_column_ids: Vec<usize>,
418 writer_param: &SinkWriterParam,
419 ) -> Result<Self> {
420 let SinkWriterParam {
421 extra_partition_col_idx,
422 actor_id,
423 sink_id,
424 sink_name,
425 ..
426 } = writer_param;
427 let metrics_labels = [
428 &actor_id.to_string(),
429 &sink_id.to_string(),
430 sink_name.as_str(),
431 ];
432 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
433
434 let write_qps = GLOBAL_SINK_METRICS
436 .iceberg_write_qps
437 .with_guarded_label_values(&metrics_labels);
438 let write_latency = GLOBAL_SINK_METRICS
439 .iceberg_write_latency
440 .with_guarded_label_values(&metrics_labels);
441 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
444 .iceberg_rolling_unflushed_data_file
445 .with_guarded_label_values(&metrics_labels);
446 let write_bytes = GLOBAL_SINK_METRICS
447 .iceberg_write_bytes
448 .with_guarded_label_values(&metrics_labels);
449
450 let schema = table.metadata().current_schema();
452 let partition_spec = table.metadata().default_partition_spec();
453 let fanout_enabled = !partition_spec.fields().is_empty();
454 let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
455
456 let unique_uuid_suffix = Uuid::now_v7();
458
459 let parquet_writer_properties = WriterProperties::builder()
460 .set_compression(config.get_parquet_compression())
461 .set_max_row_group_size(config.write_parquet_max_row_group_rows())
462 .set_created_by(PARQUET_CREATED_BY.to_owned())
463 .build();
464
465 let data_file_builder = {
466 let parquet_writer_builder =
467 ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
468 let rolling_writer_builder = RollingFileWriterBuilder::new(
469 parquet_writer_builder,
470 (config.target_file_size_mb() * 1024 * 1024) as usize,
471 table.file_io().clone(),
472 DefaultLocationGenerator::new(table.metadata().clone())
473 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
474 DefaultFileNameGenerator::new(
475 writer_param.actor_id.to_string(),
476 Some(unique_uuid_suffix.to_string()),
477 iceberg::spec::DataFileFormat::Parquet,
478 ),
479 );
480 DataFileWriterBuilder::new(rolling_writer_builder)
481 };
482 let position_delete_builder = if use_deletion_vectors {
483 let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
484 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
485 PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
486 table.file_io().clone(),
487 location_generator,
488 DefaultFileNameGenerator::new(
489 writer_param.actor_id.to_string(),
490 Some(format!("delvec-{}", unique_uuid_suffix)),
491 iceberg::spec::DataFileFormat::Puffin,
492 ),
493 ))
494 } else {
495 let parquet_writer_builder = ParquetWriterBuilder::new(
496 parquet_writer_properties.clone(),
497 POSITION_DELETE_SCHEMA.clone().into(),
498 );
499 let rolling_writer_builder = RollingFileWriterBuilder::new(
500 parquet_writer_builder,
501 (config.target_file_size_mb() * 1024 * 1024) as usize,
502 table.file_io().clone(),
503 DefaultLocationGenerator::new(table.metadata().clone())
504 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
505 DefaultFileNameGenerator::new(
506 writer_param.actor_id.to_string(),
507 Some(format!("pos-del-{}", unique_uuid_suffix)),
508 iceberg::spec::DataFileFormat::Parquet,
509 ),
510 );
511 PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
512 rolling_writer_builder,
513 ))
514 };
515 let equality_delete_builder = {
516 let eq_del_config = EqualityDeleteWriterConfig::new(
517 unique_column_ids.clone(),
518 table.metadata().current_schema().clone(),
519 )
520 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
521 let parquet_writer_builder = ParquetWriterBuilder::new(
522 parquet_writer_properties,
523 Arc::new(
524 arrow_schema_to_schema(eq_del_config.projected_arrow_schema_ref())
525 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
526 ),
527 );
528 let rolling_writer_builder = RollingFileWriterBuilder::new(
529 parquet_writer_builder,
530 (config.target_file_size_mb() * 1024 * 1024) as usize,
531 table.file_io().clone(),
532 DefaultLocationGenerator::new(table.metadata().clone())
533 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
534 DefaultFileNameGenerator::new(
535 writer_param.actor_id.to_string(),
536 Some(format!("eq-del-{}", unique_uuid_suffix)),
537 iceberg::spec::DataFileFormat::Parquet,
538 ),
539 );
540
541 EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, eq_del_config)
542 };
543 let delta_builder = DeltaWriterBuilder::new(
544 data_file_builder,
545 position_delete_builder,
546 equality_delete_builder,
547 unique_column_ids,
548 schema.clone(),
549 );
550 let original_arrow_schema = Arc::new(
551 schema_to_arrow_schema(table.metadata().current_schema())
552 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
553 );
554 let schema_with_extra_op_column = {
555 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
556 new_fields.push(Arc::new(ArrowField::new(
557 "op".to_owned(),
558 ArrowDataType::Int32,
559 false,
560 )));
561 Arc::new(ArrowSchema::new(new_fields))
562 };
563 let writer_builder = TaskWriterBuilderWrapper::new(
564 MonitoredGeneralWriterBuilder::new(
565 delta_builder,
566 write_qps.clone(),
567 write_latency.clone(),
568 ),
569 fanout_enabled,
570 schema.clone(),
571 partition_spec.clone(),
572 true,
573 );
574 let inner_writer = Some(Box::new(
575 writer_builder
576 .build()
577 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
578 ) as Box<dyn IcebergWriter>);
579 Ok(Self {
580 arrow_schema: original_arrow_schema,
581 metrics: IcebergWriterMetrics {
582 _write_qps: write_qps,
583 _write_latency: write_latency,
584 write_bytes,
585 },
586 table,
587 writer: IcebergWriterDispatch::Upsert {
588 writer: inner_writer,
589 writer_builder,
590 arrow_schema_with_op_column: schema_with_extra_op_column,
591 },
592 project_idx_vec: {
593 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
594 ProjectIdxVec::Prepare(*extra_partition_col_idx)
595 } else {
596 ProjectIdxVec::None
597 }
598 },
599 commit_checkpoint_size_threshold_bytes: config.commit_checkpoint_size_threshold_bytes(),
600 uncommitted_write_bytes: 0,
601 })
602 }
603}
604
605#[async_trait]
606impl SinkWriter for IcebergSinkWriter {
607 type CommitMetadata = Option<SinkMetadata>;
608
609 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
611 let Self::Created(args) = self else {
612 return Ok(());
613 };
614
615 let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
616 let inner = match &args.unique_column_ids {
617 Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
618 &args.config,
619 table,
620 unique_column_ids.clone(),
621 &args.writer_param,
622 )?,
623 None => {
624 IcebergSinkWriterInner::build_append_only(&args.config, table, &args.writer_param)?
625 }
626 };
627
628 *self = IcebergSinkWriter::Initialized(inner);
629 Ok(())
630 }
631
632 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
634 let Self::Initialized(inner) = self else {
635 unreachable!("IcebergSinkWriter should be initialized before barrier");
636 };
637
638 match &mut inner.writer {
640 IcebergWriterDispatch::Append {
641 writer,
642 writer_builder,
643 } => {
644 if writer.is_none() {
645 *writer = Some(Box::new(
646 writer_builder
647 .build()
648 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
649 ));
650 }
651 }
652 IcebergWriterDispatch::Upsert {
653 writer,
654 writer_builder,
655 ..
656 } => {
657 if writer.is_none() {
658 *writer = Some(Box::new(
659 writer_builder
660 .build()
661 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
662 ));
663 }
664 }
665 };
666
667 let (mut chunk, ops) = chunk.compact_vis().into_parts();
669 match &mut inner.project_idx_vec {
670 ProjectIdxVec::None => {}
671 ProjectIdxVec::Prepare(idx) => {
672 if *idx >= chunk.columns().len() {
673 return Err(SinkError::Iceberg(anyhow!(
674 "invalid extra partition column index {}",
675 idx
676 )));
677 }
678 let project_idx_vec = (0..*idx)
679 .chain(*idx + 1..chunk.columns().len())
680 .collect_vec();
681 chunk = chunk.project(&project_idx_vec);
682 inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
683 }
684 ProjectIdxVec::Done(idx_vec) => {
685 chunk = chunk.project(idx_vec);
686 }
687 }
688 if ops.is_empty() {
689 return Ok(());
690 }
691 let write_batch_size = chunk.estimated_heap_size();
692 let batch = match &inner.writer {
693 IcebergWriterDispatch::Append { .. } => {
694 let filters =
696 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
697 chunk.set_visibility(filters);
698 IcebergArrowConvert
699 .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
700 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
701 }
702 IcebergWriterDispatch::Upsert {
703 arrow_schema_with_op_column,
704 ..
705 } => {
706 let chunk = IcebergArrowConvert
707 .to_record_batch(inner.arrow_schema.clone(), &chunk)
708 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
709 let ops = Arc::new(Int32Array::from(
710 ops.iter()
711 .map(|op| match op {
712 Op::UpdateInsert | Op::Insert => INSERT_OP,
713 Op::UpdateDelete | Op::Delete => DELETE_OP,
714 })
715 .collect_vec(),
716 ));
717 let mut columns = chunk.columns().to_vec();
718 columns.push(ops);
719 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
720 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
721 }
722 };
723
724 let writer = inner.writer.get_writer().unwrap();
725 writer
726 .write(batch)
727 .instrument_await("iceberg_write")
728 .await
729 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
730 inner.metrics.write_bytes.inc_by(write_batch_size as _);
731 inner.uncommitted_write_bytes = inner
732 .uncommitted_write_bytes
733 .saturating_add(write_batch_size as u64);
734 Ok(())
735 }
736
737 fn should_commit_on_checkpoint(&self) -> bool {
738 match self {
739 Self::Initialized(inner) => inner.should_commit_on_checkpoint(),
740 Self::Created(_) => false,
741 }
742 }
743
744 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
747 let Self::Initialized(inner) = self else {
748 unreachable!("IcebergSinkWriter should be initialized before barrier");
749 };
750
751 if !is_checkpoint {
753 return Ok(None);
754 }
755
756 let close_result = match &mut inner.writer {
757 IcebergWriterDispatch::Append {
758 writer,
759 writer_builder,
760 } => {
761 let close_result = match writer.take() {
762 Some(mut writer) => {
763 Some(writer.close().instrument_await("iceberg_close").await)
764 }
765 _ => None,
766 };
767 match writer_builder.build() {
768 Ok(new_writer) => {
769 *writer = Some(Box::new(new_writer));
770 }
771 _ => {
772 tracing::warn!("Failed to build new writer after close");
775 }
776 }
777 close_result
778 }
779 IcebergWriterDispatch::Upsert {
780 writer,
781 writer_builder,
782 ..
783 } => {
784 let close_result = match writer.take() {
785 Some(mut writer) => {
786 Some(writer.close().instrument_await("iceberg_close").await)
787 }
788 _ => None,
789 };
790 match writer_builder.build() {
791 Ok(new_writer) => {
792 *writer = Some(Box::new(new_writer));
793 }
794 _ => {
795 tracing::warn!("Failed to build new writer after close");
798 }
799 }
800 close_result
801 }
802 };
803
804 match close_result {
805 Some(Ok(result)) => {
806 inner.uncommitted_write_bytes = 0;
807 let format_version = inner.table.metadata().format_version();
808 let partition_type = inner.table.metadata().default_partition_type();
809 let data_files = result
810 .into_iter()
811 .map(|f| {
812 let truncated = truncate_datafile(f);
814 SerializedDataFile::try_from(truncated, partition_type, format_version)
815 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
816 })
817 .collect::<Result<Vec<_>>>()?;
818 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
819 data_files,
820 schema_id: inner.table.metadata().current_schema_id(),
821 partition_spec_id: inner.table.metadata().default_partition_spec_id(),
822 })?))
823 }
824 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
825 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
826 }
827 }
828}
829
830const MAX_COLUMN_STAT_SIZE: usize = 10240; fn truncate_datafile(mut data_file: DataFile) -> DataFile {
854 data_file.lower_bounds.retain(|field_id, datum| {
856 let size = match datum.to_bytes() {
858 Ok(bytes) => bytes.len(),
859 Err(_) => 0,
860 };
861
862 if size > MAX_COLUMN_STAT_SIZE {
863 tracing::debug!(
864 field_id = field_id,
865 size = size,
866 "Truncating large lower_bound statistic"
867 );
868 return false;
869 }
870 true
871 });
872
873 data_file.upper_bounds.retain(|field_id, datum| {
875 let size = match datum.to_bytes() {
877 Ok(bytes) => bytes.len(),
878 Err(_) => 0,
879 };
880
881 if size > MAX_COLUMN_STAT_SIZE {
882 tracing::debug!(
883 field_id = field_id,
884 size = size,
885 "Truncating large upper_bound statistic"
886 );
887 return false;
888 }
889 true
890 });
891
892 data_file
893}