1pub mod exactly_once_util;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use await_tree::InstrumentAwait;
27use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
28use iceberg::spec::{
29 DataFile, SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
30};
31use iceberg::table::Table;
32use iceberg::transaction::Transaction;
33use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
34use iceberg::writer::base_writer::equality_delete_writer::{
35 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
36};
37use iceberg::writer::base_writer::sort_position_delete_writer::{
38 POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
39};
40use iceberg::writer::file_writer::ParquetWriterBuilder;
41use iceberg::writer::file_writer::location_generator::{
42 DefaultFileNameGenerator, DefaultLocationGenerator,
43};
44use iceberg::writer::function_writer::equality_delta_writer::{
45 DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
46};
47use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
48use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
49use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
50use itertools::Itertools;
51use parquet::file::properties::WriterProperties;
52use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
53use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
54use regex::Regex;
55use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
56use risingwave_common::array::arrow::arrow_schema_iceberg::{
57 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
58 Schema as ArrowSchema, SchemaRef,
59};
60use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
61use risingwave_common::array::{Op, StreamChunk};
62use risingwave_common::bail;
63use risingwave_common::bitmap::Bitmap;
64use risingwave_common::catalog::Schema;
65use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
66use risingwave_common_estimate_size::EstimateSize;
67use risingwave_pb::connector_service::SinkMetadata;
68use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
69use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
70use sea_orm::DatabaseConnection;
71use serde_derive::Deserialize;
72use serde_json::from_value;
73use serde_with::{DisplayFromStr, serde_as};
74use thiserror_ext::AsReport;
75use tokio::sync::mpsc::UnboundedSender;
76use tokio_retry::Retry;
77use tokio_retry::strategy::{ExponentialBackoff, jitter};
78use tracing::warn;
79use url::Url;
80use uuid::Uuid;
81use with_options::WithOptions;
82
83use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
84use super::{
85 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
86 SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
87};
88use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate};
89use crate::enforce_secret::EnforceSecret;
90use crate::sink::catalog::SinkId;
91use crate::sink::coordinate::CoordinatedLogSinker;
92use crate::sink::iceberg::exactly_once_util::*;
93use crate::sink::writer::SinkWriter;
94use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
95use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
96
97pub const ICEBERG_SINK: &str = "iceberg";
98
99fn default_commit_retry_num() -> u32 {
100 8
101}
102
103#[serde_as]
104#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
105pub struct IcebergConfig {
106 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
109 pub force_append_only: bool,
110
111 #[serde(flatten)]
112 common: IcebergCommon,
113
114 #[serde(
115 rename = "primary_key",
116 default,
117 deserialize_with = "deserialize_optional_string_seq_from_string"
118 )]
119 pub primary_key: Option<Vec<String>>,
120
121 #[serde(skip)]
123 pub java_catalog_props: HashMap<String, String>,
124
125 #[serde(default)]
126 pub partition_by: Option<String>,
127
128 #[serde(default = "default_commit_checkpoint_interval")]
130 #[serde_as(as = "DisplayFromStr")]
131 pub commit_checkpoint_interval: u64,
132
133 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
134 pub create_table_if_not_exists: bool,
135
136 #[serde(default)]
138 #[serde_as(as = "Option<DisplayFromStr>")]
139 pub is_exactly_once: Option<bool>,
140 #[serde(default = "default_commit_retry_num")]
145 pub commit_retry_num: u32,
146
147 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
149 pub enable_compaction: bool,
150
151 #[serde(default)]
153 #[serde_as(as = "Option<DisplayFromStr>")]
154 pub compaction_interval_sec: Option<u64>,
155
156 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
158 pub enable_snapshot_expiration: bool,
159}
160
161impl EnforceSecret for IcebergConfig {
162 fn enforce_secret<'a>(
163 prop_iter: impl Iterator<Item = &'a str>,
164 ) -> crate::error::ConnectorResult<()> {
165 for prop in prop_iter {
166 IcebergCommon::enforce_one(prop)?;
167 }
168 Ok(())
169 }
170
171 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
172 IcebergCommon::enforce_one(prop)
173 }
174}
175
176impl IcebergConfig {
177 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
178 let mut config =
179 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
180 .map_err(|e| SinkError::Config(anyhow!(e)))?;
181
182 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
183 return Err(SinkError::Config(anyhow!(
184 "`{}` must be {}, or {}",
185 SINK_TYPE_OPTION,
186 SINK_TYPE_APPEND_ONLY,
187 SINK_TYPE_UPSERT
188 )));
189 }
190
191 if config.r#type == SINK_TYPE_UPSERT {
192 if let Some(primary_key) = &config.primary_key {
193 if primary_key.is_empty() {
194 return Err(SinkError::Config(anyhow!(
195 "`primary_key` must not be empty in {}",
196 SINK_TYPE_UPSERT
197 )));
198 }
199 } else {
200 return Err(SinkError::Config(anyhow!(
201 "Must set `primary_key` in {}",
202 SINK_TYPE_UPSERT
203 )));
204 }
205 }
206
207 config.java_catalog_props = values
209 .iter()
210 .filter(|(k, _v)| {
211 k.starts_with("catalog.")
212 && k != &"catalog.uri"
213 && k != &"catalog.type"
214 && k != &"catalog.name"
215 })
216 .map(|(k, v)| (k[8..].to_string(), v.to_string()))
217 .collect();
218
219 if config.commit_checkpoint_interval == 0 {
220 return Err(SinkError::Config(anyhow!(
221 "`commit_checkpoint_interval` must be greater than 0"
222 )));
223 }
224
225 Ok(config)
226 }
227
228 pub fn catalog_type(&self) -> &str {
229 self.common.catalog_type()
230 }
231
232 pub async fn load_table(&self) -> Result<Table> {
233 self.common
234 .load_table(&self.java_catalog_props)
235 .await
236 .map_err(Into::into)
237 }
238
239 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
240 self.common
241 .create_catalog(&self.java_catalog_props)
242 .await
243 .map_err(Into::into)
244 }
245
246 pub fn full_table_name(&self) -> Result<TableIdent> {
247 self.common.full_table_name().map_err(Into::into)
248 }
249
250 pub fn catalog_name(&self) -> String {
251 self.common.catalog_name()
252 }
253
254 pub fn compaction_interval_sec(&self) -> u64 {
255 self.compaction_interval_sec.unwrap_or(3600)
257 }
258}
259
260pub struct IcebergSink {
261 config: IcebergConfig,
262 param: SinkParam,
263 unique_column_ids: Option<Vec<usize>>,
265}
266
267impl EnforceSecret for IcebergSink {
268 fn enforce_secret<'a>(
269 prop_iter: impl Iterator<Item = &'a str>,
270 ) -> crate::error::ConnectorResult<()> {
271 for prop in prop_iter {
272 IcebergConfig::enforce_one(prop)?;
273 }
274 Ok(())
275 }
276}
277
278impl TryFrom<SinkParam> for IcebergSink {
279 type Error = SinkError;
280
281 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
282 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
283 IcebergSink::new(config, param)
284 }
285}
286
287impl Debug for IcebergSink {
288 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289 f.debug_struct("IcebergSink")
290 .field("config", &self.config)
291 .finish()
292 }
293}
294
295impl IcebergSink {
296 async fn create_and_validate_table(&self) -> Result<Table> {
297 if self.config.create_table_if_not_exists {
298 self.create_table_if_not_exists().await?;
299 }
300
301 let table = self
302 .config
303 .load_table()
304 .await
305 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
306
307 let sink_schema = self.param.schema();
308 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
309 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
310
311 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
312 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
313
314 Ok(table)
315 }
316
317 async fn create_table_if_not_exists(&self) -> Result<()> {
318 let catalog = self.config.create_catalog().await?;
319 let namespace = if let Some(database_name) = &self.config.common.database_name {
320 let namespace = NamespaceIdent::new(database_name.clone());
321 if !catalog
322 .namespace_exists(&namespace)
323 .await
324 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
325 {
326 catalog
327 .create_namespace(&namespace, HashMap::default())
328 .await
329 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
330 .context("failed to create iceberg namespace")?;
331 }
332 namespace
333 } else {
334 bail!("database name must be set if you want to create table")
335 };
336
337 let table_id = self
338 .config
339 .full_table_name()
340 .context("Unable to parse table name")?;
341 if !catalog
342 .table_exists(&table_id)
343 .await
344 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
345 {
346 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
347 let arrow_fields = self
349 .param
350 .columns
351 .iter()
352 .map(|column| {
353 Ok(iceberg_create_table_arrow_convert
354 .to_arrow_field(&column.name, &column.data_type)
355 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
356 .context(format!(
357 "failed to convert {}: {} to arrow type",
358 &column.name, &column.data_type
359 ))?)
360 })
361 .collect::<Result<Vec<ArrowField>>>()?;
362 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
363 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
364 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
365 .context("failed to convert arrow schema to iceberg schema")?;
366
367 let location = {
368 let mut names = namespace.clone().inner();
369 names.push(self.config.common.table_name.clone());
370 match &self.config.common.warehouse_path {
371 Some(warehouse_path) => {
372 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
373 let url = Url::parse(warehouse_path);
374 if url.is_err() || is_s3_tables {
375 if self.config.common.catalog_type() == "rest"
378 || self.config.common.catalog_type() == "rest_rust"
379 {
380 None
381 } else {
382 bail!(format!("Invalid warehouse path: {}", warehouse_path))
383 }
384 } else if warehouse_path.ends_with('/') {
385 Some(format!("{}{}", warehouse_path, names.join("/")))
386 } else {
387 Some(format!("{}/{}", warehouse_path, names.join("/")))
388 }
389 }
390 None => None,
391 }
392 };
393
394 let partition_spec = match &self.config.partition_by {
395 Some(partition_by) => {
396 let mut partition_fields = Vec::<UnboundPartitionField>::new();
397 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
398 .into_iter()
399 .enumerate()
400 {
401 match iceberg_schema.field_id_by_name(&column) {
402 Some(id) => partition_fields.push(
403 UnboundPartitionField::builder()
404 .source_id(id)
405 .transform(transform)
406 .name(format!("_p_{}", column))
407 .field_id(i as i32)
408 .build(),
409 ),
410 None => bail!(format!(
411 "Partition source column does not exist in schema: {}",
412 column
413 )),
414 };
415 }
416 Some(
417 UnboundPartitionSpec::builder()
418 .with_spec_id(0)
419 .add_partition_fields(partition_fields)
420 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
421 .context("failed to add partition columns")?
422 .build(),
423 )
424 }
425 None => None,
426 };
427
428 let table_creation_builder = TableCreation::builder()
429 .name(self.config.common.table_name.clone())
430 .schema(iceberg_schema);
431
432 let table_creation = match (location, partition_spec) {
433 (Some(location), Some(partition_spec)) => table_creation_builder
434 .location(location)
435 .partition_spec(partition_spec)
436 .build(),
437 (Some(location), None) => table_creation_builder.location(location).build(),
438 (None, Some(partition_spec)) => table_creation_builder
439 .partition_spec(partition_spec)
440 .build(),
441 (None, None) => table_creation_builder.build(),
442 };
443
444 catalog
445 .create_table(&namespace, table_creation)
446 .await
447 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
448 .context("failed to create iceberg table")?;
449 }
450 Ok(())
451 }
452
453 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
454 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
455 if let Some(pk) = &config.primary_key {
456 let mut unique_column_ids = Vec::with_capacity(pk.len());
457 for col_name in pk {
458 let id = param
459 .columns
460 .iter()
461 .find(|col| col.name.as_str() == col_name)
462 .ok_or_else(|| {
463 SinkError::Config(anyhow!(
464 "Primary key column {} not found in sink schema",
465 col_name
466 ))
467 })?
468 .column_id
469 .get_id() as usize;
470 unique_column_ids.push(id);
471 }
472 Some(unique_column_ids)
473 } else {
474 unreachable!()
475 }
476 } else {
477 None
478 };
479 Ok(Self {
480 config,
481 param,
482 unique_column_ids,
483 })
484 }
485}
486
487impl Sink for IcebergSink {
488 type Coordinator = IcebergSinkCommitter;
489 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
490
491 const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &[
492 "commit_checkpoint_interval",
493 "enable_compaction",
494 "compaction_interval_sec",
495 "enable_snapshot_expiration",
496 ];
497 const SINK_NAME: &'static str = ICEBERG_SINK;
498
499 async fn validate(&self) -> Result<()> {
500 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
501 bail!("Snowflake catalog only supports iceberg sources");
502 }
503 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
504 risingwave_common::license::Feature::IcebergSinkWithGlue
505 .check_available()
506 .map_err(|e| anyhow::anyhow!(e))?;
507 }
508 if self.config.enable_compaction {
509 risingwave_common::license::Feature::IcebergCompaction
510 .check_available()
511 .map_err(|e| anyhow::anyhow!(e))?;
512 }
513
514 let _ = self.create_and_validate_table().await?;
515 Ok(())
516 }
517
518 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
519 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
520
521 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
522 if iceberg_config.enable_compaction {
523 if compaction_interval == 0 {
524 bail!(
525 "`compaction_interval_sec` must be greater than 0 when `enable_compaction` is true"
526 );
527 }
528 } else {
529 bail!("`compaction_interval_sec` can only be set when `enable_compaction` is true");
530 }
531 }
532
533 Ok(())
534 }
535
536 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
537 let table = self.create_and_validate_table().await?;
538 let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
539 IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
540 } else {
541 IcebergSinkWriter::new_append_only(table, &writer_param).await?
542 };
543
544 let commit_checkpoint_interval =
545 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
546 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
547 );
548 let writer = CoordinatedLogSinker::new(
549 &writer_param,
550 self.param.clone(),
551 inner,
552 commit_checkpoint_interval,
553 )
554 .await?;
555
556 Ok(writer)
557 }
558
559 fn is_coordinated_sink(&self) -> bool {
560 true
561 }
562
563 async fn new_coordinator(
564 &self,
565 db: DatabaseConnection,
566 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
567 ) -> Result<Self::Coordinator> {
568 let catalog = self.config.create_catalog().await?;
569 let table = self.create_and_validate_table().await?;
570 Ok(IcebergSinkCommitter {
571 catalog,
572 table,
573 is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
574 last_commit_epoch: 0,
575 sink_id: self.param.sink_id.sink_id(),
576 config: self.config.clone(),
577 param: self.param.clone(),
578 db,
579 commit_retry_num: self.config.commit_retry_num,
580 committed_epoch_subscriber: None,
581 iceberg_compact_stat_sender,
582 })
583 }
584}
585
586enum ProjectIdxVec {
593 None,
594 Prepare(usize),
595 Done(Vec<usize>),
596}
597
598pub struct IcebergSinkWriter {
599 writer: IcebergWriterDispatch,
600 arrow_schema: SchemaRef,
601 metrics: IcebergWriterMetrics,
603 table: Table,
605 project_idx_vec: ProjectIdxVec,
608}
609
610#[allow(clippy::type_complexity)]
611enum IcebergWriterDispatch {
612 PartitionAppendOnly {
613 writer: Option<Box<dyn IcebergWriter>>,
614 writer_builder: MonitoredGeneralWriterBuilder<
615 FanoutPartitionWriterBuilder<
616 DataFileWriterBuilder<
617 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
618 >,
619 >,
620 >,
621 },
622 NonpartitionAppendOnly {
623 writer: Option<Box<dyn IcebergWriter>>,
624 writer_builder: MonitoredGeneralWriterBuilder<
625 DataFileWriterBuilder<
626 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
627 >,
628 >,
629 },
630 PartitionUpsert {
631 writer: Option<Box<dyn IcebergWriter>>,
632 writer_builder: MonitoredGeneralWriterBuilder<
633 FanoutPartitionWriterBuilder<
634 EqualityDeltaWriterBuilder<
635 DataFileWriterBuilder<
636 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
637 >,
638 MonitoredPositionDeleteWriterBuilder<
639 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
640 >,
641 EqualityDeleteFileWriterBuilder<
642 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
643 >,
644 >,
645 >,
646 >,
647 arrow_schema_with_op_column: SchemaRef,
648 },
649 NonpartitionUpsert {
650 writer: Option<Box<dyn IcebergWriter>>,
651 writer_builder: MonitoredGeneralWriterBuilder<
652 EqualityDeltaWriterBuilder<
653 DataFileWriterBuilder<
654 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
655 >,
656 MonitoredPositionDeleteWriterBuilder<
657 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
658 >,
659 EqualityDeleteFileWriterBuilder<
660 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
661 >,
662 >,
663 >,
664 arrow_schema_with_op_column: SchemaRef,
665 },
666}
667
668impl IcebergWriterDispatch {
669 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
670 match self {
671 IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
672 | IcebergWriterDispatch::NonpartitionAppendOnly { writer, .. }
673 | IcebergWriterDispatch::PartitionUpsert { writer, .. }
674 | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
675 }
676 }
677}
678
679pub struct IcebergWriterMetrics {
680 _write_qps: LabelGuardedIntCounter,
685 _write_latency: LabelGuardedHistogram,
686 write_bytes: LabelGuardedIntCounter,
687}
688
689impl IcebergSinkWriter {
690 pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
691 let SinkWriterParam {
692 extra_partition_col_idx,
693 actor_id,
694 sink_id,
695 sink_name,
696 ..
697 } = writer_param;
698 let metrics_labels = [
699 &actor_id.to_string(),
700 &sink_id.to_string(),
701 sink_name.as_str(),
702 ];
703
704 let write_qps = GLOBAL_SINK_METRICS
706 .iceberg_write_qps
707 .with_guarded_label_values(&metrics_labels);
708 let write_latency = GLOBAL_SINK_METRICS
709 .iceberg_write_latency
710 .with_guarded_label_values(&metrics_labels);
711 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
714 .iceberg_rolling_unflushed_data_file
715 .with_guarded_label_values(&metrics_labels);
716 let write_bytes = GLOBAL_SINK_METRICS
717 .iceberg_write_bytes
718 .with_guarded_label_values(&metrics_labels);
719
720 let schema = table.metadata().current_schema();
721 let partition_spec = table.metadata().default_partition_spec();
722
723 let unique_uuid_suffix = Uuid::now_v7();
725
726 let parquet_writer_builder = ParquetWriterBuilder::new(
727 WriterProperties::new(),
728 schema.clone(),
729 table.file_io().clone(),
730 DefaultLocationGenerator::new(table.metadata().clone())
731 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
732 DefaultFileNameGenerator::new(
733 writer_param.actor_id.to_string(),
734 Some(unique_uuid_suffix.to_string()),
735 iceberg::spec::DataFileFormat::Parquet,
736 ),
737 );
738 let data_file_builder =
739 DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
740 if partition_spec.fields().is_empty() {
741 let writer_builder = MonitoredGeneralWriterBuilder::new(
742 data_file_builder,
743 write_qps.clone(),
744 write_latency.clone(),
745 );
746 let inner_writer = Some(Box::new(
747 writer_builder
748 .clone()
749 .build()
750 .await
751 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
752 ) as Box<dyn IcebergWriter>);
753 Ok(Self {
754 arrow_schema: Arc::new(
755 schema_to_arrow_schema(table.metadata().current_schema())
756 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
757 ),
758 metrics: IcebergWriterMetrics {
759 _write_qps: write_qps,
760 _write_latency: write_latency,
761 write_bytes,
762 },
763 writer: IcebergWriterDispatch::NonpartitionAppendOnly {
764 writer: inner_writer,
765 writer_builder,
766 },
767 table,
768 project_idx_vec: {
769 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
770 ProjectIdxVec::Prepare(*extra_partition_col_idx)
771 } else {
772 ProjectIdxVec::None
773 }
774 },
775 })
776 } else {
777 let partition_builder = MonitoredGeneralWriterBuilder::new(
778 FanoutPartitionWriterBuilder::new(
779 data_file_builder,
780 partition_spec.clone(),
781 schema.clone(),
782 )
783 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
784 write_qps.clone(),
785 write_latency.clone(),
786 );
787 let inner_writer = Some(Box::new(
788 partition_builder
789 .clone()
790 .build()
791 .await
792 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
793 ) as Box<dyn IcebergWriter>);
794 Ok(Self {
795 arrow_schema: Arc::new(
796 schema_to_arrow_schema(table.metadata().current_schema())
797 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
798 ),
799 metrics: IcebergWriterMetrics {
800 _write_qps: write_qps,
801 _write_latency: write_latency,
802 write_bytes,
803 },
804 writer: IcebergWriterDispatch::PartitionAppendOnly {
805 writer: inner_writer,
806 writer_builder: partition_builder,
807 },
808 table,
809 project_idx_vec: {
810 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
811 ProjectIdxVec::Prepare(*extra_partition_col_idx)
812 } else {
813 ProjectIdxVec::None
814 }
815 },
816 })
817 }
818 }
819
820 pub async fn new_upsert(
821 table: Table,
822 unique_column_ids: Vec<usize>,
823 writer_param: &SinkWriterParam,
824 ) -> Result<Self> {
825 let SinkWriterParam {
826 extra_partition_col_idx,
827 actor_id,
828 sink_id,
829 sink_name,
830 ..
831 } = writer_param;
832 let metrics_labels = [
833 &actor_id.to_string(),
834 &sink_id.to_string(),
835 sink_name.as_str(),
836 ];
837 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
838
839 let write_qps = GLOBAL_SINK_METRICS
841 .iceberg_write_qps
842 .with_guarded_label_values(&metrics_labels);
843 let write_latency = GLOBAL_SINK_METRICS
844 .iceberg_write_latency
845 .with_guarded_label_values(&metrics_labels);
846 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
849 .iceberg_rolling_unflushed_data_file
850 .with_guarded_label_values(&metrics_labels);
851 let position_delete_cache_num = GLOBAL_SINK_METRICS
852 .iceberg_position_delete_cache_num
853 .with_guarded_label_values(&metrics_labels);
854 let write_bytes = GLOBAL_SINK_METRICS
855 .iceberg_write_bytes
856 .with_guarded_label_values(&metrics_labels);
857
858 let schema = table.metadata().current_schema();
860 let partition_spec = table.metadata().default_partition_spec();
861
862 let unique_uuid_suffix = Uuid::now_v7();
864
865 let data_file_builder = {
866 let parquet_writer_builder = ParquetWriterBuilder::new(
867 WriterProperties::new(),
868 schema.clone(),
869 table.file_io().clone(),
870 DefaultLocationGenerator::new(table.metadata().clone())
871 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
872 DefaultFileNameGenerator::new(
873 writer_param.actor_id.to_string(),
874 Some(unique_uuid_suffix.to_string()),
875 iceberg::spec::DataFileFormat::Parquet,
876 ),
877 );
878 DataFileWriterBuilder::new(
879 parquet_writer_builder.clone(),
880 None,
881 partition_spec.spec_id(),
882 )
883 };
884 let position_delete_builder = {
885 let parquet_writer_builder = ParquetWriterBuilder::new(
886 WriterProperties::new(),
887 POSITION_DELETE_SCHEMA.clone(),
888 table.file_io().clone(),
889 DefaultLocationGenerator::new(table.metadata().clone())
890 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
891 DefaultFileNameGenerator::new(
892 writer_param.actor_id.to_string(),
893 Some(format!("pos-del-{}", unique_uuid_suffix)),
894 iceberg::spec::DataFileFormat::Parquet,
895 ),
896 );
897 MonitoredPositionDeleteWriterBuilder::new(
898 SortPositionDeleteWriterBuilder::new(
899 parquet_writer_builder.clone(),
900 writer_param
901 .streaming_config
902 .developer
903 .iceberg_sink_positional_delete_cache_size,
904 None,
905 None,
906 ),
907 position_delete_cache_num,
908 )
909 };
910 let equality_delete_builder = {
911 let config = EqualityDeleteWriterConfig::new(
912 unique_column_ids.clone(),
913 table.metadata().current_schema().clone(),
914 None,
915 partition_spec.spec_id(),
916 )
917 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
918 let parquet_writer_builder = ParquetWriterBuilder::new(
919 WriterProperties::new(),
920 Arc::new(
921 arrow_schema_to_schema(config.projected_arrow_schema_ref())
922 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
923 ),
924 table.file_io().clone(),
925 DefaultLocationGenerator::new(table.metadata().clone())
926 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
927 DefaultFileNameGenerator::new(
928 writer_param.actor_id.to_string(),
929 Some(format!("eq-del-{}", unique_uuid_suffix)),
930 iceberg::spec::DataFileFormat::Parquet,
931 ),
932 );
933
934 EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
935 };
936 let delta_builder = EqualityDeltaWriterBuilder::new(
937 data_file_builder,
938 position_delete_builder,
939 equality_delete_builder,
940 unique_column_ids,
941 );
942 if partition_spec.fields().is_empty() {
943 let writer_builder = MonitoredGeneralWriterBuilder::new(
944 delta_builder,
945 write_qps.clone(),
946 write_latency.clone(),
947 );
948 let inner_writer = Some(Box::new(
949 writer_builder
950 .clone()
951 .build()
952 .await
953 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
954 ) as Box<dyn IcebergWriter>);
955 let original_arrow_schema = Arc::new(
956 schema_to_arrow_schema(table.metadata().current_schema())
957 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
958 );
959 let schema_with_extra_op_column = {
960 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
961 new_fields.push(Arc::new(ArrowField::new(
962 "op".to_owned(),
963 ArrowDataType::Int32,
964 false,
965 )));
966 Arc::new(ArrowSchema::new(new_fields))
967 };
968 Ok(Self {
969 arrow_schema: original_arrow_schema,
970 metrics: IcebergWriterMetrics {
971 _write_qps: write_qps,
972 _write_latency: write_latency,
973 write_bytes,
974 },
975 table,
976 writer: IcebergWriterDispatch::NonpartitionUpsert {
977 writer: inner_writer,
978 writer_builder,
979 arrow_schema_with_op_column: schema_with_extra_op_column,
980 },
981 project_idx_vec: {
982 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
983 ProjectIdxVec::Prepare(*extra_partition_col_idx)
984 } else {
985 ProjectIdxVec::None
986 }
987 },
988 })
989 } else {
990 let original_arrow_schema = Arc::new(
991 schema_to_arrow_schema(table.metadata().current_schema())
992 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
993 );
994 let schema_with_extra_op_column = {
995 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
996 new_fields.push(Arc::new(ArrowField::new(
997 "op".to_owned(),
998 ArrowDataType::Int32,
999 false,
1000 )));
1001 Arc::new(ArrowSchema::new(new_fields))
1002 };
1003 let partition_builder = MonitoredGeneralWriterBuilder::new(
1004 FanoutPartitionWriterBuilder::new_with_custom_schema(
1005 delta_builder,
1006 schema_with_extra_op_column.clone(),
1007 partition_spec.clone(),
1008 table.metadata().current_schema().clone(),
1009 ),
1010 write_qps.clone(),
1011 write_latency.clone(),
1012 );
1013 let inner_writer = Some(Box::new(
1014 partition_builder
1015 .clone()
1016 .build()
1017 .await
1018 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1019 ) as Box<dyn IcebergWriter>);
1020 Ok(Self {
1021 arrow_schema: original_arrow_schema,
1022 metrics: IcebergWriterMetrics {
1023 _write_qps: write_qps,
1024 _write_latency: write_latency,
1025 write_bytes,
1026 },
1027 table,
1028 writer: IcebergWriterDispatch::PartitionUpsert {
1029 writer: inner_writer,
1030 writer_builder: partition_builder,
1031 arrow_schema_with_op_column: schema_with_extra_op_column,
1032 },
1033 project_idx_vec: {
1034 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1035 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1036 } else {
1037 ProjectIdxVec::None
1038 }
1039 },
1040 })
1041 }
1042 }
1043}
1044
1045#[async_trait]
1046impl SinkWriter for IcebergSinkWriter {
1047 type CommitMetadata = Option<SinkMetadata>;
1048
1049 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1051 Ok(())
1053 }
1054
1055 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1057 match &mut self.writer {
1059 IcebergWriterDispatch::PartitionAppendOnly {
1060 writer,
1061 writer_builder,
1062 } => {
1063 if writer.is_none() {
1064 *writer = Some(Box::new(
1065 writer_builder
1066 .clone()
1067 .build()
1068 .await
1069 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1070 ));
1071 }
1072 }
1073 IcebergWriterDispatch::NonpartitionAppendOnly {
1074 writer,
1075 writer_builder,
1076 } => {
1077 if writer.is_none() {
1078 *writer = Some(Box::new(
1079 writer_builder
1080 .clone()
1081 .build()
1082 .await
1083 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1084 ));
1085 }
1086 }
1087 IcebergWriterDispatch::PartitionUpsert {
1088 writer,
1089 writer_builder,
1090 ..
1091 } => {
1092 if writer.is_none() {
1093 *writer = Some(Box::new(
1094 writer_builder
1095 .clone()
1096 .build()
1097 .await
1098 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1099 ));
1100 }
1101 }
1102 IcebergWriterDispatch::NonpartitionUpsert {
1103 writer,
1104 writer_builder,
1105 ..
1106 } => {
1107 if writer.is_none() {
1108 *writer = Some(Box::new(
1109 writer_builder
1110 .clone()
1111 .build()
1112 .await
1113 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1114 ));
1115 }
1116 }
1117 };
1118
1119 let (mut chunk, ops) = chunk.compact().into_parts();
1121 match &self.project_idx_vec {
1122 ProjectIdxVec::None => {}
1123 ProjectIdxVec::Prepare(idx) => {
1124 if *idx >= chunk.columns().len() {
1125 return Err(SinkError::Iceberg(anyhow!(
1126 "invalid extra partition column index {}",
1127 idx
1128 )));
1129 }
1130 let project_idx_vec = (0..*idx)
1131 .chain(*idx + 1..chunk.columns().len())
1132 .collect_vec();
1133 chunk = chunk.project(&project_idx_vec);
1134 self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1135 }
1136 ProjectIdxVec::Done(idx_vec) => {
1137 chunk = chunk.project(idx_vec);
1138 }
1139 }
1140 if ops.is_empty() {
1141 return Ok(());
1142 }
1143 let write_batch_size = chunk.estimated_heap_size();
1144 let batch = match &self.writer {
1145 IcebergWriterDispatch::PartitionAppendOnly { .. }
1146 | IcebergWriterDispatch::NonpartitionAppendOnly { .. } => {
1147 let filters =
1149 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1150 chunk.set_visibility(filters);
1151 IcebergArrowConvert
1152 .to_record_batch(self.arrow_schema.clone(), &chunk.compact())
1153 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1154 }
1155 IcebergWriterDispatch::PartitionUpsert {
1156 arrow_schema_with_op_column,
1157 ..
1158 }
1159 | IcebergWriterDispatch::NonpartitionUpsert {
1160 arrow_schema_with_op_column,
1161 ..
1162 } => {
1163 let chunk = IcebergArrowConvert
1164 .to_record_batch(self.arrow_schema.clone(), &chunk)
1165 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1166 let ops = Arc::new(Int32Array::from(
1167 ops.iter()
1168 .map(|op| match op {
1169 Op::UpdateInsert | Op::Insert => INSERT_OP,
1170 Op::UpdateDelete | Op::Delete => DELETE_OP,
1171 })
1172 .collect_vec(),
1173 ));
1174 let mut columns = chunk.columns().to_vec();
1175 columns.push(ops);
1176 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1177 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1178 }
1179 };
1180
1181 let writer = self.writer.get_writer().unwrap();
1182 writer
1183 .write(batch)
1184 .instrument_await("iceberg_write")
1185 .await
1186 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1187 self.metrics.write_bytes.inc_by(write_batch_size as _);
1188 Ok(())
1189 }
1190
1191 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1194 if !is_checkpoint {
1196 return Ok(None);
1197 }
1198
1199 let close_result = match &mut self.writer {
1200 IcebergWriterDispatch::PartitionAppendOnly {
1201 writer,
1202 writer_builder,
1203 } => {
1204 let close_result = match writer.take() {
1205 Some(mut writer) => {
1206 Some(writer.close().instrument_await("iceberg_close").await)
1207 }
1208 _ => None,
1209 };
1210 match writer_builder.clone().build().await {
1211 Ok(new_writer) => {
1212 *writer = Some(Box::new(new_writer));
1213 }
1214 _ => {
1215 warn!("Failed to build new writer after close");
1218 }
1219 }
1220 close_result
1221 }
1222 IcebergWriterDispatch::NonpartitionAppendOnly {
1223 writer,
1224 writer_builder,
1225 } => {
1226 let close_result = match writer.take() {
1227 Some(mut writer) => {
1228 Some(writer.close().instrument_await("iceberg_close").await)
1229 }
1230 _ => None,
1231 };
1232 match writer_builder.clone().build().await {
1233 Ok(new_writer) => {
1234 *writer = Some(Box::new(new_writer));
1235 }
1236 _ => {
1237 warn!("Failed to build new writer after close");
1240 }
1241 }
1242 close_result
1243 }
1244 IcebergWriterDispatch::PartitionUpsert {
1245 writer,
1246 writer_builder,
1247 ..
1248 } => {
1249 let close_result = match writer.take() {
1250 Some(mut writer) => {
1251 Some(writer.close().instrument_await("iceberg_close").await)
1252 }
1253 _ => None,
1254 };
1255 match writer_builder.clone().build().await {
1256 Ok(new_writer) => {
1257 *writer = Some(Box::new(new_writer));
1258 }
1259 _ => {
1260 warn!("Failed to build new writer after close");
1263 }
1264 }
1265 close_result
1266 }
1267 IcebergWriterDispatch::NonpartitionUpsert {
1268 writer,
1269 writer_builder,
1270 ..
1271 } => {
1272 let close_result = match writer.take() {
1273 Some(mut writer) => {
1274 Some(writer.close().instrument_await("iceberg_close").await)
1275 }
1276 _ => None,
1277 };
1278 match writer_builder.clone().build().await {
1279 Ok(new_writer) => {
1280 *writer = Some(Box::new(new_writer));
1281 }
1282 _ => {
1283 warn!("Failed to build new writer after close");
1286 }
1287 }
1288 close_result
1289 }
1290 };
1291
1292 match close_result {
1293 Some(Ok(result)) => {
1294 let version = self.table.metadata().format_version() as u8;
1295 let partition_type = self.table.metadata().default_partition_type();
1296 let data_files = result
1297 .into_iter()
1298 .map(|f| {
1299 SerializedDataFile::try_from(f, partition_type, version == 1)
1300 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1301 })
1302 .collect::<Result<Vec<_>>>()?;
1303 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1304 data_files,
1305 schema_id: self.table.metadata().current_schema_id(),
1306 partition_spec_id: self.table.metadata().default_partition_spec_id(),
1307 })?))
1308 }
1309 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1310 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1311 }
1312 }
1313
1314 async fn abort(&mut self) -> Result<()> {
1316 Ok(())
1318 }
1319}
1320
1321const SCHEMA_ID: &str = "schema_id";
1322const PARTITION_SPEC_ID: &str = "partition_spec_id";
1323const DATA_FILES: &str = "data_files";
1324
1325#[derive(Default, Clone)]
1326struct IcebergCommitResult {
1327 schema_id: i32,
1328 partition_spec_id: i32,
1329 data_files: Vec<SerializedDataFile>,
1330}
1331
1332impl IcebergCommitResult {
1333 fn try_from(value: &SinkMetadata) -> Result<Self> {
1334 if let Some(Serialized(v)) = &value.metadata {
1335 let mut values = if let serde_json::Value::Object(v) =
1336 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1337 .context("Can't parse iceberg sink metadata")?
1338 {
1339 v
1340 } else {
1341 bail!("iceberg sink metadata should be an object");
1342 };
1343
1344 let schema_id;
1345 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1346 schema_id = value
1347 .as_u64()
1348 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1349 } else {
1350 bail!("iceberg sink metadata should have schema_id");
1351 }
1352
1353 let partition_spec_id;
1354 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1355 partition_spec_id = value
1356 .as_u64()
1357 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1358 } else {
1359 bail!("iceberg sink metadata should have partition_spec_id");
1360 }
1361
1362 let data_files: Vec<SerializedDataFile>;
1363 if let serde_json::Value::Array(values) = values
1364 .remove(DATA_FILES)
1365 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1366 {
1367 data_files = values
1368 .into_iter()
1369 .map(from_value::<SerializedDataFile>)
1370 .collect::<std::result::Result<_, _>>()
1371 .unwrap();
1372 } else {
1373 bail!("iceberg sink metadata should have data_files object");
1374 }
1375
1376 Ok(Self {
1377 schema_id: schema_id as i32,
1378 partition_spec_id: partition_spec_id as i32,
1379 data_files,
1380 })
1381 } else {
1382 bail!("Can't create iceberg sink write result from empty data!")
1383 }
1384 }
1385
1386 fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1387 let mut values = if let serde_json::Value::Object(value) =
1388 serde_json::from_slice::<serde_json::Value>(&value)
1389 .context("Can't parse iceberg sink metadata")?
1390 {
1391 value
1392 } else {
1393 bail!("iceberg sink metadata should be an object");
1394 };
1395
1396 let schema_id;
1397 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1398 schema_id = value
1399 .as_u64()
1400 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1401 } else {
1402 bail!("iceberg sink metadata should have schema_id");
1403 }
1404
1405 let partition_spec_id;
1406 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1407 partition_spec_id = value
1408 .as_u64()
1409 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1410 } else {
1411 bail!("iceberg sink metadata should have partition_spec_id");
1412 }
1413
1414 let data_files: Vec<SerializedDataFile>;
1415 if let serde_json::Value::Array(values) = values
1416 .remove(DATA_FILES)
1417 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1418 {
1419 data_files = values
1420 .into_iter()
1421 .map(from_value::<SerializedDataFile>)
1422 .collect::<std::result::Result<_, _>>()
1423 .unwrap();
1424 } else {
1425 bail!("iceberg sink metadata should have data_files object");
1426 }
1427
1428 Ok(Self {
1429 schema_id: schema_id as i32,
1430 partition_spec_id: partition_spec_id as i32,
1431 data_files,
1432 })
1433 }
1434}
1435
1436impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1437 type Error = SinkError;
1438
1439 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1440 let json_data_files = serde_json::Value::Array(
1441 value
1442 .data_files
1443 .iter()
1444 .map(serde_json::to_value)
1445 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1446 .context("Can't serialize data files to json")?,
1447 );
1448 let json_value = serde_json::Value::Object(
1449 vec![
1450 (
1451 SCHEMA_ID.to_owned(),
1452 serde_json::Value::Number(value.schema_id.into()),
1453 ),
1454 (
1455 PARTITION_SPEC_ID.to_owned(),
1456 serde_json::Value::Number(value.partition_spec_id.into()),
1457 ),
1458 (DATA_FILES.to_owned(), json_data_files),
1459 ]
1460 .into_iter()
1461 .collect(),
1462 );
1463 Ok(SinkMetadata {
1464 metadata: Some(Serialized(SerializedMetadata {
1465 metadata: serde_json::to_vec(&json_value)
1466 .context("Can't serialize iceberg sink metadata")?,
1467 })),
1468 })
1469 }
1470}
1471
1472impl TryFrom<IcebergCommitResult> for Vec<u8> {
1473 type Error = SinkError;
1474
1475 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1476 let json_data_files = serde_json::Value::Array(
1477 value
1478 .data_files
1479 .iter()
1480 .map(serde_json::to_value)
1481 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1482 .context("Can't serialize data files to json")?,
1483 );
1484 let json_value = serde_json::Value::Object(
1485 vec![
1486 (
1487 SCHEMA_ID.to_owned(),
1488 serde_json::Value::Number(value.schema_id.into()),
1489 ),
1490 (
1491 PARTITION_SPEC_ID.to_owned(),
1492 serde_json::Value::Number(value.partition_spec_id.into()),
1493 ),
1494 (DATA_FILES.to_owned(), json_data_files),
1495 ]
1496 .into_iter()
1497 .collect(),
1498 );
1499 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1500 }
1501}
1502pub struct IcebergSinkCommitter {
1503 catalog: Arc<dyn Catalog>,
1504 table: Table,
1505 pub last_commit_epoch: u64,
1506 pub(crate) is_exactly_once: bool,
1507 pub(crate) sink_id: u32,
1508 pub(crate) config: IcebergConfig,
1509 pub(crate) param: SinkParam,
1510 pub(crate) db: DatabaseConnection,
1511 pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1512 commit_retry_num: u32,
1513 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1514}
1515
1516impl IcebergSinkCommitter {
1517 async fn reload_table(
1520 catalog: &dyn Catalog,
1521 table_ident: &TableIdent,
1522 schema_id: i32,
1523 partition_spec_id: i32,
1524 ) -> Result<Table> {
1525 let table = catalog
1526 .load_table(table_ident)
1527 .await
1528 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1529 if table.metadata().current_schema_id() != schema_id {
1530 return Err(SinkError::Iceberg(anyhow!(
1531 "Schema evolution not supported, expect schema id {}, but got {}",
1532 schema_id,
1533 table.metadata().current_schema_id()
1534 )));
1535 }
1536 if table.metadata().default_partition_spec_id() != partition_spec_id {
1537 return Err(SinkError::Iceberg(anyhow!(
1538 "Partition evolution not supported, expect partition spec id {}, but got {}",
1539 partition_spec_id,
1540 table.metadata().default_partition_spec_id()
1541 )));
1542 }
1543 Ok(table)
1544 }
1545}
1546
1547#[async_trait::async_trait]
1548impl SinkCommitCoordinator for IcebergSinkCommitter {
1549 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1550 if self.is_exactly_once {
1551 self.committed_epoch_subscriber = Some(subscriber);
1552 tracing::info!(
1553 "Sink id = {}: iceberg sink coordinator initing.",
1554 self.param.sink_id.sink_id()
1555 );
1556 if iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id()).await? {
1557 let ordered_metadata_list_by_end_epoch =
1558 get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id()).await?;
1559
1560 let mut last_recommit_epoch = 0;
1561 for (end_epoch, sealized_bytes, snapshot_id, committed) in
1562 ordered_metadata_list_by_end_epoch
1563 {
1564 let write_results_bytes = deserialize_metadata(sealized_bytes);
1565 let mut write_results = vec![];
1566
1567 for each in write_results_bytes {
1568 let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1569 write_results.push(write_result);
1570 }
1571
1572 match (
1573 committed,
1574 self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1575 .await?,
1576 ) {
1577 (true, _) => {
1578 tracing::info!(
1579 "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1580 self.param.sink_id.sink_id()
1581 );
1582 }
1583 (false, true) => {
1584 tracing::info!(
1586 "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1587 self.param.sink_id.sink_id()
1588 );
1589 mark_row_is_committed_by_sink_id_and_end_epoch(
1590 &self.db,
1591 self.sink_id,
1592 end_epoch,
1593 )
1594 .await?;
1595 }
1596 (false, false) => {
1597 tracing::info!(
1598 "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1599 self.param.sink_id.sink_id()
1600 );
1601 self.re_commit(end_epoch, write_results, snapshot_id)
1602 .await?;
1603 }
1604 }
1605
1606 last_recommit_epoch = end_epoch;
1607 }
1608 tracing::info!(
1609 "Sink id = {}: iceberg commit coordinator inited.",
1610 self.param.sink_id.sink_id()
1611 );
1612 return Ok(Some(last_recommit_epoch));
1613 } else {
1614 tracing::info!(
1615 "Sink id = {}: init iceberg coodinator, and system table is empty.",
1616 self.param.sink_id.sink_id()
1617 );
1618 return Ok(None);
1619 }
1620 }
1621
1622 tracing::info!("Iceberg commit coordinator inited.");
1623 return Ok(None);
1624 }
1625
1626 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
1627 tracing::info!("Starting iceberg commit in epoch {epoch}.");
1628 let write_results: Vec<IcebergCommitResult> = metadata
1629 .iter()
1630 .map(IcebergCommitResult::try_from)
1631 .collect::<Result<Vec<IcebergCommitResult>>>()?;
1632
1633 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1635 tracing::debug!(?epoch, "no data to commit");
1636 return Ok(());
1637 }
1638
1639 if write_results
1641 .iter()
1642 .any(|r| r.schema_id != write_results[0].schema_id)
1643 || write_results
1644 .iter()
1645 .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1646 {
1647 return Err(SinkError::Iceberg(anyhow!(
1648 "schema_id and partition_spec_id should be the same in all write results"
1649 )));
1650 }
1651
1652 if self.is_exactly_once {
1653 assert!(self.committed_epoch_subscriber.is_some());
1654 match self.committed_epoch_subscriber.clone() {
1655 Some(committed_epoch_subscriber) => {
1656 let (committed_epoch, mut rw_futures_utilrx) =
1658 committed_epoch_subscriber(self.param.sink_id).await?;
1659 if committed_epoch >= epoch {
1661 self.commit_iceberg_inner(epoch, write_results, None)
1662 .await?;
1663 } else {
1664 tracing::info!(
1665 "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1666 committed_epoch,
1667 epoch
1668 );
1669 while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1670 tracing::info!(
1671 "Received next committed epoch: {}",
1672 next_committed_epoch
1673 );
1674 if next_committed_epoch >= epoch {
1676 self.commit_iceberg_inner(epoch, write_results, None)
1677 .await?;
1678 break;
1679 }
1680 }
1681 }
1682 }
1683 None => unreachable!(
1684 "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
1685 ),
1686 }
1687 } else {
1688 self.commit_iceberg_inner(epoch, write_results, None)
1689 .await?;
1690 }
1691
1692 Ok(())
1693 }
1694}
1695
1696impl IcebergSinkCommitter {
1698 async fn re_commit(
1699 &mut self,
1700 epoch: u64,
1701 write_results: Vec<IcebergCommitResult>,
1702 snapshot_id: i64,
1703 ) -> Result<()> {
1704 tracing::info!("Starting iceberg re commit in epoch {epoch}.");
1705
1706 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1708 tracing::debug!(?epoch, "no data to commit");
1709 return Ok(());
1710 }
1711 self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
1712 .await?;
1713 Ok(())
1714 }
1715
1716 async fn commit_iceberg_inner(
1717 &mut self,
1718 epoch: u64,
1719 write_results: Vec<IcebergCommitResult>,
1720 snapshot_id: Option<i64>,
1721 ) -> Result<()> {
1722 let is_first_commit = snapshot_id.is_none();
1726 self.last_commit_epoch = epoch;
1727 let expect_schema_id = write_results[0].schema_id;
1728 let expect_partition_spec_id = write_results[0].partition_spec_id;
1729
1730 self.table = Self::reload_table(
1732 self.catalog.as_ref(),
1733 self.table.identifier(),
1734 expect_schema_id,
1735 expect_partition_spec_id,
1736 )
1737 .await?;
1738 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1739 return Err(SinkError::Iceberg(anyhow!(
1740 "Can't find schema by id {}",
1741 expect_schema_id
1742 )));
1743 };
1744 let Some(partition_spec) = self
1745 .table
1746 .metadata()
1747 .partition_spec_by_id(expect_partition_spec_id)
1748 else {
1749 return Err(SinkError::Iceberg(anyhow!(
1750 "Can't find partition spec by id {}",
1751 expect_partition_spec_id
1752 )));
1753 };
1754 let partition_type = partition_spec
1755 .as_ref()
1756 .clone()
1757 .partition_type(schema)
1758 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1759
1760 let txn = Transaction::new(&self.table);
1761 let snapshot_id = match snapshot_id {
1763 Some(previous_snapshot_id) => previous_snapshot_id,
1764 None => txn.generate_unique_snapshot_id(),
1765 };
1766 if self.is_exactly_once && is_first_commit {
1767 let mut pre_commit_metadata_bytes = Vec::new();
1769 for each_parallelism_write_result in write_results.clone() {
1770 let each_parallelism_write_result_bytes: Vec<u8> =
1771 each_parallelism_write_result.try_into()?;
1772 pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
1773 }
1774
1775 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
1776
1777 persist_pre_commit_metadata(
1778 self.sink_id,
1779 self.db.clone(),
1780 self.last_commit_epoch,
1781 epoch,
1782 pre_commit_metadata_bytes,
1783 snapshot_id,
1784 )
1785 .await?;
1786 }
1787
1788 let data_files = write_results
1789 .into_iter()
1790 .flat_map(|r| {
1791 r.data_files.into_iter().map(|f| {
1792 f.try_into(expect_partition_spec_id, &partition_type, schema)
1793 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1794 })
1795 })
1796 .collect::<Result<Vec<DataFile>>>()?;
1797 let retry_strategy = ExponentialBackoff::from_millis(10)
1802 .max_delay(Duration::from_secs(60))
1803 .map(jitter)
1804 .take(self.commit_retry_num as usize);
1805 let catalog = self.catalog.clone();
1806 let table_ident = self.table.identifier().clone();
1807 let table = Retry::spawn(retry_strategy, || async {
1808 let table = Self::reload_table(
1809 catalog.as_ref(),
1810 &table_ident,
1811 expect_schema_id,
1812 expect_partition_spec_id,
1813 )
1814 .await?;
1815 let txn = Transaction::new(&table);
1816 let mut append_action = txn
1817 .fast_append(Some(snapshot_id), None, vec![])
1818 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1819 append_action
1820 .add_data_files(data_files.clone())
1821 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1822 let tx = append_action.apply().await.map_err(|err| {
1823 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
1824 SinkError::Iceberg(anyhow!(err))
1825 })?;
1826 tx.commit(self.catalog.as_ref()).await.map_err(|err| {
1827 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
1828 SinkError::Iceberg(anyhow!(err))
1829 })
1830 })
1831 .await?;
1832 self.table = table;
1833
1834 tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
1835
1836 if self.is_exactly_once {
1837 mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1838 tracing::info!(
1839 "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
1840 self.sink_id,
1841 epoch
1842 );
1843
1844 delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1845 }
1846 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
1847 && self.config.enable_compaction
1848 && iceberg_compact_stat_sender
1849 .send(IcebergSinkCompactionUpdate {
1850 sink_id: SinkId::new(self.sink_id),
1851 compaction_interval: self.config.compaction_interval_sec(),
1852 })
1853 .is_err()
1854 {
1855 warn!("failed to send iceberg compaction stats");
1856 }
1857
1858 Ok(())
1859 }
1860
1861 async fn is_snapshot_id_in_iceberg(
1865 &self,
1866 iceberg_config: &IcebergConfig,
1867 snapshot_id: i64,
1868 ) -> Result<bool> {
1869 let iceberg_common = iceberg_config.common.clone();
1870 let table = iceberg_common
1871 .load_table(&iceberg_config.java_catalog_props)
1872 .await?;
1873 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
1874 Ok(true)
1875 } else {
1876 Ok(false)
1877 }
1878 }
1879}
1880
1881const MAP_KEY: &str = "key";
1882const MAP_VALUE: &str = "value";
1883
1884fn get_fields<'a>(
1885 our_field_type: &'a risingwave_common::types::DataType,
1886 data_type: &ArrowDataType,
1887 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
1888) -> Option<ArrowFields> {
1889 match data_type {
1890 ArrowDataType::Struct(fields) => {
1891 match our_field_type {
1892 risingwave_common::types::DataType::Struct(struct_fields) => {
1893 struct_fields.iter().for_each(|(name, data_type)| {
1894 let res = schema_fields.insert(name, data_type);
1895 assert!(res.is_none())
1897 });
1898 }
1899 risingwave_common::types::DataType::Map(map_fields) => {
1900 schema_fields.insert(MAP_KEY, map_fields.key());
1901 schema_fields.insert(MAP_VALUE, map_fields.value());
1902 }
1903 risingwave_common::types::DataType::List(list_field) => {
1904 list_field.as_struct().iter().for_each(|(name, data_type)| {
1905 let res = schema_fields.insert(name, data_type);
1906 assert!(res.is_none())
1908 });
1909 }
1910 _ => {}
1911 };
1912 Some(fields.clone())
1913 }
1914 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
1915 get_fields(our_field_type, field.data_type(), schema_fields)
1916 }
1917 _ => None, }
1919}
1920
1921fn check_compatibility(
1922 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
1923 fields: &ArrowFields,
1924) -> anyhow::Result<bool> {
1925 for arrow_field in fields {
1926 let our_field_type = schema_fields
1927 .get(arrow_field.name().as_str())
1928 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
1929
1930 let converted_arrow_data_type = IcebergArrowConvert
1932 .to_arrow_field("", our_field_type)
1933 .map_err(|e| anyhow!(e))?
1934 .data_type()
1935 .clone();
1936
1937 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
1938 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
1939 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
1940 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
1941 (ArrowDataType::List(_), ArrowDataType::List(field))
1942 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
1943 let mut schema_fields = HashMap::new();
1944 get_fields(our_field_type, field.data_type(), &mut schema_fields)
1945 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
1946 }
1947 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
1949 let mut schema_fields = HashMap::new();
1950 our_field_type
1951 .as_struct()
1952 .iter()
1953 .for_each(|(name, data_type)| {
1954 let res = schema_fields.insert(name, data_type);
1955 assert!(res.is_none())
1957 });
1958 check_compatibility(schema_fields, fields)?
1959 }
1960 (left, right) => left.equals_datatype(right),
1968 };
1969 if !compatible {
1970 bail!(
1971 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
1972 arrow_field.name(),
1973 converted_arrow_data_type,
1974 arrow_field.data_type()
1975 );
1976 }
1977 }
1978 Ok(true)
1979}
1980
1981pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
1983 if rw_schema.fields.len() != arrow_schema.fields().len() {
1984 bail!(
1985 "Schema length mismatch, risingwave is {}, and iceberg is {}",
1986 rw_schema.fields.len(),
1987 arrow_schema.fields.len()
1988 );
1989 }
1990
1991 let mut schema_fields = HashMap::new();
1992 rw_schema.fields.iter().for_each(|field| {
1993 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
1994 assert!(res.is_none())
1996 });
1997
1998 check_compatibility(schema_fields, &arrow_schema.fields)?;
1999 Ok(())
2000}
2001
2002pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2003 serde_json::to_vec(&metadata).unwrap()
2004}
2005
2006pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2007 serde_json::from_slice(&bytes).unwrap()
2008}
2009
2010pub fn parse_partition_by_exprs(
2011 expr: String,
2012) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2013 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2015 if !re.is_match(&expr) {
2016 bail!(format!(
2017 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2018 expr
2019 ))
2020 }
2021 let caps = re.captures_iter(&expr);
2022
2023 let mut partition_columns = vec![];
2024
2025 for mat in caps {
2026 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2027 (&mat["transform"], Transform::Identity)
2028 } else {
2029 let mut func = mat["transform"].to_owned();
2030 if func == "bucket" || func == "truncate" {
2031 let n = &mat
2032 .name("n")
2033 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2034 .as_str();
2035 func = format!("{func}[{n}]");
2036 }
2037 (
2038 &mat["field"],
2039 Transform::from_str(&func)
2040 .with_context(|| format!("invalid transform function {}", func))?,
2041 )
2042 };
2043 partition_columns.push((column.to_owned(), transform));
2044 }
2045 Ok(partition_columns)
2046}
2047
2048#[cfg(test)]
2049mod test {
2050 use std::collections::BTreeMap;
2051
2052 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2053 use risingwave_common::catalog::Field;
2054 use risingwave_common::types::{DataType, MapType, StructType};
2055
2056 use crate::connector_common::IcebergCommon;
2057 use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
2058 use crate::sink::iceberg::IcebergConfig;
2059
2060 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
2063 fn test_compatible_arrow_schema() {
2064 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2065
2066 use super::*;
2067 let risingwave_schema = Schema::new(vec![
2068 Field::with_name(DataType::Int32, "a"),
2069 Field::with_name(DataType::Int32, "b"),
2070 Field::with_name(DataType::Int32, "c"),
2071 ]);
2072 let arrow_schema = ArrowSchema::new(vec![
2073 ArrowField::new("a", ArrowDataType::Int32, false),
2074 ArrowField::new("b", ArrowDataType::Int32, false),
2075 ArrowField::new("c", ArrowDataType::Int32, false),
2076 ]);
2077
2078 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2079
2080 let risingwave_schema = Schema::new(vec![
2081 Field::with_name(DataType::Int32, "d"),
2082 Field::with_name(DataType::Int32, "c"),
2083 Field::with_name(DataType::Int32, "a"),
2084 Field::with_name(DataType::Int32, "b"),
2085 ]);
2086 let arrow_schema = ArrowSchema::new(vec![
2087 ArrowField::new("a", ArrowDataType::Int32, false),
2088 ArrowField::new("b", ArrowDataType::Int32, false),
2089 ArrowField::new("d", ArrowDataType::Int32, false),
2090 ArrowField::new("c", ArrowDataType::Int32, false),
2091 ]);
2092 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2093
2094 let risingwave_schema = Schema::new(vec![
2095 Field::with_name(
2096 DataType::Struct(StructType::new(vec![
2097 ("a1", DataType::Int32),
2098 (
2099 "a2",
2100 DataType::Struct(StructType::new(vec![
2101 ("a21", DataType::Bytea),
2102 (
2103 "a22",
2104 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2105 ),
2106 ])),
2107 ),
2108 ])),
2109 "a",
2110 ),
2111 Field::with_name(
2112 DataType::List(Box::new(DataType::Struct(StructType::new(vec![
2113 ("b1", DataType::Int32),
2114 ("b2", DataType::Bytea),
2115 (
2116 "b3",
2117 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2118 ),
2119 ])))),
2120 "b",
2121 ),
2122 Field::with_name(
2123 DataType::Map(MapType::from_kv(
2124 DataType::Varchar,
2125 DataType::List(Box::new(DataType::Struct(StructType::new([
2126 ("c1", DataType::Int32),
2127 ("c2", DataType::Bytea),
2128 (
2129 "c3",
2130 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2131 ),
2132 ])))),
2133 )),
2134 "c",
2135 ),
2136 ]);
2137 let arrow_schema = ArrowSchema::new(vec![
2138 ArrowField::new(
2139 "a",
2140 ArrowDataType::Struct(ArrowFields::from(vec![
2141 ArrowField::new("a1", ArrowDataType::Int32, false),
2142 ArrowField::new(
2143 "a2",
2144 ArrowDataType::Struct(ArrowFields::from(vec![
2145 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2146 ArrowField::new_map(
2147 "a22",
2148 "entries",
2149 ArrowFieldRef::new(ArrowField::new(
2150 "key",
2151 ArrowDataType::Utf8,
2152 false,
2153 )),
2154 ArrowFieldRef::new(ArrowField::new(
2155 "value",
2156 ArrowDataType::Utf8,
2157 false,
2158 )),
2159 false,
2160 false,
2161 ),
2162 ])),
2163 false,
2164 ),
2165 ])),
2166 false,
2167 ),
2168 ArrowField::new(
2169 "b",
2170 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2171 ArrowDataType::Struct(ArrowFields::from(vec![
2172 ArrowField::new("b1", ArrowDataType::Int32, false),
2173 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2174 ArrowField::new_map(
2175 "b3",
2176 "entries",
2177 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2178 ArrowFieldRef::new(ArrowField::new(
2179 "value",
2180 ArrowDataType::Utf8,
2181 false,
2182 )),
2183 false,
2184 false,
2185 ),
2186 ])),
2187 false,
2188 ))),
2189 false,
2190 ),
2191 ArrowField::new_map(
2192 "c",
2193 "entries",
2194 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2195 ArrowFieldRef::new(ArrowField::new(
2196 "value",
2197 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2198 ArrowDataType::Struct(ArrowFields::from(vec![
2199 ArrowField::new("c1", ArrowDataType::Int32, false),
2200 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2201 ArrowField::new_map(
2202 "c3",
2203 "entries",
2204 ArrowFieldRef::new(ArrowField::new(
2205 "key",
2206 ArrowDataType::Utf8,
2207 false,
2208 )),
2209 ArrowFieldRef::new(ArrowField::new(
2210 "value",
2211 ArrowDataType::Utf8,
2212 false,
2213 )),
2214 false,
2215 false,
2216 ),
2217 ])),
2218 false,
2219 ))),
2220 false,
2221 )),
2222 false,
2223 false,
2224 ),
2225 ]);
2226 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2227 }
2228
2229 #[test]
2230 fn test_parse_iceberg_config() {
2231 let values = [
2232 ("connector", "iceberg"),
2233 ("type", "upsert"),
2234 ("primary_key", "v1"),
2235 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2236 ("warehouse.path", "s3://iceberg"),
2237 ("s3.endpoint", "http://127.0.0.1:9301"),
2238 ("s3.access.key", "hummockadmin"),
2239 ("s3.secret.key", "hummockadmin"),
2240 ("s3.path.style.access", "true"),
2241 ("s3.region", "us-east-1"),
2242 ("catalog.type", "jdbc"),
2243 ("catalog.name", "demo"),
2244 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2245 ("catalog.jdbc.user", "admin"),
2246 ("catalog.jdbc.password", "123456"),
2247 ("database.name", "demo_db"),
2248 ("table.name", "demo_table"),
2249 ("enable_compaction", "true"),
2250 ("compaction_interval_sec", "1800"),
2251 ("enable_snapshot_expiration", "true"),
2252 ]
2253 .into_iter()
2254 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2255 .collect();
2256
2257 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2258
2259 let expected_iceberg_config = IcebergConfig {
2260 common: IcebergCommon {
2261 warehouse_path: Some("s3://iceberg".to_owned()),
2262 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2263 region: Some("us-east-1".to_owned()),
2264 endpoint: Some("http://127.0.0.1:9301".to_owned()),
2265 access_key: Some("hummockadmin".to_owned()),
2266 secret_key: Some("hummockadmin".to_owned()),
2267 gcs_credential: None,
2268 catalog_type: Some("jdbc".to_owned()),
2269 glue_id: None,
2270 catalog_name: Some("demo".to_owned()),
2271 database_name: Some("demo_db".to_owned()),
2272 table_name: "demo_table".to_owned(),
2273 path_style_access: Some(true),
2274 credential: None,
2275 oauth2_server_uri: None,
2276 scope: None,
2277 token: None,
2278 enable_config_load: None,
2279 rest_signing_name: None,
2280 rest_signing_region: None,
2281 rest_sigv4_enabled: None,
2282 hosted_catalog: None,
2283 azblob_account_name: None,
2284 azblob_account_key: None,
2285 azblob_endpoint_url: None,
2286 },
2287 r#type: "upsert".to_owned(),
2288 force_append_only: false,
2289 primary_key: Some(vec!["v1".to_owned()]),
2290 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2291 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2292 .into_iter()
2293 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2294 .collect(),
2295 commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
2296 create_table_if_not_exists: false,
2297 is_exactly_once: None,
2298 commit_retry_num: 8,
2299 enable_compaction: true,
2300 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2301 enable_snapshot_expiration: true,
2302 };
2303
2304 assert_eq!(iceberg_config, expected_iceberg_config);
2305
2306 assert_eq!(
2307 &iceberg_config.common.full_table_name().unwrap().to_string(),
2308 "demo_db.demo_table"
2309 );
2310 }
2311
2312 async fn test_create_catalog(configs: BTreeMap<String, String>) {
2313 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2314
2315 let table = iceberg_config.load_table().await.unwrap();
2316
2317 println!("{:?}", table.identifier());
2318 }
2319
2320 #[tokio::test]
2321 #[ignore]
2322 async fn test_storage_catalog() {
2323 let values = [
2324 ("connector", "iceberg"),
2325 ("type", "append-only"),
2326 ("force_append_only", "true"),
2327 ("s3.endpoint", "http://127.0.0.1:9301"),
2328 ("s3.access.key", "hummockadmin"),
2329 ("s3.secret.key", "hummockadmin"),
2330 ("s3.region", "us-east-1"),
2331 ("s3.path.style.access", "true"),
2332 ("catalog.name", "demo"),
2333 ("catalog.type", "storage"),
2334 ("warehouse.path", "s3://icebergdata/demo"),
2335 ("database.name", "s1"),
2336 ("table.name", "t1"),
2337 ]
2338 .into_iter()
2339 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2340 .collect();
2341
2342 test_create_catalog(values).await;
2343 }
2344
2345 #[tokio::test]
2346 #[ignore]
2347 async fn test_rest_catalog() {
2348 let values = [
2349 ("connector", "iceberg"),
2350 ("type", "append-only"),
2351 ("force_append_only", "true"),
2352 ("s3.endpoint", "http://127.0.0.1:9301"),
2353 ("s3.access.key", "hummockadmin"),
2354 ("s3.secret.key", "hummockadmin"),
2355 ("s3.region", "us-east-1"),
2356 ("s3.path.style.access", "true"),
2357 ("catalog.name", "demo"),
2358 ("catalog.type", "rest"),
2359 ("catalog.uri", "http://192.168.167.4:8181"),
2360 ("warehouse.path", "s3://icebergdata/demo"),
2361 ("database.name", "s1"),
2362 ("table.name", "t1"),
2363 ]
2364 .into_iter()
2365 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2366 .collect();
2367
2368 test_create_catalog(values).await;
2369 }
2370
2371 #[tokio::test]
2372 #[ignore]
2373 async fn test_jdbc_catalog() {
2374 let values = [
2375 ("connector", "iceberg"),
2376 ("type", "append-only"),
2377 ("force_append_only", "true"),
2378 ("s3.endpoint", "http://127.0.0.1:9301"),
2379 ("s3.access.key", "hummockadmin"),
2380 ("s3.secret.key", "hummockadmin"),
2381 ("s3.region", "us-east-1"),
2382 ("s3.path.style.access", "true"),
2383 ("catalog.name", "demo"),
2384 ("catalog.type", "jdbc"),
2385 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2386 ("catalog.jdbc.user", "admin"),
2387 ("catalog.jdbc.password", "123456"),
2388 ("warehouse.path", "s3://icebergdata/demo"),
2389 ("database.name", "s1"),
2390 ("table.name", "t1"),
2391 ]
2392 .into_iter()
2393 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2394 .collect();
2395
2396 test_create_catalog(values).await;
2397 }
2398
2399 #[tokio::test]
2400 #[ignore]
2401 async fn test_hive_catalog() {
2402 let values = [
2403 ("connector", "iceberg"),
2404 ("type", "append-only"),
2405 ("force_append_only", "true"),
2406 ("s3.endpoint", "http://127.0.0.1:9301"),
2407 ("s3.access.key", "hummockadmin"),
2408 ("s3.secret.key", "hummockadmin"),
2409 ("s3.region", "us-east-1"),
2410 ("s3.path.style.access", "true"),
2411 ("catalog.name", "demo"),
2412 ("catalog.type", "hive"),
2413 ("catalog.uri", "thrift://localhost:9083"),
2414 ("warehouse.path", "s3://icebergdata/demo"),
2415 ("database.name", "s1"),
2416 ("table.name", "t1"),
2417 ]
2418 .into_iter()
2419 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2420 .collect();
2421
2422 test_create_catalog(values).await;
2423 }
2424}