1mod compaction;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
27use iceberg::spec::{
28 DataFile, SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
29};
30use iceberg::table::Table;
31use iceberg::transaction::Transaction;
32use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
33use iceberg::writer::base_writer::equality_delete_writer::{
34 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
35};
36use iceberg::writer::base_writer::sort_position_delete_writer::{
37 POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
38};
39use iceberg::writer::file_writer::ParquetWriterBuilder;
40use iceberg::writer::file_writer::location_generator::{
41 DefaultFileNameGenerator, DefaultLocationGenerator,
42};
43use iceberg::writer::function_writer::equality_delta_writer::{
44 DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
45};
46use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
47use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
48use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
49use itertools::Itertools;
50use parquet::file::properties::WriterProperties;
51use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
52use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
53use regex::Regex;
54use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
55use risingwave_common::array::arrow::arrow_schema_iceberg::{
56 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
57 Schema as ArrowSchema, SchemaRef,
58};
59use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
60use risingwave_common::array::{Op, StreamChunk};
61use risingwave_common::bail;
62use risingwave_common::bitmap::Bitmap;
63use risingwave_common::catalog::Schema;
64use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
65use risingwave_common_estimate_size::EstimateSize;
66use risingwave_meta_model::exactly_once_iceberg_sink::{self, Column, Entity, Model};
67use risingwave_pb::connector_service::SinkMetadata;
68use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
69use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
70use sea_orm::{
71 ColumnTrait, DatabaseConnection, EntityTrait, Order, PaginatorTrait, QueryFilter, QueryOrder,
72 Set,
73};
74use serde_derive::Deserialize;
75use serde_json::from_value;
76use serde_with::{DisplayFromStr, serde_as};
77use thiserror_ext::AsReport;
78use tokio::sync::mpsc::{self};
79use tokio::sync::oneshot;
80use tokio_retry::Retry;
81use tokio_retry::strategy::{ExponentialBackoff, jitter};
82use tracing::warn;
83use url::Url;
84use uuid::Uuid;
85use with_options::WithOptions;
86
87use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
88use super::{
89 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
90 SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
91};
92use crate::connector_common::IcebergCommon;
93use crate::enforce_secret::EnforceSecret;
94use crate::sink::coordinate::CoordinatedLogSinker;
95use crate::sink::writer::SinkWriter;
96use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
97use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
98
99pub const ICEBERG_SINK: &str = "iceberg";
100
101fn default_commit_retry_num() -> u32 {
102 8
103}
104
105#[serde_as]
106#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
107pub struct IcebergConfig {
108 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
111 pub force_append_only: bool,
112
113 #[serde(flatten)]
114 common: IcebergCommon,
115
116 #[serde(
117 rename = "primary_key",
118 default,
119 deserialize_with = "deserialize_optional_string_seq_from_string"
120 )]
121 pub primary_key: Option<Vec<String>>,
122
123 #[serde(skip)]
125 pub java_catalog_props: HashMap<String, String>,
126
127 #[serde(default)]
128 pub partition_by: Option<String>,
129
130 #[serde(default = "default_commit_checkpoint_interval")]
132 #[serde_as(as = "DisplayFromStr")]
133 pub commit_checkpoint_interval: u64,
134
135 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
136 pub create_table_if_not_exists: bool,
137
138 #[serde(default)]
140 #[serde_as(as = "Option<DisplayFromStr>")]
141 pub is_exactly_once: Option<bool>,
142 #[serde(default = "default_commit_retry_num")]
147 pub commit_retry_num: u32,
148}
149
150impl EnforceSecret for IcebergConfig {
151 fn enforce_secret<'a>(
152 prop_iter: impl Iterator<Item = &'a str>,
153 ) -> crate::error::ConnectorResult<()> {
154 for prop in prop_iter {
155 IcebergCommon::enforce_one(prop)?;
156 }
157 Ok(())
158 }
159
160 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
161 IcebergCommon::enforce_one(prop)
162 }
163}
164
165impl IcebergConfig {
166 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
167 let mut config =
168 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
169 .map_err(|e| SinkError::Config(anyhow!(e)))?;
170
171 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
172 return Err(SinkError::Config(anyhow!(
173 "`{}` must be {}, or {}",
174 SINK_TYPE_OPTION,
175 SINK_TYPE_APPEND_ONLY,
176 SINK_TYPE_UPSERT
177 )));
178 }
179
180 if config.r#type == SINK_TYPE_UPSERT {
181 if let Some(primary_key) = &config.primary_key {
182 if primary_key.is_empty() {
183 return Err(SinkError::Config(anyhow!(
184 "`primary_key` must not be empty in {}",
185 SINK_TYPE_UPSERT
186 )));
187 }
188 } else {
189 return Err(SinkError::Config(anyhow!(
190 "Must set `primary_key` in {}",
191 SINK_TYPE_UPSERT
192 )));
193 }
194 }
195
196 config.java_catalog_props = values
198 .iter()
199 .filter(|(k, _v)| {
200 k.starts_with("catalog.")
201 && k != &"catalog.uri"
202 && k != &"catalog.type"
203 && k != &"catalog.name"
204 })
205 .map(|(k, v)| (k[8..].to_string(), v.to_string()))
206 .collect();
207
208 if config.commit_checkpoint_interval == 0 {
209 return Err(SinkError::Config(anyhow!(
210 "`commit_checkpoint_interval` must be greater than 0"
211 )));
212 }
213
214 Ok(config)
215 }
216
217 pub fn catalog_type(&self) -> &str {
218 self.common.catalog_type()
219 }
220
221 pub async fn load_table(&self) -> Result<Table> {
222 self.common
223 .load_table(&self.java_catalog_props)
224 .await
225 .map_err(Into::into)
226 }
227
228 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
229 self.common
230 .create_catalog(&self.java_catalog_props)
231 .await
232 .map_err(Into::into)
233 }
234
235 pub fn full_table_name(&self) -> Result<TableIdent> {
236 self.common.full_table_name().map_err(Into::into)
237 }
238}
239
240pub struct IcebergSink {
241 config: IcebergConfig,
242 param: SinkParam,
243 unique_column_ids: Option<Vec<usize>>,
245}
246
247impl EnforceSecret for IcebergSink {
248 fn enforce_secret<'a>(
249 prop_iter: impl Iterator<Item = &'a str>,
250 ) -> crate::error::ConnectorResult<()> {
251 for prop in prop_iter {
252 IcebergConfig::enforce_one(prop)?;
253 }
254 Ok(())
255 }
256}
257
258impl TryFrom<SinkParam> for IcebergSink {
259 type Error = SinkError;
260
261 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
262 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
263 IcebergSink::new(config, param)
264 }
265}
266
267impl Debug for IcebergSink {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 f.debug_struct("IcebergSink")
270 .field("config", &self.config)
271 .finish()
272 }
273}
274
275impl IcebergSink {
276 async fn create_and_validate_table(&self) -> Result<Table> {
277 if self.config.create_table_if_not_exists {
278 self.create_table_if_not_exists().await?;
279 }
280
281 let table = self
282 .config
283 .load_table()
284 .await
285 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
286
287 let sink_schema = self.param.schema();
288 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
289 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
290
291 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
292 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
293
294 Ok(table)
295 }
296
297 async fn create_table_if_not_exists(&self) -> Result<()> {
298 let catalog = self.config.create_catalog().await?;
299 let namespace = if let Some(database_name) = &self.config.common.database_name {
300 let namespace = NamespaceIdent::new(database_name.clone());
301 if !catalog
302 .namespace_exists(&namespace)
303 .await
304 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
305 {
306 catalog
307 .create_namespace(&namespace, HashMap::default())
308 .await
309 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
310 .context("failed to create iceberg namespace")?;
311 }
312 namespace
313 } else {
314 bail!("database name must be set if you want to create table")
315 };
316
317 let table_id = self
318 .config
319 .full_table_name()
320 .context("Unable to parse table name")?;
321 if !catalog
322 .table_exists(&table_id)
323 .await
324 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
325 {
326 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
327 let arrow_fields = self
329 .param
330 .columns
331 .iter()
332 .map(|column| {
333 Ok(iceberg_create_table_arrow_convert
334 .to_arrow_field(&column.name, &column.data_type)
335 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
336 .context(format!(
337 "failed to convert {}: {} to arrow type",
338 &column.name, &column.data_type
339 ))?)
340 })
341 .collect::<Result<Vec<ArrowField>>>()?;
342 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
343 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
344 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
345 .context("failed to convert arrow schema to iceberg schema")?;
346
347 let location = {
348 let mut names = namespace.clone().inner();
349 names.push(self.config.common.table_name.clone());
350 match &self.config.common.warehouse_path {
351 Some(warehouse_path) => {
352 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
353 let url = Url::parse(warehouse_path);
354 if url.is_err() || is_s3_tables {
355 if self.config.common.catalog_type() == "rest"
358 || self.config.common.catalog_type() == "rest_rust"
359 {
360 None
361 } else {
362 bail!(format!("Invalid warehouse path: {}", warehouse_path))
363 }
364 } else if warehouse_path.ends_with('/') {
365 Some(format!("{}{}", warehouse_path, names.join("/")))
366 } else {
367 Some(format!("{}/{}", warehouse_path, names.join("/")))
368 }
369 }
370 None => None,
371 }
372 };
373
374 let partition_spec = match &self.config.partition_by {
375 Some(partition_field) => {
376 let mut partition_fields = Vec::<UnboundPartitionField>::new();
377 let re = Regex::new(
379 r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?",
380 )
381 .unwrap();
382 if !re.is_match(partition_field) {
383 bail!(format!(
384 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
385 partition_field
386 ))
387 }
388 let caps = re.captures_iter(partition_field);
389 for (i, mat) in caps.enumerate() {
390 let (column, transform) =
391 if mat.name("n").is_none() && mat.name("field").is_none() {
392 (&mat["transform"], Transform::Identity)
393 } else {
394 let mut func = mat["transform"].to_owned();
395 if func == "bucket" || func == "truncate" {
396 let n = &mat
397 .name("n")
398 .ok_or_else(|| {
399 SinkError::Iceberg(anyhow!(
400 "The `n` must be set with `bucket` and `truncate`"
401 ))
402 })?
403 .as_str();
404 func = format!("{func}[{n}]");
405 }
406 (
407 &mat["field"],
408 Transform::from_str(&func)
409 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?,
410 )
411 };
412
413 match iceberg_schema.field_id_by_name(column) {
414 Some(id) => partition_fields.push(
415 UnboundPartitionField::builder()
416 .source_id(id)
417 .transform(transform)
418 .name(format!("_p_{}", column))
419 .field_id(i as i32)
420 .build(),
421 ),
422 None => bail!(format!(
423 "Partition source column does not exist in schema: {}",
424 column
425 )),
426 };
427 }
428 Some(
429 UnboundPartitionSpec::builder()
430 .with_spec_id(0)
431 .add_partition_fields(partition_fields)
432 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
433 .context("failed to add partition columns")?
434 .build(),
435 )
436 }
437 None => None,
438 };
439
440 let table_creation_builder = TableCreation::builder()
441 .name(self.config.common.table_name.clone())
442 .schema(iceberg_schema);
443
444 let table_creation = match (location, partition_spec) {
445 (Some(location), Some(partition_spec)) => table_creation_builder
446 .location(location)
447 .partition_spec(partition_spec)
448 .build(),
449 (Some(location), None) => table_creation_builder.location(location).build(),
450 (None, Some(partition_spec)) => table_creation_builder
451 .partition_spec(partition_spec)
452 .build(),
453 (None, None) => table_creation_builder.build(),
454 };
455
456 catalog
457 .create_table(&namespace, table_creation)
458 .await
459 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
460 .context("failed to create iceberg table")?;
461 }
462 Ok(())
463 }
464
465 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
466 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
467 if let Some(pk) = &config.primary_key {
468 let mut unique_column_ids = Vec::with_capacity(pk.len());
469 for col_name in pk {
470 let id = param
471 .columns
472 .iter()
473 .find(|col| col.name.as_str() == col_name)
474 .ok_or_else(|| {
475 SinkError::Config(anyhow!(
476 "Primary key column {} not found in sink schema",
477 col_name
478 ))
479 })?
480 .column_id
481 .get_id() as usize;
482 unique_column_ids.push(id);
483 }
484 Some(unique_column_ids)
485 } else {
486 unreachable!()
487 }
488 } else {
489 None
490 };
491 Ok(Self {
492 config,
493 param,
494 unique_column_ids,
495 })
496 }
497}
498
499impl Sink for IcebergSink {
500 type Coordinator = IcebergSinkCommitter;
501 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
502
503 const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &["commit_checkpoint_interval"];
504 const SINK_NAME: &'static str = ICEBERG_SINK;
505
506 async fn validate(&self) -> Result<()> {
507 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
508 bail!("Snowflake catalog only supports iceberg sources");
509 }
510 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
511 risingwave_common::license::Feature::IcebergSinkWithGlue
512 .check_available()
513 .map_err(|e| anyhow::anyhow!(e))?;
514 }
515 let _ = self.create_and_validate_table().await?;
516 Ok(())
517 }
518
519 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
520 IcebergConfig::from_btreemap(config.clone())?;
521 Ok(())
522 }
523
524 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
525 let table = self.create_and_validate_table().await?;
526 let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
527 IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
528 } else {
529 IcebergSinkWriter::new_append_only(table, &writer_param).await?
530 };
531
532 let commit_checkpoint_interval =
533 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
534 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
535 );
536 let writer = CoordinatedLogSinker::new(
537 &writer_param,
538 self.param.clone(),
539 inner,
540 commit_checkpoint_interval,
541 )
542 .await?;
543
544 Ok(writer)
545 }
546
547 fn is_coordinated_sink(&self) -> bool {
548 true
549 }
550
551 async fn new_coordinator(&self, db: DatabaseConnection) -> Result<Self::Coordinator> {
552 let catalog = self.config.create_catalog().await?;
553 let table = self.create_and_validate_table().await?;
554 Ok(IcebergSinkCommitter {
556 catalog,
557 table,
558 is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
559 last_commit_epoch: 0,
560 sink_id: self.param.sink_id.sink_id(),
561 config: self.config.clone(),
562 param: self.param.clone(),
563 db,
564 commit_notifier: None,
565 _compact_task_guard: None,
566 commit_retry_num: self.config.commit_retry_num,
567 committed_epoch_subscriber: None,
568 })
569 }
570}
571
572pub struct IcebergSinkWriter {
573 writer: IcebergWriterDispatch,
574 arrow_schema: SchemaRef,
575 metrics: IcebergWriterMetrics,
577 table: Table,
579}
580
581#[allow(clippy::type_complexity)]
582enum IcebergWriterDispatch {
583 PartitionAppendOnly {
584 writer: Option<Box<dyn IcebergWriter>>,
585 writer_builder: MonitoredGeneralWriterBuilder<
586 FanoutPartitionWriterBuilder<
587 DataFileWriterBuilder<
588 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
589 >,
590 >,
591 >,
592 },
593 NonpartitionAppendOnly {
594 writer: Option<Box<dyn IcebergWriter>>,
595 writer_builder: MonitoredGeneralWriterBuilder<
596 DataFileWriterBuilder<
597 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
598 >,
599 >,
600 },
601 PartitionUpsert {
602 writer: Option<Box<dyn IcebergWriter>>,
603 writer_builder: MonitoredGeneralWriterBuilder<
604 FanoutPartitionWriterBuilder<
605 EqualityDeltaWriterBuilder<
606 DataFileWriterBuilder<
607 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
608 >,
609 MonitoredPositionDeleteWriterBuilder<
610 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
611 >,
612 EqualityDeleteFileWriterBuilder<
613 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
614 >,
615 >,
616 >,
617 >,
618 arrow_schema_with_op_column: SchemaRef,
619 },
620 NonpartitionUpsert {
621 writer: Option<Box<dyn IcebergWriter>>,
622 writer_builder: MonitoredGeneralWriterBuilder<
623 EqualityDeltaWriterBuilder<
624 DataFileWriterBuilder<
625 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
626 >,
627 MonitoredPositionDeleteWriterBuilder<
628 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
629 >,
630 EqualityDeleteFileWriterBuilder<
631 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
632 >,
633 >,
634 >,
635 arrow_schema_with_op_column: SchemaRef,
636 },
637}
638
639impl IcebergWriterDispatch {
640 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
641 match self {
642 IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
643 | IcebergWriterDispatch::NonpartitionAppendOnly { writer, .. }
644 | IcebergWriterDispatch::PartitionUpsert { writer, .. }
645 | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
646 }
647 }
648}
649
650pub struct IcebergWriterMetrics {
651 _write_qps: LabelGuardedIntCounter,
656 _write_latency: LabelGuardedHistogram,
657 write_bytes: LabelGuardedIntCounter,
658}
659
660impl IcebergSinkWriter {
661 pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
662 let SinkWriterParam {
663 extra_partition_col_idx,
664 actor_id,
665 sink_id,
666 sink_name,
667 ..
668 } = writer_param;
669 let metrics_labels = [
670 &actor_id.to_string(),
671 &sink_id.to_string(),
672 sink_name.as_str(),
673 ];
674
675 let write_qps = GLOBAL_SINK_METRICS
677 .iceberg_write_qps
678 .with_guarded_label_values(&metrics_labels);
679 let write_latency = GLOBAL_SINK_METRICS
680 .iceberg_write_latency
681 .with_guarded_label_values(&metrics_labels);
682 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
685 .iceberg_rolling_unflushed_data_file
686 .with_guarded_label_values(&metrics_labels);
687 let write_bytes = GLOBAL_SINK_METRICS
688 .iceberg_write_bytes
689 .with_guarded_label_values(&metrics_labels);
690
691 let schema = table.metadata().current_schema();
692 let partition_spec = table.metadata().default_partition_spec();
693
694 let unique_uuid_suffix = Uuid::now_v7();
696
697 let parquet_writer_builder = ParquetWriterBuilder::new(
698 WriterProperties::new(),
699 schema.clone(),
700 table.file_io().clone(),
701 DefaultLocationGenerator::new(table.metadata().clone())
702 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
703 DefaultFileNameGenerator::new(
704 writer_param.actor_id.to_string(),
705 Some(unique_uuid_suffix.to_string()),
706 iceberg::spec::DataFileFormat::Parquet,
707 ),
708 );
709 let data_file_builder =
710 DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
711 if let Some(_extra_partition_col_idx) = extra_partition_col_idx {
712 Err(SinkError::Iceberg(anyhow!(
713 "Extra partition column is not supported in append-only mode"
714 )))
715 } else if partition_spec.fields().is_empty() {
716 let writer_builder = MonitoredGeneralWriterBuilder::new(
717 data_file_builder,
718 write_qps.clone(),
719 write_latency.clone(),
720 );
721 let inner_writer = Some(Box::new(
722 writer_builder
723 .clone()
724 .build()
725 .await
726 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
727 ) as Box<dyn IcebergWriter>);
728 Ok(Self {
729 arrow_schema: Arc::new(
730 schema_to_arrow_schema(table.metadata().current_schema())
731 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
732 ),
733 metrics: IcebergWriterMetrics {
734 _write_qps: write_qps,
735 _write_latency: write_latency,
736 write_bytes,
737 },
738 writer: IcebergWriterDispatch::NonpartitionAppendOnly {
739 writer: inner_writer,
740 writer_builder,
741 },
742 table,
743 })
744 } else {
745 let partition_builder = MonitoredGeneralWriterBuilder::new(
746 FanoutPartitionWriterBuilder::new(
747 data_file_builder,
748 partition_spec.clone(),
749 schema.clone(),
750 )
751 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
752 write_qps.clone(),
753 write_latency.clone(),
754 );
755 let inner_writer = Some(Box::new(
756 partition_builder
757 .clone()
758 .build()
759 .await
760 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
761 ) as Box<dyn IcebergWriter>);
762 Ok(Self {
763 arrow_schema: Arc::new(
764 schema_to_arrow_schema(table.metadata().current_schema())
765 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
766 ),
767 metrics: IcebergWriterMetrics {
768 _write_qps: write_qps,
769 _write_latency: write_latency,
770 write_bytes,
771 },
772 writer: IcebergWriterDispatch::PartitionAppendOnly {
773 writer: inner_writer,
774 writer_builder: partition_builder,
775 },
776 table,
777 })
778 }
779 }
780
781 pub async fn new_upsert(
782 table: Table,
783 unique_column_ids: Vec<usize>,
784 writer_param: &SinkWriterParam,
785 ) -> Result<Self> {
786 let SinkWriterParam {
787 extra_partition_col_idx,
788 actor_id,
789 sink_id,
790 sink_name,
791 ..
792 } = writer_param;
793 let metrics_labels = [
794 &actor_id.to_string(),
795 &sink_id.to_string(),
796 sink_name.as_str(),
797 ];
798 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
799
800 let write_qps = GLOBAL_SINK_METRICS
802 .iceberg_write_qps
803 .with_guarded_label_values(&metrics_labels);
804 let write_latency = GLOBAL_SINK_METRICS
805 .iceberg_write_latency
806 .with_guarded_label_values(&metrics_labels);
807 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
810 .iceberg_rolling_unflushed_data_file
811 .with_guarded_label_values(&metrics_labels);
812 let position_delete_cache_num = GLOBAL_SINK_METRICS
813 .iceberg_position_delete_cache_num
814 .with_guarded_label_values(&metrics_labels);
815 let write_bytes = GLOBAL_SINK_METRICS
816 .iceberg_write_bytes
817 .with_guarded_label_values(&metrics_labels);
818
819 let schema = table.metadata().current_schema();
821 let partition_spec = table.metadata().default_partition_spec();
822
823 let unique_uuid_suffix = Uuid::now_v7();
825
826 let data_file_builder = {
827 let parquet_writer_builder = ParquetWriterBuilder::new(
828 WriterProperties::new(),
829 schema.clone(),
830 table.file_io().clone(),
831 DefaultLocationGenerator::new(table.metadata().clone())
832 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
833 DefaultFileNameGenerator::new(
834 writer_param.actor_id.to_string(),
835 Some(unique_uuid_suffix.to_string()),
836 iceberg::spec::DataFileFormat::Parquet,
837 ),
838 );
839 DataFileWriterBuilder::new(
840 parquet_writer_builder.clone(),
841 None,
842 partition_spec.spec_id(),
843 )
844 };
845 let position_delete_builder = {
846 let parquet_writer_builder = ParquetWriterBuilder::new(
847 WriterProperties::new(),
848 POSITION_DELETE_SCHEMA.clone(),
849 table.file_io().clone(),
850 DefaultLocationGenerator::new(table.metadata().clone())
851 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
852 DefaultFileNameGenerator::new(
853 writer_param.actor_id.to_string(),
854 Some(format!("pos-del-{}", unique_uuid_suffix)),
855 iceberg::spec::DataFileFormat::Parquet,
856 ),
857 );
858 MonitoredPositionDeleteWriterBuilder::new(
859 SortPositionDeleteWriterBuilder::new(
860 parquet_writer_builder.clone(),
861 writer_param
862 .streaming_config
863 .developer
864 .iceberg_sink_positional_delete_cache_size,
865 None,
866 None,
867 ),
868 position_delete_cache_num,
869 )
870 };
871 let equality_delete_builder = {
872 let config = EqualityDeleteWriterConfig::new(
873 unique_column_ids.clone(),
874 table.metadata().current_schema().clone(),
875 None,
876 partition_spec.spec_id(),
877 )
878 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
879 let parquet_writer_builder = ParquetWriterBuilder::new(
880 WriterProperties::new(),
881 Arc::new(
882 arrow_schema_to_schema(config.projected_arrow_schema_ref())
883 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
884 ),
885 table.file_io().clone(),
886 DefaultLocationGenerator::new(table.metadata().clone())
887 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
888 DefaultFileNameGenerator::new(
889 writer_param.actor_id.to_string(),
890 Some(format!("eq-del-{}", unique_uuid_suffix)),
891 iceberg::spec::DataFileFormat::Parquet,
892 ),
893 );
894
895 EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
896 };
897 let delta_builder = EqualityDeltaWriterBuilder::new(
898 data_file_builder,
899 position_delete_builder,
900 equality_delete_builder,
901 unique_column_ids,
902 );
903 if let Some(_extra_partition_col_idx) = extra_partition_col_idx {
904 Err(SinkError::Iceberg(anyhow!(
905 "Extra partition column is not supported in upsert mode"
906 )))
907 } else if partition_spec.fields().is_empty() {
908 let writer_builder = MonitoredGeneralWriterBuilder::new(
909 delta_builder,
910 write_qps.clone(),
911 write_latency.clone(),
912 );
913 let inner_writer = Some(Box::new(
914 writer_builder
915 .clone()
916 .build()
917 .await
918 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
919 ) as Box<dyn IcebergWriter>);
920 let original_arrow_schema = Arc::new(
921 schema_to_arrow_schema(table.metadata().current_schema())
922 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
923 );
924 let schema_with_extra_op_column = {
925 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
926 new_fields.push(Arc::new(ArrowField::new(
927 "op".to_owned(),
928 ArrowDataType::Int32,
929 false,
930 )));
931 Arc::new(ArrowSchema::new(new_fields))
932 };
933 Ok(Self {
934 arrow_schema: original_arrow_schema,
935 metrics: IcebergWriterMetrics {
936 _write_qps: write_qps,
937 _write_latency: write_latency,
938 write_bytes,
939 },
940 table,
941 writer: IcebergWriterDispatch::NonpartitionUpsert {
942 writer: inner_writer,
943 writer_builder,
944 arrow_schema_with_op_column: schema_with_extra_op_column,
945 },
946 })
947 } else {
948 let original_arrow_schema = Arc::new(
949 schema_to_arrow_schema(table.metadata().current_schema())
950 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
951 );
952 let schema_with_extra_op_column = {
953 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
954 new_fields.push(Arc::new(ArrowField::new(
955 "op".to_owned(),
956 ArrowDataType::Int32,
957 false,
958 )));
959 Arc::new(ArrowSchema::new(new_fields))
960 };
961 let partition_builder = MonitoredGeneralWriterBuilder::new(
962 FanoutPartitionWriterBuilder::new_with_custom_schema(
963 delta_builder,
964 schema_with_extra_op_column.clone(),
965 partition_spec.clone(),
966 table.metadata().current_schema().clone(),
967 ),
968 write_qps.clone(),
969 write_latency.clone(),
970 );
971 let inner_writer = Some(Box::new(
972 partition_builder
973 .clone()
974 .build()
975 .await
976 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
977 ) as Box<dyn IcebergWriter>);
978 Ok(Self {
979 arrow_schema: original_arrow_schema,
980 metrics: IcebergWriterMetrics {
981 _write_qps: write_qps,
982 _write_latency: write_latency,
983 write_bytes,
984 },
985 table,
986 writer: IcebergWriterDispatch::PartitionUpsert {
987 writer: inner_writer,
988 writer_builder: partition_builder,
989 arrow_schema_with_op_column: schema_with_extra_op_column,
990 },
991 })
992 }
993 }
994}
995
996#[async_trait]
997impl SinkWriter for IcebergSinkWriter {
998 type CommitMetadata = Option<SinkMetadata>;
999
1000 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1002 Ok(())
1004 }
1005
1006 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1008 match &mut self.writer {
1010 IcebergWriterDispatch::PartitionAppendOnly {
1011 writer,
1012 writer_builder,
1013 } => {
1014 if writer.is_none() {
1015 *writer = Some(Box::new(
1016 writer_builder
1017 .clone()
1018 .build()
1019 .await
1020 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1021 ));
1022 }
1023 }
1024 IcebergWriterDispatch::NonpartitionAppendOnly {
1025 writer,
1026 writer_builder,
1027 } => {
1028 if writer.is_none() {
1029 *writer = Some(Box::new(
1030 writer_builder
1031 .clone()
1032 .build()
1033 .await
1034 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1035 ));
1036 }
1037 }
1038 IcebergWriterDispatch::PartitionUpsert {
1039 writer,
1040 writer_builder,
1041 ..
1042 } => {
1043 if writer.is_none() {
1044 *writer = Some(Box::new(
1045 writer_builder
1046 .clone()
1047 .build()
1048 .await
1049 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1050 ));
1051 }
1052 }
1053 IcebergWriterDispatch::NonpartitionUpsert {
1054 writer,
1055 writer_builder,
1056 ..
1057 } => {
1058 if writer.is_none() {
1059 *writer = Some(Box::new(
1060 writer_builder
1061 .clone()
1062 .build()
1063 .await
1064 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1065 ));
1066 }
1067 }
1068 };
1069
1070 let (mut chunk, ops) = chunk.compact().into_parts();
1072 if ops.is_empty() {
1073 return Ok(());
1074 }
1075 let write_batch_size = chunk.estimated_heap_size();
1076 let batch = match &self.writer {
1077 IcebergWriterDispatch::PartitionAppendOnly { .. }
1078 | IcebergWriterDispatch::NonpartitionAppendOnly { .. } => {
1079 let filters =
1081 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1082 chunk.set_visibility(filters);
1083 IcebergArrowConvert
1084 .to_record_batch(self.arrow_schema.clone(), &chunk.compact())
1085 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1086 }
1087 IcebergWriterDispatch::PartitionUpsert {
1088 arrow_schema_with_op_column,
1089 ..
1090 }
1091 | IcebergWriterDispatch::NonpartitionUpsert {
1092 arrow_schema_with_op_column,
1093 ..
1094 } => {
1095 let chunk = IcebergArrowConvert
1096 .to_record_batch(self.arrow_schema.clone(), &chunk)
1097 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1098 let ops = Arc::new(Int32Array::from(
1099 ops.iter()
1100 .map(|op| match op {
1101 Op::UpdateInsert | Op::Insert => INSERT_OP,
1102 Op::UpdateDelete | Op::Delete => DELETE_OP,
1103 })
1104 .collect_vec(),
1105 ));
1106 let mut columns = chunk.columns().to_vec();
1107 columns.push(ops);
1108 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1109 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1110 }
1111 };
1112
1113 let writer = self.writer.get_writer().unwrap();
1114 writer
1115 .write(batch)
1116 .await
1117 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1118 self.metrics.write_bytes.inc_by(write_batch_size as _);
1119 Ok(())
1120 }
1121
1122 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1125 if !is_checkpoint {
1127 return Ok(None);
1128 }
1129
1130 let close_result = match &mut self.writer {
1131 IcebergWriterDispatch::PartitionAppendOnly {
1132 writer,
1133 writer_builder,
1134 } => {
1135 let close_result = match writer.take() {
1136 Some(mut writer) => Some(writer.close().await),
1137 _ => None,
1138 };
1139 match writer_builder.clone().build().await {
1140 Ok(new_writer) => {
1141 *writer = Some(Box::new(new_writer));
1142 }
1143 _ => {
1144 warn!("Failed to build new writer after close");
1147 }
1148 }
1149 close_result
1150 }
1151 IcebergWriterDispatch::NonpartitionAppendOnly {
1152 writer,
1153 writer_builder,
1154 } => {
1155 let close_result = match writer.take() {
1156 Some(mut writer) => Some(writer.close().await),
1157 _ => None,
1158 };
1159 match writer_builder.clone().build().await {
1160 Ok(new_writer) => {
1161 *writer = Some(Box::new(new_writer));
1162 }
1163 _ => {
1164 warn!("Failed to build new writer after close");
1167 }
1168 }
1169 close_result
1170 }
1171 IcebergWriterDispatch::PartitionUpsert {
1172 writer,
1173 writer_builder,
1174 ..
1175 } => {
1176 let close_result = match writer.take() {
1177 Some(mut writer) => Some(writer.close().await),
1178 _ => None,
1179 };
1180 match writer_builder.clone().build().await {
1181 Ok(new_writer) => {
1182 *writer = Some(Box::new(new_writer));
1183 }
1184 _ => {
1185 warn!("Failed to build new writer after close");
1188 }
1189 }
1190 close_result
1191 }
1192 IcebergWriterDispatch::NonpartitionUpsert {
1193 writer,
1194 writer_builder,
1195 ..
1196 } => {
1197 let close_result = match writer.take() {
1198 Some(mut writer) => Some(writer.close().await),
1199 _ => None,
1200 };
1201 match writer_builder.clone().build().await {
1202 Ok(new_writer) => {
1203 *writer = Some(Box::new(new_writer));
1204 }
1205 _ => {
1206 warn!("Failed to build new writer after close");
1209 }
1210 }
1211 close_result
1212 }
1213 };
1214
1215 match close_result {
1216 Some(Ok(result)) => {
1217 let version = self.table.metadata().format_version() as u8;
1218 let partition_type = self.table.metadata().default_partition_type();
1219 let data_files = result
1220 .into_iter()
1221 .map(|f| {
1222 SerializedDataFile::try_from(f, partition_type, version == 1)
1223 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1224 })
1225 .collect::<Result<Vec<_>>>()?;
1226 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1227 data_files,
1228 schema_id: self.table.metadata().current_schema_id(),
1229 partition_spec_id: self.table.metadata().default_partition_spec_id(),
1230 })?))
1231 }
1232 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1233 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1234 }
1235 }
1236
1237 async fn abort(&mut self) -> Result<()> {
1239 Ok(())
1241 }
1242}
1243
1244const SCHEMA_ID: &str = "schema_id";
1245const PARTITION_SPEC_ID: &str = "partition_spec_id";
1246const DATA_FILES: &str = "data_files";
1247
1248#[derive(Default, Clone)]
1249struct IcebergCommitResult {
1250 schema_id: i32,
1251 partition_spec_id: i32,
1252 data_files: Vec<SerializedDataFile>,
1253}
1254
1255impl IcebergCommitResult {
1256 fn try_from(value: &SinkMetadata) -> Result<Self> {
1257 if let Some(Serialized(v)) = &value.metadata {
1258 let mut values = if let serde_json::Value::Object(v) =
1259 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1260 .context("Can't parse iceberg sink metadata")?
1261 {
1262 v
1263 } else {
1264 bail!("iceberg sink metadata should be an object");
1265 };
1266
1267 let schema_id;
1268 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1269 schema_id = value
1270 .as_u64()
1271 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1272 } else {
1273 bail!("iceberg sink metadata should have schema_id");
1274 }
1275
1276 let partition_spec_id;
1277 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1278 partition_spec_id = value
1279 .as_u64()
1280 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1281 } else {
1282 bail!("iceberg sink metadata should have partition_spec_id");
1283 }
1284
1285 let data_files: Vec<SerializedDataFile>;
1286 if let serde_json::Value::Array(values) = values
1287 .remove(DATA_FILES)
1288 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1289 {
1290 data_files = values
1291 .into_iter()
1292 .map(from_value::<SerializedDataFile>)
1293 .collect::<std::result::Result<_, _>>()
1294 .unwrap();
1295 } else {
1296 bail!("iceberg sink metadata should have data_files object");
1297 }
1298
1299 Ok(Self {
1300 schema_id: schema_id as i32,
1301 partition_spec_id: partition_spec_id as i32,
1302 data_files,
1303 })
1304 } else {
1305 bail!("Can't create iceberg sink write result from empty data!")
1306 }
1307 }
1308
1309 fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1310 let mut values = if let serde_json::Value::Object(value) =
1311 serde_json::from_slice::<serde_json::Value>(&value)
1312 .context("Can't parse iceberg sink metadata")?
1313 {
1314 value
1315 } else {
1316 bail!("iceberg sink metadata should be an object");
1317 };
1318
1319 let schema_id;
1320 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1321 schema_id = value
1322 .as_u64()
1323 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1324 } else {
1325 bail!("iceberg sink metadata should have schema_id");
1326 }
1327
1328 let partition_spec_id;
1329 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1330 partition_spec_id = value
1331 .as_u64()
1332 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1333 } else {
1334 bail!("iceberg sink metadata should have partition_spec_id");
1335 }
1336
1337 let data_files: Vec<SerializedDataFile>;
1338 if let serde_json::Value::Array(values) = values
1339 .remove(DATA_FILES)
1340 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1341 {
1342 data_files = values
1343 .into_iter()
1344 .map(from_value::<SerializedDataFile>)
1345 .collect::<std::result::Result<_, _>>()
1346 .unwrap();
1347 } else {
1348 bail!("iceberg sink metadata should have data_files object");
1349 }
1350
1351 Ok(Self {
1352 schema_id: schema_id as i32,
1353 partition_spec_id: partition_spec_id as i32,
1354 data_files,
1355 })
1356 }
1357}
1358
1359impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1360 type Error = SinkError;
1361
1362 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1363 let json_data_files = serde_json::Value::Array(
1364 value
1365 .data_files
1366 .iter()
1367 .map(serde_json::to_value)
1368 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1369 .context("Can't serialize data files to json")?,
1370 );
1371 let json_value = serde_json::Value::Object(
1372 vec![
1373 (
1374 SCHEMA_ID.to_owned(),
1375 serde_json::Value::Number(value.schema_id.into()),
1376 ),
1377 (
1378 PARTITION_SPEC_ID.to_owned(),
1379 serde_json::Value::Number(value.partition_spec_id.into()),
1380 ),
1381 (DATA_FILES.to_owned(), json_data_files),
1382 ]
1383 .into_iter()
1384 .collect(),
1385 );
1386 Ok(SinkMetadata {
1387 metadata: Some(Serialized(SerializedMetadata {
1388 metadata: serde_json::to_vec(&json_value)
1389 .context("Can't serialize iceberg sink metadata")?,
1390 })),
1391 })
1392 }
1393}
1394
1395impl TryFrom<IcebergCommitResult> for Vec<u8> {
1396 type Error = SinkError;
1397
1398 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1399 let json_data_files = serde_json::Value::Array(
1400 value
1401 .data_files
1402 .iter()
1403 .map(serde_json::to_value)
1404 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1405 .context("Can't serialize data files to json")?,
1406 );
1407 let json_value = serde_json::Value::Object(
1408 vec![
1409 (
1410 SCHEMA_ID.to_owned(),
1411 serde_json::Value::Number(value.schema_id.into()),
1412 ),
1413 (
1414 PARTITION_SPEC_ID.to_owned(),
1415 serde_json::Value::Number(value.partition_spec_id.into()),
1416 ),
1417 (DATA_FILES.to_owned(), json_data_files),
1418 ]
1419 .into_iter()
1420 .collect(),
1421 );
1422 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1423 }
1424}
1425pub struct IcebergSinkCommitter {
1426 catalog: Arc<dyn Catalog>,
1427 table: Table,
1428 pub last_commit_epoch: u64,
1429 pub(crate) is_exactly_once: bool,
1430 pub(crate) sink_id: u32,
1431 pub(crate) config: IcebergConfig,
1432 pub(crate) param: SinkParam,
1433 pub(crate) db: DatabaseConnection,
1434 commit_notifier: Option<mpsc::UnboundedSender<()>>,
1435 commit_retry_num: u32,
1436 _compact_task_guard: Option<oneshot::Sender<()>>,
1437 pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1438}
1439
1440impl IcebergSinkCommitter {
1441 async fn reload_table(
1444 catalog: &dyn Catalog,
1445 table_ident: &TableIdent,
1446 schema_id: i32,
1447 partition_spec_id: i32,
1448 ) -> Result<Table> {
1449 let table = catalog
1450 .load_table(table_ident)
1451 .await
1452 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1453 if table.metadata().current_schema_id() != schema_id {
1454 return Err(SinkError::Iceberg(anyhow!(
1455 "Schema evolution not supported, expect schema id {}, but got {}",
1456 schema_id,
1457 table.metadata().current_schema_id()
1458 )));
1459 }
1460 if table.metadata().default_partition_spec_id() != partition_spec_id {
1461 return Err(SinkError::Iceberg(anyhow!(
1462 "Partition evolution not supported, expect partition spec id {}, but got {}",
1463 partition_spec_id,
1464 table.metadata().default_partition_spec_id()
1465 )));
1466 }
1467 Ok(table)
1468 }
1469}
1470
1471#[async_trait::async_trait]
1472impl SinkCommitCoordinator for IcebergSinkCommitter {
1473 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1474 if self.is_exactly_once {
1475 self.committed_epoch_subscriber = Some(subscriber);
1476 tracing::info!(
1477 "Sink id = {}: iceberg sink coordinator initing.",
1478 self.param.sink_id.sink_id()
1479 );
1480 if self
1481 .iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id())
1482 .await?
1483 {
1484 let ordered_metadata_list_by_end_epoch = self
1485 .get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id())
1486 .await?;
1487
1488 let mut last_recommit_epoch = 0;
1489 for (end_epoch, sealized_bytes, snapshot_id, committed) in
1490 ordered_metadata_list_by_end_epoch
1491 {
1492 let write_results_bytes = deserialize_metadata(sealized_bytes);
1493 let mut write_results = vec![];
1494
1495 for each in write_results_bytes {
1496 let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1497 write_results.push(write_result);
1498 }
1499
1500 match (
1501 committed,
1502 self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1503 .await?,
1504 ) {
1505 (true, _) => {
1506 tracing::info!(
1507 "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1508 self.param.sink_id.sink_id()
1509 );
1510 }
1511 (false, true) => {
1512 tracing::info!(
1514 "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1515 self.param.sink_id.sink_id()
1516 );
1517 self.mark_row_is_committed_by_sink_id_and_end_epoch(
1518 &self.db,
1519 self.sink_id,
1520 end_epoch,
1521 )
1522 .await?;
1523 }
1524 (false, false) => {
1525 tracing::info!(
1526 "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1527 self.param.sink_id.sink_id()
1528 );
1529 self.re_commit(end_epoch, write_results, snapshot_id)
1530 .await?;
1531 }
1532 }
1533
1534 last_recommit_epoch = end_epoch;
1535 }
1536 tracing::info!(
1537 "Sink id = {}: iceberg commit coordinator inited.",
1538 self.param.sink_id.sink_id()
1539 );
1540 return Ok(Some(last_recommit_epoch));
1541 } else {
1542 tracing::info!(
1543 "Sink id = {}: init iceberg coodinator, and system table is empty.",
1544 self.param.sink_id.sink_id()
1545 );
1546 return Ok(None);
1547 }
1548 }
1549
1550 tracing::info!("Iceberg commit coordinator inited.");
1551 return Ok(None);
1552 }
1553
1554 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
1555 tracing::info!("Starting iceberg commit in epoch {epoch}.");
1556 let write_results: Vec<IcebergCommitResult> = metadata
1557 .iter()
1558 .map(IcebergCommitResult::try_from)
1559 .collect::<Result<Vec<IcebergCommitResult>>>()?;
1560
1561 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1563 tracing::debug!(?epoch, "no data to commit");
1564 return Ok(());
1565 }
1566
1567 if write_results
1569 .iter()
1570 .any(|r| r.schema_id != write_results[0].schema_id)
1571 || write_results
1572 .iter()
1573 .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1574 {
1575 return Err(SinkError::Iceberg(anyhow!(
1576 "schema_id and partition_spec_id should be the same in all write results"
1577 )));
1578 }
1579
1580 if self.is_exactly_once {
1581 assert!(self.committed_epoch_subscriber.is_some());
1582 match self.committed_epoch_subscriber.clone() {
1583 Some(committed_epoch_subscriber) => {
1584 let (committed_epoch, mut rw_futures_utilrx) =
1586 committed_epoch_subscriber(self.param.sink_id).await?;
1587 if committed_epoch >= epoch {
1589 self.commit_iceberg_inner(epoch, write_results, None)
1590 .await?;
1591 } else {
1592 tracing::info!(
1593 "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1594 committed_epoch,
1595 epoch
1596 );
1597 while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1598 tracing::info!(
1599 "Received next committed epoch: {}",
1600 next_committed_epoch
1601 );
1602 if next_committed_epoch >= epoch {
1604 self.commit_iceberg_inner(epoch, write_results, None)
1605 .await?;
1606 break;
1607 }
1608 }
1609 }
1610 }
1611 None => unreachable!(
1612 "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
1613 ),
1614 }
1615 } else {
1616 self.commit_iceberg_inner(epoch, write_results, None)
1617 .await?;
1618 }
1619
1620 Ok(())
1621 }
1622}
1623
1624impl IcebergSinkCommitter {
1626 async fn re_commit(
1627 &mut self,
1628 epoch: u64,
1629 write_results: Vec<IcebergCommitResult>,
1630 snapshot_id: i64,
1631 ) -> Result<()> {
1632 tracing::info!("Starting iceberg re commit in epoch {epoch}.");
1633
1634 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1636 tracing::debug!(?epoch, "no data to commit");
1637 return Ok(());
1638 }
1639 self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
1640 .await?;
1641 Ok(())
1642 }
1643
1644 async fn commit_iceberg_inner(
1645 &mut self,
1646 epoch: u64,
1647 write_results: Vec<IcebergCommitResult>,
1648 snapshot_id: Option<i64>,
1649 ) -> Result<()> {
1650 let is_first_commit = snapshot_id.is_none();
1654 if !is_first_commit {
1655 tracing::info!("Doing iceberg re commit.");
1656 }
1657 self.last_commit_epoch = epoch;
1658 let expect_schema_id = write_results[0].schema_id;
1659 let expect_partition_spec_id = write_results[0].partition_spec_id;
1660
1661 self.table = Self::reload_table(
1663 self.catalog.as_ref(),
1664 self.table.identifier(),
1665 expect_schema_id,
1666 expect_partition_spec_id,
1667 )
1668 .await?;
1669 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1670 return Err(SinkError::Iceberg(anyhow!(
1671 "Can't find schema by id {}",
1672 expect_schema_id
1673 )));
1674 };
1675 let Some(partition_spec) = self
1676 .table
1677 .metadata()
1678 .partition_spec_by_id(expect_partition_spec_id)
1679 else {
1680 return Err(SinkError::Iceberg(anyhow!(
1681 "Can't find partition spec by id {}",
1682 expect_partition_spec_id
1683 )));
1684 };
1685 let partition_type = partition_spec
1686 .as_ref()
1687 .clone()
1688 .partition_type(schema)
1689 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1690
1691 let txn = Transaction::new(&self.table);
1692 let snapshot_id = match snapshot_id {
1694 Some(previous_snapshot_id) => previous_snapshot_id,
1695 None => txn.generate_unique_snapshot_id(),
1696 };
1697 if self.is_exactly_once && is_first_commit {
1698 let mut pre_commit_metadata_bytes = Vec::new();
1700 for each_parallelism_write_result in write_results.clone() {
1701 let each_parallelism_write_result_bytes: Vec<u8> =
1702 each_parallelism_write_result.try_into()?;
1703 pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
1704 }
1705
1706 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
1707
1708 self.persist_pre_commit_metadata(
1709 self.db.clone(),
1710 self.last_commit_epoch,
1711 epoch,
1712 pre_commit_metadata_bytes,
1713 snapshot_id,
1714 )
1715 .await?;
1716 }
1717
1718 let data_files = write_results
1719 .into_iter()
1720 .flat_map(|r| {
1721 r.data_files.into_iter().map(|f| {
1722 f.try_into(expect_partition_spec_id, &partition_type, schema)
1723 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1724 })
1725 })
1726 .collect::<Result<Vec<DataFile>>>()?;
1727 let retry_strategy = ExponentialBackoff::from_millis(10)
1732 .max_delay(Duration::from_secs(60))
1733 .map(jitter)
1734 .take(self.commit_retry_num as usize);
1735 let catalog = self.catalog.clone();
1736 let table_ident = self.table.identifier().clone();
1737 let table = Retry::spawn(retry_strategy, || async {
1738 let table = Self::reload_table(
1739 catalog.as_ref(),
1740 &table_ident,
1741 expect_schema_id,
1742 expect_partition_spec_id,
1743 )
1744 .await?;
1745 let txn = Transaction::new(&table);
1746 let mut append_action = txn
1747 .fast_append(Some(snapshot_id), None, vec![])
1748 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1749 append_action
1750 .add_data_files(data_files.clone())
1751 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1752 let tx = append_action.apply().await.map_err(|err| {
1753 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
1754 SinkError::Iceberg(anyhow!(err))
1755 })?;
1756 tx.commit(self.catalog.as_ref()).await.map_err(|err| {
1757 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
1758 SinkError::Iceberg(anyhow!(err))
1759 })
1760 })
1761 .await?;
1762 self.table = table;
1763
1764 if let Some(commit_notifier) = &mut self.commit_notifier {
1765 if commit_notifier.send(()).is_err() {
1766 warn!("failed to notify commit");
1767 }
1768 }
1769 tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
1770
1771 if self.is_exactly_once {
1772 self.mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch)
1773 .await?;
1774 tracing::info!(
1775 "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
1776 self.sink_id,
1777 epoch
1778 );
1779
1780 self.delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch)
1781 .await?;
1782 }
1783 Ok(())
1784 }
1785
1786 async fn persist_pre_commit_metadata(
1787 &self,
1788 db: DatabaseConnection,
1789 start_epoch: u64,
1790 end_epoch: u64,
1791 pre_commit_metadata: Vec<u8>,
1792 snapshot_id: i64,
1793 ) -> Result<()> {
1794 let m = exactly_once_iceberg_sink::ActiveModel {
1795 sink_id: Set(self.sink_id as i32),
1796 end_epoch: Set(end_epoch.try_into().unwrap()),
1797 start_epoch: Set(start_epoch.try_into().unwrap()),
1798 metadata: Set(pre_commit_metadata),
1799 committed: Set(false),
1800 snapshot_id: Set(snapshot_id),
1801 };
1802 match exactly_once_iceberg_sink::Entity::insert(m).exec(&db).await {
1803 Ok(_) => Ok(()),
1804 Err(e) => {
1805 tracing::error!("Error inserting into system table: {:?}", e.as_report());
1806 Err(e.into())
1807 }
1808 }
1809 }
1810
1811 async fn mark_row_is_committed_by_sink_id_and_end_epoch(
1812 &self,
1813 db: &DatabaseConnection,
1814 sink_id: u32,
1815 end_epoch: u64,
1816 ) -> Result<()> {
1817 match Entity::update(exactly_once_iceberg_sink::ActiveModel {
1818 sink_id: Set(sink_id as i32),
1819 end_epoch: Set(end_epoch.try_into().unwrap()),
1820 committed: Set(true),
1821 ..Default::default()
1822 })
1823 .exec(db)
1824 .await
1825 {
1826 Ok(_) => {
1827 tracing::info!(
1828 "Sink id = {}: mark written data status to committed, end_epoch = {}.",
1829 sink_id,
1830 end_epoch
1831 );
1832 Ok(())
1833 }
1834 Err(e) => {
1835 tracing::error!(
1836 "Error marking item to committed from iceberg exactly once system table: {:?}",
1837 e.as_report()
1838 );
1839 Err(e.into())
1840 }
1841 }
1842 }
1843
1844 async fn delete_row_by_sink_id_and_end_epoch(
1845 &self,
1846 db: &DatabaseConnection,
1847 sink_id: u32,
1848 end_epoch: u64,
1849 ) -> Result<()> {
1850 let end_epoch_i64: i64 = end_epoch.try_into().unwrap();
1851 match Entity::delete_many()
1852 .filter(Column::SinkId.eq(sink_id))
1853 .filter(Column::EndEpoch.lt(end_epoch_i64))
1854 .exec(db)
1855 .await
1856 {
1857 Ok(result) => {
1858 let deleted_count = result.rows_affected;
1859
1860 if deleted_count == 0 {
1861 tracing::info!(
1862 "Sink id = {}: no item deleted in iceberg exactly once system table, end_epoch < {}.",
1863 sink_id,
1864 end_epoch
1865 );
1866 } else {
1867 tracing::info!(
1868 "Sink id = {}: deleted item in iceberg exactly once system table, end_epoch < {}.",
1869 sink_id,
1870 end_epoch
1871 );
1872 }
1873 Ok(())
1874 }
1875 Err(e) => {
1876 tracing::error!(
1877 "Sink id = {}: error deleting from iceberg exactly once system table: {:?}",
1878 sink_id,
1879 e.as_report()
1880 );
1881 Err(e.into())
1882 }
1883 }
1884 }
1885
1886 async fn iceberg_sink_has_pre_commit_metadata(
1887 &self,
1888 db: &DatabaseConnection,
1889 sink_id: u32,
1890 ) -> Result<bool> {
1891 match exactly_once_iceberg_sink::Entity::find()
1892 .filter(exactly_once_iceberg_sink::Column::SinkId.eq(sink_id as i32))
1893 .count(db)
1894 .await
1895 {
1896 Ok(count) => Ok(count > 0),
1897 Err(e) => {
1898 tracing::error!(
1899 "Error querying pre-commit metadata from system table: {:?}",
1900 e.as_report()
1901 );
1902 Err(e.into())
1903 }
1904 }
1905 }
1906
1907 async fn get_pre_commit_info_by_sink_id(
1908 &self,
1909 db: &DatabaseConnection,
1910 sink_id: u32,
1911 ) -> Result<Vec<(u64, Vec<u8>, i64, bool)>> {
1912 let models: Vec<Model> = Entity::find()
1913 .filter(Column::SinkId.eq(sink_id as i32))
1914 .order_by(Column::EndEpoch, Order::Asc)
1915 .all(db)
1916 .await?;
1917
1918 let mut result: Vec<(u64, Vec<u8>, i64, bool)> = Vec::new();
1919
1920 for model in models {
1921 result.push((
1922 model.end_epoch.try_into().unwrap(),
1923 model.metadata,
1924 model.snapshot_id,
1925 model.committed,
1926 ));
1927 }
1928
1929 Ok(result)
1930 }
1931
1932 async fn is_snapshot_id_in_iceberg(
1936 &self,
1937 iceberg_config: &IcebergConfig,
1938 snapshot_id: i64,
1939 ) -> Result<bool> {
1940 let iceberg_common = iceberg_config.common.clone();
1941 let table = iceberg_common
1942 .load_table(&iceberg_config.java_catalog_props)
1943 .await?;
1944 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
1945 Ok(true)
1946 } else {
1947 Ok(false)
1948 }
1949 }
1950}
1951
1952const MAP_KEY: &str = "key";
1953const MAP_VALUE: &str = "value";
1954
1955fn get_fields<'a>(
1956 our_field_type: &'a risingwave_common::types::DataType,
1957 data_type: &ArrowDataType,
1958 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
1959) -> Option<ArrowFields> {
1960 match data_type {
1961 ArrowDataType::Struct(fields) => {
1962 match our_field_type {
1963 risingwave_common::types::DataType::Struct(struct_fields) => {
1964 struct_fields.iter().for_each(|(name, data_type)| {
1965 let res = schema_fields.insert(name, data_type);
1966 assert!(res.is_none())
1968 });
1969 }
1970 risingwave_common::types::DataType::Map(map_fields) => {
1971 schema_fields.insert(MAP_KEY, map_fields.key());
1972 schema_fields.insert(MAP_VALUE, map_fields.value());
1973 }
1974 risingwave_common::types::DataType::List(list_field) => {
1975 list_field.as_struct().iter().for_each(|(name, data_type)| {
1976 let res = schema_fields.insert(name, data_type);
1977 assert!(res.is_none())
1979 });
1980 }
1981 _ => {}
1982 };
1983 Some(fields.clone())
1984 }
1985 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
1986 get_fields(our_field_type, field.data_type(), schema_fields)
1987 }
1988 _ => None, }
1990}
1991
1992fn check_compatibility(
1993 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
1994 fields: &ArrowFields,
1995) -> anyhow::Result<bool> {
1996 for arrow_field in fields {
1997 let our_field_type = schema_fields
1998 .get(arrow_field.name().as_str())
1999 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2000
2001 let converted_arrow_data_type = IcebergArrowConvert
2003 .to_arrow_field("", our_field_type)
2004 .map_err(|e| anyhow!(e))?
2005 .data_type()
2006 .clone();
2007
2008 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2009 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2010 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2011 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2012 (ArrowDataType::List(_), ArrowDataType::List(field))
2013 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2014 let mut schema_fields = HashMap::new();
2015 get_fields(our_field_type, field.data_type(), &mut schema_fields)
2016 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2017 }
2018 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2020 let mut schema_fields = HashMap::new();
2021 our_field_type
2022 .as_struct()
2023 .iter()
2024 .for_each(|(name, data_type)| {
2025 let res = schema_fields.insert(name, data_type);
2026 assert!(res.is_none())
2028 });
2029 check_compatibility(schema_fields, fields)?
2030 }
2031 (left, right) => left.equals_datatype(right),
2039 };
2040 if !compatible {
2041 bail!(
2042 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2043 arrow_field.name(),
2044 converted_arrow_data_type,
2045 arrow_field.data_type()
2046 );
2047 }
2048 }
2049 Ok(true)
2050}
2051
2052pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2054 if rw_schema.fields.len() != arrow_schema.fields().len() {
2055 bail!(
2056 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2057 rw_schema.fields.len(),
2058 arrow_schema.fields.len()
2059 );
2060 }
2061
2062 let mut schema_fields = HashMap::new();
2063 rw_schema.fields.iter().for_each(|field| {
2064 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2065 assert!(res.is_none())
2067 });
2068
2069 check_compatibility(schema_fields, &arrow_schema.fields)?;
2070 Ok(())
2071}
2072
2073pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2074 serde_json::to_vec(&metadata).unwrap()
2075}
2076
2077pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2078 serde_json::from_slice(&bytes).unwrap()
2079}
2080
2081#[cfg(test)]
2082mod test {
2083 use std::collections::BTreeMap;
2084
2085 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2086 use risingwave_common::catalog::Field;
2087 use risingwave_common::types::{DataType, MapType, StructType};
2088
2089 use crate::connector_common::IcebergCommon;
2090 use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
2091 use crate::sink::iceberg::IcebergConfig;
2092
2093 #[test]
2094 fn test_compatible_arrow_schema() {
2095 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2096
2097 use super::*;
2098 let risingwave_schema = Schema::new(vec![
2099 Field::with_name(DataType::Int32, "a"),
2100 Field::with_name(DataType::Int32, "b"),
2101 Field::with_name(DataType::Int32, "c"),
2102 ]);
2103 let arrow_schema = ArrowSchema::new(vec![
2104 ArrowField::new("a", ArrowDataType::Int32, false),
2105 ArrowField::new("b", ArrowDataType::Int32, false),
2106 ArrowField::new("c", ArrowDataType::Int32, false),
2107 ]);
2108
2109 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2110
2111 let risingwave_schema = Schema::new(vec![
2112 Field::with_name(DataType::Int32, "d"),
2113 Field::with_name(DataType::Int32, "c"),
2114 Field::with_name(DataType::Int32, "a"),
2115 Field::with_name(DataType::Int32, "b"),
2116 ]);
2117 let arrow_schema = ArrowSchema::new(vec![
2118 ArrowField::new("a", ArrowDataType::Int32, false),
2119 ArrowField::new("b", ArrowDataType::Int32, false),
2120 ArrowField::new("d", ArrowDataType::Int32, false),
2121 ArrowField::new("c", ArrowDataType::Int32, false),
2122 ]);
2123 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2124
2125 let risingwave_schema = Schema::new(vec![
2126 Field::with_name(
2127 DataType::Struct(StructType::new(vec![
2128 ("a1", DataType::Int32),
2129 (
2130 "a2",
2131 DataType::Struct(StructType::new(vec![
2132 ("a21", DataType::Bytea),
2133 (
2134 "a22",
2135 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2136 ),
2137 ])),
2138 ),
2139 ])),
2140 "a",
2141 ),
2142 Field::with_name(
2143 DataType::List(Box::new(DataType::Struct(StructType::new(vec![
2144 ("b1", DataType::Int32),
2145 ("b2", DataType::Bytea),
2146 (
2147 "b3",
2148 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2149 ),
2150 ])))),
2151 "b",
2152 ),
2153 Field::with_name(
2154 DataType::Map(MapType::from_kv(
2155 DataType::Varchar,
2156 DataType::List(Box::new(DataType::Struct(StructType::new([
2157 ("c1", DataType::Int32),
2158 ("c2", DataType::Bytea),
2159 (
2160 "c3",
2161 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2162 ),
2163 ])))),
2164 )),
2165 "c",
2166 ),
2167 ]);
2168 let arrow_schema = ArrowSchema::new(vec![
2169 ArrowField::new(
2170 "a",
2171 ArrowDataType::Struct(ArrowFields::from(vec![
2172 ArrowField::new("a1", ArrowDataType::Int32, false),
2173 ArrowField::new(
2174 "a2",
2175 ArrowDataType::Struct(ArrowFields::from(vec![
2176 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2177 ArrowField::new_map(
2178 "a22",
2179 "entries",
2180 ArrowFieldRef::new(ArrowField::new(
2181 "key",
2182 ArrowDataType::Utf8,
2183 false,
2184 )),
2185 ArrowFieldRef::new(ArrowField::new(
2186 "value",
2187 ArrowDataType::Utf8,
2188 false,
2189 )),
2190 false,
2191 false,
2192 ),
2193 ])),
2194 false,
2195 ),
2196 ])),
2197 false,
2198 ),
2199 ArrowField::new(
2200 "b",
2201 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2202 ArrowDataType::Struct(ArrowFields::from(vec![
2203 ArrowField::new("b1", ArrowDataType::Int32, false),
2204 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2205 ArrowField::new_map(
2206 "b3",
2207 "entries",
2208 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2209 ArrowFieldRef::new(ArrowField::new(
2210 "value",
2211 ArrowDataType::Utf8,
2212 false,
2213 )),
2214 false,
2215 false,
2216 ),
2217 ])),
2218 false,
2219 ))),
2220 false,
2221 ),
2222 ArrowField::new_map(
2223 "c",
2224 "entries",
2225 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2226 ArrowFieldRef::new(ArrowField::new(
2227 "value",
2228 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2229 ArrowDataType::Struct(ArrowFields::from(vec![
2230 ArrowField::new("c1", ArrowDataType::Int32, false),
2231 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2232 ArrowField::new_map(
2233 "c3",
2234 "entries",
2235 ArrowFieldRef::new(ArrowField::new(
2236 "key",
2237 ArrowDataType::Utf8,
2238 false,
2239 )),
2240 ArrowFieldRef::new(ArrowField::new(
2241 "value",
2242 ArrowDataType::Utf8,
2243 false,
2244 )),
2245 false,
2246 false,
2247 ),
2248 ])),
2249 false,
2250 ))),
2251 false,
2252 )),
2253 false,
2254 false,
2255 ),
2256 ]);
2257 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2258 }
2259
2260 #[test]
2261 fn test_parse_iceberg_config() {
2262 let values = [
2263 ("connector", "iceberg"),
2264 ("type", "upsert"),
2265 ("primary_key", "v1"),
2266 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2267 ("warehouse.path", "s3://iceberg"),
2268 ("s3.endpoint", "http://127.0.0.1:9301"),
2269 ("s3.access.key", "hummockadmin"),
2270 ("s3.secret.key", "hummockadmin"),
2271 ("s3.path.style.access", "true"),
2272 ("s3.region", "us-east-1"),
2273 ("catalog.type", "jdbc"),
2274 ("catalog.name", "demo"),
2275 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2276 ("catalog.jdbc.user", "admin"),
2277 ("catalog.jdbc.password", "123456"),
2278 ("database.name", "demo_db"),
2279 ("table.name", "demo_table"),
2280 ]
2281 .into_iter()
2282 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2283 .collect();
2284
2285 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2286
2287 let expected_iceberg_config = IcebergConfig {
2288 common: IcebergCommon {
2289 warehouse_path: Some("s3://iceberg".to_owned()),
2290 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2291 region: Some("us-east-1".to_owned()),
2292 endpoint: Some("http://127.0.0.1:9301".to_owned()),
2293 access_key: Some("hummockadmin".to_owned()),
2294 secret_key: Some("hummockadmin".to_owned()),
2295 gcs_credential: None,
2296 catalog_type: Some("jdbc".to_owned()),
2297 glue_id: None,
2298 catalog_name: Some("demo".to_owned()),
2299 database_name: Some("demo_db".to_owned()),
2300 table_name: "demo_table".to_owned(),
2301 path_style_access: Some(true),
2302 credential: None,
2303 oauth2_server_uri: None,
2304 scope: None,
2305 token: None,
2306 enable_config_load: None,
2307 rest_signing_name: None,
2308 rest_signing_region: None,
2309 rest_sigv4_enabled: None,
2310 hosted_catalog: None,
2311 azblob_account_name: None,
2312 azblob_account_key: None,
2313 azblob_endpoint_url: None,
2314 },
2315 r#type: "upsert".to_owned(),
2316 force_append_only: false,
2317 primary_key: Some(vec!["v1".to_owned()]),
2318 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2319 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2320 .into_iter()
2321 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2322 .collect(),
2323 commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
2324 create_table_if_not_exists: false,
2325 is_exactly_once: None,
2326 commit_retry_num: 8,
2327 };
2328
2329 assert_eq!(iceberg_config, expected_iceberg_config);
2330
2331 assert_eq!(
2332 &iceberg_config.common.full_table_name().unwrap().to_string(),
2333 "demo_db.demo_table"
2334 );
2335 }
2336
2337 async fn test_create_catalog(configs: BTreeMap<String, String>) {
2338 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2339
2340 let table = iceberg_config.load_table().await.unwrap();
2341
2342 println!("{:?}", table.identifier());
2343 }
2344
2345 #[tokio::test]
2346 #[ignore]
2347 async fn test_storage_catalog() {
2348 let values = [
2349 ("connector", "iceberg"),
2350 ("type", "append-only"),
2351 ("force_append_only", "true"),
2352 ("s3.endpoint", "http://127.0.0.1:9301"),
2353 ("s3.access.key", "hummockadmin"),
2354 ("s3.secret.key", "hummockadmin"),
2355 ("s3.region", "us-east-1"),
2356 ("s3.path.style.access", "true"),
2357 ("catalog.name", "demo"),
2358 ("catalog.type", "storage"),
2359 ("warehouse.path", "s3://icebergdata/demo"),
2360 ("database.name", "s1"),
2361 ("table.name", "t1"),
2362 ]
2363 .into_iter()
2364 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2365 .collect();
2366
2367 test_create_catalog(values).await;
2368 }
2369
2370 #[tokio::test]
2371 #[ignore]
2372 async fn test_rest_catalog() {
2373 let values = [
2374 ("connector", "iceberg"),
2375 ("type", "append-only"),
2376 ("force_append_only", "true"),
2377 ("s3.endpoint", "http://127.0.0.1:9301"),
2378 ("s3.access.key", "hummockadmin"),
2379 ("s3.secret.key", "hummockadmin"),
2380 ("s3.region", "us-east-1"),
2381 ("s3.path.style.access", "true"),
2382 ("catalog.name", "demo"),
2383 ("catalog.type", "rest"),
2384 ("catalog.uri", "http://192.168.167.4:8181"),
2385 ("warehouse.path", "s3://icebergdata/demo"),
2386 ("database.name", "s1"),
2387 ("table.name", "t1"),
2388 ]
2389 .into_iter()
2390 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2391 .collect();
2392
2393 test_create_catalog(values).await;
2394 }
2395
2396 #[tokio::test]
2397 #[ignore]
2398 async fn test_jdbc_catalog() {
2399 let values = [
2400 ("connector", "iceberg"),
2401 ("type", "append-only"),
2402 ("force_append_only", "true"),
2403 ("s3.endpoint", "http://127.0.0.1:9301"),
2404 ("s3.access.key", "hummockadmin"),
2405 ("s3.secret.key", "hummockadmin"),
2406 ("s3.region", "us-east-1"),
2407 ("s3.path.style.access", "true"),
2408 ("catalog.name", "demo"),
2409 ("catalog.type", "jdbc"),
2410 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2411 ("catalog.jdbc.user", "admin"),
2412 ("catalog.jdbc.password", "123456"),
2413 ("warehouse.path", "s3://icebergdata/demo"),
2414 ("database.name", "s1"),
2415 ("table.name", "t1"),
2416 ]
2417 .into_iter()
2418 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2419 .collect();
2420
2421 test_create_catalog(values).await;
2422 }
2423
2424 #[tokio::test]
2425 #[ignore]
2426 async fn test_hive_catalog() {
2427 let values = [
2428 ("connector", "iceberg"),
2429 ("type", "append-only"),
2430 ("force_append_only", "true"),
2431 ("s3.endpoint", "http://127.0.0.1:9301"),
2432 ("s3.access.key", "hummockadmin"),
2433 ("s3.secret.key", "hummockadmin"),
2434 ("s3.region", "us-east-1"),
2435 ("s3.path.style.access", "true"),
2436 ("catalog.name", "demo"),
2437 ("catalog.type", "hive"),
2438 ("catalog.uri", "thrift://localhost:9083"),
2439 ("warehouse.path", "s3://icebergdata/demo"),
2440 ("database.name", "s1"),
2441 ("table.name", "t1"),
2442 ]
2443 .into_iter()
2444 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2445 .collect();
2446
2447 test_create_catalog(values).await;
2448 }
2449}