1use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::{Context, anyhow};
19use async_trait::async_trait;
20use iceberg::arrow::schema_to_arrow_schema;
21use iceberg::spec::{DataFile, Operation, SerializedDataFile};
22use iceberg::table::Table;
23use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
24use iceberg::{Catalog, TableIdent};
25use itertools::Itertools;
26use risingwave_common::array::arrow::arrow_schema_iceberg::Field as ArrowField;
27use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
28use risingwave_common::bail;
29use risingwave_common::catalog::Field;
30use risingwave_common::error::IcebergError;
31use risingwave_pb::connector_service::SinkMetadata;
32use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
33use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
34use risingwave_pb::stream_plan::PbSinkSchemaChange;
35use serde_json::from_value;
36use thiserror_ext::AsReport;
37use tokio::sync::mpsc::UnboundedSender;
38use tokio_retry::RetryIf;
39use tokio_retry::strategy::{ExponentialBackoff, jitter};
40use tracing::warn;
41
42use super::{GLOBAL_SINK_METRICS, IcebergConfig, SinkError, commit_branch};
43use crate::connector_common::IcebergSinkCompactionUpdate;
44use crate::sink::catalog::SinkId;
45use crate::sink::{Result, SinglePhaseCommitCoordinator, SinkParam, TwoPhaseCommitCoordinator};
46
47const SCHEMA_ID: &str = "schema_id";
48const PARTITION_SPEC_ID: &str = "partition_spec_id";
49const DATA_FILES: &str = "data_files";
50
51#[derive(Default, Clone)]
52pub struct IcebergCommitResult {
53 pub schema_id: i32,
54 pub partition_spec_id: i32,
55 pub data_files: Vec<SerializedDataFile>,
56}
57
58impl IcebergCommitResult {
59 fn try_from(value: &SinkMetadata) -> Result<Self> {
60 if let Some(Serialized(v)) = &value.metadata {
61 let mut values = if let serde_json::Value::Object(v) =
62 serde_json::from_slice::<serde_json::Value>(&v.metadata)
63 .context("Can't parse iceberg sink metadata")?
64 {
65 v
66 } else {
67 bail!("iceberg sink metadata should be an object");
68 };
69
70 let schema_id;
71 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
72 schema_id = value
73 .as_u64()
74 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
75 } else {
76 bail!("iceberg sink metadata should have schema_id");
77 }
78
79 let partition_spec_id;
80 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
81 partition_spec_id = value
82 .as_u64()
83 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
84 } else {
85 bail!("iceberg sink metadata should have partition_spec_id");
86 }
87
88 let data_files: Vec<SerializedDataFile>;
89 if let serde_json::Value::Array(values) = values
90 .remove(DATA_FILES)
91 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
92 {
93 data_files = values
94 .into_iter()
95 .map(from_value::<SerializedDataFile>)
96 .collect::<std::result::Result<_, _>>()
97 .unwrap();
98 } else {
99 bail!("iceberg sink metadata should have data_files object");
100 }
101
102 Ok(Self {
103 schema_id: schema_id as i32,
104 partition_spec_id: partition_spec_id as i32,
105 data_files,
106 })
107 } else {
108 bail!("Can't create iceberg sink write result from empty data!")
109 }
110 }
111
112 fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
113 let mut values = if let serde_json::Value::Object(value) =
114 serde_json::from_slice::<serde_json::Value>(&value)
115 .context("Can't parse iceberg sink metadata")?
116 {
117 value
118 } else {
119 bail!("iceberg sink metadata should be an object");
120 };
121
122 let schema_id;
123 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
124 schema_id = value
125 .as_u64()
126 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
127 } else {
128 bail!("iceberg sink metadata should have schema_id");
129 }
130
131 let partition_spec_id;
132 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
133 partition_spec_id = value
134 .as_u64()
135 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
136 } else {
137 bail!("iceberg sink metadata should have partition_spec_id");
138 }
139
140 let data_files: Vec<SerializedDataFile>;
141 if let serde_json::Value::Array(values) = values
142 .remove(DATA_FILES)
143 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
144 {
145 data_files = values
146 .into_iter()
147 .map(from_value::<SerializedDataFile>)
148 .collect::<std::result::Result<_, _>>()
149 .unwrap();
150 } else {
151 bail!("iceberg sink metadata should have data_files object");
152 }
153
154 Ok(Self {
155 schema_id: schema_id as i32,
156 partition_spec_id: partition_spec_id as i32,
157 data_files,
158 })
159 }
160}
161
162impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
163 type Error = SinkError;
164
165 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
166 let json_data_files = serde_json::Value::Array(
167 value
168 .data_files
169 .iter()
170 .map(serde_json::to_value)
171 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
172 .context("Can't serialize data files to json")?,
173 );
174 let json_value = serde_json::Value::Object(
175 vec![
176 (
177 SCHEMA_ID.to_owned(),
178 serde_json::Value::Number(value.schema_id.into()),
179 ),
180 (
181 PARTITION_SPEC_ID.to_owned(),
182 serde_json::Value::Number(value.partition_spec_id.into()),
183 ),
184 (DATA_FILES.to_owned(), json_data_files),
185 ]
186 .into_iter()
187 .collect(),
188 );
189 Ok(SinkMetadata {
190 metadata: Some(Serialized(SerializedMetadata {
191 metadata: serde_json::to_vec(&json_value)
192 .context("Can't serialize iceberg sink metadata")?,
193 })),
194 })
195 }
196}
197
198impl TryFrom<IcebergCommitResult> for Vec<u8> {
199 type Error = SinkError;
200
201 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
202 let json_data_files = serde_json::Value::Array(
203 value
204 .data_files
205 .iter()
206 .map(serde_json::to_value)
207 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
208 .context("Can't serialize data files to json")?,
209 );
210 let json_value = serde_json::Value::Object(
211 vec![
212 (
213 SCHEMA_ID.to_owned(),
214 serde_json::Value::Number(value.schema_id.into()),
215 ),
216 (
217 PARTITION_SPEC_ID.to_owned(),
218 serde_json::Value::Number(value.partition_spec_id.into()),
219 ),
220 (DATA_FILES.to_owned(), json_data_files),
221 ]
222 .into_iter()
223 .collect(),
224 );
225 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
226 }
227}
228pub struct IcebergSinkCommitter {
229 pub(super) catalog: Arc<dyn Catalog>,
230 pub(super) table: Table,
231 pub last_commit_epoch: u64,
232 pub(crate) sink_id: SinkId,
233 pub(crate) config: IcebergConfig,
234 pub(crate) param: SinkParam,
235 pub(super) commit_retry_num: u32,
236 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
237}
238
239impl IcebergSinkCommitter {
240 async fn reload_table(
243 catalog: &dyn Catalog,
244 table_ident: &TableIdent,
245 schema_id: i32,
246 partition_spec_id: i32,
247 ) -> Result<Table> {
248 let table = catalog
249 .load_table(table_ident)
250 .await
251 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
252 if table.metadata().current_schema_id() != schema_id {
253 return Err(SinkError::Iceberg(anyhow!(
254 "Schema evolution not supported, expect schema id {}, but got {}",
255 schema_id,
256 table.metadata().current_schema_id()
257 )));
258 }
259 if table.metadata().default_partition_spec_id() != partition_spec_id {
260 return Err(SinkError::Iceberg(anyhow!(
261 "Partition evolution not supported, expect partition spec id {}, but got {}",
262 partition_spec_id,
263 table.metadata().default_partition_spec_id()
264 )));
265 }
266 Ok(table)
267 }
268}
269
270#[async_trait]
271impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
272 async fn init(&mut self) -> Result<()> {
273 tracing::info!(
274 sink_id = %self.param.sink_id,
275 "Iceberg sink coordinator initialized",
276 );
277
278 Ok(())
279 }
280
281 async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
282 tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
283
284 if metadata.is_empty() {
285 tracing::debug!(?epoch, "No datafile to commit");
286 return Ok(());
287 }
288
289 if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
291 self.commit_data_impl(epoch, write_results, snapshot_id)
292 .await?;
293 }
294
295 Ok(())
296 }
297
298 async fn commit_schema_change(
299 &mut self,
300 epoch: u64,
301 schema_change: PbSinkSchemaChange,
302 ) -> Result<()> {
303 tracing::info!(
304 "Committing schema change {:?} in epoch {}",
305 schema_change,
306 epoch
307 );
308 self.commit_schema_change_impl(schema_change).await?;
309 tracing::info!("Successfully committed schema change in epoch {}", epoch);
310
311 Ok(())
312 }
313}
314
315#[async_trait]
316impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
317 async fn init(&mut self) -> Result<()> {
318 tracing::info!(
319 sink_id = %self.param.sink_id,
320 "Iceberg sink coordinator initialized",
321 );
322
323 Ok(())
324 }
325
326 async fn pre_commit(
327 &mut self,
328 epoch: u64,
329 metadata: Vec<SinkMetadata>,
330 _schema_change: Option<PbSinkSchemaChange>,
331 ) -> Result<Option<Vec<u8>>> {
332 tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
333
334 let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
335 Some((write_results, snapshot_id)) => (write_results, snapshot_id),
336 None => {
337 tracing::debug!(?epoch, "no data to pre commit");
338 return Ok(None);
339 }
340 };
341
342 let mut write_results_bytes = Vec::new();
343 for each_parallelism_write_result in write_results {
344 let each_parallelism_write_result_bytes: Vec<u8> =
345 each_parallelism_write_result.try_into()?;
346 write_results_bytes.push(each_parallelism_write_result_bytes);
347 }
348
349 let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
350 write_results_bytes.push(snapshot_id_bytes);
351
352 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
353 Ok(Some(pre_commit_metadata_bytes))
354 }
355
356 async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
357 tracing::debug!("Starting iceberg commit in epoch {epoch}");
358
359 if commit_metadata.is_empty() {
360 tracing::debug!(?epoch, "No datafile to commit");
361 return Ok(());
362 }
363
364 let mut payload = deserialize_metadata(commit_metadata);
366 if payload.is_empty() {
367 return Err(SinkError::Iceberg(anyhow!(
368 "Invalid commit metadata: empty payload"
369 )));
370 }
371
372 let snapshot_id_bytes = payload.pop().ok_or_else(|| {
374 SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
375 })?;
376 let snapshot_id = i64::from_le_bytes(
377 snapshot_id_bytes
378 .try_into()
379 .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
380 );
381
382 let write_results = payload
384 .into_iter()
385 .map(IcebergCommitResult::try_from_serialized_bytes)
386 .collect::<Result<Vec<_>>>()?;
387
388 let snapshot_committed = self
389 .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
390 .await?;
391
392 if snapshot_committed {
393 tracing::info!(
394 "Snapshot id {} already committed in iceberg table, skip committing again.",
395 snapshot_id
396 );
397 return Ok(());
398 }
399
400 self.commit_data_impl(epoch, write_results, snapshot_id)
401 .await
402 }
403
404 async fn commit_schema_change(
405 &mut self,
406 epoch: u64,
407 schema_change: PbSinkSchemaChange,
408 ) -> Result<()> {
409 let schema_updated = self.check_schema_change_applied(&schema_change)?;
410 if schema_updated {
411 tracing::info!("Schema change already committed in epoch {}, skip", epoch);
412 return Ok(());
413 }
414
415 tracing::info!(
416 "Committing schema change {:?} in epoch {}",
417 schema_change,
418 epoch
419 );
420 self.commit_schema_change_impl(schema_change).await?;
421 tracing::info!("Successfully committed schema change in epoch {epoch}");
422
423 Ok(())
424 }
425
426 async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
427 tracing::debug!("Abort not implemented yet");
429 }
430}
431
432impl IcebergSinkCommitter {
434 fn pre_commit_inner(
435 &mut self,
436 _epoch: u64,
437 metadata: Vec<SinkMetadata>,
438 ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
439 let write_results: Vec<IcebergCommitResult> = metadata
440 .iter()
441 .map(IcebergCommitResult::try_from)
442 .collect::<Result<Vec<IcebergCommitResult>>>()?;
443
444 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
446 return Ok(None);
447 }
448
449 let expect_schema_id = write_results[0].schema_id;
450 let expect_partition_spec_id = write_results[0].partition_spec_id;
451
452 if write_results
454 .iter()
455 .any(|r| r.schema_id != expect_schema_id)
456 || write_results
457 .iter()
458 .any(|r| r.partition_spec_id != expect_partition_spec_id)
459 {
460 return Err(SinkError::Iceberg(anyhow!(
461 "schema_id and partition_spec_id should be the same in all write results"
462 )));
463 }
464
465 let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
466
467 Ok(Some((write_results, snapshot_id)))
468 }
469
470 async fn commit_data_impl(
471 &mut self,
472 epoch: u64,
473 write_results: Vec<IcebergCommitResult>,
474 snapshot_id: i64,
475 ) -> Result<()> {
476 assert!(
478 !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
479 );
480
481 self.wait_for_snapshot_limit().await?;
483
484 let expect_schema_id = write_results[0].schema_id;
485 let expect_partition_spec_id = write_results[0].partition_spec_id;
486
487 self.table = Self::reload_table(
489 self.catalog.as_ref(),
490 self.table.identifier(),
491 expect_schema_id,
492 expect_partition_spec_id,
493 )
494 .await?;
495
496 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
497 return Err(SinkError::Iceberg(anyhow!(
498 "Can't find schema by id {}",
499 expect_schema_id
500 )));
501 };
502 let Some(partition_spec) = self
503 .table
504 .metadata()
505 .partition_spec_by_id(expect_partition_spec_id)
506 else {
507 return Err(SinkError::Iceberg(anyhow!(
508 "Can't find partition spec by id {}",
509 expect_partition_spec_id
510 )));
511 };
512 let partition_type = partition_spec
513 .as_ref()
514 .clone()
515 .partition_type(schema)
516 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
517
518 let data_files = write_results
519 .into_iter()
520 .flat_map(|r| {
521 r.data_files.into_iter().map(|f| {
522 f.try_into(expect_partition_spec_id, &partition_type, schema)
523 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
524 })
525 })
526 .collect::<Result<Vec<DataFile>>>()?;
527 let retry_strategy = ExponentialBackoff::from_millis(10)
532 .max_delay(Duration::from_secs(60))
533 .map(jitter)
534 .take(self.commit_retry_num as usize);
535 let catalog = self.catalog.clone();
536 let table_ident = self.table.identifier().clone();
537
538 enum CommitError {
543 ReloadTable(SinkError), Commit(SinkError), }
546
547 let table = RetryIf::spawn(
548 retry_strategy,
549 || async {
550 let table = Self::reload_table(
552 catalog.as_ref(),
553 &table_ident,
554 expect_schema_id,
555 expect_partition_spec_id,
556 )
557 .await
558 .map_err(|e| {
559 tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
560 CommitError::ReloadTable(e)
561 })?;
562
563 let txn = Transaction::new(&table);
564 let append_action = txn
565 .fast_append()
566 .set_snapshot_id(snapshot_id)
567 .set_target_branch(commit_branch(
568 self.config.r#type.as_str(),
569 self.config.write_mode,
570 ))
571 .add_data_files(data_files.clone());
572
573 let tx = append_action.apply(txn).map_err(|err| {
574 let err: IcebergError = err.into();
575 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
576 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
577 })?;
578
579 tx.commit(catalog.as_ref()).await.map_err(|err| {
580 let err: IcebergError = err.into();
581 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
582 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
583 })
584 },
585 |err: &CommitError| {
586 match err {
588 CommitError::Commit(_) => {
589 tracing::warn!("Commit failed, will retry");
590 true
591 }
592 CommitError::ReloadTable(_) => {
593 tracing::error!(
594 "reload_table failed with non-retriable error, will not retry"
595 );
596 false
597 }
598 }
599 },
600 )
601 .await
602 .map_err(|e| match e {
603 CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
604 })?;
605 self.table = table;
606
607 let snapshot_num = self.table.metadata().snapshots().count();
608 let catalog_name = self.config.common.catalog_name();
609 let table_name = self.table.identifier().to_string();
610 let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
611 GLOBAL_SINK_METRICS
612 .iceberg_snapshot_num
613 .with_guarded_label_values(&metrics_labels)
614 .set(snapshot_num as i64);
615
616 tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
617
618 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
619 && self.config.enable_compaction
620 && iceberg_compact_stat_sender
621 .send(IcebergSinkCompactionUpdate {
622 sink_id: self.sink_id,
623 compaction_interval: self.config.compaction_interval_sec(),
624 force_compaction: false,
625 })
626 .is_err()
627 {
628 warn!("failed to send iceberg compaction stats");
629 }
630
631 Ok(())
632 }
633
634 async fn is_snapshot_id_in_iceberg(
638 &self,
639 iceberg_config: &IcebergConfig,
640 snapshot_id: i64,
641 ) -> Result<bool> {
642 let table = iceberg_config.load_table().await?;
643 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
644 Ok(true)
645 } else {
646 Ok(false)
647 }
648 }
649
650 fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
653 let current_schema = self.table.metadata().current_schema();
654 let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
655 .context("Failed to convert schema")
656 .map_err(SinkError::Iceberg)?;
657
658 let iceberg_arrow_convert = IcebergArrowConvert;
659
660 let schema_matches = |expected: &[ArrowField]| {
661 if current_arrow_schema.fields().len() != expected.len() {
662 return false;
663 }
664
665 expected.iter().all(|expected_field| {
666 current_arrow_schema.fields().iter().any(|current_field| {
667 current_field.name() == expected_field.name()
668 && current_field.data_type() == expected_field.data_type()
669 })
670 })
671 };
672
673 let original_arrow_fields: Vec<ArrowField> = schema_change
674 .original_schema
675 .iter()
676 .map(|pb_field| {
677 let field = Field::from(pb_field);
678 iceberg_arrow_convert
679 .to_arrow_field(&field.name, &field.data_type)
680 .context("Failed to convert field to arrow")
681 .map_err(SinkError::Iceberg)
682 })
683 .collect::<Result<_>>()?;
684
685 if schema_matches(&original_arrow_fields) {
687 tracing::debug!(
688 "Current iceberg schema matches original_schema ({} columns); schema change not applied",
689 original_arrow_fields.len()
690 );
691 return Ok(false);
692 }
693
694 let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
696 schema_change.op.as_ref()
697 else {
698 return Err(SinkError::Iceberg(anyhow!(
699 "Unsupported sink schema change op in iceberg sink: {:?}",
700 schema_change.op
701 )));
702 };
703
704 let add_arrow_fields: Vec<ArrowField> = add_columns_op
705 .fields
706 .iter()
707 .map(|pb_field| {
708 let field = Field::from(pb_field);
709 iceberg_arrow_convert
710 .to_arrow_field(&field.name, &field.data_type)
711 .context("Failed to convert field to arrow")
712 .map_err(SinkError::Iceberg)
713 })
714 .collect::<Result<_>>()?;
715
716 let mut expected_after_change = original_arrow_fields;
717 expected_after_change.extend(add_arrow_fields);
718
719 if schema_matches(&expected_after_change) {
721 tracing::debug!(
722 "Current iceberg schema matches original_schema + add_columns ({} columns); schema change already applied",
723 expected_after_change.len()
724 );
725 return Ok(true);
726 }
727
728 Err(SinkError::Iceberg(anyhow!(
729 "Current iceberg schema does not match either original_schema ({} cols) or original_schema + add_columns; cannot determine whether schema change is applied",
730 schema_change.original_schema.len()
731 )))
732 }
733
734 async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
738 use iceberg::spec::NestedField;
739
740 let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
741 schema_change.op.as_ref()
742 else {
743 return Err(SinkError::Iceberg(anyhow!(
744 "Unsupported sink schema change op in iceberg sink: {:?}",
745 schema_change.op
746 )));
747 };
748
749 let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
750
751 let metadata = self.table.metadata();
753 let mut next_field_id = metadata.last_column_id() + 1;
754 tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
755
756 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
758 let mut new_fields = Vec::new();
759
760 for field in &add_columns {
761 let arrow_field = iceberg_create_table_arrow_convert
763 .to_arrow_field(&field.name, &field.data_type)
764 .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
765 .map_err(SinkError::Iceberg)?;
766
767 let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
769 .map_err(|err| {
770 SinkError::Iceberg(
771 anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
772 )
773 })?;
774
775 let nested_field = Arc::new(NestedField::optional(
777 next_field_id,
778 &field.name,
779 iceberg_type,
780 ));
781
782 new_fields.push(nested_field);
783 tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
784 next_field_id += 1;
785 }
786
787 tracing::info!(
789 "Committing schema change to catalog for table {}",
790 self.table.identifier()
791 );
792
793 let txn = Transaction::new(&self.table);
794 let action = txn.update_schema().add_fields(new_fields);
795
796 let updated_table = action
797 .apply(txn)
798 .context("Failed to apply schema update action")
799 .map_err(SinkError::Iceberg)?
800 .commit(self.catalog.as_ref())
801 .await
802 .context("Failed to commit table schema change")
803 .map_err(SinkError::Iceberg)?;
804
805 self.table = updated_table;
806
807 tracing::info!(
808 "Successfully committed schema change, added {} columns to iceberg table",
809 add_columns.len()
810 );
811
812 Ok(())
813 }
814
815 fn count_snapshots_since_rewrite(&self) -> usize {
818 let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
819 snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
820
821 let mut count = 0;
823 for snapshot in snapshots {
824 let summary = snapshot.summary();
826 match &summary.operation {
827 Operation::Replace => {
828 break;
830 }
831
832 _ => {
833 count += 1;
835 }
836 }
837 }
838
839 count
840 }
841
842 async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
844 if !self.config.enable_compaction {
845 return Ok(());
846 }
847
848 if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
849 loop {
850 let current_count = self.count_snapshots_since_rewrite();
851
852 if current_count < max_snapshots {
853 tracing::info!(
854 "Snapshot count check passed: {} < {}",
855 current_count,
856 max_snapshots
857 );
858 break;
859 }
860
861 tracing::info!(
862 "Snapshot count {} exceeds limit {}, waiting...",
863 current_count,
864 max_snapshots
865 );
866
867 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
868 && iceberg_compact_stat_sender
869 .send(IcebergSinkCompactionUpdate {
870 sink_id: self.sink_id,
871 compaction_interval: self.config.compaction_interval_sec(),
872 force_compaction: true,
873 })
874 .is_err()
875 {
876 tracing::warn!("failed to send iceberg compaction stats");
877 }
878
879 tokio::time::sleep(Duration::from_secs(30)).await;
881
882 self.table = self.config.load_table().await?;
884 }
885 }
886 Ok(())
887 }
888}
889
890fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
891 serde_json::to_vec(&metadata).unwrap()
892}
893
894fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
895 serde_json::from_slice(&bytes).unwrap()
896}