1use std::sync::Arc;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use await_tree::InstrumentAwait;
20use iceberg::arrow::{
21 RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
22};
23use iceberg::spec::{
24 DataFile, FormatVersion, PartitionSpecRef, SchemaRef as IcebergSchemaRef, SerializedDataFile,
25};
26use iceberg::table::Table;
27use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
28use iceberg::writer::base_writer::deletion_vector_writer::{
29 DeletionVectorWriter, DeletionVectorWriterBuilder,
30};
31use iceberg::writer::base_writer::equality_delete_writer::{
32 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
33};
34use iceberg::writer::base_writer::position_delete_file_writer::{
35 POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
36 PositionDeleteInput,
37};
38use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
39use iceberg::writer::file_writer::ParquetWriterBuilder;
40use iceberg::writer::file_writer::location_generator::{
41 DefaultFileNameGenerator, DefaultLocationGenerator,
42};
43use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
44use iceberg::writer::task_writer::TaskWriter;
45use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
46use itertools::Itertools;
47use parquet::file::properties::WriterProperties;
48use risingwave_common::array::arrow::IcebergArrowConvert;
49use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
50use risingwave_common::array::arrow::arrow_schema_iceberg::{
51 DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef,
52};
53use risingwave_common::array::{Op, StreamChunk};
54use risingwave_common::bitmap::Bitmap;
55use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
56use risingwave_common_estimate_size::EstimateSize;
57use risingwave_pb::connector_service::SinkMetadata;
58use uuid::Uuid;
59
60use super::prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
61use super::{
62 GLOBAL_SINK_METRICS, IcebergCommitResult, IcebergConfig, PARQUET_CREATED_BY, SinkError,
63 SinkWriterParam, create_and_validate_table_impl,
64};
65use crate::sink::writer::SinkWriter;
66use crate::sink::{Result, SinkParam};
67
68enum ProjectIdxVec {
76 None,
77 Prepare(usize),
78 Done(Vec<usize>),
79}
80
81type DataFileWriterBuilderType =
82 DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
83type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
84 ParquetWriterBuilder,
85 DefaultLocationGenerator,
86 DefaultFileNameGenerator,
87>;
88type PositionDeleteFileWriterType = PositionDeleteFileWriter<
89 ParquetWriterBuilder,
90 DefaultLocationGenerator,
91 DefaultFileNameGenerator,
92>;
93type DeletionVectorWriterBuilderType =
94 DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
95type DeletionVectorWriterType =
96 DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
97type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
98 ParquetWriterBuilder,
99 DefaultLocationGenerator,
100 DefaultFileNameGenerator,
101>;
102
103#[derive(Clone)]
104enum PositionDeleteWriterBuilderType {
105 PositionDelete(PositionDeleteFileWriterBuilderType),
106 DeletionVector(DeletionVectorWriterBuilderType),
107}
108
109enum PositionDeleteWriterType {
110 PositionDelete(PositionDeleteFileWriterType),
111 DeletionVector(DeletionVectorWriterType),
112}
113
114#[async_trait]
115impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
116 type R = PositionDeleteWriterType;
117
118 async fn build(
119 &self,
120 partition_key: Option<iceberg::spec::PartitionKey>,
121 ) -> iceberg::Result<Self::R> {
122 match self {
123 PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
124 PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
125 ),
126 PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
127 PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
128 ),
129 }
130 }
131}
132
133#[async_trait]
134impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
135 async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
136 match self {
137 PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
138 PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
139 }
140 }
141
142 async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
143 match self {
144 PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
145 PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
146 }
147 }
148}
149
150#[derive(Clone)]
151struct SharedIcebergWriterBuilder<B>(Arc<B>);
152
153#[async_trait]
154impl<B: IcebergWriterBuilder> IcebergWriterBuilder for SharedIcebergWriterBuilder<B> {
155 type R = B::R;
156
157 async fn build(
158 &self,
159 partition_key: Option<iceberg::spec::PartitionKey>,
160 ) -> iceberg::Result<Self::R> {
161 self.0.build(partition_key).await
162 }
163}
164
165#[derive(Clone)]
166struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
167 inner: Arc<B>,
168 fanout_enabled: bool,
169 schema: IcebergSchemaRef,
170 partition_spec: PartitionSpecRef,
171 compute_partition: bool,
172}
173
174impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
175 fn new(
176 inner: B,
177 fanout_enabled: bool,
178 schema: IcebergSchemaRef,
179 partition_spec: PartitionSpecRef,
180 compute_partition: bool,
181 ) -> Self {
182 Self {
183 inner: Arc::new(inner),
184 fanout_enabled,
185 schema,
186 partition_spec,
187 compute_partition,
188 }
189 }
190
191 fn build(&self) -> iceberg::Result<TaskWriter<SharedIcebergWriterBuilder<B>>> {
192 let partition_splitter = match (
193 self.partition_spec.is_unpartitioned(),
194 self.compute_partition,
195 ) {
196 (true, _) => None,
197 (false, true) => Some(RecordBatchPartitionSplitter::try_new_with_computed_values(
198 self.schema.clone(),
199 self.partition_spec.clone(),
200 )?),
201 (false, false) => Some(
202 RecordBatchPartitionSplitter::try_new_with_precomputed_values(
203 self.schema.clone(),
204 self.partition_spec.clone(),
205 )?,
206 ),
207 };
208
209 Ok(TaskWriter::new_with_partition_splitter(
210 SharedIcebergWriterBuilder(self.inner.clone()),
211 self.fanout_enabled,
212 self.schema.clone(),
213 self.partition_spec.clone(),
214 partition_splitter,
215 ))
216 }
217}
218
219pub enum IcebergSinkWriter {
220 Created(IcebergSinkWriterArgs),
221 Initialized(IcebergSinkWriterInner),
222}
223
224pub struct IcebergSinkWriterArgs {
225 config: IcebergConfig,
226 sink_param: SinkParam,
227 writer_param: SinkWriterParam,
228 unique_column_ids: Option<Vec<usize>>,
229}
230
231pub struct IcebergSinkWriterInner {
232 writer: IcebergWriterDispatch,
233 arrow_schema: SchemaRef,
234 metrics: IcebergWriterMetrics,
236 table: Table,
238 project_idx_vec: ProjectIdxVec,
241}
242
243enum IcebergWriterDispatch {
244 Append {
245 writer: Option<Box<dyn IcebergWriter>>,
246 writer_builder:
247 TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
248 },
249 Upsert {
250 writer: Option<Box<dyn IcebergWriter>>,
251 writer_builder: TaskWriterBuilderWrapper<
252 MonitoredGeneralWriterBuilder<
253 DeltaWriterBuilder<
254 DataFileWriterBuilderType,
255 PositionDeleteWriterBuilderType,
256 EqualityDeleteFileWriterBuilderType,
257 >,
258 >,
259 >,
260 arrow_schema_with_op_column: SchemaRef,
261 },
262}
263
264impl IcebergWriterDispatch {
265 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
266 match self {
267 IcebergWriterDispatch::Append { writer, .. }
268 | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
269 }
270 }
271}
272
273pub struct IcebergWriterMetrics {
274 _write_qps: LabelGuardedIntCounter,
279 _write_latency: LabelGuardedHistogram,
280 write_bytes: LabelGuardedIntCounter,
281}
282
283impl IcebergSinkWriter {
284 pub fn new(
285 config: IcebergConfig,
286 sink_param: SinkParam,
287 writer_param: SinkWriterParam,
288 unique_column_ids: Option<Vec<usize>>,
289 ) -> Self {
290 Self::Created(IcebergSinkWriterArgs {
291 config,
292 sink_param,
293 writer_param,
294 unique_column_ids,
295 })
296 }
297}
298
299impl IcebergSinkWriterInner {
300 pub fn build_append_only(
301 config: &IcebergConfig,
302 table: Table,
303 writer_param: &SinkWriterParam,
304 ) -> Result<Self> {
305 let SinkWriterParam {
306 extra_partition_col_idx,
307 actor_id,
308 sink_id,
309 sink_name,
310 ..
311 } = writer_param;
312 let metrics_labels = [
313 &actor_id.to_string(),
314 &sink_id.to_string(),
315 sink_name.as_str(),
316 ];
317
318 let write_qps = GLOBAL_SINK_METRICS
320 .iceberg_write_qps
321 .with_guarded_label_values(&metrics_labels);
322 let write_latency = GLOBAL_SINK_METRICS
323 .iceberg_write_latency
324 .with_guarded_label_values(&metrics_labels);
325 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
328 .iceberg_rolling_unflushed_data_file
329 .with_guarded_label_values(&metrics_labels);
330 let write_bytes = GLOBAL_SINK_METRICS
331 .iceberg_write_bytes
332 .with_guarded_label_values(&metrics_labels);
333
334 let schema = table.metadata().current_schema();
335 let partition_spec = table.metadata().default_partition_spec();
336 let fanout_enabled = !partition_spec.fields().is_empty();
337 let unique_uuid_suffix = Uuid::now_v7();
339
340 let parquet_writer_properties = WriterProperties::builder()
341 .set_compression(config.get_parquet_compression())
342 .set_max_row_group_bytes(config.write_parquet_max_row_group_bytes())
343 .set_created_by(PARQUET_CREATED_BY.to_owned())
344 .build();
345
346 let parquet_writer_builder =
347 ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
348 let rolling_builder = RollingFileWriterBuilder::new(
349 parquet_writer_builder,
350 (config.target_file_size_mb() * 1024 * 1024) as usize,
351 table.file_io().clone(),
352 DefaultLocationGenerator::new(table.metadata().clone())
353 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
354 DefaultFileNameGenerator::new(
355 writer_param.actor_id.to_string(),
356 Some(unique_uuid_suffix.to_string()),
357 iceberg::spec::DataFileFormat::Parquet,
358 ),
359 );
360 let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
361 let monitored_builder = MonitoredGeneralWriterBuilder::new(
362 data_file_builder,
363 write_qps.clone(),
364 write_latency.clone(),
365 );
366 let writer_builder = TaskWriterBuilderWrapper::new(
367 monitored_builder,
368 fanout_enabled,
369 schema.clone(),
370 partition_spec.clone(),
371 true,
372 );
373 let inner_writer = Some(Box::new(
374 writer_builder
375 .build()
376 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
377 ) as Box<dyn IcebergWriter>);
378 Ok(Self {
379 arrow_schema: Arc::new(
380 schema_to_arrow_schema(table.metadata().current_schema())
381 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
382 ),
383 metrics: IcebergWriterMetrics {
384 _write_qps: write_qps,
385 _write_latency: write_latency,
386 write_bytes,
387 },
388 writer: IcebergWriterDispatch::Append {
389 writer: inner_writer,
390 writer_builder,
391 },
392 table,
393 project_idx_vec: {
394 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
395 ProjectIdxVec::Prepare(*extra_partition_col_idx)
396 } else {
397 ProjectIdxVec::None
398 }
399 },
400 })
401 }
402
403 pub fn build_upsert(
404 config: &IcebergConfig,
405 table: Table,
406 unique_column_ids: Vec<usize>,
407 writer_param: &SinkWriterParam,
408 ) -> Result<Self> {
409 let SinkWriterParam {
410 extra_partition_col_idx,
411 actor_id,
412 sink_id,
413 sink_name,
414 ..
415 } = writer_param;
416 let metrics_labels = [
417 &actor_id.to_string(),
418 &sink_id.to_string(),
419 sink_name.as_str(),
420 ];
421 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
422
423 let write_qps = GLOBAL_SINK_METRICS
425 .iceberg_write_qps
426 .with_guarded_label_values(&metrics_labels);
427 let write_latency = GLOBAL_SINK_METRICS
428 .iceberg_write_latency
429 .with_guarded_label_values(&metrics_labels);
430 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
433 .iceberg_rolling_unflushed_data_file
434 .with_guarded_label_values(&metrics_labels);
435 let write_bytes = GLOBAL_SINK_METRICS
436 .iceberg_write_bytes
437 .with_guarded_label_values(&metrics_labels);
438
439 let schema = table.metadata().current_schema();
441 let partition_spec = table.metadata().default_partition_spec();
442 let fanout_enabled = !partition_spec.fields().is_empty();
443 let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
444
445 let unique_uuid_suffix = Uuid::now_v7();
447
448 let parquet_writer_properties = WriterProperties::builder()
449 .set_compression(config.get_parquet_compression())
450 .set_max_row_group_bytes(config.write_parquet_max_row_group_bytes())
451 .set_created_by(PARQUET_CREATED_BY.to_owned())
452 .build();
453
454 let data_file_builder = {
455 let parquet_writer_builder =
456 ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
457 let rolling_writer_builder = RollingFileWriterBuilder::new(
458 parquet_writer_builder,
459 (config.target_file_size_mb() * 1024 * 1024) as usize,
460 table.file_io().clone(),
461 DefaultLocationGenerator::new(table.metadata().clone())
462 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
463 DefaultFileNameGenerator::new(
464 writer_param.actor_id.to_string(),
465 Some(unique_uuid_suffix.to_string()),
466 iceberg::spec::DataFileFormat::Parquet,
467 ),
468 );
469 DataFileWriterBuilder::new(rolling_writer_builder)
470 };
471 let position_delete_builder = if use_deletion_vectors {
472 let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
473 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
474 PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
475 table.file_io().clone(),
476 location_generator,
477 DefaultFileNameGenerator::new(
478 writer_param.actor_id.to_string(),
479 Some(format!("delvec-{}", unique_uuid_suffix)),
480 iceberg::spec::DataFileFormat::Puffin,
481 ),
482 ))
483 } else {
484 let parquet_writer_builder = ParquetWriterBuilder::new(
485 parquet_writer_properties.clone(),
486 POSITION_DELETE_SCHEMA.clone().into(),
487 );
488 let rolling_writer_builder = RollingFileWriterBuilder::new(
489 parquet_writer_builder,
490 (config.target_file_size_mb() * 1024 * 1024) as usize,
491 table.file_io().clone(),
492 DefaultLocationGenerator::new(table.metadata().clone())
493 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
494 DefaultFileNameGenerator::new(
495 writer_param.actor_id.to_string(),
496 Some(format!("pos-del-{}", unique_uuid_suffix)),
497 iceberg::spec::DataFileFormat::Parquet,
498 ),
499 );
500 PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
501 rolling_writer_builder,
502 ))
503 };
504 let equality_delete_builder = {
505 let eq_del_config = EqualityDeleteWriterConfig::new(
506 unique_column_ids.clone(),
507 table.metadata().current_schema().clone(),
508 )
509 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
510 let parquet_writer_builder = ParquetWriterBuilder::new(
511 parquet_writer_properties,
512 Arc::new(
513 arrow_schema_to_schema(eq_del_config.projected_arrow_schema_ref())
514 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
515 ),
516 );
517 let rolling_writer_builder = RollingFileWriterBuilder::new(
518 parquet_writer_builder,
519 (config.target_file_size_mb() * 1024 * 1024) as usize,
520 table.file_io().clone(),
521 DefaultLocationGenerator::new(table.metadata().clone())
522 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
523 DefaultFileNameGenerator::new(
524 writer_param.actor_id.to_string(),
525 Some(format!("eq-del-{}", unique_uuid_suffix)),
526 iceberg::spec::DataFileFormat::Parquet,
527 ),
528 );
529
530 EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, eq_del_config)
531 };
532 let delta_builder = DeltaWriterBuilder::new(
533 data_file_builder,
534 position_delete_builder,
535 equality_delete_builder,
536 unique_column_ids,
537 schema.clone(),
538 );
539 let original_arrow_schema = Arc::new(
540 schema_to_arrow_schema(table.metadata().current_schema())
541 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
542 );
543 let schema_with_extra_op_column = {
544 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
545 new_fields.push(Arc::new(ArrowField::new(
546 "op".to_owned(),
547 ArrowDataType::Int32,
548 false,
549 )));
550 Arc::new(ArrowSchema::new(new_fields))
551 };
552 let writer_builder = TaskWriterBuilderWrapper::new(
553 MonitoredGeneralWriterBuilder::new(
554 delta_builder,
555 write_qps.clone(),
556 write_latency.clone(),
557 ),
558 fanout_enabled,
559 schema.clone(),
560 partition_spec.clone(),
561 true,
562 );
563 let inner_writer = Some(Box::new(
564 writer_builder
565 .build()
566 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
567 ) as Box<dyn IcebergWriter>);
568 Ok(Self {
569 arrow_schema: original_arrow_schema,
570 metrics: IcebergWriterMetrics {
571 _write_qps: write_qps,
572 _write_latency: write_latency,
573 write_bytes,
574 },
575 table,
576 writer: IcebergWriterDispatch::Upsert {
577 writer: inner_writer,
578 writer_builder,
579 arrow_schema_with_op_column: schema_with_extra_op_column,
580 },
581 project_idx_vec: {
582 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
583 ProjectIdxVec::Prepare(*extra_partition_col_idx)
584 } else {
585 ProjectIdxVec::None
586 }
587 },
588 })
589 }
590
591 fn prepare_writer(&mut self) -> Result<()> {
592 match &mut self.writer {
593 IcebergWriterDispatch::Append {
594 writer,
595 writer_builder,
596 } => {
597 if writer.is_none() {
598 *writer = Some(Box::new(
599 writer_builder
600 .build()
601 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
602 ));
603 }
604 }
605 IcebergWriterDispatch::Upsert {
606 writer,
607 writer_builder,
608 ..
609 } => {
610 if writer.is_none() {
611 *writer = Some(Box::new(
612 writer_builder
613 .build()
614 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
615 ));
616 }
617 }
618 };
619 Ok(())
620 }
621
622 fn process_chunk(&mut self, chunk: StreamChunk) -> Result<Option<(RecordBatch, usize)>> {
623 let (mut chunk, ops) = chunk.compact_vis().into_parts();
624 match &mut self.project_idx_vec {
625 ProjectIdxVec::None => {}
626 ProjectIdxVec::Prepare(idx) => {
627 if *idx >= chunk.columns().len() {
628 return Err(SinkError::Iceberg(anyhow!(
629 "invalid extra partition column index {}",
630 idx
631 )));
632 }
633 let project_idx_vec = (0..*idx)
634 .chain(*idx + 1..chunk.columns().len())
635 .collect_vec();
636 chunk = chunk.project(&project_idx_vec);
637 self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
638 }
639 ProjectIdxVec::Done(idx_vec) => {
640 chunk = chunk.project(idx_vec);
641 }
642 }
643 if ops.is_empty() {
644 return Ok(None);
645 }
646 let write_batch_size = chunk.estimated_heap_size();
647 let batch = match &self.writer {
648 IcebergWriterDispatch::Append { .. } => {
649 let filters =
651 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
652 chunk.set_visibility(filters);
653 IcebergArrowConvert
654 .to_record_batch(self.arrow_schema.clone(), &chunk.compact_vis())
655 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
656 }
657 IcebergWriterDispatch::Upsert {
658 arrow_schema_with_op_column,
659 ..
660 } => {
661 let chunk = IcebergArrowConvert
662 .to_record_batch(self.arrow_schema.clone(), &chunk)
663 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
664 let ops = Arc::new(Int32Array::from(
665 ops.iter()
666 .map(|op| match op {
667 Op::UpdateInsert | Op::Insert => INSERT_OP,
668 Op::UpdateDelete | Op::Delete => DELETE_OP,
669 })
670 .collect_vec(),
671 ));
672 let mut columns = chunk.columns().to_vec();
673 columns.push(ops);
674 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
675 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
676 }
677 };
678 Ok(Some((batch, write_batch_size)))
679 }
680
681 pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
682 self.prepare_writer()?;
683 let Some((batch, write_batch_size)) = self.process_chunk(chunk)? else {
684 return Ok(());
685 };
686
687 let writer = self.writer.get_writer().unwrap();
688 writer
689 .write(batch)
690 .instrument_await("iceberg_write")
691 .await?;
692 self.metrics.write_bytes.inc_by(write_batch_size as _);
693 Ok(())
694 }
695
696 pub async fn write_batch_with_position(
697 &mut self,
698 chunk: StreamChunk,
699 ) -> Result<Vec<PositionDeleteInput>> {
700 debug_assert!(
701 matches!(self.writer, IcebergWriterDispatch::Append { .. }),
702 "write_batch_with_position should only be called for append-only writer"
703 );
704
705 self.prepare_writer()?;
706 let Some((batch, write_batch_size)) = self.process_chunk(chunk)? else {
707 return Ok(vec![]);
708 };
709
710 let writer = self.writer.get_writer().unwrap();
711 let positions = writer
712 .write_with_position(batch)
713 .instrument_await("iceberg_write")
714 .await?;
715 self.metrics.write_bytes.inc_by(write_batch_size as _);
716 Ok(positions)
717 }
718
719 pub async fn close(&mut self) -> Result<Option<Vec<DataFile>>> {
720 let res = match &mut self.writer {
721 IcebergWriterDispatch::Append {
722 writer,
723 writer_builder,
724 } => {
725 let close_result = match writer.take() {
726 Some(mut writer) => {
727 Some(writer.close().instrument_await("iceberg_close").await?)
728 }
729 _ => None,
730 };
731 match writer_builder.build() {
732 Ok(new_writer) => {
733 *writer = Some(Box::new(new_writer));
734 }
735 _ => {
736 tracing::warn!("Failed to build new writer after close");
739 }
740 }
741 close_result
742 }
743 IcebergWriterDispatch::Upsert {
744 writer,
745 writer_builder,
746 ..
747 } => {
748 let close_result = match writer.take() {
749 Some(mut writer) => {
750 Some(writer.close().instrument_await("iceberg_close").await?)
751 }
752 _ => None,
753 };
754 match writer_builder.build() {
755 Ok(new_writer) => {
756 *writer = Some(Box::new(new_writer));
757 }
758 _ => {
759 tracing::warn!("Failed to build new writer after close");
762 }
763 }
764 close_result
765 }
766 };
767 Ok(res)
768 }
769
770 pub fn generate_commit_metadata(&self, data_files: Vec<DataFile>) -> Result<SinkMetadata> {
771 let format_version = self.table.metadata().format_version();
772 let partition_type = self.table.metadata().default_partition_type();
773 let serialized_data_files = data_files
774 .into_iter()
775 .map(|f| {
776 let truncated = truncate_datafile(f);
778 let res = SerializedDataFile::try_from(truncated, partition_type, format_version)?;
779 Ok(res)
780 })
781 .collect::<Result<Vec<_>>>()?;
782 SinkMetadata::try_from(&IcebergCommitResult {
783 data_files: serialized_data_files,
784 schema_id: self.table.metadata().current_schema_id(),
785 partition_spec_id: self.table.metadata().default_partition_spec_id(),
786 })
787 }
788
789 pub fn partition_type(&self) -> &iceberg::spec::StructType {
790 self.table.metadata().default_partition_type()
791 }
792}
793
794#[async_trait]
795impl SinkWriter for IcebergSinkWriter {
796 type CommitMetadata = Option<SinkMetadata>;
797
798 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
800 let Self::Created(args) = self else {
801 return Ok(());
802 };
803
804 let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
805 let inner = match &args.unique_column_ids {
806 Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
807 &args.config,
808 table,
809 unique_column_ids.clone(),
810 &args.writer_param,
811 )?,
812 None => {
813 IcebergSinkWriterInner::build_append_only(&args.config, table, &args.writer_param)?
814 }
815 };
816
817 *self = IcebergSinkWriter::Initialized(inner);
818 Ok(())
819 }
820
821 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
823 let Self::Initialized(inner) = self else {
824 unreachable!("IcebergSinkWriter should be initialized before barrier");
825 };
826 inner.write_batch(chunk).await
827 }
828
829 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
832 let Self::Initialized(inner) = self else {
833 unreachable!("IcebergSinkWriter should be initialized before barrier");
834 };
835
836 if !is_checkpoint {
838 return Ok(None);
839 }
840
841 let data_files = inner
842 .close()
843 .await?
844 .ok_or_else(|| anyhow!("No writer to close"))?;
845 let res = inner.generate_commit_metadata(data_files)?;
846 Ok(Some(res))
847 }
848}
849
850const MAX_COLUMN_STAT_SIZE: usize = 10240; pub fn truncate_datafile(mut data_file: DataFile) -> DataFile {
874 data_file.lower_bounds.retain(|field_id, datum| {
876 let size = match datum.to_bytes() {
878 Ok(bytes) => bytes.len(),
879 Err(_) => 0,
880 };
881
882 if size > MAX_COLUMN_STAT_SIZE {
883 tracing::debug!(
884 field_id = field_id,
885 size = size,
886 "Truncating large lower_bound statistic"
887 );
888 return false;
889 }
890 true
891 });
892
893 data_file.upper_bounds.retain(|field_id, datum| {
895 let size = match datum.to_bytes() {
897 Ok(bytes) => bytes.len(),
898 Err(_) => 0,
899 };
900
901 if size > MAX_COLUMN_STAT_SIZE {
902 tracing::debug!(
903 field_id = field_id,
904 size = size,
905 "Truncating large upper_bound statistic"
906 );
907 return false;
908 }
909 true
910 });
911
912 data_file
913}