1use std::sync::Arc;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use await_tree::InstrumentAwait;
20use iceberg::arrow::{
21 RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
22};
23use iceberg::spec::{
24 DataFile, FormatVersion, PartitionSpecRef, SchemaRef as IcebergSchemaRef, SerializedDataFile,
25};
26use iceberg::table::Table;
27use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
28use iceberg::writer::base_writer::deletion_vector_writer::{
29 DeletionVectorWriter, DeletionVectorWriterBuilder,
30};
31use iceberg::writer::base_writer::equality_delete_writer::{
32 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
33};
34use iceberg::writer::base_writer::position_delete_file_writer::{
35 POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
36 PositionDeleteInput,
37};
38use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
39use iceberg::writer::file_writer::ParquetWriterBuilder;
40use iceberg::writer::file_writer::location_generator::{
41 DefaultFileNameGenerator, DefaultLocationGenerator,
42};
43use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
44use iceberg::writer::task_writer::TaskWriter;
45use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
46use itertools::Itertools;
47use parquet::file::properties::WriterProperties;
48use risingwave_common::array::arrow::IcebergArrowConvert;
49use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
50use risingwave_common::array::arrow::arrow_schema_iceberg::{
51 DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef,
52};
53use risingwave_common::array::{Op, StreamChunk};
54use risingwave_common::bitmap::Bitmap;
55use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
56use risingwave_common_estimate_size::EstimateSize;
57use risingwave_pb::connector_service::SinkMetadata;
58use uuid::Uuid;
59
60use super::prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
61use super::{
62 GLOBAL_SINK_METRICS, IcebergCommitResult, IcebergConfig, PARQUET_CREATED_BY, SinkError,
63 SinkWriterParam, create_and_validate_table_impl,
64};
65use crate::sink::writer::SinkWriter;
66use crate::sink::{Result, SinkParam};
67
68enum ProjectIdxVec {
76 None,
77 Prepare(usize),
78 Done(Vec<usize>),
79}
80
81type DataFileWriterBuilderType =
82 DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
83type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
84 ParquetWriterBuilder,
85 DefaultLocationGenerator,
86 DefaultFileNameGenerator,
87>;
88type PositionDeleteFileWriterType = PositionDeleteFileWriter<
89 ParquetWriterBuilder,
90 DefaultLocationGenerator,
91 DefaultFileNameGenerator,
92>;
93type DeletionVectorWriterBuilderType =
94 DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
95type DeletionVectorWriterType =
96 DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
97type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
98 ParquetWriterBuilder,
99 DefaultLocationGenerator,
100 DefaultFileNameGenerator,
101>;
102
103#[derive(Clone)]
104enum PositionDeleteWriterBuilderType {
105 PositionDelete(PositionDeleteFileWriterBuilderType),
106 DeletionVector(DeletionVectorWriterBuilderType),
107}
108
109enum PositionDeleteWriterType {
110 PositionDelete(PositionDeleteFileWriterType),
111 DeletionVector(DeletionVectorWriterType),
112}
113
114#[async_trait]
115impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
116 type R = PositionDeleteWriterType;
117
118 async fn build(
119 &self,
120 partition_key: Option<iceberg::spec::PartitionKey>,
121 ) -> iceberg::Result<Self::R> {
122 match self {
123 PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
124 PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
125 ),
126 PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
127 PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
128 ),
129 }
130 }
131}
132
133#[async_trait]
134impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
135 async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
136 match self {
137 PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
138 PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
139 }
140 }
141
142 async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
143 match self {
144 PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
145 PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
146 }
147 }
148}
149
150#[derive(Clone)]
151struct SharedIcebergWriterBuilder<B>(Arc<B>);
152
153#[async_trait]
154impl<B: IcebergWriterBuilder> IcebergWriterBuilder for SharedIcebergWriterBuilder<B> {
155 type R = B::R;
156
157 async fn build(
158 &self,
159 partition_key: Option<iceberg::spec::PartitionKey>,
160 ) -> iceberg::Result<Self::R> {
161 self.0.build(partition_key).await
162 }
163}
164
165#[derive(Clone)]
166struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
167 inner: Arc<B>,
168 fanout_enabled: bool,
169 schema: IcebergSchemaRef,
170 partition_spec: PartitionSpecRef,
171 compute_partition: bool,
172}
173
174impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
175 fn new(
176 inner: B,
177 fanout_enabled: bool,
178 schema: IcebergSchemaRef,
179 partition_spec: PartitionSpecRef,
180 compute_partition: bool,
181 ) -> Self {
182 Self {
183 inner: Arc::new(inner),
184 fanout_enabled,
185 schema,
186 partition_spec,
187 compute_partition,
188 }
189 }
190
191 fn build(&self) -> iceberg::Result<TaskWriter<SharedIcebergWriterBuilder<B>>> {
192 let partition_splitter = match (
193 self.partition_spec.is_unpartitioned(),
194 self.compute_partition,
195 ) {
196 (true, _) => None,
197 (false, true) => Some(RecordBatchPartitionSplitter::try_new_with_computed_values(
198 self.schema.clone(),
199 self.partition_spec.clone(),
200 )?),
201 (false, false) => Some(
202 RecordBatchPartitionSplitter::try_new_with_precomputed_values(
203 self.schema.clone(),
204 self.partition_spec.clone(),
205 )?,
206 ),
207 };
208
209 Ok(TaskWriter::new_with_partition_splitter(
210 SharedIcebergWriterBuilder(self.inner.clone()),
211 self.fanout_enabled,
212 self.schema.clone(),
213 self.partition_spec.clone(),
214 partition_splitter,
215 ))
216 }
217}
218
219pub enum IcebergSinkWriter {
220 Created(IcebergSinkWriterArgs),
221 Initialized(IcebergSinkWriterInner),
222}
223
224pub struct IcebergSinkWriterArgs {
225 config: IcebergConfig,
226 sink_param: SinkParam,
227 writer_param: SinkWriterParam,
228 upsert_primary_key_column_names: Option<Vec<String>>,
229}
230
231pub struct IcebergSinkWriterInner {
232 writer: IcebergWriterDispatch,
233 arrow_schema: SchemaRef,
234 metrics: IcebergWriterMetrics,
236 table: Table,
238 project_idx_vec: ProjectIdxVec,
241}
242
243enum IcebergWriterDispatch {
244 Append {
245 writer: Option<Box<dyn IcebergWriter>>,
246 writer_builder:
247 TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
248 },
249 Upsert {
250 writer: Option<Box<dyn IcebergWriter>>,
251 writer_builder: TaskWriterBuilderWrapper<
252 MonitoredGeneralWriterBuilder<
253 DeltaWriterBuilder<
254 DataFileWriterBuilderType,
255 PositionDeleteWriterBuilderType,
256 EqualityDeleteFileWriterBuilderType,
257 >,
258 >,
259 >,
260 arrow_schema_with_op_column: SchemaRef,
261 },
262}
263
264impl IcebergWriterDispatch {
265 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
266 match self {
267 IcebergWriterDispatch::Append { writer, .. }
268 | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
269 }
270 }
271}
272
273pub struct IcebergWriterMetrics {
274 _write_qps: LabelGuardedIntCounter,
279 _write_latency: LabelGuardedHistogram,
280 write_bytes: LabelGuardedIntCounter,
281}
282
283impl IcebergSinkWriter {
284 pub fn new(
285 config: IcebergConfig,
286 sink_param: SinkParam,
287 writer_param: SinkWriterParam,
288 upsert_primary_key_column_names: Option<Vec<String>>,
289 ) -> Self {
290 Self::Created(IcebergSinkWriterArgs {
291 config,
292 sink_param,
293 writer_param,
294 upsert_primary_key_column_names,
295 })
296 }
297}
298
299pub(super) fn resolve_equality_delete_field_ids(
300 primary_key_column_names: &[String],
301 iceberg_schema: &iceberg::spec::Schema,
302) -> Result<Vec<i32>> {
303 primary_key_column_names
304 .iter()
305 .map(|column_name| {
306 iceberg_schema.field_id_by_name(column_name).ok_or_else(|| {
307 SinkError::Config(anyhow!(
308 "Primary key column {} not found in Iceberg schema",
309 column_name
310 ))
311 })
312 })
313 .collect()
314}
315
316impl IcebergSinkWriterInner {
317 pub fn build_append_only(
318 config: &IcebergConfig,
319 table: Table,
320 writer_param: &SinkWriterParam,
321 ) -> Result<Self> {
322 let SinkWriterParam {
323 extra_partition_col_idx,
324 actor_id,
325 sink_id,
326 sink_name,
327 ..
328 } = writer_param;
329 let metrics_labels = [
330 &actor_id.to_string(),
331 &sink_id.to_string(),
332 sink_name.as_str(),
333 ];
334
335 let write_qps = GLOBAL_SINK_METRICS
337 .iceberg_write_qps
338 .with_guarded_label_values(&metrics_labels);
339 let write_latency = GLOBAL_SINK_METRICS
340 .iceberg_write_latency
341 .with_guarded_label_values(&metrics_labels);
342 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
345 .iceberg_rolling_unflushed_data_file
346 .with_guarded_label_values(&metrics_labels);
347 let write_bytes = GLOBAL_SINK_METRICS
348 .iceberg_write_bytes
349 .with_guarded_label_values(&metrics_labels);
350
351 let schema = table.metadata().current_schema();
352 let partition_spec = table.metadata().default_partition_spec();
353 let fanout_enabled = !partition_spec.fields().is_empty();
354 let unique_uuid_suffix = Uuid::now_v7();
356
357 let parquet_writer_properties = WriterProperties::builder()
358 .set_compression(config.get_parquet_compression())
359 .set_max_row_group_bytes(config.write_parquet_max_row_group_bytes())
360 .set_created_by(PARQUET_CREATED_BY.to_owned())
361 .build();
362
363 let parquet_writer_builder =
364 ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
365 let rolling_builder = RollingFileWriterBuilder::new(
366 parquet_writer_builder,
367 (config.target_file_size_mb() * 1024 * 1024) as usize,
368 table.file_io().clone(),
369 DefaultLocationGenerator::new(table.metadata().clone())
370 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
371 DefaultFileNameGenerator::new(
372 writer_param.actor_id.to_string(),
373 Some(unique_uuid_suffix.to_string()),
374 iceberg::spec::DataFileFormat::Parquet,
375 ),
376 );
377 let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
378 let monitored_builder = MonitoredGeneralWriterBuilder::new(
379 data_file_builder,
380 write_qps.clone(),
381 write_latency.clone(),
382 );
383 let writer_builder = TaskWriterBuilderWrapper::new(
384 monitored_builder,
385 fanout_enabled,
386 schema.clone(),
387 partition_spec.clone(),
388 true,
389 );
390 let inner_writer = Some(Box::new(
391 writer_builder
392 .build()
393 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
394 ) as Box<dyn IcebergWriter>);
395 Ok(Self {
396 arrow_schema: Arc::new(
397 schema_to_arrow_schema(table.metadata().current_schema())
398 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
399 ),
400 metrics: IcebergWriterMetrics {
401 _write_qps: write_qps,
402 _write_latency: write_latency,
403 write_bytes,
404 },
405 writer: IcebergWriterDispatch::Append {
406 writer: inner_writer,
407 writer_builder,
408 },
409 table,
410 project_idx_vec: {
411 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
412 ProjectIdxVec::Prepare(*extra_partition_col_idx)
413 } else {
414 ProjectIdxVec::None
415 }
416 },
417 })
418 }
419
420 pub fn build_upsert(
421 config: &IcebergConfig,
422 table: Table,
423 primary_key_column_names: Vec<String>,
424 writer_param: &SinkWriterParam,
425 ) -> Result<Self> {
426 let SinkWriterParam {
427 extra_partition_col_idx,
428 actor_id,
429 sink_id,
430 sink_name,
431 ..
432 } = writer_param;
433 let metrics_labels = [
434 &actor_id.to_string(),
435 &sink_id.to_string(),
436 sink_name.as_str(),
437 ];
438
439 let write_qps = GLOBAL_SINK_METRICS
441 .iceberg_write_qps
442 .with_guarded_label_values(&metrics_labels);
443 let write_latency = GLOBAL_SINK_METRICS
444 .iceberg_write_latency
445 .with_guarded_label_values(&metrics_labels);
446 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
449 .iceberg_rolling_unflushed_data_file
450 .with_guarded_label_values(&metrics_labels);
451 let write_bytes = GLOBAL_SINK_METRICS
452 .iceberg_write_bytes
453 .with_guarded_label_values(&metrics_labels);
454
455 let schema = table.metadata().current_schema();
457 let unique_column_ids =
458 resolve_equality_delete_field_ids(&primary_key_column_names, schema)?;
459 let partition_spec = table.metadata().default_partition_spec();
460 let fanout_enabled = !partition_spec.fields().is_empty();
461 let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
462
463 let unique_uuid_suffix = Uuid::now_v7();
465
466 let parquet_writer_properties = WriterProperties::builder()
467 .set_compression(config.get_parquet_compression())
468 .set_max_row_group_bytes(config.write_parquet_max_row_group_bytes())
469 .set_created_by(PARQUET_CREATED_BY.to_owned())
470 .build();
471
472 let data_file_builder = {
473 let parquet_writer_builder =
474 ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
475 let rolling_writer_builder = RollingFileWriterBuilder::new(
476 parquet_writer_builder,
477 (config.target_file_size_mb() * 1024 * 1024) as usize,
478 table.file_io().clone(),
479 DefaultLocationGenerator::new(table.metadata().clone())
480 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
481 DefaultFileNameGenerator::new(
482 writer_param.actor_id.to_string(),
483 Some(unique_uuid_suffix.to_string()),
484 iceberg::spec::DataFileFormat::Parquet,
485 ),
486 );
487 DataFileWriterBuilder::new(rolling_writer_builder)
488 };
489 let position_delete_builder = if use_deletion_vectors {
490 let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
491 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
492 PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
493 table.file_io().clone(),
494 location_generator,
495 DefaultFileNameGenerator::new(
496 writer_param.actor_id.to_string(),
497 Some(format!("delvec-{}", unique_uuid_suffix)),
498 iceberg::spec::DataFileFormat::Puffin,
499 ),
500 ))
501 } else {
502 let parquet_writer_builder = ParquetWriterBuilder::new(
503 parquet_writer_properties.clone(),
504 POSITION_DELETE_SCHEMA.clone().into(),
505 );
506 let rolling_writer_builder = RollingFileWriterBuilder::new(
507 parquet_writer_builder,
508 (config.target_file_size_mb() * 1024 * 1024) as usize,
509 table.file_io().clone(),
510 DefaultLocationGenerator::new(table.metadata().clone())
511 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
512 DefaultFileNameGenerator::new(
513 writer_param.actor_id.to_string(),
514 Some(format!("pos-del-{}", unique_uuid_suffix)),
515 iceberg::spec::DataFileFormat::Parquet,
516 ),
517 );
518 PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
519 rolling_writer_builder,
520 ))
521 };
522 let equality_delete_builder = {
523 let eq_del_config = EqualityDeleteWriterConfig::new(
524 unique_column_ids.clone(),
525 table.metadata().current_schema().clone(),
526 )
527 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
528 let parquet_writer_builder = ParquetWriterBuilder::new(
529 parquet_writer_properties,
530 Arc::new(
531 arrow_schema_to_schema(eq_del_config.projected_arrow_schema_ref())
532 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
533 ),
534 );
535 let rolling_writer_builder = RollingFileWriterBuilder::new(
536 parquet_writer_builder,
537 (config.target_file_size_mb() * 1024 * 1024) as usize,
538 table.file_io().clone(),
539 DefaultLocationGenerator::new(table.metadata().clone())
540 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
541 DefaultFileNameGenerator::new(
542 writer_param.actor_id.to_string(),
543 Some(format!("eq-del-{}", unique_uuid_suffix)),
544 iceberg::spec::DataFileFormat::Parquet,
545 ),
546 );
547
548 EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, eq_del_config)
549 };
550 let delta_builder = DeltaWriterBuilder::new(
551 data_file_builder,
552 position_delete_builder,
553 equality_delete_builder,
554 unique_column_ids,
555 schema.clone(),
556 );
557 let original_arrow_schema = Arc::new(
558 schema_to_arrow_schema(table.metadata().current_schema())
559 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
560 );
561 let schema_with_extra_op_column = {
562 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
563 new_fields.push(Arc::new(ArrowField::new(
564 "op".to_owned(),
565 ArrowDataType::Int32,
566 false,
567 )));
568 Arc::new(ArrowSchema::new(new_fields))
569 };
570 let writer_builder = TaskWriterBuilderWrapper::new(
571 MonitoredGeneralWriterBuilder::new(
572 delta_builder,
573 write_qps.clone(),
574 write_latency.clone(),
575 ),
576 fanout_enabled,
577 schema.clone(),
578 partition_spec.clone(),
579 true,
580 );
581 let inner_writer = Some(Box::new(
582 writer_builder
583 .build()
584 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
585 ) as Box<dyn IcebergWriter>);
586 Ok(Self {
587 arrow_schema: original_arrow_schema,
588 metrics: IcebergWriterMetrics {
589 _write_qps: write_qps,
590 _write_latency: write_latency,
591 write_bytes,
592 },
593 table,
594 writer: IcebergWriterDispatch::Upsert {
595 writer: inner_writer,
596 writer_builder,
597 arrow_schema_with_op_column: schema_with_extra_op_column,
598 },
599 project_idx_vec: {
600 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
601 ProjectIdxVec::Prepare(*extra_partition_col_idx)
602 } else {
603 ProjectIdxVec::None
604 }
605 },
606 })
607 }
608
609 fn prepare_writer(&mut self) -> Result<()> {
610 match &mut self.writer {
611 IcebergWriterDispatch::Append {
612 writer,
613 writer_builder,
614 } => {
615 if writer.is_none() {
616 *writer = Some(Box::new(
617 writer_builder
618 .build()
619 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
620 ));
621 }
622 }
623 IcebergWriterDispatch::Upsert {
624 writer,
625 writer_builder,
626 ..
627 } => {
628 if writer.is_none() {
629 *writer = Some(Box::new(
630 writer_builder
631 .build()
632 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
633 ));
634 }
635 }
636 };
637 Ok(())
638 }
639
640 fn process_chunk(&mut self, chunk: StreamChunk) -> Result<Option<(RecordBatch, usize)>> {
641 let (mut chunk, ops) = chunk.compact_vis().into_parts();
642 match &mut self.project_idx_vec {
643 ProjectIdxVec::None => {}
644 ProjectIdxVec::Prepare(idx) => {
645 if *idx >= chunk.columns().len() {
646 return Err(SinkError::Iceberg(anyhow!(
647 "invalid extra partition column index {}",
648 idx
649 )));
650 }
651 let project_idx_vec = (0..*idx)
652 .chain(*idx + 1..chunk.columns().len())
653 .collect_vec();
654 chunk = chunk.project(&project_idx_vec);
655 self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
656 }
657 ProjectIdxVec::Done(idx_vec) => {
658 chunk = chunk.project(idx_vec);
659 }
660 }
661 if ops.is_empty() {
662 return Ok(None);
663 }
664 let write_batch_size = chunk.estimated_heap_size();
665 let batch = match &self.writer {
666 IcebergWriterDispatch::Append { .. } => {
667 let filters =
669 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
670 chunk.set_visibility(filters);
671 IcebergArrowConvert
672 .to_record_batch(self.arrow_schema.clone(), &chunk.compact_vis())
673 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
674 }
675 IcebergWriterDispatch::Upsert {
676 arrow_schema_with_op_column,
677 ..
678 } => {
679 let chunk = IcebergArrowConvert
680 .to_record_batch(self.arrow_schema.clone(), &chunk)
681 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
682 let ops = Arc::new(Int32Array::from(
683 ops.iter()
684 .map(|op| match op {
685 Op::UpdateInsert | Op::Insert => INSERT_OP,
686 Op::UpdateDelete | Op::Delete => DELETE_OP,
687 })
688 .collect_vec(),
689 ));
690 let mut columns = chunk.columns().to_vec();
691 columns.push(ops);
692 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
693 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
694 }
695 };
696 Ok(Some((batch, write_batch_size)))
697 }
698
699 pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
700 self.prepare_writer()?;
701 let Some((batch, write_batch_size)) = self.process_chunk(chunk)? else {
702 return Ok(());
703 };
704
705 let writer = self.writer.get_writer().unwrap();
706 writer
707 .write(batch)
708 .instrument_await("iceberg_write")
709 .await?;
710 self.metrics.write_bytes.inc_by(write_batch_size as _);
711 Ok(())
712 }
713
714 pub async fn write_batch_with_position(
715 &mut self,
716 chunk: StreamChunk,
717 ) -> Result<Vec<PositionDeleteInput>> {
718 debug_assert!(
719 matches!(self.writer, IcebergWriterDispatch::Append { .. }),
720 "write_batch_with_position should only be called for append-only writer"
721 );
722
723 self.prepare_writer()?;
724 let Some((batch, write_batch_size)) = self.process_chunk(chunk)? else {
725 return Ok(vec![]);
726 };
727
728 let writer = self.writer.get_writer().unwrap();
729 let positions = writer
730 .write_with_position(batch)
731 .instrument_await("iceberg_write")
732 .await?;
733 self.metrics.write_bytes.inc_by(write_batch_size as _);
734 Ok(positions)
735 }
736
737 pub async fn close(&mut self) -> Result<Option<Vec<DataFile>>> {
738 let res = match &mut self.writer {
739 IcebergWriterDispatch::Append {
740 writer,
741 writer_builder,
742 } => {
743 let close_result = match writer.take() {
744 Some(mut writer) => {
745 Some(writer.close().instrument_await("iceberg_close").await?)
746 }
747 _ => None,
748 };
749 match writer_builder.build() {
750 Ok(new_writer) => {
751 *writer = Some(Box::new(new_writer));
752 }
753 _ => {
754 tracing::warn!("Failed to build new writer after close");
757 }
758 }
759 close_result
760 }
761 IcebergWriterDispatch::Upsert {
762 writer,
763 writer_builder,
764 ..
765 } => {
766 let close_result = match writer.take() {
767 Some(mut writer) => {
768 Some(writer.close().instrument_await("iceberg_close").await?)
769 }
770 _ => None,
771 };
772 match writer_builder.build() {
773 Ok(new_writer) => {
774 *writer = Some(Box::new(new_writer));
775 }
776 _ => {
777 tracing::warn!("Failed to build new writer after close");
780 }
781 }
782 close_result
783 }
784 };
785 Ok(res)
786 }
787
788 pub fn generate_commit_metadata(&self, data_files: Vec<DataFile>) -> Result<SinkMetadata> {
789 let format_version = self.table.metadata().format_version();
790 let partition_type = self.table.metadata().default_partition_type();
791 let serialized_data_files = data_files
792 .into_iter()
793 .map(|f| {
794 let truncated = truncate_datafile(f);
796 let res = SerializedDataFile::try_from(truncated, partition_type, format_version)?;
797 Ok(res)
798 })
799 .collect::<Result<Vec<_>>>()?;
800 SinkMetadata::try_from(&IcebergCommitResult {
801 data_files: serialized_data_files,
802 schema_id: self.table.metadata().current_schema_id(),
803 partition_spec_id: self.table.metadata().default_partition_spec_id(),
804 })
805 }
806
807 pub fn partition_type(&self) -> &iceberg::spec::StructType {
808 self.table.metadata().default_partition_type()
809 }
810}
811
812#[async_trait]
813impl SinkWriter for IcebergSinkWriter {
814 type CommitMetadata = Option<SinkMetadata>;
815
816 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
818 let Self::Created(args) = self else {
819 return Ok(());
820 };
821
822 let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
823 let inner = match &args.upsert_primary_key_column_names {
824 Some(primary_key_column_names) => IcebergSinkWriterInner::build_upsert(
825 &args.config,
826 table,
827 primary_key_column_names.clone(),
828 &args.writer_param,
829 )?,
830 None => {
831 IcebergSinkWriterInner::build_append_only(&args.config, table, &args.writer_param)?
832 }
833 };
834
835 *self = IcebergSinkWriter::Initialized(inner);
836 Ok(())
837 }
838
839 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
841 let Self::Initialized(inner) = self else {
842 unreachable!("IcebergSinkWriter should be initialized before barrier");
843 };
844 inner.write_batch(chunk).await
845 }
846
847 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
850 let Self::Initialized(inner) = self else {
851 unreachable!("IcebergSinkWriter should be initialized before barrier");
852 };
853
854 if !is_checkpoint {
856 return Ok(None);
857 }
858
859 let data_files = inner
860 .close()
861 .await?
862 .ok_or_else(|| anyhow!("No writer to close"))?;
863 let res = inner.generate_commit_metadata(data_files)?;
864 Ok(Some(res))
865 }
866}
867
868const MAX_COLUMN_STAT_SIZE: usize = 10240; pub fn truncate_datafile(mut data_file: DataFile) -> DataFile {
892 data_file.lower_bounds.retain(|field_id, datum| {
894 let size = match datum.to_bytes() {
896 Ok(bytes) => bytes.len(),
897 Err(_) => 0,
898 };
899
900 if size > MAX_COLUMN_STAT_SIZE {
901 tracing::debug!(
902 field_id = field_id,
903 size = size,
904 "Truncating large lower_bound statistic"
905 );
906 return false;
907 }
908 true
909 });
910
911 data_file.upper_bounds.retain(|field_id, datum| {
913 let size = match datum.to_bytes() {
915 Ok(bytes) => bytes.len(),
916 Err(_) => 0,
917 };
918
919 if size > MAX_COLUMN_STAT_SIZE {
920 tracing::debug!(
921 field_id = field_id,
922 size = size,
923 "Truncating large upper_bound statistic"
924 );
925 return false;
926 }
927 true
928 });
929
930 data_file
931}