1use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::{Context, anyhow};
19use async_trait::async_trait;
20use iceberg::arrow::schema_to_arrow_schema;
21use iceberg::spec::{DataFile, Operation, SerializedDataFile, TableMetadata};
22use iceberg::table::Table;
23use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
24use iceberg::{Catalog, TableIdent};
25use itertools::Itertools;
26use risingwave_common::array::arrow::arrow_schema_iceberg::{
27 DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
28};
29use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
30use risingwave_common::bail;
31use risingwave_common::catalog::Field;
32use risingwave_common::error::IcebergError;
33use risingwave_pb::connector_service::SinkMetadata;
34use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
35use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
36use risingwave_pb::stream_plan::PbSinkSchemaChange;
37use serde_json::from_value;
38use thiserror_ext::AsReport;
39use tokio::sync::mpsc::UnboundedSender;
40use tokio_retry::RetryIf;
41use tokio_retry::strategy::{ExponentialBackoff, jitter};
42use tracing::warn;
43
44use super::{GLOBAL_SINK_METRICS, IcebergConfig, SinkError, commit_branch};
45use crate::connector_common::{IcebergCommittedSnapshot, IcebergSinkCompactionUpdate};
46use crate::sink::catalog::SinkId;
47use crate::sink::{Result, SinglePhaseCommitCoordinator, SinkParam, TwoPhaseCommitCoordinator};
48
49const SCHEMA_ID: &str = "schema_id";
50const PARTITION_SPEC_ID: &str = "partition_spec_id";
51const DATA_FILES: &str = "data_files";
52
53#[derive(Default, Clone)]
54pub struct IcebergCommitResult {
55 pub schema_id: i32,
56 pub partition_spec_id: i32,
57 pub data_files: Vec<SerializedDataFile>,
58}
59
60impl IcebergCommitResult {
61 fn try_from(value: &SinkMetadata) -> Result<Self> {
62 if let Some(Serialized(v)) = &value.metadata {
63 let mut values = if let serde_json::Value::Object(v) =
64 serde_json::from_slice::<serde_json::Value>(&v.metadata)
65 .context("Can't parse iceberg sink metadata")?
66 {
67 v
68 } else {
69 bail!("iceberg sink metadata should be an object");
70 };
71
72 let schema_id;
73 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
74 schema_id = value
75 .as_u64()
76 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
77 } else {
78 bail!("iceberg sink metadata should have schema_id");
79 }
80
81 let partition_spec_id;
82 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
83 partition_spec_id = value
84 .as_u64()
85 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
86 } else {
87 bail!("iceberg sink metadata should have partition_spec_id");
88 }
89
90 let data_files: Vec<SerializedDataFile>;
91 if let serde_json::Value::Array(values) = values
92 .remove(DATA_FILES)
93 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
94 {
95 data_files = values
96 .into_iter()
97 .map(from_value::<SerializedDataFile>)
98 .collect::<std::result::Result<_, _>>()
99 .unwrap();
100 } else {
101 bail!("iceberg sink metadata should have data_files object");
102 }
103
104 Ok(Self {
105 schema_id: schema_id as i32,
106 partition_spec_id: partition_spec_id as i32,
107 data_files,
108 })
109 } else {
110 bail!("Can't create iceberg sink write result from empty data!")
111 }
112 }
113
114 fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
115 let mut values = if let serde_json::Value::Object(value) =
116 serde_json::from_slice::<serde_json::Value>(&value)
117 .context("Can't parse iceberg sink metadata")?
118 {
119 value
120 } else {
121 bail!("iceberg sink metadata should be an object");
122 };
123
124 let schema_id;
125 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
126 schema_id = value
127 .as_u64()
128 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
129 } else {
130 bail!("iceberg sink metadata should have schema_id");
131 }
132
133 let partition_spec_id;
134 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
135 partition_spec_id = value
136 .as_u64()
137 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
138 } else {
139 bail!("iceberg sink metadata should have partition_spec_id");
140 }
141
142 let data_files: Vec<SerializedDataFile>;
143 if let serde_json::Value::Array(values) = values
144 .remove(DATA_FILES)
145 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
146 {
147 data_files = values
148 .into_iter()
149 .map(from_value::<SerializedDataFile>)
150 .collect::<std::result::Result<_, _>>()
151 .unwrap();
152 } else {
153 bail!("iceberg sink metadata should have data_files object");
154 }
155
156 Ok(Self {
157 schema_id: schema_id as i32,
158 partition_spec_id: partition_spec_id as i32,
159 data_files,
160 })
161 }
162}
163
164impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
165 type Error = SinkError;
166
167 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
168 let json_data_files = serde_json::Value::Array(
169 value
170 .data_files
171 .iter()
172 .map(serde_json::to_value)
173 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
174 .context("Can't serialize data files to json")?,
175 );
176 let json_value = serde_json::Value::Object(
177 vec![
178 (
179 SCHEMA_ID.to_owned(),
180 serde_json::Value::Number(value.schema_id.into()),
181 ),
182 (
183 PARTITION_SPEC_ID.to_owned(),
184 serde_json::Value::Number(value.partition_spec_id.into()),
185 ),
186 (DATA_FILES.to_owned(), json_data_files),
187 ]
188 .into_iter()
189 .collect(),
190 );
191 Ok(SinkMetadata {
192 metadata: Some(Serialized(SerializedMetadata {
193 metadata: serde_json::to_vec(&json_value)
194 .context("Can't serialize iceberg sink metadata")?,
195 })),
196 })
197 }
198}
199
200impl TryFrom<IcebergCommitResult> for Vec<u8> {
201 type Error = SinkError;
202
203 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
204 let json_data_files = serde_json::Value::Array(
205 value
206 .data_files
207 .iter()
208 .map(serde_json::to_value)
209 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
210 .context("Can't serialize data files to json")?,
211 );
212 let json_value = serde_json::Value::Object(
213 vec![
214 (
215 SCHEMA_ID.to_owned(),
216 serde_json::Value::Number(value.schema_id.into()),
217 ),
218 (
219 PARTITION_SPEC_ID.to_owned(),
220 serde_json::Value::Number(value.partition_spec_id.into()),
221 ),
222 (DATA_FILES.to_owned(), json_data_files),
223 ]
224 .into_iter()
225 .collect(),
226 );
227 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
228 }
229}
230
231fn arrow_data_type_compatible(current: &ArrowDataType, expected: &ArrowDataType) -> bool {
232 use ArrowDataType::*;
233
234 match (current, expected) {
235 (Decimal128(_, _), Decimal128(_, _)) => true,
241 (Binary, LargeBinary) | (LargeBinary, Binary) => true,
242 (List(current_field), List(expected_field)) => {
243 arrow_data_type_compatible(current_field.data_type(), expected_field.data_type())
244 }
245 (Map(current_field, current_sorted), Map(expected_field, expected_sorted)) => {
246 current_sorted == expected_sorted
247 && arrow_data_type_compatible(current_field.data_type(), expected_field.data_type())
248 }
249 (Struct(current_fields), Struct(expected_fields)) => {
250 let expected_fields = expected_fields
251 .iter()
252 .map(|field| field.as_ref().clone())
253 .collect_vec();
254 schema_contains_same_fields(current_fields, &expected_fields)
255 }
256 _ => current == expected,
257 }
258}
259
260fn schema_contains_same_fields(current: &ArrowFields, expected: &[ArrowField]) -> bool {
261 if current.len() != expected.len() {
262 return false;
263 }
264
265 let mut unmatched_current = current.iter().collect_vec();
266 expected.iter().all(|expected_field| {
267 let Some(pos) = unmatched_current.iter().position(|current_field| {
268 current_field.name() == expected_field.name()
269 && arrow_data_type_compatible(current_field.data_type(), expected_field.data_type())
270 }) else {
271 return false;
272 };
273 unmatched_current.swap_remove(pos);
274 true
275 })
276}
277
278pub struct IcebergSinkCommitter {
279 pub(super) catalog: Arc<dyn Catalog>,
280 pub(super) table: Table,
281 pub last_commit_epoch: u64,
282 pub(crate) sink_id: SinkId,
283 pub(crate) config: IcebergConfig,
284 pub(crate) param: SinkParam,
285 pub(super) commit_retry_num: u32,
286 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
287}
288
289impl IcebergSinkCommitter {
290 fn latest_observed_snapshot(&self) -> Option<IcebergCommittedSnapshot> {
291 let branch = commit_branch(self.config.r#type.as_str(), self.config.write_mode);
292 self.table
293 .metadata()
294 .snapshot_for_ref(&branch)
295 .map(|snapshot| IcebergCommittedSnapshot {
296 branch,
297 snapshot_id: snapshot.snapshot_id(),
298 timestamp_ms: snapshot.timestamp_ms(),
299 })
300 }
301
302 fn notify_iceberg_compaction_scheduler(&self, force_compaction: bool) {
303 let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender else {
304 return;
305 };
306
307 let Some(observed_snapshot) = self.latest_observed_snapshot() else {
308 warn!(
309 sink_id = %self.sink_id,
310 "skip iceberg compaction update because no observed snapshot is available"
311 );
312 return;
313 };
314
315 let observed_snapshot_id = observed_snapshot.snapshot_id;
316 let observed_snapshot_timestamp_ms = observed_snapshot.timestamp_ms;
317 let observed_snapshot_branch = observed_snapshot.branch.clone();
318
319 if iceberg_compact_stat_sender
320 .send(IcebergSinkCompactionUpdate {
321 sink_id: self.sink_id,
322 force_compaction,
323 observed_snapshot,
324 })
325 .is_err()
326 {
327 warn!(
328 sink_id = %self.sink_id,
329 force_compaction,
330 observed_snapshot_id,
331 observed_snapshot_timestamp_ms,
332 observed_snapshot_branch = %observed_snapshot_branch,
333 "failed to send iceberg compaction update"
334 );
335 }
336 }
337
338 async fn reload_table(
341 catalog: &dyn Catalog,
342 table_ident: &TableIdent,
343 schema_id: i32,
344 partition_spec_id: i32,
345 ) -> Result<Table> {
346 let table = catalog
347 .load_table(table_ident)
348 .await
349 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
350 if table.metadata().current_schema_id() != schema_id {
351 return Err(SinkError::Iceberg(anyhow!(
352 "Schema evolution not supported, expect schema id {}, but got {}",
353 schema_id,
354 table.metadata().current_schema_id()
355 )));
356 }
357 if table.metadata().default_partition_spec_id() != partition_spec_id {
358 return Err(SinkError::Iceberg(anyhow!(
359 "Partition evolution not supported, expect partition spec id {}, but got {}",
360 partition_spec_id,
361 table.metadata().default_partition_spec_id()
362 )));
363 }
364 Ok(table)
365 }
366}
367
368#[async_trait]
369impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
370 async fn init(&mut self) -> Result<()> {
371 tracing::info!(
372 sink_id = %self.param.sink_id,
373 "Iceberg sink coordinator initialized",
374 );
375
376 Ok(())
377 }
378
379 async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
380 tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
381
382 if metadata.is_empty() {
383 tracing::debug!(?epoch, "No datafile to commit");
384 return Ok(());
385 }
386
387 if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
389 self.commit_data_impl(epoch, write_results, snapshot_id)
390 .await?;
391 }
392
393 Ok(())
394 }
395
396 async fn commit_schema_change(
397 &mut self,
398 epoch: u64,
399 schema_change: PbSinkSchemaChange,
400 ) -> Result<()> {
401 tracing::info!(
402 "Committing schema change {:?} in epoch {}",
403 schema_change,
404 epoch
405 );
406 self.commit_schema_change_impl(schema_change).await?;
407 tracing::info!("Successfully committed schema change in epoch {}", epoch);
408
409 Ok(())
410 }
411}
412
413#[async_trait]
414impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
415 async fn init(&mut self) -> Result<()> {
416 tracing::info!(
417 sink_id = %self.param.sink_id,
418 "Iceberg sink coordinator initialized",
419 );
420
421 Ok(())
422 }
423
424 async fn pre_commit(
425 &mut self,
426 epoch: u64,
427 metadata: Vec<SinkMetadata>,
428 _schema_change: Option<PbSinkSchemaChange>,
429 ) -> Result<Option<Vec<u8>>> {
430 tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
431
432 let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
433 Some((write_results, snapshot_id)) => (write_results, snapshot_id),
434 None => {
435 tracing::debug!(?epoch, "no data to pre commit");
436 return Ok(None);
437 }
438 };
439
440 let mut write_results_bytes = Vec::new();
441 for each_parallelism_write_result in write_results {
442 let each_parallelism_write_result_bytes: Vec<u8> =
443 each_parallelism_write_result.try_into()?;
444 write_results_bytes.push(each_parallelism_write_result_bytes);
445 }
446
447 let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
448 write_results_bytes.push(snapshot_id_bytes);
449
450 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
451 Ok(Some(pre_commit_metadata_bytes))
452 }
453
454 async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
455 tracing::debug!("Starting iceberg commit in epoch {epoch}");
456
457 if commit_metadata.is_empty() {
458 tracing::debug!(?epoch, "No datafile to commit");
459 return Ok(());
460 }
461
462 let mut payload = deserialize_metadata(commit_metadata);
464 if payload.is_empty() {
465 return Err(SinkError::Iceberg(anyhow!(
466 "Invalid commit metadata: empty payload"
467 )));
468 }
469
470 let snapshot_id_bytes = payload.pop().ok_or_else(|| {
472 SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
473 })?;
474 let snapshot_id = i64::from_le_bytes(
475 snapshot_id_bytes
476 .try_into()
477 .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
478 );
479
480 let write_results = payload
482 .into_iter()
483 .map(IcebergCommitResult::try_from_serialized_bytes)
484 .collect::<Result<Vec<_>>>()?;
485
486 let snapshot_committed = self
487 .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
488 .await?;
489
490 if snapshot_committed {
491 tracing::info!(
492 "Snapshot id {} already committed in iceberg table, skip committing again.",
493 snapshot_id
494 );
495 return Ok(());
496 }
497
498 self.commit_data_impl(epoch, write_results, snapshot_id)
499 .await
500 }
501
502 async fn commit_schema_change(
503 &mut self,
504 epoch: u64,
505 schema_change: PbSinkSchemaChange,
506 ) -> Result<()> {
507 let schema_updated = self.check_schema_change_applied(&schema_change)?;
508 if schema_updated {
509 tracing::info!("Schema change already committed in epoch {}, skip", epoch);
510 return Ok(());
511 }
512
513 tracing::info!(
514 "Committing schema change {:?} in epoch {}",
515 schema_change,
516 epoch
517 );
518 self.commit_schema_change_impl(schema_change).await?;
519 tracing::info!("Successfully committed schema change in epoch {epoch}");
520
521 Ok(())
522 }
523
524 async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
525 tracing::debug!("Abort not implemented yet");
527 }
528}
529
530impl IcebergSinkCommitter {
532 fn pre_commit_inner(
533 &mut self,
534 _epoch: u64,
535 metadata: Vec<SinkMetadata>,
536 ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
537 let write_results: Vec<IcebergCommitResult> = metadata
538 .iter()
539 .map(IcebergCommitResult::try_from)
540 .collect::<Result<Vec<IcebergCommitResult>>>()?;
541
542 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
544 return Ok(None);
545 }
546
547 let expect_schema_id = write_results[0].schema_id;
548 let expect_partition_spec_id = write_results[0].partition_spec_id;
549
550 if write_results
552 .iter()
553 .any(|r| r.schema_id != expect_schema_id)
554 || write_results
555 .iter()
556 .any(|r| r.partition_spec_id != expect_partition_spec_id)
557 {
558 return Err(SinkError::Iceberg(anyhow!(
559 "schema_id and partition_spec_id should be the same in all write results"
560 )));
561 }
562
563 let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
564
565 Ok(Some((write_results, snapshot_id)))
566 }
567
568 async fn commit_data_impl(
569 &mut self,
570 epoch: u64,
571 write_results: Vec<IcebergCommitResult>,
572 snapshot_id: i64,
573 ) -> Result<()> {
574 assert!(
576 !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
577 );
578
579 self.wait_for_snapshot_limit().await?;
581
582 let expect_schema_id = write_results[0].schema_id;
583 let expect_partition_spec_id = write_results[0].partition_spec_id;
584
585 self.table = Self::reload_table(
587 self.catalog.as_ref(),
588 self.table.identifier(),
589 expect_schema_id,
590 expect_partition_spec_id,
591 )
592 .await?;
593
594 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
595 return Err(SinkError::Iceberg(anyhow!(
596 "Can't find schema by id {}",
597 expect_schema_id
598 )));
599 };
600 let Some(partition_spec) = self
601 .table
602 .metadata()
603 .partition_spec_by_id(expect_partition_spec_id)
604 else {
605 return Err(SinkError::Iceberg(anyhow!(
606 "Can't find partition spec by id {}",
607 expect_partition_spec_id
608 )));
609 };
610 let partition_type = partition_spec
611 .as_ref()
612 .clone()
613 .partition_type(schema)
614 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
615
616 let data_files = write_results
617 .into_iter()
618 .flat_map(|r| {
619 r.data_files.into_iter().map(|f| {
620 f.try_into(expect_partition_spec_id, &partition_type, schema)
621 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
622 })
623 })
624 .collect::<Result<Vec<DataFile>>>()?;
625 let retry_strategy = ExponentialBackoff::from_millis(10)
630 .max_delay(Duration::from_secs(60))
631 .map(jitter)
632 .take(self.commit_retry_num as usize);
633 let catalog = self.catalog.clone();
634 let table_ident = self.table.identifier().clone();
635
636 enum CommitError {
641 ReloadTable(SinkError), Commit(SinkError), }
644
645 let table = RetryIf::spawn(
646 retry_strategy,
647 || async {
648 let table = Self::reload_table(
650 catalog.as_ref(),
651 &table_ident,
652 expect_schema_id,
653 expect_partition_spec_id,
654 )
655 .await
656 .map_err(|e| {
657 tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
658 CommitError::ReloadTable(e)
659 })?;
660
661 let txn = Transaction::new(&table);
662 let append_action = txn
663 .fast_append()
664 .set_snapshot_id(snapshot_id)
665 .set_target_branch(commit_branch(
666 self.config.r#type.as_str(),
667 self.config.write_mode,
668 ))
669 .add_data_files(data_files.clone());
670
671 let tx = append_action.apply(txn).map_err(|err| {
672 let err: IcebergError = err.into();
673 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
674 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
675 })?;
676
677 tx.commit(catalog.as_ref()).await.map_err(|err| {
678 let err: IcebergError = err.into();
679 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
680 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
681 })
682 },
683 |err: &CommitError| {
684 match err {
686 CommitError::Commit(_) => {
687 tracing::warn!("Commit failed, will retry");
688 true
689 }
690 CommitError::ReloadTable(_) => {
691 tracing::error!(
692 "reload_table failed with non-retriable error, will not retry"
693 );
694 false
695 }
696 }
697 },
698 )
699 .await
700 .map_err(|e| match e {
701 CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
702 })?;
703 self.table = table;
704
705 let snapshot_num = self.table.metadata().snapshots().count();
706 let catalog_name = self.config.common.catalog_name();
707 let table_name = self.table.identifier().to_string();
708 let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
709 GLOBAL_SINK_METRICS
710 .iceberg_snapshot_num
711 .with_guarded_label_values(&metrics_labels)
712 .set(snapshot_num as i64);
713
714 tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
715
716 self.notify_iceberg_compaction_scheduler(false);
717
718 Ok(())
719 }
720
721 async fn is_snapshot_id_in_iceberg(
725 &self,
726 iceberg_config: &IcebergConfig,
727 snapshot_id: i64,
728 ) -> Result<bool> {
729 let table = iceberg_config.load_table().await?;
730 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
731 Ok(true)
732 } else {
733 Ok(false)
734 }
735 }
736
737 fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
740 let current_schema = self.table.metadata().current_schema();
741 let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
742 .context("Failed to convert schema")
743 .map_err(SinkError::Iceberg)?;
744
745 let iceberg_arrow_convert = IcebergArrowConvert;
746
747 let schema_matches = |expected: &[ArrowField]| {
748 schema_contains_same_fields(current_arrow_schema.fields(), expected)
749 };
750
751 let original_arrow_fields: Vec<ArrowField> = schema_change
752 .original_schema
753 .iter()
754 .map(|pb_field| {
755 let field = Field::from(pb_field);
756 iceberg_arrow_convert
757 .to_arrow_field(&field.name, &field.data_type)
758 .context("Failed to convert field to arrow")
759 .map_err(SinkError::Iceberg)
760 })
761 .collect::<Result<_>>()?;
762
763 if schema_matches(&original_arrow_fields) {
765 tracing::debug!(
766 "Current iceberg schema matches original_schema ({} columns); schema change not applied",
767 original_arrow_fields.len()
768 );
769 return Ok(false);
770 }
771
772 let expected_after_change = match schema_change.op.as_ref() {
773 Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(
774 add_columns_op,
775 )) => {
776 let add_arrow_fields: Vec<ArrowField> = add_columns_op
777 .fields
778 .iter()
779 .map(|pb_field| {
780 let field = Field::from(pb_field);
781 iceberg_arrow_convert
782 .to_arrow_field(&field.name, &field.data_type)
783 .context("Failed to convert field to arrow")
784 .map_err(SinkError::Iceberg)
785 })
786 .collect::<Result<_>>()?;
787
788 let mut expected_after_change = original_arrow_fields;
789 expected_after_change.extend(add_arrow_fields);
790 expected_after_change
791 }
792 Some(risingwave_pb::stream_plan::sink_schema_change::Op::DropColumns(
793 drop_columns_op,
794 )) => original_arrow_fields
795 .into_iter()
796 .filter(|field| {
797 !drop_columns_op
798 .column_names
799 .iter()
800 .any(|name| name == field.name())
801 })
802 .collect_vec(),
803 _ => {
804 return Err(SinkError::Iceberg(anyhow!(
805 "Unsupported sink schema change op in iceberg sink: {:?}",
806 schema_change.op
807 )));
808 }
809 };
810
811 if schema_matches(&expected_after_change) {
813 tracing::debug!(
814 "Current iceberg schema matches changed schema ({} columns); schema change already applied",
815 expected_after_change.len()
816 );
817 return Ok(true);
818 }
819
820 Err(SinkError::Iceberg(anyhow!(
821 "Current iceberg schema does not match either original_schema ({} cols) or changed schema; cannot determine whether schema change is applied",
822 schema_change.original_schema.len()
823 )))
824 }
825
826 async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
830 use iceberg::spec::NestedField;
831
832 let metadata = self.table.metadata();
834 let mut next_field_id = metadata.last_column_id() + 1;
835 tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
836
837 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
839 let mut new_fields = Vec::new();
840
841 let mut drop_column_names = Vec::new();
842 match schema_change.op.as_ref() {
843 Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(
844 add_columns_op,
845 )) => {
846 let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
847 for field in &add_columns {
848 let arrow_field = iceberg_create_table_arrow_convert
850 .to_arrow_field(&field.name, &field.data_type)
851 .with_context(|| {
852 format!("Failed to convert field '{}' to arrow", field.name)
853 })
854 .map_err(SinkError::Iceberg)?;
855
856 let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
858 .map_err(|err| {
859 SinkError::Iceberg(
860 anyhow!(err)
861 .context("Failed to convert Arrow type to Iceberg type"),
862 )
863 })?;
864
865 let nested_field = Arc::new(NestedField::optional(
867 next_field_id,
868 &field.name,
869 iceberg_type,
870 ));
871
872 new_fields.push(nested_field);
873 tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
874 next_field_id += 1;
875 }
876 }
877 Some(risingwave_pb::stream_plan::sink_schema_change::Op::DropColumns(
878 drop_columns_op,
879 )) => {
880 drop_column_names = drop_columns_op.column_names.clone();
881 }
882 _ => {
883 return Err(SinkError::Iceberg(anyhow!(
884 "Unsupported sink schema change op in iceberg sink: {:?}",
885 schema_change.op
886 )));
887 }
888 }
889
890 tracing::info!(
892 "Committing schema change to catalog for table {}",
893 self.table.identifier()
894 );
895
896 let txn = Transaction::new(&self.table);
897 let action_fields_added = new_fields.len();
898 let action = txn
899 .update_schema()
900 .add_fields(new_fields)
901 .drop_fields(drop_column_names.clone());
902
903 let updated_table = action
904 .apply(txn)
905 .context("Failed to apply schema update action")
906 .map_err(SinkError::Iceberg)?
907 .commit(self.catalog.as_ref())
908 .await
909 .context("Failed to commit table schema change")
910 .map_err(SinkError::Iceberg)?;
911
912 self.table = updated_table;
913
914 tracing::info!(
915 "Successfully committed schema change, added {} columns and dropped {} columns from iceberg table",
916 action_fields_added,
917 drop_column_names.len()
918 );
919
920 Ok(())
921 }
922
923 fn count_snapshots_since_rewrite_in_metadata(metadata: &TableMetadata, branch: &str) -> usize {
926 let mut snapshot_id = metadata
928 .snapshot_for_ref(branch)
929 .map(|snapshot| snapshot.snapshot_id());
930 let mut count = 0;
931
932 while let Some(current_snapshot_id) = snapshot_id {
934 let Some(snapshot) = metadata.snapshot_by_id(current_snapshot_id) else {
935 break;
936 };
937
938 if snapshot.summary().operation == Operation::Replace {
940 break;
942 }
943
944 count += 1;
946 snapshot_id = snapshot.parent_snapshot_id();
947 }
948
949 count
950 }
951
952 fn count_snapshots_since_rewrite(&self) -> usize {
954 let branch = commit_branch(self.config.r#type.as_str(), self.config.write_mode);
955 Self::count_snapshots_since_rewrite_in_metadata(self.table.metadata(), branch.as_str())
956 }
957
958 async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
960 if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
961 loop {
962 let current_count = self.count_snapshots_since_rewrite();
963
964 if current_count < max_snapshots {
965 tracing::info!(
966 "Snapshot count check passed: {} < {}",
967 current_count,
968 max_snapshots
969 );
970 break;
971 }
972
973 tracing::info!(
974 "Snapshot count {} exceeds limit {}, waiting...",
975 current_count,
976 max_snapshots
977 );
978
979 self.notify_iceberg_compaction_scheduler(true);
980
981 tokio::time::sleep(Duration::from_secs(30)).await;
983
984 self.table = self.config.load_table().await?;
986 }
987 }
988 Ok(())
989 }
990}
991
992fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
993 serde_json::to_vec(&metadata).unwrap()
994}
995
996fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
997 serde_json::from_slice(&bytes).unwrap()
998}
999
1000#[cfg(test)]
1001mod tests {
1002 use std::collections::HashMap;
1003
1004 use iceberg::spec::{
1005 FormatVersion, MAIN_BRANCH, NestedField, PrimitiveType, Schema, Snapshot,
1006 SnapshotReference, SnapshotRetention, SortOrder, Summary, TableMetadataBuilder, Type,
1007 UnboundPartitionSpec,
1008 };
1009 use risingwave_common::array::arrow::arrow_schema_iceberg::{
1010 DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef,
1011 Fields as ArrowFields, Schema as ArrowSchema,
1012 };
1013
1014 use super::*;
1015
1016 #[test]
1017 fn test_schema_contains_same_fields_allows_binary_large_binary() {
1018 let current_schema = ArrowSchema::new(vec![
1019 ArrowField::new("k", ArrowDataType::Int32, true),
1020 ArrowField::new("v", ArrowDataType::LargeBinary, true),
1021 ]);
1022 let expected_fields = vec![
1023 ArrowField::new("k", ArrowDataType::Int32, true),
1024 ArrowField::new("v", ArrowDataType::Binary, true),
1025 ];
1026
1027 assert!(schema_contains_same_fields(
1028 current_schema.fields(),
1029 &expected_fields
1030 ));
1031 }
1032
1033 #[test]
1034 fn test_schema_contains_same_fields_allows_nested_binary_large_binary() {
1035 let current_schema = ArrowSchema::new(vec![
1036 ArrowField::new(
1037 "s",
1038 ArrowDataType::Struct(ArrowFields::from(vec![ArrowField::new(
1039 "payload",
1040 ArrowDataType::LargeBinary,
1041 true,
1042 )])),
1043 true,
1044 ),
1045 ArrowField::new(
1046 "l",
1047 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
1048 ArrowDataType::LargeBinary,
1049 true,
1050 ))),
1051 true,
1052 ),
1053 ArrowField::new_map(
1054 "m",
1055 "entries",
1056 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
1057 ArrowFieldRef::new(ArrowField::new("value", ArrowDataType::LargeBinary, true)),
1058 false,
1059 true,
1060 ),
1061 ]);
1062 let expected_fields = vec![
1063 ArrowField::new(
1064 "s",
1065 ArrowDataType::Struct(ArrowFields::from(vec![ArrowField::new(
1066 "payload",
1067 ArrowDataType::Binary,
1068 true,
1069 )])),
1070 true,
1071 ),
1072 ArrowField::new(
1073 "l",
1074 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
1075 ArrowDataType::Binary,
1076 true,
1077 ))),
1078 true,
1079 ),
1080 ArrowField::new_map(
1081 "m",
1082 "entries",
1083 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
1084 ArrowFieldRef::new(ArrowField::new("value", ArrowDataType::Binary, true)),
1085 false,
1086 true,
1087 ),
1088 ];
1089
1090 assert!(schema_contains_same_fields(
1091 current_schema.fields(),
1092 &expected_fields
1093 ));
1094 }
1095
1096 #[test]
1097 fn test_schema_contains_same_fields_allows_decimal_precision_delta() {
1098 let current_schema = ArrowSchema::new(vec![ArrowField::new(
1099 "d",
1100 ArrowDataType::Decimal128(28, 10),
1101 true,
1102 )]);
1103 let expected_fields = vec![ArrowField::new(
1104 "d",
1105 ArrowDataType::Decimal128(38, 10),
1106 true,
1107 )];
1108
1109 assert!(schema_contains_same_fields(
1110 current_schema.fields(),
1111 &expected_fields
1112 ));
1113 }
1114
1115 #[test]
1116 fn test_schema_contains_same_fields_allows_decimal_precision_and_scale_delta() {
1117 let current_schema = ArrowSchema::new(vec![ArrowField::new(
1118 "d",
1119 ArrowDataType::Decimal128(38, 2),
1120 true,
1121 )]);
1122 let expected_fields = vec![ArrowField::new(
1123 "d",
1124 ArrowDataType::Decimal128(38, 10),
1125 true,
1126 )];
1127
1128 assert!(schema_contains_same_fields(
1129 current_schema.fields(),
1130 &expected_fields
1131 ));
1132 }
1133
1134 #[test]
1135 fn test_schema_contains_same_fields_rejects_length_mismatch() {
1136 let current_schema =
1137 ArrowSchema::new(vec![ArrowField::new("k", ArrowDataType::Int32, true)]);
1138 let expected_fields = vec![
1139 ArrowField::new("k", ArrowDataType::Int32, true),
1140 ArrowField::new("v", ArrowDataType::Utf8, true),
1141 ];
1142
1143 assert!(!schema_contains_same_fields(
1144 current_schema.fields(),
1145 &expected_fields
1146 ));
1147 }
1148
1149 #[test]
1150 fn test_schema_contains_same_fields_rejects_type_mismatch() {
1151 let current_schema =
1152 ArrowSchema::new(vec![ArrowField::new("v", ArrowDataType::Utf8, true)]);
1153 let expected_fields = vec![ArrowField::new("v", ArrowDataType::Int32, true)];
1154
1155 assert!(!schema_contains_same_fields(
1156 current_schema.fields(),
1157 &expected_fields
1158 ));
1159 }
1160
1161 #[test]
1162 fn test_schema_contains_same_fields_rejects_reused_duplicate_match() {
1163 let current_schema = ArrowSchema::new(vec![
1164 ArrowField::new("v", ArrowDataType::Int32, true),
1165 ArrowField::new("v", ArrowDataType::Utf8, true),
1166 ]);
1167 let expected_fields = vec![
1168 ArrowField::new("v", ArrowDataType::Int32, true),
1169 ArrowField::new("v", ArrowDataType::Int32, true),
1170 ];
1171
1172 assert!(!schema_contains_same_fields(
1173 current_schema.fields(),
1174 &expected_fields
1175 ));
1176 }
1177
1178 #[test]
1179 fn test_schema_contains_same_fields_rejects_nested_type_mismatch() {
1180 let current_schema = ArrowSchema::new(vec![ArrowField::new(
1181 "s",
1182 ArrowDataType::Struct(ArrowFields::from(vec![ArrowField::new(
1183 "payload",
1184 ArrowDataType::Utf8,
1185 true,
1186 )])),
1187 true,
1188 )]);
1189 let expected_fields = vec![ArrowField::new(
1190 "s",
1191 ArrowDataType::Struct(ArrowFields::from(vec![ArrowField::new(
1192 "payload",
1193 ArrowDataType::Int32,
1194 true,
1195 )])),
1196 true,
1197 )];
1198
1199 assert!(!schema_contains_same_fields(
1200 current_schema.fields(),
1201 &expected_fields
1202 ));
1203 }
1204
1205 #[test]
1206 fn test_count_snapshots_since_rewrite_in_metadata_ignores_other_branches() {
1207 let mut builder = TableMetadataBuilder::new(
1208 Schema::builder()
1209 .with_fields(vec![
1210 NestedField::new(1, "id", Type::Primitive(PrimitiveType::Long), false).into(),
1211 ])
1212 .build()
1213 .unwrap(),
1214 UnboundPartitionSpec::builder().build(),
1215 SortOrder::unsorted_order(),
1216 "s3://warehouse/db/table".to_owned(),
1217 FormatVersion::V2,
1218 HashMap::new(),
1219 )
1220 .unwrap();
1221
1222 for (snapshot_id, parent_snapshot_id, operation) in [
1223 (1, None, Operation::Append),
1224 (2, Some(1), Operation::Append),
1225 (3, Some(2), Operation::Replace),
1226 (4, Some(3), Operation::Append),
1227 (5, None, Operation::Overwrite),
1230 ] {
1231 builder = builder
1232 .add_snapshot(snapshot(snapshot_id, parent_snapshot_id, operation))
1233 .unwrap();
1234 }
1235
1236 let metadata = builder
1237 .set_ref(super::super::ICEBERG_COW_BRANCH, snapshot_ref(4))
1238 .unwrap()
1239 .set_ref(MAIN_BRANCH, snapshot_ref(5))
1240 .unwrap()
1241 .build()
1242 .unwrap()
1243 .metadata;
1244
1245 let count = IcebergSinkCommitter::count_snapshots_since_rewrite_in_metadata(
1246 &metadata,
1247 super::super::ICEBERG_COW_BRANCH,
1248 );
1249
1250 assert_eq!(count, 1);
1251 }
1252
1253 fn snapshot(
1254 snapshot_id: i64,
1255 parent_snapshot_id: Option<i64>,
1256 operation: Operation,
1257 ) -> Snapshot {
1258 Snapshot::builder()
1259 .with_snapshot_id(snapshot_id)
1260 .with_parent_snapshot_id(parent_snapshot_id)
1261 .with_sequence_number(snapshot_id)
1262 .with_timestamp_ms(snapshot_id)
1263 .with_manifest_list(format!("/snap-{snapshot_id}.avro"))
1264 .with_summary(Summary {
1265 operation,
1266 additional_properties: HashMap::new(),
1267 })
1268 .with_schema_id(0)
1269 .build()
1270 }
1271
1272 fn snapshot_ref(snapshot_id: i64) -> SnapshotReference {
1273 SnapshotReference::new(snapshot_id, SnapshotRetention::branch(None, None, None))
1274 }
1275}